]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: ring: unlock the ring's tail earlier
authorWilly Tarreau <w@1wt.eu>
Wed, 28 Feb 2024 08:57:00 +0000 (09:57 +0100)
committerWilly Tarreau <w@1wt.eu>
Mon, 25 Mar 2024 17:34:19 +0000 (17:34 +0000)
We know we can continue to protect the message area so we can unlock the
tail as soon as we know its new value. Now we're seeing ~6.4M msg/s vs
5.4M previously on 3C6T of a 3rd gen EPYC, and 1.88M vs 1.54M for 24C48T
threads, which is a significant gain!

This requires to carefully write the new head counter before releasing
the writers, and to change the calculation of the work area from
tail..head to tail...new_tail while writing the message.

src/ring.c

index 97d10ee27a9c949b0cc5f21ad32c93cf0d8228ab..94ce0fe5ee4a5d8b4b2513c633b2129d43b5bda5 100644 (file)
@@ -172,7 +172,7 @@ void ring_free(struct ring *ring)
  */
 ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg)
 {
-       size_t head_ofs, tail_ofs;
+       size_t head_ofs, tail_ofs, new_tail_ofs;
        size_t ring_size;
        char *ring_area;
        struct ist v1, v2;
@@ -273,8 +273,25 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
                vp_skip(&v1, &v2, 1 + dellenlen + dellen);
        }
 
-       /* now let's update the buffer with the new head and size */
-       vp_data_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs);
+       /* now let's update the buffer with the new tail if our message will fit */
+       new_tail_ofs = tail_ofs;
+       if (vp_size(v1, v2) <= ring_size - needed - 1) {
+               vp_data_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs);
+
+               /* update the new space in the buffer */
+               HA_ATOMIC_STORE(&ring->storage->head, head_ofs);
+
+               /* calculate next tail pointer */
+               new_tail_ofs += needed;
+               if (new_tail_ofs >= ring_size)
+                       new_tail_ofs -= ring_size;
+
+               /* reset next read counter before releasing writers */
+               HA_ATOMIC_STORE(ring_area + (new_tail_ofs > 0 ? new_tail_ofs - 1 : ring_size - 1), 0);
+       }
+
+       /* and release other writers */
+       HA_ATOMIC_STORE(&ring->storage->tail, new_tail_ofs);
 
        if (vp_size(v1, v2) > ring_size - needed - 1 - 1) {
                /* we had to stop due to readers blocking the head,
@@ -283,8 +300,8 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
                goto done_update_buf;
        }
 
-       /* now focus on free room */
-       vp_ring_to_room(&v1, &v2, ring_area, ring_size, head_ofs, tail_ofs);
+       /* now focus on free room between the old and the new tail */
+       vp_ring_to_room(&v1, &v2, ring_area, ring_size, (new_tail_ofs > 0 ? new_tail_ofs - 1 : ring_size - 1), tail_ofs);
 
        /* let's write the message size */
        vp_put_varint(&v1, &v2, msglen);
@@ -311,20 +328,16 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
                msglen += len;
        }
 
-       vp_putchr(&v1, &v2, 0); // new read counter
+       /* we must not write the read counter, it was already done,
+        * plus we could ruin the one of the next writer.
+        */
        sent = lenlen + msglen + 1;
        BUG_ON_HOT(sent != needed);
 
-       vp_room_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs);
-
  done_update_buf:
        /* unlock the message area */
        HA_ATOMIC_STORE(lock_ptr, readers);
 
-       /* update the new space in the buffer */
-       ring->storage->head = head_ofs;
-       HA_ATOMIC_STORE(&ring->storage->tail, tail_ofs);
-
        /* notify potential readers */
        if (sent) {
                HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock);