[MAJOR] rework of the server FSM

srv_state has been removed from HTTP state machines, and states
have been split in either TCP states or analyzers. For instance,
the TARPIT state has just become a simple analyzer.

New flags have been added to the struct buffer to compensate this.
The high-level stream processors sometimes need to force a disconnection
without touching a file-descriptor (eg: report an error). But if
they touched BF_SHUTW or BF_SHUTR, the file descriptor would not
be closed. Thus, the two SHUT?_NOW flags have been added so that
an application can request a forced close which the stream interface
will be forced to obey.

During this change, a new BF_HIJACK flag was added. It will
be used for data generation, eg during a stats dump. It
prevents the producer on a buffer from sending data into it.

  BF_SHUTR_NOW  /* the producer must shut down for reads ASAP  */
  BF_SHUTW_NOW  /* the consumer must shut down for writes ASAP */
  BF_HIJACK     /* the producer is temporarily replaced        */

BF_SHUTW_NOW has precedence over BF_HIJACK. BF_HIJACK has
precedence over BF_MAY_FORWARD (so that it does not need it).

New functions buffer_shutr_now(), buffer_shutw_now(), buffer_abort()
are provided to manipulate BF_SHUT* flags.

A new type "stream_interface" has been added to describe both
sides of a buffer. A stream interface has states and error
reporting. The session now has two stream interfaces (one per
side). Each buffer has stream_interface pointers to both
consumer and producer sides.

The server-side file descriptor has moved to its stream interface,
so that even the buffer has access to it.

process_srv() has been split into three parts :
  - tcp_get_connection() obtains a connection to the server
  - tcp_connection_failed() tests if a previously attempted
    connection has succeeded or not.
  - process_srv_data() only manages the data phase, and in
    this sense should be roughly equivalent to process_cli.

Little code has been removed, and a lot of old code has been
left in comments for now.
This commit is contained in:
Willy Tarreau 2008-10-19 07:30:41 +02:00
parent 41f40ede3b
commit fa7e10251d
12 changed files with 1792 additions and 631 deletions

View File

