MEDIUM: stream: implement stream_buf_available()

This function replaces stream_res_available(), which is used as a callback
for the buffer allocator. It now carefully checks which stream interface
was blocked on a buffer allocation, tries to allocate the input buffer to
this stream interface, and wakes the task up once such a buffer was found.
It will automatically remove the SI_FL_WAIT_ROOM flag upon success since
the info this flag indicates becomes wrong as soon as the buffer is
allocated.

The code is still far from being perfect because if a call to si_cs_recv()
fails to allocate a buffer, we'll still end up passing via process_stream()
again, but this could be improved in the future by using finer-grained
wake-up notifications.
This commit is contained in:
Willy Tarreau 2018-11-06 15:50:21 +01:00
parent 745f15eba9
commit b882dd88cc
2 changed files with 27 additions and 13 deletions

View File

@ -57,6 +57,7 @@ int parse_track_counters(char **args, int *arg,
/* Update the stream's backend and server time stats */
void stream_update_time_stats(struct stream *s);
void stream_release_buffers(struct stream *s);
int stream_buf_available(void *arg);
/* returns the session this stream belongs to */
static inline struct session *strm_sess(const struct stream *strm)
@ -347,18 +348,6 @@ static void inline stream_init_srv_conn(struct stream *sess)
LIST_INIT(&sess->by_srv);
}
/* Callback used to wake up a stream when a buffer is available. The stream <s>
* is woken up is if it is not already running and if it is not already in the
* task run queue. This functions returns 1 is the stream is woken up, otherwise
* it returns 0. */
static int inline stream_res_wakeup(struct stream *s)
{
if (s->task->state & TASK_RUNNING)
return 0;
task_wakeup(s->task, TASK_WOKEN_RES);
return 1;
}
void service_keywords_register(struct action_kw_list *kw_list);
#endif /* _PROTO_STREAM_H */

View File

@ -85,6 +85,31 @@ int stream_create_from_cs(struct conn_stream *cs)
return 0;
}
/* Callback used to wake up a stream when an input buffer is available. The
* stream <s>'s stream interfaces are checked for a failed buffer allocation
* as indicated by the presence of the SI_FL_WAIT_ROOM flag and the lack of a
* buffer, and and input buffer is assigned there (at most one). The function
* returns 1 and wakes the stream up if a buffer was taken, otherwise zero.
* It's designed to be called from __offer_buffer().
*/
int stream_buf_available(void *arg)
{
struct stream *s = arg;
if (!s->req.buf.size && !s->req.pipe && (s->si[0].flags & SI_FL_WAIT_ROOM) &&
b_alloc_margin(&s->req.buf, global.tune.reserved_bufs))
s->si[0].flags &= ~SI_FL_WAIT_ROOM;
else if (!s->res.buf.size && !s->res.pipe && (s->si[1].flags & SI_FL_WAIT_ROOM) &&
b_alloc_margin(&s->res.buf, 0))
s->si[1].flags &= ~SI_FL_WAIT_ROOM;
else
return 0;
task_wakeup(s->task, TASK_WOKEN_RES);
return 1;
}
/* This function is called from the session handler which detects the end of
* handshake, in order to complete initialization of a valid stream. It must be
* called with a completley initialized session. It returns the pointer to
@ -157,7 +182,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
LIST_INIT(&s->buffer_wait.list);
s->buffer_wait.target = s;
s->buffer_wait.wakeup_cb = (int (*)(void *))stream_res_wakeup;
s->buffer_wait.wakeup_cb = stream_buf_available;
s->flags |= SF_INITIALIZED;
s->pcli_next_pid = 0;