MAJOR: channel: add a new flag CF_WAKE_WRITE to notify the task of writes
Since commit 6b66f3e ([MAJOR] implement autonomous inter-socket forwarding) introduced in 1.3.16-rc1, we've been relying on a stupid mechanism to wake up the task after a write, which was an exact copy-paste of the reader side. The principle was that if we empty a buffer and there's no forwarding scheduled or if the *producer* is not in a connected state, then we wake the task up. That does not make any sense. It happens to wake up too late sometimes (eg, when the request analyser waits for some room in the buffer to start to work), and leads to unneeded wakeups in client-side keep-alive, because the task is woken up when the response is sent, while the analysers are simply waiting for a new request. In order to fix this, we introduce a new channel flag : CF_WAKE_WRITE. It is designed so that an analyser can explicitly request being notified when some data were written. It is used only when the HTTP request or response analysers need to wait for more room in the buffers. It is automatically cleared upon wake up. The flag is also automatically set by the functions which try to write into a buffer from an applet when they fail (bi_putblk() etc...). That allows us to remove the stupid condition above and avoid some wakeups. In http-server-close and in http-keep-alive modes, this reduces from 4 to 3 the average number of wakeups per request, and increases the overall performance by about 1.5%.
This commit is contained in:
parent
51437d2c59
commit
d7ad9f5b0d
@ -70,7 +70,7 @@
|
||||
#define CF_WRITE_ERROR 0x00000800 /* unrecoverable error on consumer side */
|
||||
#define CF_WRITE_ACTIVITY (CF_WRITE_NULL|CF_WRITE_PARTIAL|CF_WRITE_ERROR)
|
||||
|
||||
/* unused: 0x00001000 */
|
||||
#define CF_WAKE_WRITE 0x00001000 /* wake the task up when there's write activity */
|
||||
#define CF_SHUTW 0x00002000 /* consumer has already shut down */
|
||||
#define CF_SHUTW_NOW 0x00004000 /* the consumer must shut down for writes ASAP */
|
||||
#define CF_AUTO_CLOSE 0x00008000 /* producer can forward shutdown to other side */
|
||||
@ -120,11 +120,6 @@
|
||||
#define CF_WAKE_ONCE 0x10000000 /* pretend there is activity on this channel (one-shoot) */
|
||||
/* unused: 0x20000000, 0x40000000, 0x80000000 */
|
||||
|
||||
/* Use these masks to clear the flags before going back to lower layers */
|
||||
#define CF_CLEAR_READ (~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ERROR|CF_READ_ATTACHED))
|
||||
#define CF_CLEAR_WRITE (~(CF_WRITE_NULL|CF_WRITE_PARTIAL|CF_WRITE_ERROR))
|
||||
#define CF_CLEAR_TIMEOUT (~(CF_READ_TIMEOUT|CF_WRITE_TIMEOUT|CF_ANA_TIMEOUT))
|
||||
|
||||
/* Masks which define input events for stream analysers */
|
||||
#define CF_MASK_ANALYSER (CF_READ_ATTACHED|CF_READ_ACTIVITY|CF_READ_TIMEOUT|CF_ANA_TIMEOUT|CF_WRITE_ACTIVITY|CF_WAKE_ONCE)
|
||||
|
||||
|
@ -113,14 +113,18 @@ int bo_inject(struct channel *chn, const char *msg, int len)
|
||||
* input is closed, -2 is returned. If there is not enough room left in the
|
||||
* buffer, -1 is returned. Otherwise the number of bytes copied is returned
|
||||
* (1). Channel flag READ_PARTIAL is updated if some data can be transferred.
|
||||
* Channel flag CF_WAKE_WRITE is set if the write fails because the buffer is
|
||||
* full.
|
||||
*/
|
||||
int bi_putchr(struct channel *chn, char c)
|
||||
{
|
||||
if (unlikely(channel_input_closed(chn)))
|
||||
return -2;
|
||||
|
||||
if (channel_full(chn))
|
||||
if (channel_full(chn)) {
|
||||
chn->flags |= CF_WAKE_WRITE;
|
||||
return -1;
|
||||
}
|
||||
|
||||
*bi_end(chn->buf) = c;
|
||||
|
||||
@ -143,7 +147,8 @@ int bi_putchr(struct channel *chn, char c)
|
||||
* -3 is returned. If there is not enough room left in the buffer, -1 is
|
||||
* returned. Otherwise the number of bytes copied is returned (0 being a valid
|
||||
* number). Channel flag READ_PARTIAL is updated if some data can be
|
||||
* transferred.
|
||||
* transferred. Channel flag CF_WAKE_WRITE is set if the write fails because
|
||||
* the buffer is full.
|
||||
*/
|
||||
int bi_putblk(struct channel *chn, const char *blk, int len)
|
||||
{
|
||||
@ -161,6 +166,7 @@ int bi_putblk(struct channel *chn, const char *blk, int len)
|
||||
if (len > max)
|
||||
return -3;
|
||||
|
||||
chn->flags |= CF_WAKE_WRITE;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -1965,8 +1965,10 @@ static void cli_io_handler(struct stream_interface *si)
|
||||
/* ensure we have some output room left in the event we
|
||||
* would want to return some info right after parsing.
|
||||
*/
|
||||
if (buffer_almost_full(si->ib->buf))
|
||||
if (buffer_almost_full(si->ib->buf)) {
|
||||
si->ib->flags |= CF_WAKE_WRITE;
|
||||
break;
|
||||
}
|
||||
|
||||
reql = bo_getline(si->ob, trash.str, trash.size);
|
||||
if (reql <= 0) { /* closed or EOL not found */
|
||||
@ -3360,8 +3362,10 @@ static int stats_dump_proxy_to_buffer(struct stream_interface *si, struct proxy
|
||||
case STAT_PX_ST_LI:
|
||||
/* stats.l has been initialized above */
|
||||
for (; appctx->ctx.stats.l != &px->conf.listeners; appctx->ctx.stats.l = l->by_fe.n) {
|
||||
if (buffer_almost_full(rep->buf))
|
||||
if (buffer_almost_full(rep->buf)) {
|
||||
rep->flags |= CF_WAKE_WRITE;
|
||||
return 0;
|
||||
}
|
||||
|
||||
l = LIST_ELEM(appctx->ctx.stats.l, struct listener *, by_fe);
|
||||
if (!l->counters)
|
||||
@ -3390,8 +3394,10 @@ static int stats_dump_proxy_to_buffer(struct stream_interface *si, struct proxy
|
||||
for (; appctx->ctx.stats.sv != NULL; appctx->ctx.stats.sv = sv->next) {
|
||||
int sv_state; /* 0=DOWN, 1=going up, 2=going down, 3=UP, 4,5=NOLB, 6=unchecked */
|
||||
|
||||
if (buffer_almost_full(rep->buf))
|
||||
if (buffer_almost_full(rep->buf)) {
|
||||
rep->flags |= CF_WAKE_WRITE;
|
||||
return 0;
|
||||
}
|
||||
|
||||
sv = appctx->ctx.stats.sv;
|
||||
|
||||
@ -3874,8 +3880,10 @@ static int stats_dump_stat_to_buffer(struct stream_interface *si, struct uri_aut
|
||||
case STAT_ST_LIST:
|
||||
/* dump proxies */
|
||||
while (appctx->ctx.stats.px) {
|
||||
if (buffer_almost_full(rep->buf))
|
||||
if (buffer_almost_full(rep->buf)) {
|
||||
rep->flags |= CF_WAKE_WRITE;
|
||||
return 0;
|
||||
}
|
||||
|
||||
px = appctx->ctx.stats.px;
|
||||
/* skip the disabled proxies, global frontend and non-networked ones */
|
||||
|
@ -2307,6 +2307,7 @@ int http_wait_for_request(struct session *s, struct channel *req, int an_bit)
|
||||
/* some data has still not left the buffer, wake us once that's done */
|
||||
channel_dont_connect(req);
|
||||
req->flags |= CF_READ_DONTWAIT; /* try to get back here ASAP */
|
||||
req->flags |= CF_WAKE_WRITE;
|
||||
return 0;
|
||||
}
|
||||
if (unlikely(bi_end(req->buf) < b_ptr(req->buf, msg->next) ||
|
||||
@ -2331,6 +2332,7 @@ int http_wait_for_request(struct session *s, struct channel *req, int an_bit)
|
||||
/* don't let a connection request be initiated */
|
||||
channel_dont_connect(req);
|
||||
s->rep->flags &= ~CF_EXPECT_MORE; /* speed up sending a previous response */
|
||||
s->rep->flags |= CF_WAKE_WRITE;
|
||||
s->rep->analysers |= an_bit; /* wake us up once it changes */
|
||||
return 0;
|
||||
}
|
||||
@ -5115,6 +5117,7 @@ int http_wait_for_response(struct session *s, struct channel *rep, int an_bit)
|
||||
goto abort_response;
|
||||
channel_dont_close(rep);
|
||||
rep->flags |= CF_READ_DONTWAIT; /* try to get back here ASAP */
|
||||
rep->flags |= CF_WAKE_WRITE;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -1605,7 +1605,8 @@ struct task *process_session(struct task *t)
|
||||
memset(&s->txn.auth, 0, sizeof(s->txn.auth));
|
||||
|
||||
/* This flag must explicitly be set every time */
|
||||
s->req->flags &= ~CF_READ_NOEXP;
|
||||
s->req->flags &= ~(CF_READ_NOEXP|CF_WAKE_WRITE);
|
||||
s->rep->flags &= ~(CF_READ_NOEXP|CF_WAKE_WRITE);
|
||||
|
||||
/* Keep a copy of req/rep flags so that we can detect shutdowns */
|
||||
rqf_last = s->req->flags & ~CF_MASK_ANALYSER;
|
||||
|
@ -205,9 +205,7 @@ static void stream_int_update_embedded(struct stream_interface *si)
|
||||
/* changes on the consumption side */
|
||||
(si->ob->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) ||
|
||||
((si->ob->flags & CF_WRITE_ACTIVITY) &&
|
||||
((si->ob->flags & CF_SHUTW) ||
|
||||
si->ob->prod->state != SI_ST_EST ||
|
||||
(channel_is_empty(si->ob) && !si->ob->to_forward)))) {
|
||||
(si->ob->flags & (CF_SHUTW|CF_WAKE_WRITE)))) {
|
||||
if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
|
||||
task_wakeup(si->owner, TASK_WOKEN_IO);
|
||||
}
|
||||
@ -630,9 +628,7 @@ static int si_conn_wake_cb(struct connection *conn)
|
||||
/* changes on the consumption side */
|
||||
(si->ob->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) ||
|
||||
((si->ob->flags & CF_WRITE_ACTIVITY) &&
|
||||
((si->ob->flags & CF_SHUTW) ||
|
||||
si->ob->prod->state != SI_ST_EST ||
|
||||
(channel_is_empty(si->ob) && !si->ob->to_forward)))) {
|
||||
(si->ob->flags & (CF_SHUTW|CF_WAKE_WRITE)))) {
|
||||
task_wakeup(si->owner, TASK_WOKEN_IO);
|
||||
}
|
||||
if (si->ib->flags & CF_READ_ACTIVITY)
|
||||
@ -1045,9 +1041,7 @@ static void stream_int_chk_snd_conn(struct stream_interface *si)
|
||||
/* in case of special condition (error, shutdown, end of write...), we
|
||||
* have to notify the task.
|
||||
*/
|
||||
if (likely((ob->flags & (CF_WRITE_NULL|CF_WRITE_ERROR|CF_SHUTW)) ||
|
||||
(channel_is_empty(ob) && !ob->to_forward) ||
|
||||
si->state != SI_ST_EST)) {
|
||||
if (likely(ob->flags & (CF_WRITE_NULL|CF_WRITE_ERROR|CF_SHUTW|CF_WAKE_WRITE))) {
|
||||
out_wakeup:
|
||||
if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
|
||||
task_wakeup(si->owner, TASK_WOKEN_IO);
|
||||
|
Loading…
x
Reference in New Issue
Block a user