@ -47,6 +47,7 @@ static inline void buffer_init(struct buffer *buf)
{ {
buf->l = buf->total = 0; buf->l = buf->total = 0;
buf->analysers = 0; buf->analysers = 0;
buf->cons = NULL;
buf->flags = BF_EMPTY; buf->flags = BF_EMPTY;
buf->r = buf->lr = buf->w = buf->data; buf->r = buf->lr = buf->w = buf->data;
buf->rlim = buf->data + BUFSIZE; buf->rlim = buf->data + BUFSIZE;
@ -89,6 +90,24 @@ static inline void buffer_shutw(struct buffer *buf)
buf->flags |= BF_SHUTW; buf->flags |= BF_SHUTW;
} }
/* marks the buffer as "shutdown" ASAP for reads */
static inline void buffer_shutr_now(struct buffer *buf)
{
buf->flags |= BF_SHUTR_NOW;
}
/* marks the buffer as "shutdown" ASAP for writes */
static inline void buffer_shutw_now(struct buffer *buf)
{
buf->flags |= BF_SHUTW_NOW;
}
/* marks the buffer as "shutdown" ASAP in both directions */
static inline void buffer_abort(struct buffer *buf)
{
buf->flags |= BF_SHUTR_NOW | BF_SHUTW_NOW;
}
/* returns the maximum number of bytes writable at once in this buffer */ /* returns the maximum number of bytes writable at once in this buffer */
static inline int buffer_max(const struct buffer *buf) static inline int buffer_max(const struct buffer *buf)
{ {

View File

@ -60,7 +60,8 @@ extern const char http_is_ver_token[256];
int event_accept(int fd); int event_accept(int fd);
void process_session(struct task *t, int *next); void process_session(struct task *t, int *next);
int process_cli(struct session *t); int process_cli(struct session *t);
int process_srv(struct session *t); int process_srv_data(struct session *t);
int process_srv_conn(struct session *t);
int process_request(struct session *t); int process_request(struct session *t);
int process_response(struct session *t); int process_response(struct session *t);

View File

@ -2,7 +2,7 @@
include/proto/stream_sock.h include/proto/stream_sock.h
This file contains client-side definitions. This file contains client-side definitions.
Copyright (C) 2000-2006 Willy Tarreau - w@1wt.eu Copyright (C) 2000-2008 Willy Tarreau - w@1wt.eu
This library is free software; you can redistribute it and/or This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public modify it under the terms of the GNU Lesser General Public
@ -27,6 +27,7 @@
#include <sys/types.h> #include <sys/types.h>
#include <common/config.h> #include <common/config.h>
#include <types/stream_interface.h>
/* main event functions used to move data between sockets and buffers */ /* main event functions used to move data between sockets and buffers */

View File

@ -57,6 +57,15 @@
#define BF_READ_TIMEOUT 32768 /* timeout while waiting for producer */ #define BF_READ_TIMEOUT 32768 /* timeout while waiting for producer */
#define BF_WRITE_TIMEOUT 65536 /* timeout while waiting for consumer */ #define BF_WRITE_TIMEOUT 65536 /* timeout while waiting for consumer */
/* When either BF_SHUTR_NOW or BF_HIJACK is set, it is strictly forbidden for
* the stream interface to alter the buffer contents. When BF_SHUTW_NOW is set,
* it is strictly forbidden for the stream interface to send anything from the
* buffer.
*/
#define BF_SHUTR_NOW 131072 /* the producer must shut down for reads ASAP */
#define BF_SHUTW_NOW 262144 /* the consumer must shut down for writes ASAP */
#define BF_HIJACK 524288 /* the producer is temporarily replaced */
/* Analysers (buffer->analysers). /* Analysers (buffer->analysers).
* Those bits indicate that there are some processing to do on the buffer * Those bits indicate that there are some processing to do on the buffer
@ -68,7 +77,8 @@
#define AN_REQ_INSPECT 0x00000001 /* inspect request contents */ #define AN_REQ_INSPECT 0x00000001 /* inspect request contents */
#define AN_REQ_HTTP_HDR 0x00000002 /* inspect HTTP request headers */ #define AN_REQ_HTTP_HDR 0x00000002 /* inspect HTTP request headers */
#define AN_REQ_HTTP_BODY 0x00000004 /* inspect HTTP request body */ #define AN_REQ_HTTP_BODY 0x00000004 /* inspect HTTP request body */
#define AN_RTR_HTTP_HDR 0x00000008 /* inspect HTTP response headers */ #define AN_REQ_HTTP_TARPIT 0x00000008 /* wait for end of HTTP tarpit */
#define AN_RTR_HTTP_HDR 0x00000010 /* inspect HTTP response headers */
/* describes a chunk of string */ /* describes a chunk of string */
struct chunk { struct chunk {
@ -91,6 +101,8 @@ struct buffer {
unsigned char xfer_large; /* number of consecutive large xfers */ unsigned char xfer_large; /* number of consecutive large xfers */
unsigned char xfer_small; /* number of consecutive small xfers */ unsigned char xfer_small; /* number of consecutive small xfers */
unsigned long long total; /* total data read */ unsigned long long total; /* total data read */
struct stream_interface *prod; /* producer attached to this buffer */
struct stream_interface *cons; /* consumer attached to this buffer */
char data[BUFSIZE]; char data[BUFSIZE];
}; };

View File

@ -36,6 +36,7 @@
#include <types/proxy.h> #include <types/proxy.h>
#include <types/queue.h> #include <types/queue.h>
#include <types/server.h> #include <types/server.h>
#include <types/stream_interface.h>
#include <types/task.h> #include <types/task.h>
@ -156,7 +157,6 @@ struct session {
struct proxy *fe; /* the proxy this session depends on for the client side */ struct proxy *fe; /* the proxy this session depends on for the client side */
struct proxy *be; /* the proxy this session depends on for the server side */ struct proxy *be; /* the proxy this session depends on for the server side */
int cli_fd; /* the client side fd */ int cli_fd; /* the client side fd */
int srv_fd; /* the server side fd */
int cli_state; /* state of the client side */ int cli_state; /* state of the client side */
int srv_state; /* state of the server side */ int srv_state; /* state of the server side */
int conn_retries; /* number of connect retries left */ int conn_retries; /* number of connect retries left */
@ -164,6 +164,7 @@ struct session {
unsigned term_trace; /* term trace: 4*8 bits indicating which part of the code closed */ unsigned term_trace; /* term trace: 4*8 bits indicating which part of the code closed */
struct buffer *req; /* request buffer */ struct buffer *req; /* request buffer */
struct buffer *rep; /* response buffer */ struct buffer *rep; /* response buffer */
struct stream_interface si[2]; /* client and server stream interfaces */
struct sockaddr_storage cli_addr; /* the client address */ struct sockaddr_storage cli_addr; /* the client address */
struct sockaddr_storage frt_addr; /* the frontend address reached by the client if SN_FRT_ADDR_SET is set */ struct sockaddr_storage frt_addr; /* the frontend address reached by the client if SN_FRT_ADDR_SET is set */
struct sockaddr_in srv_addr; /* the address to connect to */ struct sockaddr_in srv_addr; /* the address to connect to */

View File

@ -0,0 +1,73 @@
/*
include/types/stream_interface.h
This file describes the stream_interface struct and associated constants.
Copyright (C) 2000-2008 Willy Tarreau - w@1wt.eu
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation, version 2.1
exclusively.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#ifndef _TYPES_STREAM_INTERFACE_H
#define _TYPES_STREAM_INTERFACE_H
#include <stdlib.h>
#include <common/config.h>
/* A stream interface must have its own errors independantly of the buffer's,
* so that applications can rely on what the buffer reports while the stream
* interface is performing some retries (eg: connection error).
*/
enum {
SI_ST_INI = 0, /* interface not initialized yet and might not exist */
SI_ST_QUE, /* interface waiting in queue */
SI_ST_TAR, /* interface in turn-around state after failed connect attempt */
SI_ST_ASS, /* server just assigned to this interface */
SI_ST_CON, /* initiated connection request (resource exists) */
SI_ST_EST, /* connection established (resource exists) */
SI_ST_CLO, /* stream interface closed, might not existing anymore */
};
/* error types reported on the streams interface for more accurate reporting */
enum {
SI_ET_NONE = 0, /* no error yet, leave it to zero */
SI_ET_QUEUE_TO, /* queue timeout */
SI_ET_QUEUE_ERR, /* queue error (eg: full) */
SI_ET_QUEUE_ABRT, /* aborted in queue by external cause */
SI_ET_CONN_TO, /* connection timeout */
SI_ET_CONN_ERR, /* connection error (eg: no server available) */
SI_ET_CONN_ABRT, /* connection aborted by external cause (eg: abort) */
SI_ET_CONN_OTHER, /* connection aborted for other reason (eg: 500) */
SI_ET_DATA_TO, /* timeout during data phase */
SI_ET_DATA_ERR, /* error during data phase */
SI_ET_DATA_ABRT, /* data phase aborted by external cause */
};
struct stream_interface {
unsigned int state; /* SI_ST* */
int err_type; /* first error detected, one of SI_ET_* */
void *err_loc; /* commonly the server, NULL when SI_ET_NONE */
int fd; /* file descriptor for a stream driver when known */
};
#endif /* _TYPES_STREAM_INTERFACE_H */
/*
* Local variables:
* c-indent-level: 8
* c-basic-offset: 8
* End:
*/

View File

@ -1646,7 +1646,7 @@ int connect_server(struct session *s)
return SN_ERR_INTERNAL; return SN_ERR_INTERNAL;
} }
if ((fd = s->srv_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) { if ((fd = s->req->cons->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) {
qfprintf(stderr, "Cannot get a server socket.\n"); qfprintf(stderr, "Cannot get a server socket.\n");
if (errno == ENFILE) if (errno == ENFILE)
@ -1818,6 +1818,7 @@ int connect_server(struct session *s)
fd_insert(fd); fd_insert(fd);
EV_FD_SET(fd, DIR_WR); /* for connect status */ EV_FD_SET(fd, DIR_WR); /* for connect status */
s->req->cons->state = SI_ST_CON;
if (s->srv) { if (s->srv) {
s->srv->cur_sess++; s->srv->cur_sess++;
if (s->srv->cur_sess > s->srv->cur_sess_max) if (s->srv->cur_sess > s->srv->cur_sess_max)
@ -1833,8 +1834,8 @@ int connect_server(struct session *s)
/* /*
* This function checks the retry count during the connect() job. * This function checks the retry count during the connect() job.
* It updates the session's srv_state and retries, so that the caller knows * It updates the session's retries, so that the caller knows what it
* what it has to do. It uses the last connection error to set the log when * has to do. It uses the last connection error to set the log when
* it expires. It returns 1 when it has expired, and 0 otherwise. * it expires. It returns 1 when it has expired, and 0 otherwise.
*/ */
int srv_count_retry_down(struct session *t, int conn_err) int srv_count_retry_down(struct session *t, int conn_err)
@ -1844,9 +1845,15 @@ int srv_count_retry_down(struct session *t, int conn_err)
if (t->conn_retries < 0) { if (t->conn_retries < 0) {
/* if not retryable anymore, let's abort */ /* if not retryable anymore, let's abort */
t->req->wex = TICK_ETERNITY; //t->req->wex = TICK_ETERNITY;
srv_close_with_err(t, conn_err, SN_FINST_C, //srv_close_with_err(t, conn_err, SN_FINST_C,
503, error_message(t, HTTP_ERR_503)); // 503, error_message(t, HTTP_ERR_503));
if (!t->req->cons->err_type) {
t->req->cons->err_type = SI_ET_CONN_ERR;
t->req->cons->err_loc = t->srv;
}
if (t->srv) if (t->srv)
t->srv->failed_conns++; t->srv->failed_conns++;
t->be->failed_conns++; t->be->failed_conns++;
@ -1864,9 +1871,9 @@ int srv_count_retry_down(struct session *t, int conn_err)
/* /*
* This function performs the retryable part of the connect() job. * This function performs the retryable part of the connect() job.
* It updates the session's srv_state and retries, so that the caller knows * It updates the session's and retries, so that the caller knows
* what it has to do. It returns 1 when it breaks out of the loop, or 0 if * what it has to do. It returns 1 when it breaks out of the loop,
* it needs to redispatch. * or 0 if it needs to redispatch.
*/ */
int srv_retryable_connect(struct session *t) int srv_retryable_connect(struct session *t)
{ {
@ -1882,15 +1889,19 @@ int srv_retryable_connect(struct session *t)
case SN_ERR_NONE: case SN_ERR_NONE:
//fprintf(stderr,"0: c=%d, s=%d\n", c, s); //fprintf(stderr,"0: c=%d, s=%d\n", c, s);
t->srv_state = SV_STCONN;
if (t->srv) if (t->srv)
t->srv->cum_sess++; t->srv->cum_sess++;
return 1; return 1;
case SN_ERR_INTERNAL: case SN_ERR_INTERNAL:
t->req->wex = TICK_ETERNITY; if (!t->req->cons->err_type) {
srv_close_with_err(t, SN_ERR_INTERNAL, SN_FINST_C, t->req->cons->err_type = SI_ET_CONN_OTHER;
500, error_message(t, HTTP_ERR_500)); t->req->cons->err_loc = t->srv;
}
//t->req->wex = TICK_ETERNITY;
//srv_close_with_err(t, SN_ERR_INTERNAL, SN_FINST_C,
// 500, error_message(t, HTTP_ERR_500));
if (t->srv) if (t->srv)
t->srv->cum_sess++; t->srv->cum_sess++;
if (t->srv) if (t->srv)
@ -1902,9 +1913,8 @@ int srv_retryable_connect(struct session *t)
return 1; return 1;
} }
/* ensure that we have enough retries left */ /* ensure that we have enough retries left */
if (srv_count_retry_down(t, conn_err)) { if (srv_count_retry_down(t, conn_err))
return 1; return 1;
}
} while (t->srv == NULL || t->conn_retries > 0 || !(t->be->options & PR_O_REDISP)); } while (t->srv == NULL || t->conn_retries > 0 || !(t->be->options & PR_O_REDISP));
/* We're on our last chance, and the REDISP option was specified. /* We're on our last chance, and the REDISP option was specified.
@ -1959,9 +1969,14 @@ int srv_redispatch_connect(struct session *t)
goto redispatch; goto redispatch;
} }
t->req->wex = TICK_ETERNITY; //t->req->wex = TICK_ETERNITY;
srv_close_with_err(t, SN_ERR_SRVTO, SN_FINST_Q, //srv_close_with_err(t, SN_ERR_SRVTO, SN_FINST_Q,
503, error_message(t, HTTP_ERR_503)); // 503, error_message(t, HTTP_ERR_503));
if (!t->req->cons->err_type) {
t->req->cons->err_type = SI_ET_QUEUE_ERR;
t->req->cons->err_loc = t->srv;
}
t->srv->failed_conns++; t->srv->failed_conns++;
t->be->failed_conns++; t->be->failed_conns++;
@ -1969,24 +1984,35 @@ int srv_redispatch_connect(struct session *t)
case SRV_STATUS_NOSRV: case SRV_STATUS_NOSRV:
/* note: it is guaranteed that t->srv == NULL here */ /* note: it is guaranteed that t->srv == NULL here */
t->req->wex = TICK_ETERNITY; //t->req->wex = TICK_ETERNITY;
srv_close_with_err(t, SN_ERR_SRVTO, SN_FINST_C, //srv_close_with_err(t, SN_ERR_SRVTO, SN_FINST_C,
503, error_message(t, HTTP_ERR_503)); // 503, error_message(t, HTTP_ERR_503));
if (!t->req->cons->err_type) {
t->req->cons->err_type = SI_ET_CONN_ERR;
t->req->cons->err_loc = NULL;
}
t->be->failed_conns++; t->be->failed_conns++;
return 1; return 1;
case SRV_STATUS_QUEUED: case SRV_STATUS_QUEUED:
t->req->wex = tick_add_ifset(now_ms, t->be->timeout.queue); t->req->wex = tick_add_ifset(now_ms, t->be->timeout.queue);
t->srv_state = SV_STIDLE; t->req->cons->state = SI_ST_QUE;
/* do nothing else and do not wake any other session up */ /* do nothing else and do not wake any other session up */
return 1; return 1;
case SRV_STATUS_INTERNAL: case SRV_STATUS_INTERNAL:
default: default:
t->req->wex = TICK_ETERNITY; //t->req->wex = TICK_ETERNITY;
srv_close_with_err(t, SN_ERR_INTERNAL, SN_FINST_C, //srv_close_with_err(t, SN_ERR_INTERNAL, SN_FINST_C,
500, error_message(t, HTTP_ERR_500)); // 500, error_message(t, HTTP_ERR_500));
if (!t->req->cons->err_type) {
t->req->cons->err_type = SI_ET_CONN_OTHER;
t->req->cons->err_loc = t->srv;
}
if (t->srv) if (t->srv)
t->srv->cum_sess++; t->srv->cum_sess++;
if (t->srv) if (t->srv)

View File

@ -168,11 +168,19 @@ int event_accept(int fd) {
} }
s->cli_state = CL_STDATA; s->cli_state = CL_STDATA;
s->srv_state = SV_STIDLE;
s->req = s->rep = NULL; /* will be allocated later */ s->req = s->rep = NULL; /* will be allocated later */
s->si[0].state = SI_ST_EST;
s->si[0].err_type = SI_ET_NONE;
s->si[0].err_loc = NULL;
s->si[0].fd = cfd;
s->cli_fd = cfd; s->cli_fd = cfd;
s->srv_fd = -1;
s->si[1].state = SI_ST_INI;
s->si[1].err_type = SI_ET_NONE;
s->si[1].err_loc = NULL;
s->si[1].fd = -1; /* just to help with debugging */
s->srv = s->prev_srv = s->srv_conn = NULL; s->srv = s->prev_srv = s->srv_conn = NULL;
s->pend_pos = NULL; s->pend_pos = NULL;
s->conn_retries = s->be->conn_retries; s->conn_retries = s->be->conn_retries;
@ -326,6 +334,9 @@ int event_accept(int fd) {
goto out_fail_req; /* no memory */ goto out_fail_req; /* no memory */
buffer_init(s->req); buffer_init(s->req);
s->req->prod = &s->si[0];
s->req->cons = &s->si[1];
if (p->mode == PR_MODE_HTTP) /* reserve some space for header rewriting */ if (p->mode == PR_MODE_HTTP) /* reserve some space for header rewriting */
s->req->rlim -= MAXREWRITE; s->req->rlim -= MAXREWRITE;
@ -346,6 +357,8 @@ int event_accept(int fd) {
goto out_fail_rep; /* no memory */ goto out_fail_rep; /* no memory */
buffer_init(s->rep); buffer_init(s->rep);
s->rep->prod = &s->si[1];
s->rep->cons = &s->si[0];
s->rep->rto = s->be->timeout.server; s->rep->rto = s->be->timeout.server;
s->rep->wto = s->fe->timeout.client; s->rep->wto = s->fe->timeout.client;

File diff suppressed because it is too large Load Diff

View File

@ -454,7 +454,6 @@ int uxst_event_accept(int fd) {
s->req = s->rep = NULL; /* will be allocated later */ s->req = s->rep = NULL; /* will be allocated later */
s->cli_fd = cfd; s->cli_fd = cfd;
s->srv_fd = -1;
s->srv = NULL; s->srv = NULL;
s->pend_pos = NULL; s->pend_pos = NULL;
@ -791,7 +790,7 @@ static int process_uxst_cli(struct session *t)
if ((global.mode & MODE_DEBUG) && (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE))) { if ((global.mode & MODE_DEBUG) && (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE))) {
int len; int len;
len = sprintf(trash, "%08x:%s.clicls[%04x:%04x]\n", t->uniq_id, t->be?t->be->id:"", len = sprintf(trash, "%08x:%s.clicls[%04x:%04x]\n", t->uniq_id, t->be?t->be->id:"",
(unsigned short)t->cli_fd, (unsigned short)t->srv_fd); (unsigned short)t->cli_fd, (unsigned short)t->req->cons->fd);
write(1, trash, len); write(1, trash, len);
} }
return 0; return 0;

View File

@ -48,16 +48,19 @@
*/ */
void client_retnclose(struct session *s, const struct chunk *msg) void client_retnclose(struct session *s, const struct chunk *msg)
{ {
EV_FD_CLR(s->cli_fd, DIR_RD); //FIXME: must move to lower level
EV_FD_SET(s->cli_fd, DIR_WR); //EV_FD_CLR(s->cli_fd, DIR_RD);
buffer_shutr(s->req); //EV_FD_SET(s->cli_fd, DIR_WR);
buffer_flush(s->req); buffer_abort(s->req);
s->rep->wex = tick_add_ifset(now_ms, s->rep->wto);
s->rep->flags |= BF_MAY_FORWARD;
s->cli_state = CL_STSHUTR; // FIXME: still used by unix sockets s->cli_state = CL_STSHUTR; // FIXME: still used by unix sockets
buffer_flush(s->rep); buffer_flush(s->rep);
buffer_shutr_now(s->rep);
if (msg && msg->len) if (msg && msg->len)
buffer_write(s->rep, msg->str, msg->len); buffer_write(s->rep, msg->str, msg->len);
s->rep->wex = tick_add_ifset(now_ms, s->rep->wto);
s->rep->flags |= BF_MAY_FORWARD;
} }

View File

@ -223,9 +223,12 @@ int stream_sock_read(int fd) {
if (tick_isset(b->rex) && b->flags & BF_PARTIAL_READ) if (tick_isset(b->rex) && b->flags & BF_PARTIAL_READ)
b->rex = tick_add_ifset(now_ms, b->rto); b->rex = tick_add_ifset(now_ms, b->rto);
if (!(b->flags & BF_READ_STATUS))
goto out_skip_wakeup;
out_wakeup: out_wakeup:
if (b->flags & BF_READ_STATUS) task_wakeup(fdtab[fd].owner);
task_wakeup(fdtab[fd].owner);
out_skip_wakeup:
fdtab[fd].ev &= ~FD_POLL_IN; fdtab[fd].ev &= ~FD_POLL_IN;
return retval; return retval;
@ -241,7 +244,6 @@ int stream_sock_read(int fd) {
*/ */
fdtab[fd].state = FD_STERROR; fdtab[fd].state = FD_STERROR;
fdtab[fd].ev &= ~FD_POLL_STICKY; fdtab[fd].ev &= ~FD_POLL_STICKY;
b->flags |= BF_READ_ERROR;
b->rex = TICK_ETERNITY; b->rex = TICK_ETERNITY;
goto out_wakeup; goto out_wakeup;
} }
@ -296,7 +298,7 @@ int stream_sock_write(int fd) {
if (errno == EALREADY || errno == EINPROGRESS) { if (errno == EALREADY || errno == EINPROGRESS) {
retval = 0; retval = 0;
goto out_wakeup; goto out_may_wakeup;
} }
if (errno && errno != EISCONN) if (errno && errno != EISCONN)
@ -392,9 +394,13 @@ int stream_sock_write(int fd) {
} }
} }
out_may_wakeup:
if (!(b->flags & BF_WRITE_STATUS))
goto out_skip_wakeup;
out_wakeup: out_wakeup:
if (b->flags & BF_WRITE_STATUS) task_wakeup(fdtab[fd].owner);
task_wakeup(fdtab[fd].owner);
out_skip_wakeup:
fdtab[fd].ev &= ~FD_POLL_OUT; fdtab[fd].ev &= ~FD_POLL_OUT;
return retval; return retval;
@ -404,7 +410,6 @@ int stream_sock_write(int fd) {
*/ */
fdtab[fd].state = FD_STERROR; fdtab[fd].state = FD_STERROR;
fdtab[fd].ev &= ~FD_POLL_STICKY; fdtab[fd].ev &= ~FD_POLL_STICKY;
b->flags |= BF_WRITE_ERROR;
b->wex = TICK_ETERNITY; b->wex = TICK_ETERNITY;
goto out_wakeup; goto out_wakeup;
} }