MEDIUM: ring: replace the buffer API in ring_write() with the vec<->ring API

This is the start of the replacement of the buffer API calls. Only the
ring_write() function was touched. Instead of manipulating a buffer all
along, we now extract the ring buffer's head and tail upon entry, store
them locally and use them using the vec<->ring API until the last moment
where we can update the buffer with the new values. One subtle point is
that we must never fill the buffer past the last byte otherwise the
vec-to-ring conversion gets lost and there's no more possibility to know
where's the beginning nor the end (just like when dealing with head+tail
in fact), because it then becomes impossible to distinguish between an
empty and a full buffer.
This commit is contained in:
Willy Tarreau 2024-02-26 11:03:03 +01:00
parent 4e6de42b27
commit 4e6fadb8a1

View File

@ -27,6 +27,7 @@
#include <haproxy/sc_strm.h>
#include <haproxy/stconn.h>
#include <haproxy/thread.h>
#include <haproxy/vecpair.h>
/* context used to dump the contents of a ring via "show events" or "show errors" */
struct show_ring_ctx {
@ -164,6 +165,10 @@ void ring_free(struct ring *ring)
ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg)
{
struct buffer *buf = &ring->storage->buf;
size_t head_ofs, tail_ofs;
size_t ring_size;
char *ring_area;
struct ist v1, v2;
struct appctx *appctx;
size_t msglen = 0;
size_t lenlen;
@ -195,14 +200,29 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
* - lenlen bytes for the size encoding
* - msglen for the message
* - one byte for the new marker
*
* Note that we'll also reserve one extra byte to make sure we never
* leave a full buffer (the vec-to-ring conversion cannot be done if
* both areas are of size 0).
*/
needed = lenlen + msglen + 1;
HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
if (needed + 1 > b_size(buf))
goto done_buf;
/* these ones do not change under us (only resize affects them and it
* must be done under thread isolation).
*/
ring_area = b_orig(buf);
ring_size = b_size(buf);
while (b_room(buf) < needed) {
HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
if (needed + 1 > ring_size)
goto leave;
head_ofs = b_head_ofs(buf);
tail_ofs = b_tail_ofs(buf);
vp_ring_to_data(&v1, &v2, ring_area, ring_size, head_ofs, tail_ofs);
while (vp_size(v1, v2) > ring_size - needed - 1 - 1) {
/* we need to delete the oldest message (from the end),
* and we have to stop if there's a reader stuck there.
* Unless there's corruption in the buffer it's guaranteed
@ -210,19 +230,28 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
* varint-encoded length (1 byte min) and the message
* payload (0 bytes min).
*/
if (*b_head(buf))
goto done_buf;
dellenlen = b_peek_varint(buf, 1, &dellen);
if (*_vp_head(v1, v2))
break;
dellenlen = vp_peek_varint_ofs(v1, v2, 1, &dellen);
if (!dellenlen)
goto done_buf;
BUG_ON(b_data(buf) < 1 + dellenlen + dellen);
b_del(buf, 1 + dellenlen + dellen);
break;
BUG_ON_HOT(vp_size(v1, v2) < 1 + dellenlen + dellen);
vp_skip(&v1, &v2, 1 + dellenlen + dellen);
}
/* OK now we do have room */
__b_put_varint(buf, msglen);
/* now let's update the buffer with the new head and size */
vp_data_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs);
if (vp_size(v1, v2) > ring_size - needed - 1 - 1)
goto done_update_buf;
/* now focus on free room */
vp_ring_to_room(&v1, &v2, ring_area, ring_size, head_ofs, tail_ofs);
/* let's write the message size */
vp_put_varint(&v1, &v2, msglen);
/* then write the messages */
msglen = 0;
for (i = 0; i < npfx; i++) {
size_t len = pfx[i].len;
@ -230,7 +259,7 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
if (len + msglen > maxlen)
len = maxlen - msglen;
if (len)
__b_putblk(buf, pfx[i].ptr, len);
vp_putblk(&v1, &v2, pfx[i].ptr, len);
msglen += len;
}
@ -240,19 +269,28 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
if (len + msglen > maxlen)
len = maxlen - msglen;
if (len)
__b_putblk(buf, msg[i].ptr, len);
vp_putblk(&v1, &v2, msg[i].ptr, len);
msglen += len;
}
*b_tail(buf) = 0; buf->data++; // new read counter
vp_putchr(&v1, &v2, 0); // new read counter
sent = lenlen + msglen + 1;
BUG_ON_HOT(sent != needed);
/* notify potential readers */
list_for_each_entry(appctx, &ring->waiters, wait_entry)
appctx_wakeup(appctx);
vp_room_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs);
done_buf:
done_update_buf:
/* update the new space in the buffer */
buf->head = head_ofs;
buf->data = ((tail_ofs >= head_ofs) ? 0 : ring_size) + tail_ofs - head_ofs;
/* notify potential readers */
if (sent) {
list_for_each_entry(appctx, &ring->waiters, wait_entry)
appctx_wakeup(appctx);
}
leave:
HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
return sent;
}