MINOR: spoe: Add counters to log info about SPOE agents

In addition to metrics about time spent in the SPOE, following counters have
been added:

  * applets : number of SPOE applets.
  * idles : number of idle applets.
  * nb_sending : number of streams waiting to send data.
  * nb_waiting : number of streams waiting for a ack.
  * nb_processed : number of events/groups processed by the SPOE (from the
                   stream point of view).
  * nb_errors : number of errors during the processing (from the stream point of
                view).

Log messages has been updated to report these counters. Following pattern has
been added at the end of the log message:

    ... <idles>/<applets> <nb_sending>/<nb_waiting> <nb_error>/<nb_processed>
This commit is contained in:
Christopher Faulet 2018-04-04 10:25:50 +02:00 committed by Willy Tarreau
parent 3b8e34902b
commit caf2feca62
3 changed files with 146 additions and 92 deletions

View File

@ -1200,7 +1200,8 @@ LOG_NOTICE. Otherwise, the message is logged with the level LOG_WARNING.
The messages are logged using the agent's logger, if defined, and use the
following format:
SPOE: [AGENT] <TYPE:NAME> sid=STREAM-ID st=STATUC-CODE reqT/qT/wT/resT/pT
SPOE: [AGENT] <TYPE:NAME> sid=STREAM-ID st=STATUC-CODE reqT/qT/wT/resT/pT \
<idles>/<applets> <nb_sending>/<nb_waiting> <nb_error>/<nb_processed>
AGENT is the agent name
TYPE is EVENT of GROUP
@ -1221,6 +1222,14 @@ following format:
point of view, it is the latency added by the SPOE processing.
It is more or less the sum of values above.
<idle> is the numbers of idle SPOE applets
<applets> is the numbers of SPOE applets
<nb_sending> is the numbers of streams waiting to send data
<nb_waiting> is the numbers of streams waiting for a ack
<nb_error> is the numbers of processing errors
<nb_processed> is the numbers of events/groups processed
For all these time events, -1 means the processing was interrupted before the
end. So -1 for the queue time means the request was never dequeued. For
fragmented frames it is harder to know when the interruption happened.

View File

@ -282,6 +282,14 @@ struct spoe_agent {
__decl_hathreads(HA_SPINLOCK_T lock);
} *rt;
struct {
unsigned int applets; /* # of SPOE applets */
unsigned int idles; /* # of idle applets */
unsigned int nb_sending; /* # of streams waiting to send data */
unsigned int nb_waiting; /* # of streams waiting for a ack */
unsigned long long nb_processed; /* # of frames processed by the SPOE */
unsigned long long nb_errors; /* # of errors during the processing */
} counters;
};
/* SPOE filter configuration */

View File

