diff --git a/src/ring.c b/src/ring.c index 97d10ee27..94ce0fe5e 100644 --- a/src/ring.c +++ b/src/ring.c @@ -172,7 +172,7 @@ void ring_free(struct ring *ring) */ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg) { - size_t head_ofs, tail_ofs; + size_t head_ofs, tail_ofs, new_tail_ofs; size_t ring_size; char *ring_area; struct ist v1, v2; @@ -273,8 +273,25 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz vp_skip(&v1, &v2, 1 + dellenlen + dellen); } - /* now let's update the buffer with the new head and size */ - vp_data_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs); + /* now let's update the buffer with the new tail if our message will fit */ + new_tail_ofs = tail_ofs; + if (vp_size(v1, v2) <= ring_size - needed - 1) { + vp_data_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs); + + /* update the new space in the buffer */ + HA_ATOMIC_STORE(&ring->storage->head, head_ofs); + + /* calculate next tail pointer */ + new_tail_ofs += needed; + if (new_tail_ofs >= ring_size) + new_tail_ofs -= ring_size; + + /* reset next read counter before releasing writers */ + HA_ATOMIC_STORE(ring_area + (new_tail_ofs > 0 ? new_tail_ofs - 1 : ring_size - 1), 0); + } + + /* and release other writers */ + HA_ATOMIC_STORE(&ring->storage->tail, new_tail_ofs); if (vp_size(v1, v2) > ring_size - needed - 1 - 1) { /* we had to stop due to readers blocking the head, @@ -283,8 +300,8 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz goto done_update_buf; } - /* now focus on free room */ - vp_ring_to_room(&v1, &v2, ring_area, ring_size, head_ofs, tail_ofs); + /* now focus on free room between the old and the new tail */ + vp_ring_to_room(&v1, &v2, ring_area, ring_size, (new_tail_ofs > 0 ? new_tail_ofs - 1 : ring_size - 1), tail_ofs); /* let's write the message size */ vp_put_varint(&v1, &v2, msglen); @@ -311,20 +328,16 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz msglen += len; } - vp_putchr(&v1, &v2, 0); // new read counter + /* we must not write the read counter, it was already done, + * plus we could ruin the one of the next writer. + */ sent = lenlen + msglen + 1; BUG_ON_HOT(sent != needed); - vp_room_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs); - done_update_buf: /* unlock the message area */ HA_ATOMIC_STORE(lock_ptr, readers); - /* update the new space in the buffer */ - ring->storage->head = head_ofs; - HA_ATOMIC_STORE(&ring->storage->tail, tail_ofs); - /* notify potential readers */ if (sent) { HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock);