From 73b2436fe65c1245440d685e92a6f1c5103a7704 Mon Sep 17 00:00:00 2001 From: Willy Tarreau Date: Wed, 28 Feb 2024 09:20:54 +0100 Subject: [PATCH] MEDIUM: ring: lock the tail's readers counters before proceeding with the changes The goal here is to start to protect the writing area inside the area itself so that we'll later be able to release the ring's lock. We're not there yet, but at least the tail is marked as protected for as long as the message is not fully written. --- src/ring.c | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/src/ring.c b/src/ring.c index fb19efc46..d798f27db 100644 --- a/src/ring.c +++ b/src/ring.c @@ -182,6 +182,8 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz size_t needed; uint64_t dellen; int dellenlen; + uint8_t *lock_ptr; + uint8_t readers; ssize_t sent = 0; int i; @@ -227,6 +229,13 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz head_ofs = ring_head(ring); tail_ofs = ring_tail(ring); + /* this is the byte before tail, it contains the users count */ + lock_ptr = (uint8_t*)ring_area + (tail_ofs > 0 ? tail_ofs - 1 : ring_size - 1); + + /* take the lock on the area. Normally we're alone */ + readers = HA_ATOMIC_XCHG(lock_ptr, RING_WRITING_SIZE); + BUG_ON_HOT(readers == RING_WRITING_SIZE); + vp_ring_to_data(&v1, &v2, ring_area, ring_size, head_ofs, tail_ofs); while (vp_size(v1, v2) > ring_size - needed - 1 - 1) { @@ -249,8 +258,12 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz /* now let's update the buffer with the new head and size */ vp_data_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs); - if (vp_size(v1, v2) > ring_size - needed - 1 - 1) + if (vp_size(v1, v2) > ring_size - needed - 1 - 1) { + /* we had to stop due to readers blocking the head, + * let's give up. + */ goto done_update_buf; + } /* now focus on free room */ vp_ring_to_room(&v1, &v2, ring_area, ring_size, head_ofs, tail_ofs); @@ -287,6 +300,9 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz vp_room_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs); done_update_buf: + /* unlock the message area */ + HA_ATOMIC_STORE(lock_ptr, readers); + /* update the new space in the buffer */ ring->storage->head = head_ofs; ring->storage->tail = tail_ofs;