*/
ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg)
{
- size_t head_ofs, tail_ofs;
+ size_t head_ofs, tail_ofs, new_tail_ofs;
size_t ring_size;
char *ring_area;
struct ist v1, v2;
vp_skip(&v1, &v2, 1 + dellenlen + dellen);
}
- /* now let's update the buffer with the new head and size */
- vp_data_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs);
+ /* now let's update the buffer with the new tail if our message will fit */
+ new_tail_ofs = tail_ofs;
+ if (vp_size(v1, v2) <= ring_size - needed - 1) {
+ vp_data_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs);
+
+ /* update the new space in the buffer */
+ HA_ATOMIC_STORE(&ring->storage->head, head_ofs);
+
+ /* calculate next tail pointer */
+ new_tail_ofs += needed;
+ if (new_tail_ofs >= ring_size)
+ new_tail_ofs -= ring_size;
+
+ /* reset next read counter before releasing writers */
+ HA_ATOMIC_STORE(ring_area + (new_tail_ofs > 0 ? new_tail_ofs - 1 : ring_size - 1), 0);
+ }
+
+ /* and release other writers */
+ HA_ATOMIC_STORE(&ring->storage->tail, new_tail_ofs);
if (vp_size(v1, v2) > ring_size - needed - 1 - 1) {
/* we had to stop due to readers blocking the head,
goto done_update_buf;
}
- /* now focus on free room */
- vp_ring_to_room(&v1, &v2, ring_area, ring_size, head_ofs, tail_ofs);
+ /* now focus on free room between the old and the new tail */
+ vp_ring_to_room(&v1, &v2, ring_area, ring_size, (new_tail_ofs > 0 ? new_tail_ofs - 1 : ring_size - 1), tail_ofs);
/* let's write the message size */
vp_put_varint(&v1, &v2, msglen);
msglen += len;
}
- vp_putchr(&v1, &v2, 0); // new read counter
+ /* we must not write the read counter, it was already done,
+ * plus we could ruin the one of the next writer.
+ */
sent = lenlen + msglen + 1;
BUG_ON_HOT(sent != needed);
- vp_room_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs);
-
done_update_buf:
/* unlock the message area */
HA_ATOMIC_STORE(lock_ptr, readers);
- /* update the new space in the buffer */
- ring->storage->head = head_ofs;
- HA_ATOMIC_STORE(&ring->storage->tail, tail_ofs);
-
/* notify potential readers */
if (sent) {
HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock);