diff --git a/include/proto/stream.h b/include/proto/stream.h index 0e1cf66e0..8c6773eeb 100644 --- a/include/proto/stream.h +++ b/include/proto/stream.h @@ -57,6 +57,7 @@ int parse_track_counters(char **args, int *arg, /* Update the stream's backend and server time stats */ void stream_update_time_stats(struct stream *s); void stream_release_buffers(struct stream *s); +int stream_buf_available(void *arg); /* returns the session this stream belongs to */ static inline struct session *strm_sess(const struct stream *strm) @@ -347,18 +348,6 @@ static void inline stream_init_srv_conn(struct stream *sess) LIST_INIT(&sess->by_srv); } -/* Callback used to wake up a stream when a buffer is available. The stream - * is woken up is if it is not already running and if it is not already in the - * task run queue. This functions returns 1 is the stream is woken up, otherwise - * it returns 0. */ -static int inline stream_res_wakeup(struct stream *s) -{ - if (s->task->state & TASK_RUNNING) - return 0; - task_wakeup(s->task, TASK_WOKEN_RES); - return 1; -} - void service_keywords_register(struct action_kw_list *kw_list); #endif /* _PROTO_STREAM_H */ diff --git a/src/stream.c b/src/stream.c index a3a65e17f..2dfb2f9be 100644 --- a/src/stream.c +++ b/src/stream.c @@ -85,6 +85,31 @@ int stream_create_from_cs(struct conn_stream *cs) return 0; } +/* Callback used to wake up a stream when an input buffer is available. The + * stream 's stream interfaces are checked for a failed buffer allocation + * as indicated by the presence of the SI_FL_WAIT_ROOM flag and the lack of a + * buffer, and and input buffer is assigned there (at most one). The function + * returns 1 and wakes the stream up if a buffer was taken, otherwise zero. + * It's designed to be called from __offer_buffer(). + */ +int stream_buf_available(void *arg) +{ + struct stream *s = arg; + + if (!s->req.buf.size && !s->req.pipe && (s->si[0].flags & SI_FL_WAIT_ROOM) && + b_alloc_margin(&s->req.buf, global.tune.reserved_bufs)) + s->si[0].flags &= ~SI_FL_WAIT_ROOM; + else if (!s->res.buf.size && !s->res.pipe && (s->si[1].flags & SI_FL_WAIT_ROOM) && + b_alloc_margin(&s->res.buf, 0)) + s->si[1].flags &= ~SI_FL_WAIT_ROOM; + else + return 0; + + task_wakeup(s->task, TASK_WOKEN_RES); + return 1; + +} + /* This function is called from the session handler which detects the end of * handshake, in order to complete initialization of a valid stream. It must be * called with a completley initialized session. It returns the pointer to @@ -157,7 +182,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin) LIST_INIT(&s->buffer_wait.list); s->buffer_wait.target = s; - s->buffer_wait.wakeup_cb = (int (*)(void *))stream_res_wakeup; + s->buffer_wait.wakeup_cb = stream_buf_available; s->flags |= SF_INITIALIZED; s->pcli_next_pid = 0;