MEDIUM: stream-int: make stream_int_update() aware of the lower layers

It's far from being clean, but at least it allows to resync both CS and
applets from the same place, taking into account the fact that CS are
processed synchronously for the send side while appletx are processed
outside of the process_stream() loop. The arrangement is optimised to
minimize the amount of iteration by handling send first, then updating
the SI_FL_WAIT_ROOM flags and only then dealing with si_chk_rcv() on
both sides. The SI_FL_WANT_PUT flag is set if needed before calling
si_chk_rcv() since this is done prior to calling stream_int_update().

Now there's no risk that stream_int_notify() is called anymore during
such operations, thus we cannot have any spurious wake-up anymore. The
case where a successful send() could complete a pending connect() is
handled by taking any stream-int state changes into account at the
call place, which is normal since process_stream() is designed to
iterate till stabilisation.

Doing this solves most of the remaining inconsistencies between CS and
applets.
This commit is contained in:
Willy Tarreau 2018-11-09 14:59:25 +01:00
parent d14844a734
commit bf89ff3db8
2 changed files with 56 additions and 12 deletions

View File

@ -2476,7 +2476,8 @@ redo:
si_update_both(si_f, si_b); si_update_both(si_f, si_b);
if (si_f->state == SI_ST_DIS || si_b->state == SI_ST_DIS || if (si_f->state == SI_ST_DIS || si_f->state != si_f->prev_state ||
si_b->state == SI_ST_DIS || si_b->state != si_b->prev_state ||
(((req->flags ^ rqf_last) | (res->flags ^ rpf_last)) & CF_MASK_ANALYSER)) (((req->flags ^ rqf_last) | (res->flags ^ rpf_last)) & CF_MASK_ANALYSER))
goto redo; goto redo;

View File

@ -817,13 +817,7 @@ void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b
{ {
struct channel *req = si_ic(si_f); struct channel *req = si_ic(si_f);
struct channel *res = si_oc(si_f); struct channel *res = si_oc(si_f);
struct conn_stream *cs;
/* let's recompute both sides states */
if (si_f->state == SI_ST_EST)
stream_int_update(si_f);
if (si_b->state == SI_ST_EST)
stream_int_update(si_b);
req->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ATTACHED|CF_WRITE_NULL|CF_WRITE_PARTIAL); req->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ATTACHED|CF_WRITE_NULL|CF_WRITE_PARTIAL);
res->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ATTACHED|CF_WRITE_NULL|CF_WRITE_PARTIAL); res->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ATTACHED|CF_WRITE_NULL|CF_WRITE_PARTIAL);
@ -834,11 +828,60 @@ void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b
si_f->prev_state = si_f->state; si_f->prev_state = si_f->state;
si_b->prev_state = si_b->state; si_b->prev_state = si_b->state;
if (si_f->ops->update && si_f->state == SI_ST_EST) /* front stream-int */
si_f->ops->update(si_f); cs = objt_cs(si_f->end);
if (cs &&
si_f->state == SI_ST_EST &&
!(res->flags & CF_SHUTW) && /* Write not closed */
!channel_is_empty(res) &&
!(cs->flags & CS_FL_ERROR) &&
!(cs->conn->flags & CO_FL_ERROR)) {
if (si_cs_send(cs))
si_b->flags &= ~SI_FL_WAIT_ROOM;
}
if (si_b->ops->update && (si_b->state == SI_ST_EST || si_b->state == SI_ST_CON)) /* back stream-int */
si_b->ops->update(si_b); cs = objt_cs(si_b->end);
if (cs &&
(si_b->state == SI_ST_EST || si_b->state == SI_ST_CON) &&
!(req->flags & CF_SHUTW) && /* Write not closed */
!channel_is_empty(req) &&
!(cs->flags & CS_FL_ERROR) &&
!(cs->conn->flags & CO_FL_ERROR)) {
if (si_cs_send(cs))
si_f->flags &= ~SI_FL_WAIT_ROOM;
}
/* it's time to try to receive */
if (!(req->flags & (CF_SHUTR|CF_DONT_READ)))
si_want_put(si_f);
si_chk_rcv(si_f);
if (!(res->flags & (CF_SHUTR|CF_DONT_READ)))
si_want_put(si_b);
si_chk_rcv(si_b);
/* let's recompute both sides states */
if (si_f->state == SI_ST_EST)
stream_int_update(si_f);
if (si_b->state == SI_ST_EST)
stream_int_update(si_b);
/* stream ints are processed outside of process_stream() and must be
* handled at the latest moment.
*/
if (obj_type(si_f->end) == OBJ_TYPE_APPCTX &&
(((si_f->flags & (SI_FL_WANT_PUT|SI_FL_WAIT_ROOM)) == SI_FL_WANT_PUT) ||
((si_f->flags & (SI_FL_WANT_GET|SI_FL_WAIT_DATA)) == SI_FL_WANT_GET)))
appctx_wakeup(si_appctx(si_f));
if (obj_type(si_b->end) == OBJ_TYPE_APPCTX &&
(((si_b->flags & (SI_FL_WANT_PUT|SI_FL_WAIT_ROOM)) == SI_FL_WANT_PUT) ||
((si_b->flags & (SI_FL_WANT_GET|SI_FL_WAIT_DATA)) == SI_FL_WANT_GET)))
appctx_wakeup(si_appctx(si_b));
} }
/* Updates the active status of a connection outside of the connection handler /* Updates the active status of a connection outside of the connection handler