MEDIUM: spoe: Be sure to wakeup the good entity waiting for a buffer

This happens when buffer allocation failed. In the SPOE context, buffers are
allocated by streams and SPOE applets at different time. First, by streams, when
messages need to be encoded before sending them in a NOTIFY frame. Then, by SPOE
applets, when a ACK frame is received.

The first case works as expected, we wake up the stream. But for the second one,
we must wake up the waiting SPOE applet.
This commit is contained in:
Christopher Faulet 2017-01-11 14:05:19 +01:00 committed by Willy Tarreau
parent a21b064f81
commit 4596fb7056

View File

@ -246,8 +246,8 @@ struct spoe_context {
struct stream *strm; /* The stream that should be offloaded */
struct list *messages; /* List of messages that will be sent during the stream processing */
struct buffer *buffer; /* Buffer used to store a NOTIFY or ACK frame */
struct buffer_wait buffer_wait; /* position in the list of streams waiting for a buffer */
struct buffer *buffer; /* Buffer used to store a encoded messages */
struct buffer_wait buffer_wait; /* position in the list of ressources waiting for a buffer */
struct list list;
enum spoe_ctx_state state; /* SPOE_CTX_ST_* */
@ -270,6 +270,8 @@ struct spoe_appctx {
unsigned int flags; /* SPOE_APPCTX_FL_* */
unsigned int status_code; /* SPOE_FRM_ERR_* */
struct buffer *buffer; /* Buffer used to store a encoded messages */
struct buffer_wait buffer_wait; /* position in the list of ressources waiting for a buffer */
struct list waiting_queue; /* list of streams waiting for a ACK frame, in sync and pipelining mode */
struct list list; /* next spoe appctx for the same agent */
};
@ -309,8 +311,8 @@ char spoe_reason[256];
struct flt_ops spoe_ops;
static int queue_spoe_context(struct spoe_context *ctx);
static int acquire_spoe_buffer(struct spoe_context *ctx);
static void release_spoe_buffer(struct spoe_context *ctx);
static int acquire_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait);
static void release_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait);
/********************************************************************
* helper functions/globals
@ -913,15 +915,15 @@ prepare_spoe_hanotify_frame(struct appctx *appctx, struct spoe_context *ctx,
idx += encode_spoe_varint(ctx->stream_id, frame+idx);
idx += encode_spoe_varint(ctx->frame_id, frame+idx);
/* Copy encoded messages */
if (idx + ctx->buffer->i > size) {
/* check the buffer size */
if (idx + SPOE_APPCTX(appctx)->buffer->i > size) {
spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
return 0;
}
/* Copy encoded messages */
memcpy(frame+idx, ctx->buffer->p, ctx->buffer->i);
idx += ctx->buffer->i;
memcpy(frame+idx, SPOE_APPCTX(appctx)->buffer->p, SPOE_APPCTX(appctx)->buffer->i);
idx += SPOE_APPCTX(appctx)->buffer->i;
return idx;
}
@ -1230,9 +1232,13 @@ handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size)
return 0;
found:
if (!acquire_spoe_buffer(ctx))
if (!acquire_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait))
return 1; /* Retry later */
/* Transfer the buffer ownership to the SPOE context */
ctx->buffer = SPOE_APPCTX(appctx)->buffer;
SPOE_APPCTX(appctx)->buffer = &buf_empty;
/* Copy encoded actions */
memcpy(ctx->buffer->p, frame+idx, size-idx);
ctx->buffer->i = size-idx;
@ -1379,6 +1385,15 @@ recv_spoe_frame(struct appctx *appctx, char *buf, size_t framesz)
/********************************************************************
* Functions that manage the SPOE applet
********************************************************************/
static int
wakeup_spoe_appctx(struct appctx *appctx)
{
si_applet_want_get(appctx->owner);
si_applet_want_put(appctx->owner);
appctx_wakeup(appctx);
return 1;
}
/* Callback function that catches applet timeouts. If a timeout occurred, we set
* <appctx->st1> flag and the SPOE applet is woken up. */
static struct task *
@ -1391,9 +1406,7 @@ process_spoe_applet(struct task * task)
task->expire = TICK_ETERNITY;
appctx->st1 = SPOE_APPCTX_ERR_TOUT;
}
si_applet_want_get(appctx->owner);
si_applet_want_put(appctx->owner);
appctx_wakeup(appctx);
wakeup_spoe_appctx(appctx);
return task;
}
@ -1441,6 +1454,7 @@ release_spoe_applet(struct appctx *appctx)
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
}
release_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait);
pool_free2(pool2_spoe_appctx, SPOE_APPCTX(appctx));
if (!LIST_ISEMPTY(&agent->applets))
@ -1633,6 +1647,11 @@ handle_processing_spoe_applet(struct appctx *appctx)
}
ctx = LIST_NEXT(&agent->sending_queue, typeof(ctx), list);
/* Transfer the buffer ownership to the SPOE appctx */
SPOE_APPCTX(appctx)->buffer = ctx->buffer;
ctx->buffer = &buf_empty;
ret = prepare_spoe_hanotify_frame(appctx, ctx, frame+4, SPOE_APPCTX(appctx)->max_frame_size);
if (ret > 1)
ret = send_spoe_frame(appctx, frame, ret);
@ -1646,7 +1665,7 @@ handle_processing_spoe_applet(struct appctx *appctx)
agent->sending_rate++;
ctx->state = SPOE_CTX_ST_ERROR;
ctx->status_code = (spoe_status_code + 0x100);
release_spoe_buffer(ctx);
release_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait);
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list);
@ -1661,7 +1680,7 @@ handle_processing_spoe_applet(struct appctx *appctx)
default:
agent->sending_rate++;
ctx->state = SPOE_CTX_ST_WAITING_ACK;
release_spoe_buffer(ctx);
release_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait);
LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list);
if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC)
@ -1963,6 +1982,11 @@ create_spoe_appctx(struct spoe_config *conf)
SPOE_APPCTX(appctx)->max_frame_size = conf->agent->max_frame_size;
SPOE_APPCTX(appctx)->flags = 0;
SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_NONE;
SPOE_APPCTX(appctx)->buffer = &buf_empty;
LIST_INIT(&SPOE_APPCTX(appctx)->buffer_wait.list);
SPOE_APPCTX(appctx)->buffer_wait.target = appctx;
SPOE_APPCTX(appctx)->buffer_wait.wakeup_cb = (int (*)(void *))wakeup_spoe_appctx;
LIST_INIT(&SPOE_APPCTX(appctx)->list);
LIST_INIT(&SPOE_APPCTX(appctx)->waiting_queue);
@ -2093,9 +2117,7 @@ queue_spoe_context(struct spoe_context *ctx)
list_for_each_entry(spoe_appctx, &agent->applets, list) {
appctx = spoe_appctx->owner;
if (appctx->st0 == SPOE_APPCTX_ST_IDLE) {
si_applet_want_get(appctx->owner);
si_applet_want_put(appctx->owner);
appctx_wakeup(appctx);
wakeup_spoe_appctx(appctx);
LIST_DEL(&spoe_appctx->list);
LIST_ADDQ(&agent->applets, &spoe_appctx->list);
break;
@ -2379,7 +2401,7 @@ start_event_processing(struct spoe_context *ctx, int dir)
if (ctx->flags & SPOE_CTX_FL_PROCESS)
goto wait;
if (!acquire_spoe_buffer(ctx))
if (!acquire_spoe_buffer(&ctx->buffer, &ctx->buffer_wait))
goto wait;
/* Set the right flag to prevent request and response processing
@ -2405,7 +2427,7 @@ stop_event_processing(struct spoe_context *ctx)
/* Reset processing timer */
ctx->process_exp = TICK_ETERNITY;
release_spoe_buffer(ctx);
release_spoe_buffer(&ctx->buffer, &ctx->buffer_wait);
if (!LIST_ISEMPTY(&ctx->list)) {
LIST_DEL(&ctx->list);
@ -2539,39 +2561,41 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx,
* Functions that create/destroy SPOE contexts
**************************************************************************/
static int
acquire_spoe_buffer(struct spoe_context *ctx)
acquire_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait)
{
if (ctx->buffer != &buf_empty)
if (*buf != &buf_empty)
return 1;
if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) {
LIST_DEL(&ctx->buffer_wait.list);
LIST_INIT(&ctx->buffer_wait.list);
if (!LIST_ISEMPTY(&buffer_wait->list)) {
LIST_DEL(&buffer_wait->list);
LIST_INIT(&buffer_wait->list);
}
if (b_alloc_margin(&ctx->buffer, global.tune.reserved_bufs))
if (b_alloc_margin(buf, global.tune.reserved_bufs))
return 1;
LIST_ADDQ(&buffer_wq, &ctx->buffer_wait.list);
LIST_ADDQ(&buffer_wq, &buffer_wait->list);
return 0;
}
static void
release_spoe_buffer(struct spoe_context *ctx)
release_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait)
{
if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) {
LIST_DEL(&ctx->buffer_wait.list);
LIST_INIT(&ctx->buffer_wait.list);
if (!LIST_ISEMPTY(&buffer_wait->list)) {
LIST_DEL(&buffer_wait->list);
LIST_INIT(&buffer_wait->list);
}
/* Release the buffer if needed */
if (ctx->buffer != &buf_empty) {
b_free(&ctx->buffer);
offer_buffers(ctx, tasks_run_queue + applets_active_queue);
if (*buf != &buf_empty) {
b_free(buf);
offer_buffers(buffer_wait->target,
tasks_run_queue + applets_active_queue);
}
}
static int wakeup_spoe_context(struct spoe_context *ctx)
static int
wakeup_spoe_context(struct spoe_context *ctx)
{
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
return 1;
@ -2643,7 +2667,6 @@ sig_stop_spoe(struct sig_handler *sh)
list_for_each_entry(fconf, &p->filter_configs, list) {
struct spoe_config *conf;
struct spoe_agent *agent;
struct appctx *appctx;
struct spoe_appctx *spoe_appctx;
if (fconf->id != spoe_filter_id)
@ -2653,10 +2676,7 @@ sig_stop_spoe(struct sig_handler *sh)
agent = conf->agent;
list_for_each_entry(spoe_appctx, &agent->applets, list) {
appctx = spoe_appctx->owner;
si_applet_want_get(appctx->owner);
si_applet_want_put(appctx->owner);
appctx_wakeup(appctx);
wakeup_spoe_appctx(spoe_appctx->owner);
}
}
p = p->next;
@ -2818,7 +2838,7 @@ spoe_check_timeouts(struct stream *s, struct filter *filter)
if (tick_is_expired(ctx->process_exp, now_ms)) {
s->pending_events |= TASK_WOKEN_MSG;
release_spoe_buffer(ctx);
release_spoe_buffer(&ctx->buffer, &ctx->buffer_wait);
}
}