MEDIUM: ring: implement a wait mode for watchers
Now it is possible for a reader to subscribe and wait for new events sent to a ring buffer. When new events are written to a ring buffer, the applets that are subscribed are woken up to display new events. For now we only support this with the CLI applet called by "show events" since the I/O handler is indeed a CLI I/O handler. But it's not complicated to add other mechanisms to consume events and forward them to external log servers for example. The wait mode is enabled by adding "-w" after "show events <sink>". An extra "-n" was added to directly seek to new events only.
This commit is contained in:
parent
70b1e50feb
commit
1d181e489c
@ -1964,10 +1964,16 @@ show errors [<iid>|<proxy>] [request|response]
|
|||||||
is the slash ('/') in header name "header/bizarre", which is not a valid
|
is the slash ('/') in header name "header/bizarre", which is not a valid
|
||||||
HTTP character for a header name.
|
HTTP character for a header name.
|
||||||
|
|
||||||
show events [<sink>]
|
show events [<sink>] [-w] [-n]
|
||||||
With no option, this lists all known event sinks and their types. With an
|
With no option, this lists all known event sinks and their types. With an
|
||||||
option, it will dump all available events in the designated sink if it is of
|
option, it will dump all available events in the designated sink if it is of
|
||||||
type buffer.
|
type buffer. If option "-w" is passed after the sink name, then once the end
|
||||||
|
of the buffer is reached, the command will wait for new events and display
|
||||||
|
them. It is possible to stop the operation by entering any input (which will
|
||||||
|
be discarded) or by closing the session. Finally, option "-n" is used to
|
||||||
|
directly seek to the end of the buffer, which is often convenient when
|
||||||
|
combined with "-w" to only report new events. For convenience, "-wn" or "-nw"
|
||||||
|
may be used to enable both options at once.
|
||||||
|
|
||||||
show fd [<fd>]
|
show fd [<fd>]
|
||||||
Dump the list of either all open file descriptors or just the one number <fd>
|
Dump the list of either all open file descriptors or just the one number <fd>
|
||||||
|
@ -96,6 +96,7 @@
|
|||||||
struct ring {
|
struct ring {
|
||||||
struct buffer buf; // storage area
|
struct buffer buf; // storage area
|
||||||
size_t ofs; // absolute offset in history of the buffer's head
|
size_t ofs; // absolute offset in history of the buffer's head
|
||||||
|
struct list waiters; // list of waiters, for now, CLI "show event"
|
||||||
__decl_hathreads(HA_RWLOCK_T lock);
|
__decl_hathreads(HA_RWLOCK_T lock);
|
||||||
int readers_count;
|
int readers_count;
|
||||||
};
|
};
|
||||||
|
43
src/ring.c
43
src/ring.c
@ -48,6 +48,7 @@ struct ring *ring_new(size_t size)
|
|||||||
goto fail;
|
goto fail;
|
||||||
|
|
||||||
HA_RWLOCK_INIT(&ring->lock);
|
HA_RWLOCK_INIT(&ring->lock);
|
||||||
|
LIST_INIT(&ring->waiters);
|
||||||
ring->readers_count = 0;
|
ring->readers_count = 0;
|
||||||
ring->ofs = 0;
|
ring->ofs = 0;
|
||||||
ring->buf = b_make(area, size, 0, 0);
|
ring->buf = b_make(area, size, 0, 0);
|
||||||
@ -113,6 +114,7 @@ void ring_free(struct ring *ring)
|
|||||||
ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg)
|
ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg)
|
||||||
{
|
{
|
||||||
struct buffer *buf = &ring->buf;
|
struct buffer *buf = &ring->buf;
|
||||||
|
struct appctx *appctx;
|
||||||
size_t totlen = 0;
|
size_t totlen = 0;
|
||||||
size_t lenlen;
|
size_t lenlen;
|
||||||
size_t dellen;
|
size_t dellen;
|
||||||
@ -187,6 +189,11 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
|
|||||||
|
|
||||||
*b_tail(buf) = 0; buf->data++;; // new read counter
|
*b_tail(buf) = 0; buf->data++;; // new read counter
|
||||||
sent = lenlen + totlen + 1;
|
sent = lenlen + totlen + 1;
|
||||||
|
|
||||||
|
/* notify potential readers */
|
||||||
|
list_for_each_entry(appctx, &ring->waiters, ctx.cli.l0)
|
||||||
|
appctx_wakeup(appctx);
|
||||||
|
|
||||||
done_buf:
|
done_buf:
|
||||||
HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
|
HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
|
||||||
return sent;
|
return sent;
|
||||||
@ -216,9 +223,12 @@ int ring_attach_cli(struct ring *ring, struct appctx *appctx)
|
|||||||
|
|
||||||
/* This function dumps all events from the ring whose pointer is in <p0> into
|
/* This function dumps all events from the ring whose pointer is in <p0> into
|
||||||
* the appctx's output buffer, and takes from <o0> the seek offset into the
|
* the appctx's output buffer, and takes from <o0> the seek offset into the
|
||||||
* buffer's history (0 for oldest known event). It returns 0 if the output
|
* buffer's history (0 for oldest known event). It looks at <i0> for boolean
|
||||||
* buffer is full and it needs to be called again, otherwise non-zero. It is
|
* options: bit0 means it must wait for new data or any key to be pressed. Bit1
|
||||||
* meant to be used with cli_release_show_ring() to clean up.
|
* means it must seek directly to the end to wait for new contents. It returns
|
||||||
|
* 0 if the output buffer or events are missing is full and it needs to be
|
||||||
|
* called again, otherwise non-zero. It is meant to be used with
|
||||||
|
* cli_release_show_ring() to clean up.
|
||||||
*/
|
*/
|
||||||
int cli_io_handler_show_ring(struct appctx *appctx)
|
int cli_io_handler_show_ring(struct appctx *appctx)
|
||||||
{
|
{
|
||||||
@ -235,6 +245,8 @@ int cli_io_handler_show_ring(struct appctx *appctx)
|
|||||||
|
|
||||||
HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
|
HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
|
||||||
|
|
||||||
|
LIST_DEL_INIT(&appctx->ctx.cli.l0);
|
||||||
|
|
||||||
/* explanation for the initialization below: it would be better to do
|
/* explanation for the initialization below: it would be better to do
|
||||||
* this in the parsing function but this would occasionally result in
|
* this in the parsing function but this would occasionally result in
|
||||||
* dropped events because we'd take a reference on the oldest message
|
* dropped events because we'd take a reference on the oldest message
|
||||||
@ -244,8 +256,14 @@ int cli_io_handler_show_ring(struct appctx *appctx)
|
|||||||
* value cannot be produced after initialization.
|
* value cannot be produced after initialization.
|
||||||
*/
|
*/
|
||||||
if (unlikely(ofs == ~0)) {
|
if (unlikely(ofs == ~0)) {
|
||||||
HA_ATOMIC_ADD(b_head(buf), 1);
|
ofs = 0;
|
||||||
ofs = ring->ofs;
|
|
||||||
|
/* going to the end means looking at tail-1 */
|
||||||
|
if (appctx->ctx.cli.i0 & 2)
|
||||||
|
ofs += b_data(buf) - 1;
|
||||||
|
|
||||||
|
HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
|
||||||
|
ofs += ring->ofs;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* we were already there, adjust the offset to be relative to
|
/* we were already there, adjust the offset to be relative to
|
||||||
@ -291,6 +309,20 @@ int cli_io_handler_show_ring(struct appctx *appctx)
|
|||||||
ofs += ring->ofs;
|
ofs += ring->ofs;
|
||||||
appctx->ctx.cli.o0 = ofs;
|
appctx->ctx.cli.o0 = ofs;
|
||||||
HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
|
HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
|
||||||
|
|
||||||
|
if (ret && (appctx->ctx.cli.i0 & 1)) {
|
||||||
|
/* we've drained everything and are configured to wait for more
|
||||||
|
* data or an event (keypress, close)
|
||||||
|
*/
|
||||||
|
if (!si_oc(si)->output && !(si_oc(si)->flags & CF_SHUTW)) {
|
||||||
|
/* let's be woken up once new data arrive */
|
||||||
|
LIST_ADDQ(&ring->waiters, &appctx->ctx.cli.l0);
|
||||||
|
si_rx_endp_done(si);
|
||||||
|
ret = 0;
|
||||||
|
}
|
||||||
|
/* always drain all the request */
|
||||||
|
co_skip(si_oc(si), si_oc(si)->output);
|
||||||
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -308,6 +340,7 @@ void cli_io_release_show_ring(struct appctx *appctx)
|
|||||||
/* reader was still attached */
|
/* reader was still attached */
|
||||||
ofs -= ring->ofs;
|
ofs -= ring->ofs;
|
||||||
BUG_ON(ofs >= b_size(&ring->buf));
|
BUG_ON(ofs >= b_size(&ring->buf));
|
||||||
|
LIST_DEL_INIT(&appctx->ctx.cli.l0);
|
||||||
HA_ATOMIC_SUB(b_peek(&ring->buf, ofs), 1);
|
HA_ATOMIC_SUB(b_peek(&ring->buf, ofs), 1);
|
||||||
}
|
}
|
||||||
HA_ATOMIC_SUB(&ring->readers_count, 1);
|
HA_ATOMIC_SUB(&ring->readers_count, 1);
|
||||||
|
13
src/sink.c
13
src/sink.c
@ -203,12 +203,13 @@ int sink_announce_dropped(struct sink *sink)
|
|||||||
static int cli_parse_show_events(char **args, char *payload, struct appctx *appctx, void *private)
|
static int cli_parse_show_events(char **args, char *payload, struct appctx *appctx, void *private)
|
||||||
{
|
{
|
||||||
struct sink *sink;
|
struct sink *sink;
|
||||||
|
int arg;
|
||||||
|
|
||||||
args++; // make args[1] the 1st arg
|
args++; // make args[1] the 1st arg
|
||||||
|
|
||||||
if (!*args[1]) {
|
if (!*args[1]) {
|
||||||
/* no arg => report the list of supported sink */
|
/* no arg => report the list of supported sink */
|
||||||
chunk_printf(&trash, "Supported events sinks:\n");
|
chunk_printf(&trash, "Supported events sinks are listed below. Add -w(wait), -n(new). Any key to stop\n");
|
||||||
list_for_each_entry(sink, &sink_list, sink_list) {
|
list_for_each_entry(sink, &sink_list, sink_list) {
|
||||||
chunk_appendf(&trash, " %-10s : type=%s, %u dropped, %s\n",
|
chunk_appendf(&trash, " %-10s : type=%s, %u dropped, %s\n",
|
||||||
sink->name,
|
sink->name,
|
||||||
@ -232,6 +233,16 @@ static int cli_parse_show_events(char **args, char *payload, struct appctx *appc
|
|||||||
if (sink->type != SINK_TYPE_BUFFER)
|
if (sink->type != SINK_TYPE_BUFFER)
|
||||||
return cli_msg(appctx, LOG_NOTICE, "Nothing to report for this sink");
|
return cli_msg(appctx, LOG_NOTICE, "Nothing to report for this sink");
|
||||||
|
|
||||||
|
for (arg = 2; *args[arg]; arg++) {
|
||||||
|
if (strcmp(args[arg], "-w") == 0)
|
||||||
|
appctx->ctx.cli.i0 |= 1; // wait mode
|
||||||
|
else if (strcmp(args[arg], "-n") == 0)
|
||||||
|
appctx->ctx.cli.i0 |= 2; // seek to new
|
||||||
|
else if (strcmp(args[arg], "-nw") == 0 || strcmp(args[arg], "-wn") == 0)
|
||||||
|
appctx->ctx.cli.i0 |= 3; // seek to new + wait
|
||||||
|
else
|
||||||
|
return cli_err(appctx, "unknown option");
|
||||||
|
}
|
||||||
return ring_attach_cli(sink->ctx.ring, appctx);
|
return ring_attach_cli(sink->ctx.ring, appctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user