]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: ring: use the topmost bit of the tail as a lock
authorWilly Tarreau <w@1wt.eu>
Wed, 28 Feb 2024 08:37:47 +0000 (09:37 +0100)
committerWilly Tarreau <w@1wt.eu>
Mon, 25 Mar 2024 17:34:19 +0000 (17:34 +0000)
We're now locking the tail while looking for some room in the ring. In
fact it's still while writing to it, but the goal definitely is to get
rid of the lock ASAP. For this we reserve the topmost bit of the tail
as a lock, which may have as a possible visible effect that buffers will
be limited to 2GB instead of 4GB on 32-bit machines (though in practise,
good luck for allocating more than 2GB contiguous on 32-bit), but in
practice since the size is read with atol() and some operating systems
limit it to LONG_MAX unless passing negative numbers, the limit is
already there.

For now the impact on x86_64 is significant (drop from 2.35 to 1.4M/s
on 48 threads on EPYC 24 cores) but this situation is only temporary
so that changes can be reviewable and bisectable.

Other approaches were attempted, such as using XCHG instead, which is
slightly faster on x86 with low thread counts (but causes more write
contention), and forces readers to stall under heavy traffic because
they can't access a valid value for the queue anymore. A CAS requires
preloading the value and is les good on ARMv8.1. XADD could also be
considered with 12-13 upper bits of the offset dedicated to locking,
but that looks overkill.

dev/haring/haring.c
include/haproxy/ring-t.h
include/haproxy/ring.h
src/ring.c
src/sink.c

index 4e52b0a9e219cc1687a32479baf38392db52b743..4dfdafab552c291faa58e9b99772d9eb051d34f5 100644 (file)
@@ -218,7 +218,7 @@ int dump_ring_v2(struct ring_v2 *ring, size_t ofs, int flags)
        /* Now make our own buffer pointing to that area */
        size = ring->size;
        head = ring->head;
-       tail = ring->tail;
+       tail = ring->tail & ~RING_TAIL_LOCK;
        data = (head <= tail ? 0 : size) + tail - head;
        buf = b_make((void *)ring + ring->rsvd, size, head, data);
        return dump_ring_as_buf(buf, ofs, flags);
index 0acbee8e726f9a15c1ad5799dbec7c3437180cbc..b719c9cad6b8886b5dffb80a00f66a765baa0b12 100644 (file)
 #define RING_WRITING_SIZE  255  /* the next message's size is being written */
 #define RING_MAX_READERS   254  /* highest supported value for RC */
 
+/* mask used to lock the tail */
+#define RING_TAIL_LOCK     (1ULL << ((sizeof(size_t) * 8) - 1))
+
 /* this is the mmapped part */
 struct ring_storage {
        size_t size;         // storage size
index 53b0b22c2be5bb87e7f36768bff2f36d461c3d1f..7fb9bc6b828848d8ae4ef7d0fb9635c0ce0b528d 100644 (file)
@@ -54,8 +54,10 @@ static inline void *ring_area(const struct ring *ring)
 /* returns the number of bytes in the ring */
 static inline size_t ring_data(const struct ring *ring)
 {
-       return ((ring->storage->head <= ring->storage->tail) ?
-               0 : ring->storage->size) + ring->storage->tail - ring->storage->head;
+       size_t tail = HA_ATOMIC_LOAD(&ring->storage->tail) & ~RING_TAIL_LOCK;
+
+       return ((ring->storage->head <= tail) ?
+               0 : ring->storage->size) + tail - ring->storage->head;
 }
 
 /* returns the allocated size in bytes for the ring */
@@ -70,10 +72,10 @@ static inline size_t ring_head(const struct ring *ring)
        return ring->storage->head;
 }
 
-/* returns the tail offset of the ring */
+/* returns the ring's tail offset without the lock bit */
 static inline size_t ring_tail(const struct ring *ring)
 {
-       return ring->storage->tail;
+       return HA_ATOMIC_LOAD(&ring->storage->tail) & ~RING_TAIL_LOCK;
 }
 
 /* duplicates ring <src> over ring <dst> for no more than <max> bytes or no
index f56c8f16854b569bd9139dc6a5a752f574cd514d..9af3daaf25b93409ef1630d23cd7e06c338aa90a 100644 (file)
@@ -222,12 +222,32 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
        ring_area = ring->storage->area;
        ring_size = ring->storage->size;
 
-       HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
        if (needed + 1 > ring_size)
                goto leave;
 
-       head_ofs = ring_head(ring);
-       tail_ofs = ring_tail(ring);
+       /* try to get exclusivity on the ring's tail. For this we set the
+        * tail's highest bit, and the one that gets it wins. Many tests were
+        * run on this and the approach below is optimal for armv8.1 atomics,
+        * second-to-optimal with both x86_64 and second-to-optimal on armv8.0.
+        * x86_64 would benefit slightly more from an xchg() which would
+        * require the readers to loop during changes, and armv8.0 is slightly
+        * better there as well (+5%). The CAS is bad for both (requires a
+        * preload), though it might degrade better on large x86 compared to
+        * a busy loop that the compiler would implement for the FETCH_OR.
+        * Alternately we could kill 12 upper bits on a 64-bit tail ofs and
+        * use XADD. Not tested, and would require to undo or watch for the
+        * change (use it as a ticket).
+        */
+       while (1) {
+               tail_ofs = HA_ATOMIC_FETCH_OR(&ring->storage->tail, RING_TAIL_LOCK);
+               if (!(tail_ofs & RING_TAIL_LOCK))
+                       break;
+               pl_wait_unlock_long(&ring->storage->tail, RING_TAIL_LOCK);
+       }
+
+       HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
+
+       head_ofs = ring->storage->head;
 
        /* this is the byte before tail, it contains the users count */
        lock_ptr = (uint8_t*)ring_area + (tail_ofs > 0 ? tail_ofs - 1 : ring_size - 1);
@@ -305,7 +325,7 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
 
        /* update the new space in the buffer */
        ring->storage->head = head_ofs;
-       ring->storage->tail = tail_ofs;
+       HA_ATOMIC_STORE(&ring->storage->tail, tail_ofs);
 
        /* notify potential readers */
        if (sent) {
@@ -313,8 +333,8 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
                        appctx_wakeup(appctx);
        }
 
- leave:
        HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
+ leave:
        return sent;
 }
 
@@ -417,8 +437,8 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t
 
        HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock);
 
-       head_ofs = ring->storage->head;
-       tail_ofs = ring->storage->tail;
+       head_ofs = ring_head(ring);
+       tail_ofs = ring_tail(ring);
 
        /* explanation for the initialization below: it would be better to do
         * this in the parsing function but this would occasionally result in
index d43f7fb8aa328342e436e984cc5ca7b6385ac489..b5c681ad72939585193d62d1f74d4bece2e47326 100644 (file)
@@ -905,6 +905,12 @@ int cfg_parse_ring(const char *file, int linenum, char **args, int kwm)
                        goto err;
                }
 
+               if (size > RING_TAIL_LOCK) {
+                       ha_alert("parsing [%s:%d] : too large size '%llu' for new sink buffer, the limit on this platform is %llu bytes.\n", file, linenum, (ullong)size, (ullong)RING_TAIL_LOCK);
+                       err_code |= ERR_ALERT | ERR_FATAL;
+                       goto err;
+               }
+
                if (cfg_sink->store) {
                        ha_alert("parsing [%s:%d] : cannot resize an already mapped file, please specify 'size' before 'backing-file'.\n", file, linenum);
                        err_code |= ERR_ALERT | ERR_FATAL;