MEDIUM: ring: change the ring reader to use the new vector-based API now

The code now looks cleaner and more easily shows what still needs to be
addressed. There are not that many changes in practice, these are mostly
mechanical, essentially hiding the buffer from the callers.
This commit is contained in:
Willy Tarreau 2024-02-27 07:58:26 +01:00
parent 4e6fadb8a1
commit 01aa0a057c
6 changed files with 69 additions and 49 deletions

View File

@ -58,7 +58,7 @@ size_t appctx_raw_snd_buf(struct appctx *appctx, struct buffer *buf, size_t coun
size_t appctx_snd_buf(struct stconn *sc, struct buffer *buf, size_t count, unsigned int flags);
int appctx_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags);
ssize_t applet_append_line(void *ctx, const struct buffer *buf, size_t ofs, size_t len);
ssize_t applet_append_line(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len);
static inline struct appctx *appctx_new_here(struct applet *applet, struct sedesc *sedesc)
{

View File

@ -87,7 +87,7 @@ void app_log(struct list *loggers, struct buffer *tag, int level, const char *fo
*/
int add_to_logformat_list(char *start, char *end, int type, struct list *list_format, char **err);
ssize_t syslog_applet_append_event(void *ctx, const struct buffer *buf, size_t ofs, size_t len);
ssize_t syslog_applet_append_event(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len);
/*
* Parse the log_format string and fill a linked list.

View File

@ -42,7 +42,7 @@ void cli_io_release_show_ring(struct appctx *appctx);
size_t ring_max_payload(const struct ring *ring);
int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t *last_ofs_ptr, uint flags,
ssize_t (*msg_handler)(void *ctx, const struct buffer *buf, size_t ofs, size_t len));
ssize_t (*msg_handler)(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len));
/* returns the ring storage's area */
static inline void *ring_area(const struct ring *ring)

View File

@ -24,6 +24,7 @@
#include <haproxy/stream.h>
#include <haproxy/task.h>
#include <haproxy/trace.h>
#include <haproxy/vecpair.h>
#include <haproxy/xref.h>
unsigned int nb_applets = 0;
@ -725,15 +726,15 @@ end:
return ret;
}
/* Atomically append a line to applet <ctx>'s output, appending a trailing 'LF'.
* The line is read from <buf> at offset <ofs> relative to the buffer's origin,
* for <len> bytes. It returns the number of bytes consumed from the input
* buffer on success, -1 if it temporarily cannot (buffer full), -2 if it will
* never be able to (too large msg). The input buffer is not modified. The
* caller is responsible for making sure that there are at least ofs+len bytes
* in the input buffer.
/* Atomically append a line to applet <ctx>'s output, appending a trailing LF.
* The line is read from vectors <v1> and <v2> at offset <ofs> relative to the
* area's origin, for <len> bytes. It returns the number of bytes consumed from
* the input vectors on success, -1 if it temporarily cannot (buffer full), -2
* if it will never be able to (too large msg). The vectors are not modified.
* The caller is responsible for making sure that there are at least ofs+len
* bytes in the input vectors.
*/
ssize_t applet_append_line(void *ctx, const struct buffer *buf, size_t ofs, size_t len)
ssize_t applet_append_line(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len)
{
struct appctx *appctx = ctx;
@ -743,7 +744,7 @@ ssize_t applet_append_line(void *ctx, const struct buffer *buf, size_t ofs, size
}
chunk_reset(&trash);
b_getblk_ofs(buf, trash.area, len, ofs);
vp_peek_ofs(v1, v2, ofs, trash.area, len);
trash.data += len;
trash.area[trash.data++] = '\n';
if (applet_putchk(appctx, &trash) == -1)

View File

@ -45,6 +45,7 @@
#include <haproxy/time.h>
#include <haproxy/hash.h>
#include <haproxy/tools.h>
#include <haproxy/vecpair.h>
/* global recv logs counter */
int cum_log_messages;
@ -4346,15 +4347,14 @@ static struct applet syslog_applet = {
};
/* Atomically append an event to applet >ctx>'s output, prepending it with its
* size in decimal followed by a space.
* The line is read from <buf> at offset <ofs> relative to the buffer's origin,
* for <len> bytes. It returns the number of bytes consumed from the input
* buffer on success, -1 if it temporarily cannot (buffer full), -2 if it will
* never be able to (too large msg). The input buffer is not modified. The
* caller is responsible for making sure that there are at least ofs+len bytes
* in the input buffer.
* size in decimal followed by a space. The line is read from vectors <v1> and
* <v2> at offset <ofs> relative to the area's origin, for <len> bytes. It
* returns the number of bytes consumed from the input vectors on success, -1
* if it temporarily cannot (buffer full), -2 if it will never be able to (too
* large msg). The input vectors are not modified. The caller is responsible for
* making sure that there are at least ofs+len bytes in the input buffer.
*/
ssize_t syslog_applet_append_event(void *ctx, const struct buffer *buf, size_t ofs, size_t len)
ssize_t syslog_applet_append_event(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len)
{
struct appctx *appctx = ctx;
char *p;
@ -4372,7 +4372,7 @@ ssize_t syslog_applet_append_event(void *ctx, const struct buffer *buf, size_t o
return -2;
/* try to transfer it or report full */
trash.data += b_getblk_ofs(buf, trash.area + trash.data, len, ofs);
trash.data += vp_peek_ofs(v1, v2, ofs, trash.area, len);
if (applet_putchk(appctx, &trash) == -1)
return -1;

View File

@ -325,14 +325,11 @@ void ring_detach_appctx(struct ring *ring, struct appctx *appctx, size_t ofs)
HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
if (ofs != ~0) {
/* reader was still attached */
if (ofs < ring_head(ring))
ofs += ring_size(ring) - ring_head(ring);
else
ofs -= ring_head(ring);
char *area = ring_area(ring);
BUG_ON(ofs >= ring_size(ring));
LIST_DEL_INIT(&appctx->wait_entry);
HA_ATOMIC_DEC(b_peek(&ring->storage->buf, ofs));
HA_ATOMIC_DEC(area + ofs);
}
HA_ATOMIC_DEC(&ring->readers_count);
HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
@ -374,18 +371,26 @@ int ring_attach_cli(struct ring *ring, struct appctx *appctx, uint flags)
* if it needs to pause, 1 once finished.
*/
int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t *last_ofs_ptr, uint flags,
ssize_t (*msg_handler)(void *ctx, const struct buffer *buf, size_t ofs, size_t len))
ssize_t (*msg_handler)(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len))
{
struct buffer *buf = &ring->storage->buf;
size_t head_ofs, tail_ofs;
size_t ring_size;
char *ring_area;
struct ist v1, v2;
uint64_t msg_len;
ssize_t copied;
size_t len, cnt;
size_t ofs; /* absolute offset from the buffer's origin */
size_t pos; /* relative position from head (0..data-1) */
ssize_t copied;
int ret;
ring_area = b_orig(buf);
ring_size = b_size(buf);
HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock);
head_ofs = b_head_ofs(buf);
tail_ofs = b_tail_ofs(buf);
/* explanation for the initialization below: it would be better to do
* this in the parsing function but this would occasionally result in
* dropped events because we'd take a reference on the oldest message
@ -395,37 +400,49 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t
* value cannot be produced after initialization.
*/
if (unlikely(*ofs_ptr == ~0)) {
/* going to the end means looking at tail-1 */
*ofs_ptr = b_peek_ofs(buf, (flags & RING_WF_SEEK_NEW) ? b_data(buf) - 1 : 0);
HA_ATOMIC_INC(b_orig(buf) + *ofs_ptr);
if (flags & RING_WF_SEEK_NEW) {
/* going to the end means looking at tail-1 */
head_ofs = tail_ofs + ring_size - 1;
if (head_ofs >= ring_size)
head_ofs -= ring_size;
}
/* make ctx->ofs relative to the beginning of the buffer now */
*ofs_ptr = head_ofs;
/* and reserve our slot here */
HA_ATOMIC_INC(ring_area + head_ofs);
}
ofs = *ofs_ptr;
BUG_ON(ofs >= buf->size);
HA_ATOMIC_DEC(b_orig(buf) + ofs);
/* we have the guarantee we can restart from our own head */
head_ofs = *ofs_ptr;
BUG_ON(head_ofs >= ring_size);
/* in this loop, ofs always points to the counter byte that precedes
HA_ATOMIC_DEC(ring_area + head_ofs);
/* in this loop, head_ofs always points to the counter byte that precedes
* the message so that we can take our reference there if we have to
* stop before the end (ret=0).
* stop before the end (ret=0). The reference is relative to the ring's
* origin, while pos is relative to the ring's head.
*/
ret = 1;
while (1) {
/* relative position in the buffer */
pos = b_rel_ofs(buf, ofs);
vp_ring_to_data(&v1, &v2, ring_area, ring_size, head_ofs, tail_ofs);
if (pos + 1 >= b_data(buf)) {
while (1) {
if (vp_size(v1, v2) <= 1) {
/* no more data */
break;
}
cnt = 1;
len = b_peek_varint(buf, pos + cnt, &msg_len);
len = vp_peek_varint_ofs(v1, v2, cnt, &msg_len);
if (!len)
break;
cnt += len;
BUG_ON(msg_len + pos + cnt + 1 > b_data(buf));
copied = msg_handler(ctx, buf, ofs + cnt, msg_len);
BUG_ON(msg_len + cnt + 1 > vp_size(v1, v2));
copied = msg_handler(ctx, v1, v2, cnt, msg_len);
if (copied == -2) {
/* too large a message to ever fit, let's skip it */
goto skip;
@ -436,13 +453,15 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t
break;
}
skip:
ofs = b_add_ofs(buf, ofs, cnt + msg_len);
vp_skip(&v1, &v2, cnt + msg_len);
}
HA_ATOMIC_INC(b_orig(buf) + ofs);
vp_data_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs);
HA_ATOMIC_INC(ring_area + head_ofs);
if (last_ofs_ptr)
*last_ofs_ptr = b_tail_ofs(buf);
*ofs_ptr = ofs;
*last_ofs_ptr = tail_ofs;
*ofs_ptr = head_ofs;
HA_RWLOCK_RDUNLOCK(RING_LOCK, &ring->lock);
return ret;
}