MEDIUM: ring: make the offset relative to the head/tail instead of absolute
The ring's offset currently contains a perpetually growing custor which is the number of bytes written from the start. It's used by readers to know where to (re)start reading from. It was made absolute because both the head and the tail can change during writes and we needed a fixed position to know where the reader was attached. But this is complicated, error-prone, and limits the ability to reduce the lock's coverage. In fact what is needed is to know where the reader is currently waiting, if at all. And this location is exactly where it stored its count, so the absolute position in the buffer (the seek offset from the first storage byte) does represent exactly this, as it doesn't move (we don't realign the buffer), and is stable regardless of how head/tail changes with writes. This patch modifies this so that the application code now uses this representation instead. The most noticeable change is the initialization, where we've kept ~0 as a marker to go to the end, and it's now set to the tail offset instead of trying to resolve the current write offset against the current ring's position. The offset was also used at the end of the consuming loop, to detect if a new write had happened between the lock being released and taken again, so as to wake the consumer(s) up again. For this we used to take a copy of the ring->ofs before unlocking and comparing with the new value read in the next lock. Since it's not possible to write past the current reader's location, there's no risk of complete rollover, so it's sufficient to check if the tail has changed. Note that the change also has an impact on the haring consumer which needs to adapt as well. But that's good in fact because it will rely on one less variable, and will use offsets relative to the buffer's head, and the change remains backward-compatible.
This commit is contained in:
parent
d0d85d2e36
commit
d9c7188633
@ -123,20 +123,14 @@ int dump_ring(struct ring *ring, size_t ofs, int flags)
|
||||
ofs = 0;
|
||||
|
||||
/* going to the end means looking at tail-1 */
|
||||
if (flags & RING_WF_SEEK_NEW)
|
||||
ofs += b_data(&buf) - 1;
|
||||
ofs = (flags & RING_WF_SEEK_NEW) ? buf.data - 1 : 0;
|
||||
|
||||
//HA_ATOMIC_INC(b_peek(&buf, ofs));
|
||||
ofs += ring->ofs;
|
||||
}
|
||||
|
||||
while (1) {
|
||||
//HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
|
||||
|
||||
/* we were already there, adjust the offset to be relative to
|
||||
* the buffer's head and remove us from the counter.
|
||||
*/
|
||||
ofs -= ring->ofs;
|
||||
if (ofs >= buf.size) {
|
||||
fprintf(stderr, "FATAL error at %d\n", __LINE__);
|
||||
return 1;
|
||||
@ -203,7 +197,6 @@ int dump_ring(struct ring *ring, size_t ofs, int flags)
|
||||
}
|
||||
|
||||
//HA_ATOMIC_INC(b_peek(&buf, ofs));
|
||||
ofs += ring->ofs;
|
||||
//HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
|
||||
|
||||
if (!(flags & RING_WF_WAIT_MODE))
|
||||
|
@ -330,13 +330,11 @@ static void dns_resolve_send(struct dgram_conn *dgram)
|
||||
if (unlikely(ofs == ~0)) {
|
||||
ofs = 0;
|
||||
HA_ATOMIC_INC(b_peek(buf, ofs));
|
||||
ofs += ring->ofs;
|
||||
}
|
||||
|
||||
/* we were already there, adjust the offset to be relative to
|
||||
* the buffer's head and remove us from the counter.
|
||||
*/
|
||||
ofs -= ring->ofs;
|
||||
BUG_ON(ofs >= buf->size);
|
||||
HA_ATOMIC_DEC(b_peek(buf, ofs));
|
||||
|
||||
@ -380,7 +378,6 @@ static void dns_resolve_send(struct dgram_conn *dgram)
|
||||
out:
|
||||
|
||||
HA_ATOMIC_INC(b_peek(buf, ofs));
|
||||
ofs += ring->ofs;
|
||||
ns->dgram->ofs_req = ofs;
|
||||
HA_RWLOCK_RDUNLOCK(DNS_LOCK, &ring->lock);
|
||||
HA_SPIN_UNLOCK(DNS_LOCK, &dgram->lock);
|
||||
@ -498,7 +495,6 @@ static void dns_session_io_handler(struct appctx *appctx)
|
||||
ofs = 0;
|
||||
|
||||
HA_ATOMIC_INC(b_peek(buf, ofs));
|
||||
ofs += ring->ofs;
|
||||
}
|
||||
|
||||
/* in this loop, ofs always points to the counter byte that precedes
|
||||
@ -509,7 +505,6 @@ static void dns_session_io_handler(struct appctx *appctx)
|
||||
/* we were already there, adjust the offset to be relative to
|
||||
* the buffer's head and remove us from the counter.
|
||||
*/
|
||||
ofs -= ring->ofs;
|
||||
BUG_ON(ofs >= buf->size);
|
||||
HA_ATOMIC_DEC(b_peek(buf, ofs));
|
||||
|
||||
@ -637,7 +632,6 @@ static void dns_session_io_handler(struct appctx *appctx)
|
||||
}
|
||||
|
||||
HA_ATOMIC_INC(b_peek(buf, ofs));
|
||||
ofs += ring->ofs;
|
||||
ds->ofs = ofs;
|
||||
}
|
||||
HA_RWLOCK_RDUNLOCK(DNS_LOCK, &ring->lock);
|
||||
@ -1129,13 +1123,11 @@ static struct task *dns_process_req(struct task *t, void *context, unsigned int
|
||||
if (unlikely(ofs == ~0)) {
|
||||
ofs = 0;
|
||||
HA_ATOMIC_INC(b_peek(buf, ofs));
|
||||
ofs += ring->ofs;
|
||||
}
|
||||
|
||||
/* we were already there, adjust the offset to be relative to
|
||||
* the buffer's head and remove us from the counter.
|
||||
*/
|
||||
ofs -= ring->ofs;
|
||||
BUG_ON(ofs >= buf->size);
|
||||
HA_ATOMIC_DEC(b_peek(buf, ofs));
|
||||
|
||||
@ -1224,7 +1216,6 @@ static struct task *dns_process_req(struct task *t, void *context, unsigned int
|
||||
}
|
||||
|
||||
HA_ATOMIC_INC(b_peek(buf, ofs));
|
||||
ofs += ring->ofs;
|
||||
dss->ofs_req = ofs;
|
||||
HA_RWLOCK_RDUNLOCK(DNS_LOCK, &ring->lock);
|
||||
|
||||
|
38
src/ring.c
38
src/ring.c
@ -31,7 +31,7 @@
|
||||
/* context used to dump the contents of a ring via "show events" or "show errors" */
|
||||
struct show_ring_ctx {
|
||||
struct ring *ring; /* ring to be dumped */
|
||||
size_t ofs; /* offset to restart from, ~0 = end */
|
||||
size_t ofs; /* storage offset to restart from; ~0=oldest */
|
||||
uint flags; /* set of RING_WF_* */
|
||||
};
|
||||
|
||||
@ -278,8 +278,9 @@ int ring_attach(struct ring *ring)
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* detach an appctx from a ring. The appctx is expected to be waiting at
|
||||
* offset <ofs>. Nothing is done if <ring> is NULL.
|
||||
/* detach an appctx from a ring. The appctx is expected to be waiting at offset
|
||||
* <ofs> relative to the beginning of the storage, or ~0 if not waiting yet.
|
||||
* Nothing is done if <ring> is NULL.
|
||||
*/
|
||||
void ring_detach_appctx(struct ring *ring, struct appctx *appctx, size_t ofs)
|
||||
{
|
||||
@ -289,7 +290,11 @@ void ring_detach_appctx(struct ring *ring, struct appctx *appctx, size_t ofs)
|
||||
HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
|
||||
if (ofs != ~0) {
|
||||
/* reader was still attached */
|
||||
ofs -= ring->ofs;
|
||||
if (ofs < b_head_ofs(&ring->buf))
|
||||
ofs += b_size(&ring->buf) - b_head_ofs(&ring->buf);
|
||||
else
|
||||
ofs -= b_head_ofs(&ring->buf);
|
||||
|
||||
BUG_ON(ofs >= b_size(&ring->buf));
|
||||
LIST_DEL_INIT(&appctx->wait_entry);
|
||||
HA_ATOMIC_DEC(b_peek(&ring->buf, ofs));
|
||||
@ -340,7 +345,7 @@ int cli_io_handler_show_ring(struct appctx *appctx)
|
||||
struct stconn *sc = appctx_sc(appctx);
|
||||
struct ring *ring = ctx->ring;
|
||||
struct buffer *buf = &ring->buf;
|
||||
size_t ofs = ctx->ofs;
|
||||
size_t ofs;
|
||||
size_t last_ofs;
|
||||
uint64_t msg_len;
|
||||
size_t len, cnt;
|
||||
@ -363,21 +368,19 @@ int cli_io_handler_show_ring(struct appctx *appctx)
|
||||
* existing messages before grabbing a reference to a location. This
|
||||
* value cannot be produced after initialization.
|
||||
*/
|
||||
if (unlikely(ofs == ~0)) {
|
||||
ofs = 0;
|
||||
|
||||
if (unlikely(ctx->ofs == ~0)) {
|
||||
/* going to the end means looking at tail-1 */
|
||||
if (ctx->flags & RING_WF_SEEK_NEW)
|
||||
ofs += b_data(buf) - 1;
|
||||
|
||||
HA_ATOMIC_INC(b_peek(buf, ofs));
|
||||
ofs += ring->ofs;
|
||||
ctx->ofs = b_peek_ofs(buf, (ctx->flags & RING_WF_SEEK_NEW) ? b_data(buf) - 1 : 0);
|
||||
HA_ATOMIC_INC(b_orig(buf) + ctx->ofs);
|
||||
}
|
||||
|
||||
/* we were already there, adjust the offset to be relative to
|
||||
* the buffer's head and remove us from the counter.
|
||||
*/
|
||||
ofs -= ring->ofs;
|
||||
ofs = ctx->ofs - b_head_ofs(buf);
|
||||
if (ctx->ofs < b_head_ofs(buf))
|
||||
ofs += b_size(buf);
|
||||
|
||||
BUG_ON(ofs >= buf->size);
|
||||
HA_ATOMIC_DEC(b_peek(buf, ofs));
|
||||
|
||||
@ -413,9 +416,8 @@ int cli_io_handler_show_ring(struct appctx *appctx)
|
||||
}
|
||||
|
||||
HA_ATOMIC_INC(b_peek(buf, ofs));
|
||||
ofs += ring->ofs;
|
||||
last_ofs = ring->ofs;
|
||||
ctx->ofs = ofs;
|
||||
last_ofs = b_tail_ofs(buf);
|
||||
ctx->ofs = b_peek_ofs(buf, ofs);
|
||||
HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
|
||||
|
||||
if (ret && (ctx->flags & RING_WF_WAIT_MODE)) {
|
||||
@ -426,7 +428,7 @@ int cli_io_handler_show_ring(struct appctx *appctx)
|
||||
/* let's be woken up once new data arrive */
|
||||
HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
|
||||
LIST_APPEND(&ring->waiters, &appctx->wait_entry);
|
||||
ofs = ring->ofs;
|
||||
ofs = b_tail_ofs(&ring->buf);
|
||||
HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
|
||||
if (ofs != last_ofs) {
|
||||
/* more data was added into the ring between the
|
||||
|
12
src/sink.c
12
src/sink.c
@ -362,9 +362,7 @@ static void sink_forward_io_handler(struct appctx *appctx)
|
||||
*/
|
||||
if (unlikely(ofs == ~0)) {
|
||||
ofs = 0;
|
||||
|
||||
HA_ATOMIC_INC(b_peek(buf, ofs));
|
||||
ofs += ring->ofs;
|
||||
}
|
||||
|
||||
/* in this loop, ofs always points to the counter byte that precedes
|
||||
@ -375,7 +373,6 @@ static void sink_forward_io_handler(struct appctx *appctx)
|
||||
/* we were already there, adjust the offset to be relative to
|
||||
* the buffer's head and remove us from the counter.
|
||||
*/
|
||||
ofs -= ring->ofs;
|
||||
BUG_ON(ofs >= buf->size);
|
||||
HA_ATOMIC_DEC(b_peek(buf, ofs));
|
||||
|
||||
@ -407,9 +404,8 @@ static void sink_forward_io_handler(struct appctx *appctx)
|
||||
}
|
||||
|
||||
HA_ATOMIC_INC(b_peek(buf, ofs));
|
||||
ofs += ring->ofs;
|
||||
last_ofs = b_tail_ofs(buf);
|
||||
sft->ofs = ofs;
|
||||
last_ofs = ring->ofs;
|
||||
}
|
||||
HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
|
||||
|
||||
@ -417,7 +413,7 @@ static void sink_forward_io_handler(struct appctx *appctx)
|
||||
/* let's be woken up once new data arrive */
|
||||
HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
|
||||
LIST_APPEND(&ring->waiters, &appctx->wait_entry);
|
||||
ofs = ring->ofs;
|
||||
ofs = b_tail_ofs(buf);
|
||||
HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
|
||||
if (ofs != last_ofs) {
|
||||
/* more data was added into the ring between the
|
||||
@ -502,9 +498,7 @@ static void sink_forward_oc_io_handler(struct appctx *appctx)
|
||||
*/
|
||||
if (unlikely(ofs == ~0)) {
|
||||
ofs = 0;
|
||||
|
||||
HA_ATOMIC_INC(b_peek(buf, ofs));
|
||||
ofs += ring->ofs;
|
||||
}
|
||||
|
||||
/* in this loop, ofs always points to the counter byte that precedes
|
||||
@ -515,7 +509,6 @@ static void sink_forward_oc_io_handler(struct appctx *appctx)
|
||||
/* we were already there, adjust the offset to be relative to
|
||||
* the buffer's head and remove us from the counter.
|
||||
*/
|
||||
ofs -= ring->ofs;
|
||||
BUG_ON(ofs >= buf->size);
|
||||
HA_ATOMIC_DEC(b_peek(buf, ofs));
|
||||
|
||||
@ -551,7 +544,6 @@ static void sink_forward_oc_io_handler(struct appctx *appctx)
|
||||
}
|
||||
|
||||
HA_ATOMIC_INC(b_peek(buf, ofs));
|
||||
ofs += ring->ofs;
|
||||
sft->ofs = ofs;
|
||||
}
|
||||
HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
|
||||
|
Loading…
Reference in New Issue
Block a user