MINOR: stream-int: implement the stream_int_notify() function

stream_int_notify() was taken from the common part between si_conn_wake_cb()
and si_applet_done(). It is designed to report activity to a stream from
outside its handler. It'll generally be used by lower layers to report I/O
completion but may also be used by remote streams if the buffer processing
is shared.
This commit is contained in:
Willy Tarreau 2015-09-23 18:40:09 +02:00
parent ea3cc48d64
commit 615f28bec1
2 changed files with 110 additions and 0 deletions

View File

@ -51,6 +51,7 @@ void si_applet_done(struct stream_interface *si);
void stream_int_update(struct stream_interface *si);
void stream_int_update_conn(struct stream_interface *si);
void stream_int_update_applet(struct stream_interface *si);
void stream_int_notify(struct stream_interface *si);
/* returns the channel which receives data from this stream interface (input channel) */
static inline struct channel *si_ic(struct stream_interface *si)

View File

@ -519,6 +519,115 @@ static int si_idle_conn_wake_cb(struct connection *conn)
return 0;
}
/* This function is the equivalent to stream_int_update() except that it's
* designed to be called from outside the stream handlers, typically the lower
* layers (applets, connections) after I/O completion. After updating the stream
* interface and timeouts, it will try to forward what can be forwarded, then to
* wake the associated task up if an important event requires special handling.
* It should not be called from within the stream itself, stream_int_update()
* is designed for this.
*/
void stream_int_notify(struct stream_interface *si)
{
struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si);
/* process consumer side */
if (channel_is_empty(oc)) {
if (((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW) &&
(si->state == SI_ST_EST))
si_shutw(si);
oc->wex = TICK_ETERNITY;
}
/* indicate that we may be waiting for data from the output channel */
if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == 0 && channel_may_recv(oc))
si->flags |= SI_FL_WAIT_DATA;
/* update OC timeouts and wake the other side up if it's waiting for room */
if (oc->flags & CF_WRITE_ACTIVITY) {
if ((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL)) == CF_WRITE_PARTIAL &&
!channel_is_empty(oc))
if (tick_isset(oc->wex))
oc->wex = tick_add_ifset(now_ms, oc->wto);
if (!(si->flags & SI_FL_INDEP_STR))
if (tick_isset(ic->rex))
ic->rex = tick_add_ifset(now_ms, ic->rto);
if (likely((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL &&
channel_may_recv(oc) &&
(si_opposite(si)->flags & SI_FL_WAIT_ROOM)))
si_chk_rcv(si_opposite(si));
}
/* Notify the other side when we've injected data into the IC that
* needs to be forwarded. We can do fast-forwarding as soon as there
* are output data, but we avoid doing this if some of the data are
* not yet scheduled for being forwarded, because it is very likely
* that it will be done again immediately afterwards once the following
* data are parsed (eg: HTTP chunking). We only SI_FL_WAIT_ROOM once
* we've emptied *some* of the output buffer, and not just when there
* is available room, because applets are often forced to stop before
* the buffer is full. We must not stop based on input data alone because
* an HTTP parser might need more data to complete the parsing.
*/
if (!channel_is_empty(ic) &&
(si_opposite(si)->flags & SI_FL_WAIT_DATA) &&
(ic->buf->i == 0 || ic->pipe)) {
int new_len, last_len;
last_len = ic->buf->o;
if (ic->pipe)
last_len += ic->pipe->data;
si_chk_snd(si_opposite(si));
new_len = ic->buf->o;
if (ic->pipe)
new_len += ic->pipe->data;
/* check if the consumer has freed some space either in the
* buffer or in the pipe.
*/
if (channel_may_recv(ic) && new_len < last_len)
si->flags &= ~SI_FL_WAIT_ROOM;
}
if (si->flags & SI_FL_WAIT_ROOM) {
ic->rex = TICK_ETERNITY;
}
else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL &&
channel_may_recv(ic)) {
/* we must re-enable reading if si_chk_snd() has freed some space */
if (!(ic->flags & CF_READ_NOEXP) && tick_isset(ic->rex))
ic->rex = tick_add_ifset(now_ms, ic->rto);
}
/* wake the task up only when needed */
if (/* changes on the production side */
(ic->flags & (CF_READ_NULL|CF_READ_ERROR)) ||
si->state != SI_ST_EST ||
(si->flags & SI_FL_ERR) ||
((ic->flags & CF_READ_PARTIAL) &&
(!ic->to_forward || si_opposite(si)->state != SI_ST_EST)) ||
/* changes on the consumption side */
(oc->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) ||
((oc->flags & CF_WRITE_ACTIVITY) &&
((oc->flags & CF_SHUTW) ||
((oc->flags & CF_WAKE_WRITE) &&
(si_opposite(si)->state != SI_ST_EST ||
(channel_is_empty(oc) && !oc->to_forward)))))) {
task_wakeup(si_task(si), TASK_WOKEN_IO);
}
if (ic->flags & CF_READ_ACTIVITY)
ic->flags &= ~CF_READ_DONTWAIT;
stream_release_buffers(si_strm(si));
}
/* Callback to be used by connection I/O handlers upon completion. It differs from
* the update function in that it is designed to be called by lower layers after I/O
* events have been completed. It will also try to wake the associated task up if