@ -1233,6 +1233,7 @@ spoe_release_appctx(struct appctx *appctx)
/* Remove applet from the list of running applets */
SPOE_DEBUG_STMT(agent->rt[tid].applets_act--);
HA_ATOMIC_SUB(&agent->counters.applets, 1);
HA_SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
if (!LIST_ISEMPTY(&spoe_appctx->list)) {
LIST_DEL(&spoe_appctx->list);
@ -1245,6 +1246,7 @@ spoe_release_appctx(struct appctx *appctx)
if (appctx->st0 == SPOE_APPCTX_ST_IDLE) {
eb32_delete(&spoe_appctx->node);
SPOE_DEBUG_STMT(agent->rt[tid].applets_idle--);
HA_ATOMIC_SUB(&agent->counters.idles, 1);
}
appctx->st0 = SPOE_APPCTX_ST_END;
@ -1266,6 +1268,7 @@ spoe_release_appctx(struct appctx *appctx)
list_for_each_entry_safe(ctx, back, &spoe_appctx->waiting_queue, list) {
LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list);
HA_ATOMIC_SUB(&agent->counters.nb_waiting, 1);
spoe_update_stat_time(&ctx->stats.tv_wait, &ctx->stats.t_waiting);
ctx->state = SPOE_CTX_ST_ERROR;
ctx->status_code = (spoe_appctx->status_code + 0x100);
@ -1294,6 +1297,7 @@ spoe_release_appctx(struct appctx *appctx)
list_for_each_entry_safe(ctx, back, &agent->rt[tid].sending_queue, list) {
LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list);
HA_ATOMIC_SUB(&agent->counters.nb_sending, 1);
spoe_update_stat_time(&ctx->stats.tv_queue, &ctx->stats.t_queue);
ctx->state = SPOE_CTX_ST_ERROR;
ctx->status_code = (spoe_appctx->status_code + 0x100);
@ -1302,6 +1306,7 @@ spoe_release_appctx(struct appctx *appctx)
list_for_each_entry_safe(ctx, back, &agent->rt[tid].waiting_queue, list) {
LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list);
HA_ATOMIC_SUB(&agent->counters.nb_waiting, 1);
spoe_update_stat_time(&ctx->stats.tv_wait, &ctx->stats.t_waiting);
ctx->state = SPOE_CTX_ST_ERROR;
ctx->status_code = (spoe_appctx->status_code + 0x100);
@ -1426,6 +1431,7 @@ spoe_handle_connecting_appctx(struct appctx *appctx)
/* HELLO handshake is finished, set the idle timeout and
* add the applet in the list of running applets. */
SPOE_DEBUG_STMT(agent->rt[tid].applets_idle++);
HA_ATOMIC_ADD(&agent->counters.idles, 1);
appctx->st0 = SPOE_APPCTX_ST_IDLE;
SPOE_APPCTX(appctx)->node.key = 0;
eb32_insert(&agent->rt[tid].idle_applets, &SPOE_APPCTX(appctx)->node);
@ -1495,6 +1501,7 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip)
spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait);
LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list);
HA_ATOMIC_SUB(&agent->counters.nb_sending, 1);
spoe_update_stat_time(&ctx->stats.tv_queue, &ctx->stats.t_queue);
ctx->spoe_appctx = NULL;
ctx->state = SPOE_CTX_ST_ERROR;
@ -1514,6 +1521,7 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip)
spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait);
LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list);
HA_ATOMIC_SUB(&agent->counters.nb_sending, 1);
spoe_update_stat_time(&ctx->stats.tv_queue, &ctx->stats.t_queue);
ctx->spoe_appctx = SPOE_APPCTX(appctx);
if (!(ctx->flags & SPOE_CTX_FL_FRAGMENTED) ||
@ -1548,6 +1556,7 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip)
*skip = 1;
LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list);
}
HA_ATOMIC_ADD(&agent->counters.nb_waiting, 1);
ctx->stats.tv_wait = now;
SPOE_APPCTX(appctx)->frag_ctx.ctx = NULL;
SPOE_APPCTX(appctx)->frag_ctx.cursid = 0;
@ -1571,6 +1580,7 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip)
static int
spoe_handle_receiving_frame_appctx(struct appctx *appctx, int *skip)
{
struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
struct spoe_context *ctx = NULL;
char *frame;
int ret;
@ -1602,6 +1612,7 @@ spoe_handle_receiving_frame_appctx(struct appctx *appctx, int *skip)
default:
LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list);
HA_ATOMIC_SUB(&agent->counters.nb_waiting, 1);
spoe_update_stat_time(&ctx->stats.tv_wait, &ctx->stats.t_waiting);
ctx->stats.tv_response = now;
if (ctx->spoe_appctx) {
@ -1708,6 +1719,7 @@ spoe_handle_processing_appctx(struct appctx *appctx)
if (appctx->st0 == SPOE_APPCTX_ST_PROCESSING && SPOE_APPCTX(appctx)->cur_fpa < agent->max_fpa) {
SPOE_DEBUG_STMT(agent->rt[tid].applets_idle++);
HA_ATOMIC_ADD(&agent->counters.idles, 1);
appctx->st0 = SPOE_APPCTX_ST_IDLE;
eb32_insert(&agent->rt[tid].idle_applets, &SPOE_APPCTX(appctx)->node);
}
@ -1871,6 +1883,7 @@ spoe_handle_appctx(struct appctx *appctx)
case SPOE_APPCTX_ST_IDLE:
SPOE_DEBUG_STMT(agent->rt[tid].applets_idle--);
HA_ATOMIC_SUB(&agent->counters.idles, 1);
eb32_delete(&SPOE_APPCTX(appctx)->node);
if (stopping &&
LIST_ISEMPTY(&agent->rt[tid].sending_queue) &&
@ -1988,6 +2001,7 @@ spoe_create_appctx(struct spoe_config *conf)
LIST_ADDQ(&conf->agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list);
HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &conf->agent->rt[tid].lock);
SPOE_DEBUG_STMT(conf->agent->rt[tid].applets_act++);
HA_ATOMIC_ADD(&conf->agent->counters.applets, 1);
task_wakeup(SPOE_APPCTX(appctx)->task, TASK_WOKEN_INIT);
task_wakeup(strm->task, TASK_WOKEN_INIT);
@ -2072,6 +2086,7 @@ spoe_queue_context(struct spoe_context *ctx)
/* Add the SPOE context in the sending queue */
LIST_ADDQ(&agent->rt[tid].sending_queue, &ctx->list);
HA_ATOMIC_ADD(&agent->counters.nb_sending, 1);
spoe_update_stat_time(&ctx->stats.tv_request, &ctx->stats.t_request);
ctx->stats.tv_queue = now;
@ -2459,71 +2474,6 @@ spoe_process_actions(struct stream *s, struct spoe_context *ctx, int dir)
/***************************************************************************
* Functions that process SPOE events
**************************************************************************/
static inline int
spoe_start_processing(struct spoe_agent *agent, struct spoe_context *ctx, int dir)
{
/* If a process is already started for this SPOE context, retry
* later. */
if (ctx->flags & SPOE_CTX_FL_PROCESS)
return 0;
agent->rt[tid].processing++;
ctx->stats.tv_start = now;
ctx->stats.tv_request = now;
ctx->stats.t_request = -1;
ctx->stats.t_queue = -1;
ctx->stats.t_waiting = -1;
ctx->stats.t_response = -1;
ctx->stats.t_process = -1;
ctx->status_code = 0;
/* Set the right flag to prevent request and response processing
* in same time. */
ctx->flags |= ((dir == SMP_OPT_DIR_REQ)
? SPOE_CTX_FL_REQ_PROCESS
: SPOE_CTX_FL_RSP_PROCESS);
return 1;
}
static inline void
spoe_stop_processing(struct spoe_agent *agent, struct spoe_context *ctx)
{
struct spoe_appctx *sa = ctx->spoe_appctx;
if (!(ctx->flags & SPOE_CTX_FL_PROCESS))
return;
if (sa) {
if (sa->frag_ctx.ctx == ctx) {
sa->frag_ctx.ctx = NULL;
spoe_wakeup_appctx(sa->owner);
}
else
sa->cur_fpa--;
}
/* Reset the flag to allow next processing */
agent->rt[tid].processing--;
ctx->flags &= ~(SPOE_CTX_FL_PROCESS|SPOE_CTX_FL_FRAGMENTED);
/* Reset processing timer */
ctx->process_exp = TICK_ETERNITY;
spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait);
ctx->spoe_appctx = NULL;
ctx->frag_ctx.curmsg = NULL;
ctx->frag_ctx.curarg = NULL;
ctx->frag_ctx.curoff = 0;
ctx->frag_ctx.flags = 0;
if (!LIST_ISEMPTY(&ctx->list)) {
LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list);
}
}
static void
spoe_update_stats(struct stream *s, struct spoe_agent *agent,
struct spoe_context *ctx, int dir)
@ -2586,6 +2536,76 @@ spoe_handle_processing_error(struct stream *s, struct spoe_agent *agent,
: SPOE_CTX_ST_NONE);
}
static inline int
spoe_start_processing(struct spoe_agent *agent, struct spoe_context *ctx, int dir)
{
/* If a process is already started for this SPOE context, retry
* later. */
if (ctx->flags & SPOE_CTX_FL_PROCESS)
return 0;
agent->rt[tid].processing++;
ctx->stats.tv_start = now;
ctx->stats.tv_request = now;
ctx->stats.t_request = -1;
ctx->stats.t_queue = -1;
ctx->stats.t_waiting = -1;
ctx->stats.t_response = -1;
ctx->stats.t_process = -1;
ctx->status_code = 0;
/* Set the right flag to prevent request and response processing
* in same time. */
ctx->flags |= ((dir == SMP_OPT_DIR_REQ)
? SPOE_CTX_FL_REQ_PROCESS
: SPOE_CTX_FL_RSP_PROCESS);
return 1;
}
static inline void
spoe_stop_processing(struct spoe_agent *agent, struct spoe_context *ctx)
{
struct spoe_appctx *sa = ctx->spoe_appctx;
if (!(ctx->flags & SPOE_CTX_FL_PROCESS))
return;
HA_ATOMIC_ADD(&agent->counters.nb_processed, 1);
if (sa) {
if (sa->frag_ctx.ctx == ctx) {
sa->frag_ctx.ctx = NULL;
spoe_wakeup_appctx(sa->owner);
}
else
sa->cur_fpa--;
}
/* Reset the flag to allow next processing */
agent->rt[tid].processing--;
ctx->flags &= ~(SPOE_CTX_FL_PROCESS|SPOE_CTX_FL_FRAGMENTED);
/* Reset processing timer */
ctx->process_exp = TICK_ETERNITY;
spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait);
ctx->spoe_appctx = NULL;
ctx->frag_ctx.curmsg = NULL;
ctx->frag_ctx.curarg = NULL;
ctx->frag_ctx.curoff = 0;
ctx->frag_ctx.flags = 0;
if (!LIST_ISEMPTY(&ctx->list)) {
if (ctx->state == SPOE_CTX_ST_SENDING_MSGS)
HA_ATOMIC_ADD(&agent->counters.nb_sending, 1);
else
HA_ATOMIC_SUB(&agent->counters.nb_waiting, 1);
LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list);
}
}
/* Process a list of SPOE messages. First, this functions will process messages
* and send them to an agent in a NOTIFY frame. Then, it will wait a ACK frame
* to process corresponding actions. During all the processing, it returns 0
@ -2600,7 +2620,7 @@ spoe_process_messages(struct stream *s, struct spoe_context *ctx,
int ret = 1;
if (ctx->state == SPOE_CTX_ST_ERROR)
goto error;
goto end;
if (tick_is_expired(ctx->process_exp, now_ms) && ctx->state != SPOE_CTX_ST_DONE) {
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
@ -2608,7 +2628,7 @@ spoe_process_messages(struct stream *s, struct spoe_context *ctx,
(int)now.tv_sec, (int)now.tv_usec,
agent->id, __FUNCTION__, s);
ctx->status_code = SPOE_CTX_ERR_TOUT;
goto error;
goto end;
}
if (ctx->state == SPOE_CTX_ST_READY) {
@ -2642,11 +2662,11 @@ spoe_process_messages(struct stream *s, struct spoe_context *ctx,
goto out;
ret = spoe_encode_messages(s, ctx, messages, dir, type);
if (ret < 0)
goto error;
goto end;
if (!ret)
goto skip;
if (spoe_queue_context(ctx) < 0)
goto error;
goto end;
ctx->state = SPOE_CTX_ST_SENDING_MSGS;
}
@ -2674,19 +2694,20 @@ spoe_process_messages(struct stream *s, struct spoe_context *ctx,
out:
return ret;
error:
spoe_handle_processing_error(s, agent, ctx, dir);
ret = 1;
goto end;
skip:
tv_zero(&ctx->stats.tv_start);
ctx->state = SPOE_CTX_ST_READY;
ret = 1;
spoe_stop_processing(agent, ctx);
return 1;
end:
spoe_update_stats(s, agent, ctx, dir);
spoe_stop_processing(agent, ctx);
if (ctx->status_code) {
HA_ATOMIC_ADD(&agent->counters.nb_errors, 1);
spoe_handle_processing_error(s, agent, ctx, dir);
ret = 1;
}
return ret;
}
@ -2712,16 +2733,24 @@ spoe_process_group(struct stream *s, struct spoe_context *ctx,
ret = spoe_process_messages(s, ctx, &group->messages, dir, SPOE_MSGS_BY_GROUP);
if (ret && ctx->stats.t_process != -1) {
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
" - <GROUP:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld\n",
" - <GROUP:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld %u/%u %u/%u %llu/%llu %u/%u\n",
(int)now.tv_sec, (int)now.tv_usec, agent->id,
__FUNCTION__, s, s->uniq_id, ctx->status_code,
__FUNCTION__, s, group->id, s->uniq_id, ctx->status_code,
ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
ctx->stats.t_response, ctx->stats.t_process);
send_log(&conf->agent_fe, (!ctx->status_code ? LOG_NOTICE : LOG_WARNING),
"SPOE: [%s] <GROUP:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld\n",
agent->id, group->id, s->uniq_id, ctx->status_code,
ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
ctx->stats.t_response, ctx->stats.t_process);
ctx->stats.t_response, ctx->stats.t_process,
agent->counters.idles, agent->counters.applets,
agent->counters.nb_sending, agent->counters.nb_waiting,
agent->counters.nb_errors, agent->counters.nb_processed,
agent->rt[tid].processing, read_freq_ctr(&agent->rt[tid].processing_per_sec));
if (ctx->status_code || !(conf->agent_fe.options2 & PR_O2_NOLOGNORM))
send_log(&conf->agent_fe, (!ctx->status_code ? LOG_NOTICE : LOG_WARNING),
"SPOE: [%s] <GROUP:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld %u/%u %u/%u %llu/%llu\n",
agent->id, group->id, s->uniq_id, ctx->status_code,
ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
ctx->stats.t_response, ctx->stats.t_process,
agent->counters.idles, agent->counters.applets,
agent->counters.nb_sending, agent->counters.nb_waiting,
agent->counters.nb_errors, agent->counters.nb_processed);
}
return ret;
}
@ -2750,16 +2779,24 @@ spoe_process_event(struct stream *s, struct spoe_context *ctx,
ret = spoe_process_messages(s, ctx, &(ctx->events[ev]), dir, SPOE_MSGS_BY_EVENT);
if (ret && ctx->stats.t_process != -1) {
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
" - <EVENT:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld\n",
" - <EVENT:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld %u/%u %u/%u %llu/%llu %u/%u\n",
(int)now.tv_sec, (int)now.tv_usec, agent->id,
__FUNCTION__, s, spoe_event_str[ev], s->uniq_id, ctx->status_code,
ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
ctx->stats.t_response, ctx->stats.t_process);
send_log(&conf->agent_fe, (!ctx->status_code ? LOG_NOTICE : LOG_WARNING),
"SPOE: [%s] <EVENT:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld\n",
agent->id, spoe_event_str[ev], s->uniq_id, ctx->status_code,
ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
ctx->stats.t_response, ctx->stats.t_process);
ctx->stats.t_response, ctx->stats.t_process,
agent->counters.idles, agent->counters.applets,
agent->counters.nb_sending, agent->counters.nb_waiting,
agent->counters.nb_errors, agent->counters.nb_processed,
agent->rt[tid].processing, read_freq_ctr(&agent->rt[tid].processing_per_sec));
if (ctx->status_code || !(conf->agent_fe.options2 & PR_O2_NOLOGNORM))
send_log(&conf->agent_fe, (!ctx->status_code ? LOG_NOTICE : LOG_WARNING),
"SPOE: [%s] <EVENT:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld %u/%u %u/%u %llu/%llu\n",
agent->id, spoe_event_str[ev], s->uniq_id, ctx->status_code,
ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
ctx->stats.t_response, ctx->stats.t_process,
agent->counters.idles, agent->counters.applets,
agent->counters.nb_sending, agent->counters.nb_waiting,
agent->counters.nb_errors, agent->counters.nb_processed);
}
return ret;
}
@ -4451,8 +4488,8 @@ spoe_send_group(struct act_rule *rule, struct proxy *px,
(int)now.tv_sec, (int)now.tv_usec,
agent->id, __FUNCTION__, s, group->id);
ctx->status_code = SPOE_CTX_ERR_INTERRUPT;
spoe_handle_processing_error(s, agent, ctx, dir);
spoe_stop_processing(agent, ctx);
spoe_handle_processing_error(s, agent, ctx, dir);
return ACT_RET_CONT;
}
return ACT_RET_YIELD;