BUG/MAJOR: Fix how the list of entities waiting for a buffer is handled

When an entity tries to get a buffer, if it cannot be allocted, for example
because the number of buffers which may be allocated per process is limited,
this entity is added in a list (called <buffer_wq>) and wait for an available
buffer.

Historically, the <buffer_wq> list was logically attached to streams because it
were the only entities likely to be added in it. Now, applets can also be
waiting for a free buffer. And with filters, we could imagine to have more other
entities waiting for a buffer. So it make sense to have a generic list.

Anyway, with the current design there is a bug. When an applet failed to get a
buffer, it will wait. But we add the stream attached to the applet in
<buffer_wq>, instead of the applet itself. So when a buffer is available, we
wake up the stream and not the waiting applet. So, it is possible to have
waiting applets and never awakened.

So, now, <buffer_wq> is independant from streams. And we really add the waiting
entity in <buffer_wq>. To be generic, the entity is responsible to define the
callback used to awaken it.

In addition, applets will still request an input buffer when they become
active. But they will not be sleeped anymore if no buffer are available. So this
is the responsibility to the applet I/O handler to check if this buffer is
allocated or not. This way, an applet can decide if this buffer is required or
not and can do additional processing if not.

[wt: backport to 1.7 and 1.6]
This commit is contained in:
Christopher Faulet 2016-12-09 17:30:18 +01:00 committed by Willy Tarreau
parent 9d810cae11
commit a73e59b690
15 changed files with 225 additions and 132 deletions

View File

@ -39,9 +39,18 @@ struct buffer {
char data[0]; /* <size> bytes */
};
/* an element of the <buffer_wq> list. It represents an object that need to
* acquire a buffer to continue its process. */
struct buffer_wait {
void *target; /* The waiting object that should be woken up */
int (*wakeup_cb)(void *); /* The function used to wake up the <target>, passed as argument */
struct list list; /* Next element in the <buffer_wq> list */
};
extern struct pool_head *pool2_buffer;
extern struct buffer buf_empty;
extern struct buffer buf_wanted;
extern struct list buffer_wq;
int init_buffer();
int buffer_replace2(struct buffer *b, char *pos, char *end, const char *str, int len);
@ -522,6 +531,16 @@ static inline struct buffer *b_alloc_margin(struct buffer **buf, int margin)
return next;
}
void __offer_buffer(void *from, unsigned int threshold);
static inline void offer_buffers(void *from, unsigned int threshold)
{
if (LIST_ISEMPTY(&buffer_wq))
return;
__offer_buffer(from, threshold);
}
#endif /* _COMMON_BUFFER_H */
/*

View File

@ -36,6 +36,10 @@ extern struct list applet_active_queue;
void applet_run_active();
static int inline appctx_res_wakeup(struct appctx *appctx);
/* Initializes all required fields for a new appctx. Note that it does the
* minimum acceptable initialization for an appctx. This means only the
* 3 integer states st0, st1, st2 are zeroed.
@ -61,6 +65,9 @@ static inline struct appctx *appctx_new(struct applet *applet)
appctx->applet = applet;
appctx_init(appctx);
LIST_INIT(&appctx->runq);
LIST_INIT(&appctx->buffer_wait.list);
appctx->buffer_wait.target = appctx;
appctx->buffer_wait.wakeup_cb = (int (*)(void *))appctx_res_wakeup;
nb_applets++;
}
return appctx;
@ -75,6 +82,10 @@ static inline void appctx_free(struct appctx *appctx)
LIST_DEL(&appctx->runq);
applets_active_queue--;
}
if (!LIST_ISEMPTY(&appctx->buffer_wait.list)) {
LIST_DEL(&appctx->buffer_wait.list);
LIST_INIT(&appctx->buffer_wait.list);
}
pool_free2(pool2_connection, appctx);
nb_applets--;
}
@ -98,6 +109,19 @@ static inline void appctx_pause(struct appctx *appctx)
}
}
/* Callback used to wake up an applet when a buffer is available. The applet
* <appctx> is woken up is if it is not already in the list of "active"
* applets. This functions returns 1 is the stream is woken up, otherwise it
* returns 0. */
static inline int appctx_res_wakeup(struct appctx *appctx)
{
if (!LIST_ISEMPTY(&appctx->runq))
return 0;
appctx_wakeup(appctx);
return 1;
}
#endif /* _PROTO_APPLET_H */
/*

View File

@ -36,6 +36,9 @@
#include <types/stream.h>
#include <types/stream_interface.h>
#include <proto/applet.h>
#include <proto/task.h>
/* perform minimal intializations, report 0 in case of error, 1 if OK. */
int init_channel();
@ -439,6 +442,41 @@ static inline int channel_recv_max(const struct channel *chn)
return ret;
}
/* Allocates a buffer for channel <chn>, but only if it's guaranteed that it's
* not the last available buffer or it's the response buffer. Unless the buffer
* is the response buffer, an extra control is made so that we always keep
* <tune.buffers.reserved> buffers available after this allocation. Returns 0 in
* case of failure, non-zero otherwise.
*
* If no buffer are available, the requester, represented by <wait> pointer,
* will be added in the list of objects waiting for an available buffer.
*/
static inline int channel_alloc_buffer(struct channel *chn, struct buffer_wait *wait)
{
int margin = 0;
if (!(chn->flags & CF_ISRESP))
margin = global.tune.reserved_bufs;
if (b_alloc_margin(&chn->buf, margin) != NULL)
return 1;
if (LIST_ISEMPTY(&wait->list))
LIST_ADDQ(&buffer_wq, &wait->list);
return 0;
}
/* Releases a possibly allocated buffer for channel <chn>. If it was not
* allocated, this function does nothing. Else the buffer is released and we try
* to wake up as many streams/applets as possible. */
static inline void channel_release_buffer(struct channel *chn, struct buffer_wait *wait)
{
if (chn->buf->size && buffer_empty(chn->buf)) {
b_free(&chn->buf);
offer_buffers(wait->target, tasks_run_queue + applets_active_queue);
}
}
/* Truncate any unread data in the channel's buffer, and disable forwarding.
* Outgoing data are left intact. This is mainly to be used to send error
* messages after existing data.

View File

@ -32,7 +32,6 @@
extern struct pool_head *pool2_stream;
extern struct list streams;
extern struct list buffer_wq;
extern struct data_cb sess_conn_cb;
@ -55,11 +54,7 @@ int parse_track_counters(char **args, int *arg,
/* Update the stream's backend and server time stats */
void stream_update_time_stats(struct stream *s);
void __stream_offer_buffers(int rqlimit);
static inline void stream_offer_buffers();
int stream_alloc_work_buffer(struct stream *s);
void stream_release_buffers(struct stream *s);
int stream_alloc_recv_buffer(struct channel *chn);
/* returns the session this stream belongs to */
static inline struct session *strm_sess(const struct stream *strm)
@ -285,25 +280,16 @@ static void inline stream_init_srv_conn(struct stream *sess)
LIST_INIT(&sess->by_srv);
}
static inline void stream_offer_buffers()
/* Callback used to wake up a stream when a buffer is available. The stream <s>
* is woken up is if it is not already running and if it is not already in the
* task run queue. This functions returns 1 is the stream is woken up, otherwise
* it returns 0. */
static int inline stream_res_wakeup(struct stream *s)
{
int avail;
if (LIST_ISEMPTY(&buffer_wq))
return;
/* all streams will need 1 buffer, so we can stop waking up streams
* once we have enough of them to eat all the buffers. Note that we
* don't really know if they are streams or just other tasks, but
* that's a rough estimate. Similarly, for each cached event we'll need
* 1 buffer. If no buffer is currently used, always wake up the number
* of tasks we can offer a buffer based on what is allocated, and in
* any case at least one task per two reserved buffers.
*/
avail = pool2_buffer->allocated - pool2_buffer->used - global.tune.reserved_bufs / 2;
if (avail > (int)tasks_run_queue)
__stream_offer_buffers(avail);
if (s->task->state & TASK_RUNNING || task_in_rq(s->task))
return 0;
task_wakeup(s->task, TASK_WOKEN_RES);
return 1;
}
void service_keywords_register(struct action_kw_list *kw_list);

View File

@ -26,6 +26,7 @@
#include <types/obj_type.h>
#include <types/proxy.h>
#include <types/stream.h>
#include <common/buffer.h>
#include <common/chunk.h>
#include <common/config.h>
@ -58,6 +59,7 @@ struct appctx {
void (*io_release)(struct appctx *appctx); /* used within the cli_io_handler when st0 = CLI_ST_CALLBACK,
if the command is terminated or the session released */
void *private;
struct buffer_wait buffer_wait; /* position in the list of objects waiting for a buffer */
union {
struct {

View File

@ -135,7 +135,7 @@ struct stream {
struct list list; /* position in global streams list */
struct list by_srv; /* position in server stream list */
struct list back_refs; /* list of users tracking this stream */
struct list buffer_wait; /* position in the list of streams waiting for a buffer */
struct buffer_wait buffer_wait; /* position in the list of objects waiting for a buffer */
struct {
struct stksess *ts;

View File

@ -16,6 +16,7 @@
#include <common/config.h>
#include <common/mini-clist.h>
#include <proto/applet.h>
#include <proto/channel.h>
#include <proto/stream.h>
#include <proto/stream_interface.h>
@ -48,13 +49,12 @@ void applet_run_active()
curr = LIST_ELEM(applet_cur_queue.n, typeof(curr), runq);
si = curr->owner;
/* now we'll need a buffer */
if (!stream_alloc_recv_buffer(si_ic(si))) {
si->flags |= SI_FL_WAIT_ROOM;
LIST_DEL(&curr->runq);
LIST_INIT(&curr->runq);
continue;
}
/* Now we'll try to allocate the input buffer. We wake up the
* applet in all cases. So this is the applet responsibility to
* check if this buffer was allocated or not. This let a chance
* for applets to do some other processing if needed. */
if (!channel_alloc_buffer(si_ic(si), &curr->buffer_wait))
si_applet_cant_put(si);
/* We always pretend the applet can't get and doesn't want to
* put, it's up to it to change this if needed. This ensures
@ -65,6 +65,7 @@ void applet_run_active()
curr->applet->fct(curr);
si_applet_wake_cb(si);
channel_release_buffer(si_ic(si), &curr->buffer_wait);
if (applet_cur_queue.n == &curr->runq) {
/* curr was left in the list, move it back to the active list */

View File

@ -31,6 +31,9 @@ struct pool_head *pool2_buffer;
struct buffer buf_empty = { .p = buf_empty.data };
struct buffer buf_wanted = { .p = buf_wanted.data };
/* list of objects waiting for at least one buffer */
struct list buffer_wq = LIST_HEAD_INIT(buffer_wq);
/* perform minimal intializations, report 0 in case of error, 1 if OK. */
int init_buffer()
{
@ -278,6 +281,35 @@ void buffer_dump(FILE *o, struct buffer *b, int from, int to)
fflush(o);
}
void __offer_buffer(void *from, unsigned int threshold)
{
struct buffer_wait *wait, *bak;
int avail;
/* For now, we consider that all objects need 1 buffer, so we can stop
* waking up them once we have enough of them to eat all the available
* buffers. Note that we don't really know if they are streams or just
* other tasks, but that's a rough estimate. Similarly, for each cached
* event we'll need 1 buffer. If no buffer is currently used, always
* wake up the number of tasks we can offer a buffer based on what is
* allocated, and in any case at least one task per two reserved
* buffers.
*/
avail = pool2_buffer->allocated - pool2_buffer->used - global.tune.reserved_bufs / 2;
list_for_each_entry_safe(wait, bak, &buffer_wq, list) {
if (avail <= threshold)
break;
if (wait->target == from || !wait->wakeup_cb(wait->target))
continue;
LIST_DEL(&wait->list);
LIST_INIT(&wait->list);
avail--;
}
}
/*
* Local variables:

View File

@ -488,6 +488,12 @@ static void cli_io_handler(struct appctx *appctx)
if (unlikely(si->state == SI_ST_DIS || si->state == SI_ST_CLO))
goto out;
/* Check if the input buffer is avalaible. */
if (res->buf->size == 0) {
si_applet_cant_put(si);
goto out;
}
while (1) {
if (appctx->st0 == CLI_ST_INIT) {
/* Stats output not initialized yet */

View File

@ -224,7 +224,7 @@ struct spoe_context {
struct appctx *appctx; /* The SPOE appctx */
struct list *messages; /* List of messages that will be sent during the stream processing */
struct buffer *buffer; /* Buffer used to store a NOTIFY or ACK frame */
struct list buffer_wait; /* position in the list of streams waiting for a buffer */
struct buffer_wait buffer_wait; /* position in the list of streams waiting for a buffer */
struct list applet_wait; /* position in the list of streams waiting for a SPOE applet */
enum spoe_ctx_state state; /* SPOE_CTX_ST_* */
@ -1232,6 +1232,9 @@ send_spoe_frame(struct appctx *appctx,
int framesz, ret;
uint32_t netint;
if (si_ic(si)->buf->size == 0)
return -1;
ret = prepare(appctx, trash.str, APPCTX_SPOE(appctx).max_frame_size);
if (ret <= 0)
goto skip_or_error;
@ -1524,7 +1527,7 @@ handle_spoe_applet(struct appctx *appctx)
/* fall through */
case SPOE_APPCTX_ST_END:
break;
return;
}
out:
@ -1693,13 +1696,13 @@ acquire_spoe_appctx(struct spoe_context *ctx, int dir)
/* If needed, initialize the buffer that will be used to encode messages
* and decode actions. */
if (ctx->buffer == &buf_empty) {
if (!LIST_ISEMPTY(&ctx->buffer_wait)) {
LIST_DEL(&ctx->buffer_wait);
LIST_INIT(&ctx->buffer_wait);
if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) {
LIST_DEL(&ctx->buffer_wait.list);
LIST_INIT(&ctx->buffer_wait.list);
}
if (!b_alloc_margin(&ctx->buffer, 0)) {
LIST_ADDQ(&buffer_wq, &ctx->buffer_wait);
if (!b_alloc_margin(&ctx->buffer, global.tune.reserved_bufs)) {
LIST_ADDQ(&buffer_wq, &ctx->buffer_wait.list);
goto wait;
}
}
@ -1794,8 +1797,7 @@ release_spoe_appctx(struct spoe_context *ctx)
/* Release the buffer if needed */
if (ctx->buffer != &buf_empty) {
b_free(&ctx->buffer);
if (!LIST_ISEMPTY(&buffer_wq))
stream_offer_buffers();
offer_buffers(ctx, tasks_run_queue + applets_active_queue);
}
/* If there is no SPOE applet, all is done */
@ -2213,6 +2215,12 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx,
/***************************************************************************
* Functions that create/destroy SPOE contexts
**************************************************************************/
static int wakeup_spoe_context(struct spoe_context *ctx)
{
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
return 1;
}
static struct spoe_context *
create_spoe_context(struct filter *filter)
{
@ -2229,7 +2237,9 @@ create_spoe_context(struct filter *filter)
ctx->flags = 0;
ctx->messages = conf->agent->messages;
ctx->buffer = &buf_empty;
LIST_INIT(&ctx->buffer_wait);
LIST_INIT(&ctx->buffer_wait.list);
ctx->buffer_wait.target = ctx;
ctx->buffer_wait.wakeup_cb = (int (*)(void *))wakeup_spoe_context;
LIST_INIT(&ctx->applet_wait);
ctx->stream_id = 0;
@ -2247,8 +2257,8 @@ destroy_spoe_context(struct spoe_context *ctx)
if (ctx->appctx)
APPCTX_SPOE(ctx->appctx).ctx = NULL;
if (!LIST_ISEMPTY(&ctx->buffer_wait))
LIST_DEL(&ctx->buffer_wait);
if (!LIST_ISEMPTY(&ctx->buffer_wait.list))
LIST_DEL(&ctx->buffer_wait.list);
if (!LIST_ISEMPTY(&ctx->applet_wait))
LIST_DEL(&ctx->applet_wait);
pool_free2(pool2_spoe_ctx, ctx);
@ -2459,8 +2469,13 @@ spoe_check_timeouts(struct stream *s, struct filter *filter)
{
struct spoe_context *ctx = filter->ctx;
if (tick_is_expired(ctx->process_exp, now_ms))
s->task->state |= TASK_WOKEN_MSG;
if (tick_is_expired(ctx->process_exp, now_ms)) {
s->pending_events |= TASK_WOKEN_MSG;
if (ctx->buffer != &buf_empty) {
b_free(&ctx->buffer);
offer_buffers(ctx, tasks_run_queue + applets_active_queue);
}
}
}
/* Called when we are ready to filter data on a channel */

View File

@ -1884,10 +1884,8 @@ static int hlua_socket_write_yield(struct lua_State *L,int status, lua_KContext
* the request buffer if its not required.
*/
if (socket->s->req.buf->size == 0) {
if (!stream_alloc_recv_buffer(&socket->s->req)) {
socket->s->si[0].flags |= SI_FL_WAIT_ROOM;
goto hlua_socket_write_yield_return;
}
si_applet_cant_put(&socket->s->si[0]);
goto hlua_socket_write_yield_return;
}
/* Check for avalaible space. */
@ -2610,6 +2608,14 @@ __LJMP static int hlua_channel_append_yield(lua_State *L, int status, lua_KConte
int ret;
int max;
/* Check if the buffer is avalaible because HAProxy doesn't allocate
* the request buffer if its not required.
*/
if (chn->buf->size == 0) {
si_applet_cant_put(chn_prod(chn));
WILL_LJMP(hlua_yieldk(L, 0, 0, hlua_channel_append_yield, TICK_ETERNITY, 0));
}
max = channel_recv_limit(chn) - buffer_len(chn->buf);
if (max > len - l)
max = len - l;
@ -2700,10 +2706,8 @@ __LJMP static int hlua_channel_send_yield(lua_State *L, int status, lua_KContext
* the request buffer if its not required.
*/
if (chn->buf->size == 0) {
if (!stream_alloc_recv_buffer(chn)) {
chn_prod(chn)->flags |= SI_FL_WAIT_ROOM;
WILL_LJMP(hlua_yieldk(L, 0, 0, hlua_channel_send_yield, TICK_ETERNITY, 0));
}
si_applet_cant_put(chn_prod(chn));
WILL_LJMP(hlua_yieldk(L, 0, 0, hlua_channel_send_yield, TICK_ETERNITY, 0));
}
/* the writed data will be immediatly sent, so we can check

View File

@ -547,6 +547,10 @@ static void peer_io_handler(struct appctx *appctx)
size_t proto_len = strlen(PEER_SESSION_PROTO_NAME);
unsigned int maj_ver, min_ver;
/* Check if the input buffer is avalaible. */
if (si_ic(si)->buf->size == 0)
goto full;
while (1) {
switchstate:
maj_ver = min_ver = (unsigned int)-1;

View File

@ -2766,6 +2766,12 @@ static void http_stats_io_handler(struct appctx *appctx)
if (unlikely(si->state == SI_ST_DIS || si->state == SI_ST_CLO))
goto out;
/* Check if the input buffer is avalaible. */
if (res->buf->size == 0) {
si_applet_cant_put(si);
goto out;
}
/* check that the output is not closed */
if (res->flags & (CF_SHUTW|CF_SHUTW_NOW))
appctx->st0 = STAT_HTTP_DONE;

View File

@ -62,9 +62,6 @@
struct pool_head *pool2_stream;
struct list streams;
/* list of streams waiting for at least one buffer */
struct list buffer_wq = LIST_HEAD_INIT(buffer_wq);
/* List of all use-service keywords. */
static struct list service_keywords = LIST_HEAD_INIT(service_keywords);
@ -139,7 +136,10 @@ struct stream *stream_new(struct session *sess, struct task *t, enum obj_type *o
/* OK, we're keeping the stream, so let's properly initialize the stream */
LIST_ADDQ(&streams, &s->list);
LIST_INIT(&s->back_refs);
LIST_INIT(&s->buffer_wait);
LIST_INIT(&s->buffer_wait.list);
s->buffer_wait.target = s;
s->buffer_wait.wakeup_cb = (int (*)(void *))stream_res_wakeup;
s->flags |= SF_INITIALIZED;
s->unique_id = NULL;
@ -289,15 +289,15 @@ static void stream_free(struct stream *s)
put_pipe(s->res.pipe);
/* We may still be present in the buffer wait queue */
if (!LIST_ISEMPTY(&s->buffer_wait)) {
LIST_DEL(&s->buffer_wait);
LIST_INIT(&s->buffer_wait);
if (!LIST_ISEMPTY(&s->buffer_wait.list)) {
LIST_DEL(&s->buffer_wait.list);
LIST_INIT(&s->buffer_wait.list);
}
if (s->req.buf->size || s->res.buf->size) {
b_drop(&s->req.buf);
b_drop(&s->res.buf);
offer_buffers(NULL, tasks_run_queue + applets_active_queue);
}
b_drop(&s->req.buf);
b_drop(&s->res.buf);
if (!LIST_ISEMPTY(&buffer_wq))
stream_offer_buffers();
hlua_ctx_destroy(&s->hlua);
if (s->txn)
@ -370,33 +370,6 @@ static void stream_free(struct stream *s)
}
}
/* Allocates a receive buffer for channel <chn>, but only if it's guaranteed
* that it's not the last available buffer or it's the response buffer. Unless
* the buffer is the response buffer, an extra control is made so that we always
* keep <tune.buffers.reserved> buffers available after this allocation. To be
* called at the beginning of recv() callbacks to ensure that the required
* buffers are properly allocated. Returns 0 in case of failure, non-zero
* otherwise.
*/
int stream_alloc_recv_buffer(struct channel *chn)
{
struct stream *s;
struct buffer *b;
int margin = 0;
if (!(chn->flags & CF_ISRESP))
margin = global.tune.reserved_bufs;
s = chn_strm(chn);
b = b_alloc_margin(&chn->buf, margin);
if (b)
return 1;
if (LIST_ISEMPTY(&s->buffer_wait))
LIST_ADDQ(&buffer_wq, &s->buffer_wait);
return 0;
}
/* Allocates a work buffer for stream <s>. It is meant to be called inside
* process_stream(). It will only allocate the side needed for the function
@ -406,60 +379,44 @@ int stream_alloc_recv_buffer(struct channel *chn)
* server from releasing a connection. Returns 0 in case of failure, non-zero
* otherwise.
*/
int stream_alloc_work_buffer(struct stream *s)
static int stream_alloc_work_buffer(struct stream *s)
{
if (!LIST_ISEMPTY(&s->buffer_wait)) {
LIST_DEL(&s->buffer_wait);
LIST_INIT(&s->buffer_wait);
if (!LIST_ISEMPTY(&s->buffer_wait.list)) {
LIST_DEL(&s->buffer_wait.list);
LIST_INIT(&s->buffer_wait.list);
}
if (b_alloc_margin(&s->res.buf, 0))
return 1;
LIST_ADDQ(&buffer_wq, &s->buffer_wait);
LIST_ADDQ(&buffer_wq, &s->buffer_wait.list);
return 0;
}
/* releases unused buffers after processing. Typically used at the end of the
* update() functions. It will try to wake up as many tasks as the number of
* buffers that it releases. In practice, most often streams are blocked on
* a single buffer, so it makes sense to try to wake two up when two buffers
* are released at once.
* update() functions. It will try to wake up as many tasks/applets as the
* number of buffers that it releases. In practice, most often streams are
* blocked on a single buffer, so it makes sense to try to wake two up when two
* buffers are released at once.
*/
void stream_release_buffers(struct stream *s)
{
if (s->req.buf->size && buffer_empty(s->req.buf))
b_free(&s->req.buf);
int offer = 0;
if (s->res.buf->size && buffer_empty(s->res.buf))
if (s->req.buf->size && buffer_empty(s->req.buf)) {
offer = 1;
b_free(&s->req.buf);
}
if (s->res.buf->size && buffer_empty(s->res.buf)) {
offer = 1;
b_free(&s->res.buf);
}
/* if we're certain to have at least 1 buffer available, and there is
* someone waiting, we can wake up a waiter and offer them.
*/
if (!LIST_ISEMPTY(&buffer_wq))
stream_offer_buffers();
}
/* Runs across the list of pending streams waiting for a buffer and wakes one
* up if buffers are available. Will stop when the run queue reaches <rqlimit>.
* Should not be called directly, use stream_offer_buffers() instead.
*/
void __stream_offer_buffers(int rqlimit)
{
struct stream *sess, *bak;
list_for_each_entry_safe(sess, bak, &buffer_wq, buffer_wait) {
if (rqlimit <= tasks_run_queue)
break;
if (sess->task->state & TASK_RUNNING)
continue;
LIST_DEL(&sess->buffer_wait);
LIST_INIT(&sess->buffer_wait);
task_wakeup(sess->task, TASK_WOKEN_RES);
}
if (offer)
offer_buffers(s, tasks_run_queue + applets_active_queue);
}
/* perform minimal intializations, report 0 in case of error, 1 if OK. */
@ -2817,7 +2774,7 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st
chunk_appendf(&trash,
" txn=%p flags=0x%x meth=%d status=%d req.st=%s rsp.st=%s waiting=%d\n",
strm->txn, strm->txn->flags, strm->txn->meth, strm->txn->status,
http_msg_state_str(strm->txn->req.msg_state), http_msg_state_str(strm->txn->rsp.msg_state), !LIST_ISEMPTY(&strm->buffer_wait));
http_msg_state_str(strm->txn->req.msg_state), http_msg_state_str(strm->txn->rsp.msg_state), !LIST_ISEMPTY(&strm->buffer_wait.list));
chunk_appendf(&trash,
" si[0]=%p (state=%s flags=0x%02x endp0=%s:%p exp=%s, et=0x%03x)\n",

View File

@ -538,8 +538,6 @@ void stream_int_notify(struct stream_interface *si)
}
if (ic->flags & CF_READ_ACTIVITY)
ic->flags &= ~CF_READ_DONTWAIT;
stream_release_buffers(si_strm(si));
}
@ -571,6 +569,7 @@ static int si_conn_wake_cb(struct connection *conn)
* stream-int status.
*/
stream_int_notify(si);
channel_release_buffer(ic, &(si_strm(si)->buffer_wait));
/* Third step : update the connection's polling status based on what
* was done above (eg: maybe some buffers got emptied).
@ -1128,8 +1127,8 @@ static void si_conn_recv_cb(struct connection *conn)
ic->pipe = NULL;
}
/* now we'll need a buffer */
if (!stream_alloc_recv_buffer(ic)) {
/* now we'll need a input buffer for the stream */
if (!channel_alloc_buffer(ic, &(si_strm(si)->buffer_wait))) {
si->flags |= SI_FL_WAIT_ROOM;
goto end_recv;
}