MEDIUM: stream-int: use si_rx_buff_{rdy,blk} to report buffer readiness

The stream interface used to conflate a missing buffer and lack of
buffer space into SI_FL_WAIT_ROOM but this causes difficulties as
these cannot be checked at the same moment and are not resolved at
the same moment either. Now we instead mark the buffer as presumably
available using si_rx_buff_rdy() and mark it as unavailable+requested
using si_rx_buff_blk().

The call to si_alloc_buf() was moved after si_stop_put(). This makes
sure that the SI_FL_RX_WAIT_EP flag is cleared on allocation failure so
that the function is called again if the callee fails to do its work.
This commit is contained in:
Willy Tarreau 2018-11-14 15:12:08 +01:00
parent 32742fdf45
commit 8be7cd7b92
3 changed files with 44 additions and 18 deletions

View File

@ -285,6 +285,22 @@ static inline void si_done_put(struct stream_interface *si)
si->flags |= SI_FL_RX_WAIT_EP;
}
/* The stream interface just got the input buffer it was waiting for */
static inline void si_rx_buff_rdy(struct stream_interface *si)
{
si->flags &= ~SI_FL_RXBLK_BUFF;
}
/* The stream interface failed to get an input buffer and is waiting for it.
* Since it indicates a willingness to deliver data to the buffer that will
* have to be retried, we automatically clear RXBLK_ENDP to be called again
* as soon as RXBLK_BUFF is cleared.
*/
static inline void si_rx_buff_blk(struct stream_interface *si)
{
si->flags |= SI_FL_RXBLK_BUFF;
}
/* Returns non-zero if the stream interface's Rx path is blocked */
static inline int si_tx_blocked(const struct stream_interface *si)
{
@ -342,10 +358,10 @@ static inline struct conn_stream *si_alloc_cs(struct stream_interface *si, struc
/* Try to allocate a buffer for the stream-int's input channel. It relies on
* channel_alloc_buffer() for this so it abides by its rules. It returns 0 on
* failure, non-zero otherwise. If no buffer is available, the requester,
* represented by <wait> pointer, will be added in the list of objects waiting
* for an available buffer, and SI_FL_RXBLK_ROOM will be set on the stream-int.
* The requester will be responsible for calling this function to try again
* once woken up.
* represented by the <wait> pointer, will be added in the list of objects
* waiting for an available buffer, and SI_FL_RXBLK_BUFF will be set on the
* stream-int and SI_FL_RX_WAIT_EP cleared. The requester will be responsible
* for calling this function to try again once woken up.
*/
static inline int si_alloc_ibuf(struct stream_interface *si, struct buffer_wait *wait)
{
@ -353,7 +369,7 @@ static inline int si_alloc_ibuf(struct stream_interface *si, struct buffer_wait
ret = channel_alloc_buffer(si_ic(si), wait);
if (!ret)
si_cant_put(si);
si_rx_buff_blk(si);
return ret;
}

View File

@ -36,14 +36,21 @@ int appctx_buf_available(void *arg)
struct stream_interface *si = appctx->owner;
/* allocation requested ? */
if (!(si->flags & SI_FL_RXBLK_ROOM) || c_size(si_ic(si)) || si_ic(si)->pipe)
if (!(si->flags & SI_FL_RXBLK_BUFF))
return 0;
si_rx_buff_rdy(si);
/* was already allocated another way ? if so, don't take this one */
if (c_size(si_ic(si)) || si_ic(si)->pipe)
return 0;
/* allocation possible now ? */
if (!b_alloc_margin(&si_ic(si)->buf, global.tune.reserved_bufs))
if (!b_alloc_margin(&si_ic(si)->buf, global.tune.reserved_bufs)) {
si_rx_buff_blk(si);
return 0;
}
si->flags &= ~SI_FL_RXBLK_ROOM;
task_wakeup(appctx->t, TASK_WOKEN_RES);
return 1;
}
@ -58,12 +65,6 @@ struct task *task_run_applet(struct task *t, void *context, unsigned short state
__appctx_free(app);
return NULL;
}
/* Now we'll try to allocate the input buffer. We wake up the
* applet in all cases. So this is the applet responsibility to
* check if this buffer was allocated or not. This let a chance
* for applets to do some other processing if needed. */
if (!si_alloc_ibuf(si, &app->buffer_wait))
si_cant_put(si);
/* We always pretend the applet can't get and doesn't want to
* put, it's up to it to change this if needed. This ensures
@ -72,6 +73,15 @@ struct task *task_run_applet(struct task *t, void *context, unsigned short state
si_cant_get(si);
si_stop_put(si);
/* Now we'll try to allocate the input buffer. We wake up the applet in
* all cases. So this is the applet's responsibility to check if this
* buffer was allocated or not. This leaves a chance for applets to do
* some other processing if needed. The applet doesn't have anything to
* do if it needs the buffer, it will be called again upon readiness.
*/
if (!si_alloc_ibuf(si, &app->buffer_wait))
si_want_put(si);
app->applet->fct(app);
si_applet_wake_cb(si);
channel_release_buffer(si_ic(si), &app->buffer_wait);

View File

@ -96,12 +96,12 @@ int stream_buf_available(void *arg)
{
struct stream *s = arg;
if (!s->req.buf.size && !s->req.pipe && (s->si[0].flags & SI_FL_RXBLK_ROOM) &&
if (!s->req.buf.size && !s->req.pipe && (s->si[0].flags & SI_FL_RXBLK_BUFF) &&
b_alloc_margin(&s->req.buf, global.tune.reserved_bufs))
s->si[0].flags &= ~SI_FL_RXBLK_ROOM;
else if (!s->res.buf.size && !s->res.pipe && (s->si[1].flags & SI_FL_RXBLK_ROOM) &&
si_rx_buff_rdy(&s->si[0]);
else if (!s->res.buf.size && !s->res.pipe && (s->si[1].flags & SI_FL_RXBLK_BUFF) &&
b_alloc_margin(&s->res.buf, 0))
s->si[1].flags &= ~SI_FL_RXBLK_ROOM;
si_rx_buff_rdy(&s->si[1]);
else
return 0;