diff --git a/include/common/buffer.h b/include/common/buffer.h index ca90fbe4b..ce3eb40a9 100644 --- a/include/common/buffer.h +++ b/include/common/buffer.h @@ -39,9 +39,18 @@ struct buffer { char data[0]; /* bytes */ }; +/* an element of the list. It represents an object that need to + * acquire a buffer to continue its process. */ +struct buffer_wait { + void *target; /* The waiting object that should be woken up */ + int (*wakeup_cb)(void *); /* The function used to wake up the , passed as argument */ + struct list list; /* Next element in the list */ +}; + extern struct pool_head *pool2_buffer; extern struct buffer buf_empty; extern struct buffer buf_wanted; +extern struct list buffer_wq; int init_buffer(); int buffer_replace2(struct buffer *b, char *pos, char *end, const char *str, int len); @@ -522,6 +531,16 @@ static inline struct buffer *b_alloc_margin(struct buffer **buf, int margin) return next; } + +void __offer_buffer(void *from, unsigned int threshold); + +static inline void offer_buffers(void *from, unsigned int threshold) +{ + if (LIST_ISEMPTY(&buffer_wq)) + return; + __offer_buffer(from, threshold); +} + #endif /* _COMMON_BUFFER_H */ /* diff --git a/include/proto/applet.h b/include/proto/applet.h index 5a503b43c..653be31e4 100644 --- a/include/proto/applet.h +++ b/include/proto/applet.h @@ -36,6 +36,10 @@ extern struct list applet_active_queue; void applet_run_active(); + +static int inline appctx_res_wakeup(struct appctx *appctx); + + /* Initializes all required fields for a new appctx. Note that it does the * minimum acceptable initialization for an appctx. This means only the * 3 integer states st0, st1, st2 are zeroed. @@ -61,6 +65,9 @@ static inline struct appctx *appctx_new(struct applet *applet) appctx->applet = applet; appctx_init(appctx); LIST_INIT(&appctx->runq); + LIST_INIT(&appctx->buffer_wait.list); + appctx->buffer_wait.target = appctx; + appctx->buffer_wait.wakeup_cb = (int (*)(void *))appctx_res_wakeup; nb_applets++; } return appctx; @@ -75,6 +82,10 @@ static inline void appctx_free(struct appctx *appctx) LIST_DEL(&appctx->runq); applets_active_queue--; } + if (!LIST_ISEMPTY(&appctx->buffer_wait.list)) { + LIST_DEL(&appctx->buffer_wait.list); + LIST_INIT(&appctx->buffer_wait.list); + } pool_free2(pool2_connection, appctx); nb_applets--; } @@ -98,6 +109,19 @@ static inline void appctx_pause(struct appctx *appctx) } } +/* Callback used to wake up an applet when a buffer is available. The applet + * is woken up is if it is not already in the list of "active" + * applets. This functions returns 1 is the stream is woken up, otherwise it + * returns 0. */ +static inline int appctx_res_wakeup(struct appctx *appctx) +{ + if (!LIST_ISEMPTY(&appctx->runq)) + return 0; + appctx_wakeup(appctx); + return 1; +} + + #endif /* _PROTO_APPLET_H */ /* diff --git a/include/proto/channel.h b/include/proto/channel.h index 3d435c47c..304a9357a 100644 --- a/include/proto/channel.h +++ b/include/proto/channel.h @@ -36,6 +36,9 @@ #include #include +#include +#include + /* perform minimal intializations, report 0 in case of error, 1 if OK. */ int init_channel(); @@ -439,6 +442,41 @@ static inline int channel_recv_max(const struct channel *chn) return ret; } +/* Allocates a buffer for channel , but only if it's guaranteed that it's + * not the last available buffer or it's the response buffer. Unless the buffer + * is the response buffer, an extra control is made so that we always keep + * buffers available after this allocation. Returns 0 in + * case of failure, non-zero otherwise. + * + * If no buffer are available, the requester, represented by pointer, + * will be added in the list of objects waiting for an available buffer. + */ +static inline int channel_alloc_buffer(struct channel *chn, struct buffer_wait *wait) +{ + int margin = 0; + + if (!(chn->flags & CF_ISRESP)) + margin = global.tune.reserved_bufs; + + if (b_alloc_margin(&chn->buf, margin) != NULL) + return 1; + + if (LIST_ISEMPTY(&wait->list)) + LIST_ADDQ(&buffer_wq, &wait->list); + return 0; +} + +/* Releases a possibly allocated buffer for channel . If it was not + * allocated, this function does nothing. Else the buffer is released and we try + * to wake up as many streams/applets as possible. */ +static inline void channel_release_buffer(struct channel *chn, struct buffer_wait *wait) +{ + if (chn->buf->size && buffer_empty(chn->buf)) { + b_free(&chn->buf); + offer_buffers(wait->target, tasks_run_queue + applets_active_queue); + } +} + /* Truncate any unread data in the channel's buffer, and disable forwarding. * Outgoing data are left intact. This is mainly to be used to send error * messages after existing data. diff --git a/include/proto/stream.h b/include/proto/stream.h index b439344f2..85c234edc 100644 --- a/include/proto/stream.h +++ b/include/proto/stream.h @@ -32,7 +32,6 @@ extern struct pool_head *pool2_stream; extern struct list streams; -extern struct list buffer_wq; extern struct data_cb sess_conn_cb; @@ -55,11 +54,7 @@ int parse_track_counters(char **args, int *arg, /* Update the stream's backend and server time stats */ void stream_update_time_stats(struct stream *s); -void __stream_offer_buffers(int rqlimit); -static inline void stream_offer_buffers(); -int stream_alloc_work_buffer(struct stream *s); void stream_release_buffers(struct stream *s); -int stream_alloc_recv_buffer(struct channel *chn); /* returns the session this stream belongs to */ static inline struct session *strm_sess(const struct stream *strm) @@ -285,25 +280,16 @@ static void inline stream_init_srv_conn(struct stream *sess) LIST_INIT(&sess->by_srv); } -static inline void stream_offer_buffers() +/* Callback used to wake up a stream when a buffer is available. The stream + * is woken up is if it is not already running and if it is not already in the + * task run queue. This functions returns 1 is the stream is woken up, otherwise + * it returns 0. */ +static int inline stream_res_wakeup(struct stream *s) { - int avail; - - if (LIST_ISEMPTY(&buffer_wq)) - return; - - /* all streams will need 1 buffer, so we can stop waking up streams - * once we have enough of them to eat all the buffers. Note that we - * don't really know if they are streams or just other tasks, but - * that's a rough estimate. Similarly, for each cached event we'll need - * 1 buffer. If no buffer is currently used, always wake up the number - * of tasks we can offer a buffer based on what is allocated, and in - * any case at least one task per two reserved buffers. - */ - avail = pool2_buffer->allocated - pool2_buffer->used - global.tune.reserved_bufs / 2; - - if (avail > (int)tasks_run_queue) - __stream_offer_buffers(avail); + if (s->task->state & TASK_RUNNING || task_in_rq(s->task)) + return 0; + task_wakeup(s->task, TASK_WOKEN_RES); + return 1; } void service_keywords_register(struct action_kw_list *kw_list); diff --git a/include/types/applet.h b/include/types/applet.h index da9f787a2..89602aac5 100644 --- a/include/types/applet.h +++ b/include/types/applet.h @@ -26,6 +26,7 @@ #include #include #include +#include #include #include @@ -58,6 +59,7 @@ struct appctx { void (*io_release)(struct appctx *appctx); /* used within the cli_io_handler when st0 = CLI_ST_CALLBACK, if the command is terminated or the session released */ void *private; + struct buffer_wait buffer_wait; /* position in the list of objects waiting for a buffer */ union { struct { diff --git a/include/types/stream.h b/include/types/stream.h index 2cc903cfa..26e8dd590 100644 --- a/include/types/stream.h +++ b/include/types/stream.h @@ -135,7 +135,7 @@ struct stream { struct list list; /* position in global streams list */ struct list by_srv; /* position in server stream list */ struct list back_refs; /* list of users tracking this stream */ - struct list buffer_wait; /* position in the list of streams waiting for a buffer */ + struct buffer_wait buffer_wait; /* position in the list of objects waiting for a buffer */ struct { struct stksess *ts; diff --git a/src/applet.c b/src/applet.c index ad40e1f9f..f5bc79d80 100644 --- a/src/applet.c +++ b/src/applet.c @@ -16,6 +16,7 @@ #include #include #include +#include #include #include @@ -48,13 +49,12 @@ void applet_run_active() curr = LIST_ELEM(applet_cur_queue.n, typeof(curr), runq); si = curr->owner; - /* now we'll need a buffer */ - if (!stream_alloc_recv_buffer(si_ic(si))) { - si->flags |= SI_FL_WAIT_ROOM; - LIST_DEL(&curr->runq); - LIST_INIT(&curr->runq); - continue; - } + /* Now we'll try to allocate the input buffer. We wake up the + * applet in all cases. So this is the applet responsibility to + * check if this buffer was allocated or not. This let a chance + * for applets to do some other processing if needed. */ + if (!channel_alloc_buffer(si_ic(si), &curr->buffer_wait)) + si_applet_cant_put(si); /* We always pretend the applet can't get and doesn't want to * put, it's up to it to change this if needed. This ensures @@ -65,6 +65,7 @@ void applet_run_active() curr->applet->fct(curr); si_applet_wake_cb(si); + channel_release_buffer(si_ic(si), &curr->buffer_wait); if (applet_cur_queue.n == &curr->runq) { /* curr was left in the list, move it back to the active list */ diff --git a/src/buffer.c b/src/buffer.c index f47fbddbd..4f8f6474c 100644 --- a/src/buffer.c +++ b/src/buffer.c @@ -31,6 +31,9 @@ struct pool_head *pool2_buffer; struct buffer buf_empty = { .p = buf_empty.data }; struct buffer buf_wanted = { .p = buf_wanted.data }; +/* list of objects waiting for at least one buffer */ +struct list buffer_wq = LIST_HEAD_INIT(buffer_wq); + /* perform minimal intializations, report 0 in case of error, 1 if OK. */ int init_buffer() { @@ -278,6 +281,35 @@ void buffer_dump(FILE *o, struct buffer *b, int from, int to) fflush(o); } +void __offer_buffer(void *from, unsigned int threshold) +{ + struct buffer_wait *wait, *bak; + int avail; + + /* For now, we consider that all objects need 1 buffer, so we can stop + * waking up them once we have enough of them to eat all the available + * buffers. Note that we don't really know if they are streams or just + * other tasks, but that's a rough estimate. Similarly, for each cached + * event we'll need 1 buffer. If no buffer is currently used, always + * wake up the number of tasks we can offer a buffer based on what is + * allocated, and in any case at least one task per two reserved + * buffers. + */ + avail = pool2_buffer->allocated - pool2_buffer->used - global.tune.reserved_bufs / 2; + + list_for_each_entry_safe(wait, bak, &buffer_wq, list) { + if (avail <= threshold) + break; + + if (wait->target == from || !wait->wakeup_cb(wait->target)) + continue; + + LIST_DEL(&wait->list); + LIST_INIT(&wait->list); + + avail--; + } +} /* * Local variables: diff --git a/src/cli.c b/src/cli.c index a1923bc4e..3d537ba54 100644 --- a/src/cli.c +++ b/src/cli.c @@ -488,6 +488,12 @@ static void cli_io_handler(struct appctx *appctx) if (unlikely(si->state == SI_ST_DIS || si->state == SI_ST_CLO)) goto out; + /* Check if the input buffer is avalaible. */ + if (res->buf->size == 0) { + si_applet_cant_put(si); + goto out; + } + while (1) { if (appctx->st0 == CLI_ST_INIT) { /* Stats output not initialized yet */ diff --git a/src/flt_spoe.c b/src/flt_spoe.c index 776848e48..aa6414abf 100644 --- a/src/flt_spoe.c +++ b/src/flt_spoe.c @@ -224,7 +224,7 @@ struct spoe_context { struct appctx *appctx; /* The SPOE appctx */ struct list *messages; /* List of messages that will be sent during the stream processing */ struct buffer *buffer; /* Buffer used to store a NOTIFY or ACK frame */ - struct list buffer_wait; /* position in the list of streams waiting for a buffer */ + struct buffer_wait buffer_wait; /* position in the list of streams waiting for a buffer */ struct list applet_wait; /* position in the list of streams waiting for a SPOE applet */ enum spoe_ctx_state state; /* SPOE_CTX_ST_* */ @@ -1232,6 +1232,9 @@ send_spoe_frame(struct appctx *appctx, int framesz, ret; uint32_t netint; + if (si_ic(si)->buf->size == 0) + return -1; + ret = prepare(appctx, trash.str, APPCTX_SPOE(appctx).max_frame_size); if (ret <= 0) goto skip_or_error; @@ -1524,7 +1527,7 @@ handle_spoe_applet(struct appctx *appctx) /* fall through */ case SPOE_APPCTX_ST_END: - break; + return; } out: @@ -1693,13 +1696,13 @@ acquire_spoe_appctx(struct spoe_context *ctx, int dir) /* If needed, initialize the buffer that will be used to encode messages * and decode actions. */ if (ctx->buffer == &buf_empty) { - if (!LIST_ISEMPTY(&ctx->buffer_wait)) { - LIST_DEL(&ctx->buffer_wait); - LIST_INIT(&ctx->buffer_wait); + if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) { + LIST_DEL(&ctx->buffer_wait.list); + LIST_INIT(&ctx->buffer_wait.list); } - if (!b_alloc_margin(&ctx->buffer, 0)) { - LIST_ADDQ(&buffer_wq, &ctx->buffer_wait); + if (!b_alloc_margin(&ctx->buffer, global.tune.reserved_bufs)) { + LIST_ADDQ(&buffer_wq, &ctx->buffer_wait.list); goto wait; } } @@ -1794,8 +1797,7 @@ release_spoe_appctx(struct spoe_context *ctx) /* Release the buffer if needed */ if (ctx->buffer != &buf_empty) { b_free(&ctx->buffer); - if (!LIST_ISEMPTY(&buffer_wq)) - stream_offer_buffers(); + offer_buffers(ctx, tasks_run_queue + applets_active_queue); } /* If there is no SPOE applet, all is done */ @@ -2213,6 +2215,12 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx, /*************************************************************************** * Functions that create/destroy SPOE contexts **************************************************************************/ +static int wakeup_spoe_context(struct spoe_context *ctx) +{ + task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); + return 1; +} + static struct spoe_context * create_spoe_context(struct filter *filter) { @@ -2229,7 +2237,9 @@ create_spoe_context(struct filter *filter) ctx->flags = 0; ctx->messages = conf->agent->messages; ctx->buffer = &buf_empty; - LIST_INIT(&ctx->buffer_wait); + LIST_INIT(&ctx->buffer_wait.list); + ctx->buffer_wait.target = ctx; + ctx->buffer_wait.wakeup_cb = (int (*)(void *))wakeup_spoe_context; LIST_INIT(&ctx->applet_wait); ctx->stream_id = 0; @@ -2247,8 +2257,8 @@ destroy_spoe_context(struct spoe_context *ctx) if (ctx->appctx) APPCTX_SPOE(ctx->appctx).ctx = NULL; - if (!LIST_ISEMPTY(&ctx->buffer_wait)) - LIST_DEL(&ctx->buffer_wait); + if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) + LIST_DEL(&ctx->buffer_wait.list); if (!LIST_ISEMPTY(&ctx->applet_wait)) LIST_DEL(&ctx->applet_wait); pool_free2(pool2_spoe_ctx, ctx); @@ -2459,8 +2469,13 @@ spoe_check_timeouts(struct stream *s, struct filter *filter) { struct spoe_context *ctx = filter->ctx; - if (tick_is_expired(ctx->process_exp, now_ms)) - s->task->state |= TASK_WOKEN_MSG; + if (tick_is_expired(ctx->process_exp, now_ms)) { + s->pending_events |= TASK_WOKEN_MSG; + if (ctx->buffer != &buf_empty) { + b_free(&ctx->buffer); + offer_buffers(ctx, tasks_run_queue + applets_active_queue); + } + } } /* Called when we are ready to filter data on a channel */ diff --git a/src/hlua.c b/src/hlua.c index 0ca3ec21e..10ed8ee47 100644 --- a/src/hlua.c +++ b/src/hlua.c @@ -1884,10 +1884,8 @@ static int hlua_socket_write_yield(struct lua_State *L,int status, lua_KContext * the request buffer if its not required. */ if (socket->s->req.buf->size == 0) { - if (!stream_alloc_recv_buffer(&socket->s->req)) { - socket->s->si[0].flags |= SI_FL_WAIT_ROOM; - goto hlua_socket_write_yield_return; - } + si_applet_cant_put(&socket->s->si[0]); + goto hlua_socket_write_yield_return; } /* Check for avalaible space. */ @@ -2610,6 +2608,14 @@ __LJMP static int hlua_channel_append_yield(lua_State *L, int status, lua_KConte int ret; int max; + /* Check if the buffer is avalaible because HAProxy doesn't allocate + * the request buffer if its not required. + */ + if (chn->buf->size == 0) { + si_applet_cant_put(chn_prod(chn)); + WILL_LJMP(hlua_yieldk(L, 0, 0, hlua_channel_append_yield, TICK_ETERNITY, 0)); + } + max = channel_recv_limit(chn) - buffer_len(chn->buf); if (max > len - l) max = len - l; @@ -2700,10 +2706,8 @@ __LJMP static int hlua_channel_send_yield(lua_State *L, int status, lua_KContext * the request buffer if its not required. */ if (chn->buf->size == 0) { - if (!stream_alloc_recv_buffer(chn)) { - chn_prod(chn)->flags |= SI_FL_WAIT_ROOM; - WILL_LJMP(hlua_yieldk(L, 0, 0, hlua_channel_send_yield, TICK_ETERNITY, 0)); - } + si_applet_cant_put(chn_prod(chn)); + WILL_LJMP(hlua_yieldk(L, 0, 0, hlua_channel_send_yield, TICK_ETERNITY, 0)); } /* the writed data will be immediatly sent, so we can check diff --git a/src/peers.c b/src/peers.c index 1a80ab34a..1a280a570 100644 --- a/src/peers.c +++ b/src/peers.c @@ -547,6 +547,10 @@ static void peer_io_handler(struct appctx *appctx) size_t proto_len = strlen(PEER_SESSION_PROTO_NAME); unsigned int maj_ver, min_ver; + /* Check if the input buffer is avalaible. */ + if (si_ic(si)->buf->size == 0) + goto full; + while (1) { switchstate: maj_ver = min_ver = (unsigned int)-1; diff --git a/src/stats.c b/src/stats.c index 1a842e8d9..8ad983df6 100644 --- a/src/stats.c +++ b/src/stats.c @@ -2766,6 +2766,12 @@ static void http_stats_io_handler(struct appctx *appctx) if (unlikely(si->state == SI_ST_DIS || si->state == SI_ST_CLO)) goto out; + /* Check if the input buffer is avalaible. */ + if (res->buf->size == 0) { + si_applet_cant_put(si); + goto out; + } + /* check that the output is not closed */ if (res->flags & (CF_SHUTW|CF_SHUTW_NOW)) appctx->st0 = STAT_HTTP_DONE; diff --git a/src/stream.c b/src/stream.c index db8702dd4..298830d2f 100644 --- a/src/stream.c +++ b/src/stream.c @@ -62,9 +62,6 @@ struct pool_head *pool2_stream; struct list streams; -/* list of streams waiting for at least one buffer */ -struct list buffer_wq = LIST_HEAD_INIT(buffer_wq); - /* List of all use-service keywords. */ static struct list service_keywords = LIST_HEAD_INIT(service_keywords); @@ -139,7 +136,10 @@ struct stream *stream_new(struct session *sess, struct task *t, enum obj_type *o /* OK, we're keeping the stream, so let's properly initialize the stream */ LIST_ADDQ(&streams, &s->list); LIST_INIT(&s->back_refs); - LIST_INIT(&s->buffer_wait); + + LIST_INIT(&s->buffer_wait.list); + s->buffer_wait.target = s; + s->buffer_wait.wakeup_cb = (int (*)(void *))stream_res_wakeup; s->flags |= SF_INITIALIZED; s->unique_id = NULL; @@ -289,15 +289,15 @@ static void stream_free(struct stream *s) put_pipe(s->res.pipe); /* We may still be present in the buffer wait queue */ - if (!LIST_ISEMPTY(&s->buffer_wait)) { - LIST_DEL(&s->buffer_wait); - LIST_INIT(&s->buffer_wait); + if (!LIST_ISEMPTY(&s->buffer_wait.list)) { + LIST_DEL(&s->buffer_wait.list); + LIST_INIT(&s->buffer_wait.list); + } + if (s->req.buf->size || s->res.buf->size) { + b_drop(&s->req.buf); + b_drop(&s->res.buf); + offer_buffers(NULL, tasks_run_queue + applets_active_queue); } - - b_drop(&s->req.buf); - b_drop(&s->res.buf); - if (!LIST_ISEMPTY(&buffer_wq)) - stream_offer_buffers(); hlua_ctx_destroy(&s->hlua); if (s->txn) @@ -370,33 +370,6 @@ static void stream_free(struct stream *s) } } -/* Allocates a receive buffer for channel , but only if it's guaranteed - * that it's not the last available buffer or it's the response buffer. Unless - * the buffer is the response buffer, an extra control is made so that we always - * keep buffers available after this allocation. To be - * called at the beginning of recv() callbacks to ensure that the required - * buffers are properly allocated. Returns 0 in case of failure, non-zero - * otherwise. - */ -int stream_alloc_recv_buffer(struct channel *chn) -{ - struct stream *s; - struct buffer *b; - int margin = 0; - - if (!(chn->flags & CF_ISRESP)) - margin = global.tune.reserved_bufs; - - s = chn_strm(chn); - - b = b_alloc_margin(&chn->buf, margin); - if (b) - return 1; - - if (LIST_ISEMPTY(&s->buffer_wait)) - LIST_ADDQ(&buffer_wq, &s->buffer_wait); - return 0; -} /* Allocates a work buffer for stream . It is meant to be called inside * process_stream(). It will only allocate the side needed for the function @@ -406,60 +379,44 @@ int stream_alloc_recv_buffer(struct channel *chn) * server from releasing a connection. Returns 0 in case of failure, non-zero * otherwise. */ -int stream_alloc_work_buffer(struct stream *s) +static int stream_alloc_work_buffer(struct stream *s) { - if (!LIST_ISEMPTY(&s->buffer_wait)) { - LIST_DEL(&s->buffer_wait); - LIST_INIT(&s->buffer_wait); + if (!LIST_ISEMPTY(&s->buffer_wait.list)) { + LIST_DEL(&s->buffer_wait.list); + LIST_INIT(&s->buffer_wait.list); } if (b_alloc_margin(&s->res.buf, 0)) return 1; - LIST_ADDQ(&buffer_wq, &s->buffer_wait); + LIST_ADDQ(&buffer_wq, &s->buffer_wait.list); return 0; } /* releases unused buffers after processing. Typically used at the end of the - * update() functions. It will try to wake up as many tasks as the number of - * buffers that it releases. In practice, most often streams are blocked on - * a single buffer, so it makes sense to try to wake two up when two buffers - * are released at once. + * update() functions. It will try to wake up as many tasks/applets as the + * number of buffers that it releases. In practice, most often streams are + * blocked on a single buffer, so it makes sense to try to wake two up when two + * buffers are released at once. */ void stream_release_buffers(struct stream *s) { - if (s->req.buf->size && buffer_empty(s->req.buf)) - b_free(&s->req.buf); + int offer = 0; - if (s->res.buf->size && buffer_empty(s->res.buf)) + if (s->req.buf->size && buffer_empty(s->req.buf)) { + offer = 1; + b_free(&s->req.buf); + } + if (s->res.buf->size && buffer_empty(s->res.buf)) { + offer = 1; b_free(&s->res.buf); + } /* if we're certain to have at least 1 buffer available, and there is * someone waiting, we can wake up a waiter and offer them. */ - if (!LIST_ISEMPTY(&buffer_wq)) - stream_offer_buffers(); -} - -/* Runs across the list of pending streams waiting for a buffer and wakes one - * up if buffers are available. Will stop when the run queue reaches . - * Should not be called directly, use stream_offer_buffers() instead. - */ -void __stream_offer_buffers(int rqlimit) -{ - struct stream *sess, *bak; - - list_for_each_entry_safe(sess, bak, &buffer_wq, buffer_wait) { - if (rqlimit <= tasks_run_queue) - break; - - if (sess->task->state & TASK_RUNNING) - continue; - - LIST_DEL(&sess->buffer_wait); - LIST_INIT(&sess->buffer_wait); - task_wakeup(sess->task, TASK_WOKEN_RES); - } + if (offer) + offer_buffers(s, tasks_run_queue + applets_active_queue); } /* perform minimal intializations, report 0 in case of error, 1 if OK. */ @@ -2817,7 +2774,7 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st chunk_appendf(&trash, " txn=%p flags=0x%x meth=%d status=%d req.st=%s rsp.st=%s waiting=%d\n", strm->txn, strm->txn->flags, strm->txn->meth, strm->txn->status, - http_msg_state_str(strm->txn->req.msg_state), http_msg_state_str(strm->txn->rsp.msg_state), !LIST_ISEMPTY(&strm->buffer_wait)); + http_msg_state_str(strm->txn->req.msg_state), http_msg_state_str(strm->txn->rsp.msg_state), !LIST_ISEMPTY(&strm->buffer_wait.list)); chunk_appendf(&trash, " si[0]=%p (state=%s flags=0x%02x endp0=%s:%p exp=%s, et=0x%03x)\n", diff --git a/src/stream_interface.c b/src/stream_interface.c index e3e6cc66b..d5f2c87ea 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -538,8 +538,6 @@ void stream_int_notify(struct stream_interface *si) } if (ic->flags & CF_READ_ACTIVITY) ic->flags &= ~CF_READ_DONTWAIT; - - stream_release_buffers(si_strm(si)); } @@ -571,6 +569,7 @@ static int si_conn_wake_cb(struct connection *conn) * stream-int status. */ stream_int_notify(si); + channel_release_buffer(ic, &(si_strm(si)->buffer_wait)); /* Third step : update the connection's polling status based on what * was done above (eg: maybe some buffers got emptied). @@ -1128,8 +1127,8 @@ static void si_conn_recv_cb(struct connection *conn) ic->pipe = NULL; } - /* now we'll need a buffer */ - if (!stream_alloc_recv_buffer(ic)) { + /* now we'll need a input buffer for the stream */ + if (!channel_alloc_buffer(ic, &(si_strm(si)->buffer_wait))) { si->flags |= SI_FL_WAIT_ROOM; goto end_recv; }