MEDIUM: ring: lock the tail's readers counters before proceeding with the changes

The goal here is to start to protect the writing area inside the area
itself so that we'll later be able to release the ring's lock. We're not
there yet, but at least the tail is marked as protected for as long as the
message is not fully written.
This commit is contained in:
Willy Tarreau 2024-02-28 09:20:54 +01:00
parent d336d71cbb
commit 73b2436fe6

View File

@ -182,6 +182,8 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
size_t needed;
uint64_t dellen;
int dellenlen;
uint8_t *lock_ptr;
uint8_t readers;
ssize_t sent = 0;
int i;
@ -227,6 +229,13 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
head_ofs = ring_head(ring);
tail_ofs = ring_tail(ring);
/* this is the byte before tail, it contains the users count */
lock_ptr = (uint8_t*)ring_area + (tail_ofs > 0 ? tail_ofs - 1 : ring_size - 1);
/* take the lock on the area. Normally we're alone */
readers = HA_ATOMIC_XCHG(lock_ptr, RING_WRITING_SIZE);
BUG_ON_HOT(readers == RING_WRITING_SIZE);
vp_ring_to_data(&v1, &v2, ring_area, ring_size, head_ofs, tail_ofs);
while (vp_size(v1, v2) > ring_size - needed - 1 - 1) {
@ -249,8 +258,12 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
/* now let's update the buffer with the new head and size */
vp_data_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs);
if (vp_size(v1, v2) > ring_size - needed - 1 - 1)
if (vp_size(v1, v2) > ring_size - needed - 1 - 1) {
/* we had to stop due to readers blocking the head,
* let's give up.
*/
goto done_update_buf;
}
/* now focus on free room */
vp_ring_to_room(&v1, &v2, ring_area, ring_size, head_ofs, tail_ofs);
@ -287,6 +300,9 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
vp_room_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs);
done_update_buf:
/* unlock the message area */
HA_ATOMIC_STORE(lock_ptr, readers);
/* update the new space in the buffer */
ring->storage->head = head_ofs;
ring->storage->tail = tail_ofs;