MINOR: spoe: Count the number of frames waiting for an ack for each applet

So it is easier to respect the max_fpa value. This is no more the maximum frames
processed by an applet at each loop but the maximum frames waiting for an ack
for a specific applet.

The function spoe_handle_processing_appctx has been rewritten accordingly.
This commit is contained in:
Christopher Faulet 2018-01-24 16:23:03 +01:00 committed by Willy Tarreau
parent 6f9ea4f87b
commit 8f82b203d5
2 changed files with 60 additions and 66 deletions

View File

@ -336,6 +336,7 @@ struct spoe_appctx {
struct buffer_wait buffer_wait; /* position in the list of ressources waiting for a buffer */
struct list waiting_queue; /* list of streams waiting for a ACK frame, in sync and pipelining mode */
struct list list; /* next spoe appctx for the same agent */
unsigned int cur_fpa;
struct {
struct spoe_context *ctx; /* SPOE context owning the fragmented frame */

View File

@ -1523,6 +1523,7 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip)
SPOE_APPCTX(appctx)->frag_ctx.ctx = NULL;
SPOE_APPCTX(appctx)->frag_ctx.cursid = 0;
SPOE_APPCTX(appctx)->frag_ctx.curfid = 0;
SPOE_APPCTX(appctx)->cur_fpa++;
ctx->state = SPOE_CTX_ST_WAITING_ACK;
goto end;
@ -1571,8 +1572,10 @@ spoe_handle_receiving_frame_appctx(struct appctx *appctx, int *skip)
default:
LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list);
if (ctx->spoe_appctx)
if (ctx->spoe_appctx) {
ctx->spoe_appctx->cur_fpa--;
ctx->spoe_appctx = NULL;
}
if (appctx->st0 == SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY &&
ctx == SPOE_APPCTX(appctx)->frag_ctx.ctx) {
appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
@ -1599,8 +1602,7 @@ spoe_handle_processing_appctx(struct appctx *appctx)
{
struct stream_interface *si = appctx->owner;
struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
unsigned int fpa = 0;
int ret, skip_sending = 0, skip_receiving = 0;
int ret, skip_sending = 0, skip_receiving = 0, active_s = 0, active_r = 0;
if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_IO;
@ -1614,86 +1616,76 @@ spoe_handle_processing_appctx(struct appctx *appctx)
goto next;
}
process:
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
" - process: fpa=%u/%u - skip_sending=%d - skip_receiving=%d"
" - appctx-state=%s\n",
" - process: fpa=%u/%u - appctx-state=%s - flags=0x%08x\n",
(int)now.tv_sec, (int)now.tv_usec, agent->id,
__FUNCTION__, appctx, fpa, agent->max_fpa,
skip_sending, skip_receiving,
spoe_appctx_state_str[appctx->st0]);
__FUNCTION__, appctx, SPOE_APPCTX(appctx)->cur_fpa,
agent->max_fpa, spoe_appctx_state_str[appctx->st0],
SPOE_APPCTX(appctx)->flags);
if (fpa > agent->max_fpa)
goto stop;
else if (skip_sending || appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK) {
if (skip_receiving)
goto stop;
goto recv_frame;
if (appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK)
skip_sending = 1;
/* receiving_frame loop */
while (!skip_receiving) {
ret = spoe_handle_receiving_frame_appctx(appctx, &skip_receiving);
switch (ret) {
case -1: /* error */
goto next;
case 0: /* ignore */
active_r = 1;
break;
case 1: /* retry */
break;
default:
active_r = 1;
break;
}
}
/* send_frame */
ret = spoe_handle_sending_frame_appctx(appctx, &skip_sending);
switch (ret) {
case -1: /* error */
goto next;
/* send_frame loop */
while (!skip_sending && SPOE_APPCTX(appctx)->cur_fpa < agent->max_fpa) {
ret = spoe_handle_sending_frame_appctx(appctx, &skip_sending);
switch (ret) {
case -1: /* error */
goto next;
case 0: /* ignore */
update_freq_ctr(&agent->rt[tid].processing_per_sec, 1);
fpa++;
break;
case 0: /* ignore */
active_s++;
break;
case 1: /* retry */
break;
case 1: /* retry */
break;
default:
update_freq_ctr(&agent->rt[tid].processing_per_sec, 1);
fpa++;
break;
default:
active_s++;
break;
}
}
if (fpa > agent->max_fpa)
goto stop;
recv_frame:
if (skip_receiving)
goto process;
ret = spoe_handle_receiving_frame_appctx(appctx, &skip_receiving);
switch (ret) {
case -1: /* error */
goto next;
case 0: /* ignore */
fpa++;
break;
case 1: /* retry */
break;
default:
fpa++;
break;
}
goto process;
next:
SPOE_APPCTX(appctx)->task->expire =
tick_add_ifset(now_ms, agent->timeout.idle);
return 0;
stop:
if (appctx->st0 == SPOE_APPCTX_ST_PROCESSING) {
appctx->st0 = SPOE_APPCTX_ST_IDLE;
agent->rt[tid].applets_idle++;
}
if (fpa) {
if (active_s || active_r) {
HA_SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
LIST_DEL(&SPOE_APPCTX(appctx)->list);
LIST_ADD(&agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list);
HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
if (fpa)
SPOE_APPCTX(appctx)->task->expire =
tick_add_ifset(now_ms, agent->timeout.idle);
update_freq_ctr(&agent->rt[tid].processing_per_sec, active_s);
SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
}
if (appctx->st0 == SPOE_APPCTX_ST_PROCESSING && SPOE_APPCTX(appctx)->cur_fpa < agent->max_fpa) {
appctx->st0 = SPOE_APPCTX_ST_IDLE;
agent->rt[tid].applets_idle++;
}
return 1;
next:
SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
return 0;
exit:
appctx->st0 = SPOE_APPCTX_ST_EXIT;
return 0;
@ -1935,6 +1927,7 @@ spoe_create_appctx(struct spoe_config *conf)
SPOE_APPCTX(appctx)->flags = 0;
SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_NONE;
SPOE_APPCTX(appctx)->buffer = &buf_empty;
SPOE_APPCTX(appctx)->cur_fpa = 0;
LIST_INIT(&SPOE_APPCTX(appctx)->buffer_wait.list);
SPOE_APPCTX(appctx)->buffer_wait.target = appctx;