]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: ring: make the reader check the readers count before inc/dec
authorWilly Tarreau <w@1wt.eu>
Wed, 28 Feb 2024 08:03:46 +0000 (09:03 +0100)
committerWilly Tarreau <w@1wt.eu>
Mon, 25 Mar 2024 17:34:19 +0000 (17:34 +0000)
We'll want to reserve some special values for the readers count to
temporary lock the following message, but for this it will be mandatory
that readers check for them before incrementing/decrementing the counter.
Let'sdo that using a CAS. The readers performance is not as critical as
the writer's anyway so the slight overhead is not a problem.

src/ring.c

index 13d75dfb2ec3b778abad0a6337bdfb2640ddc0a8..fb19efc4663fcac0fcc5b7413219043fe9c9244a 100644 (file)
@@ -332,11 +332,17 @@ void ring_detach_appctx(struct ring *ring, struct appctx *appctx, size_t ofs)
        HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
        if (ofs != ~0) {
                /* reader was still attached */
-               char *area = ring_area(ring);
+               uint8_t *area = (uint8_t *)ring_area(ring);
+               uint8_t readers;
 
                BUG_ON(ofs >= ring_size(ring));
                LIST_DEL_INIT(&appctx->wait_entry);
-               HA_ATOMIC_DEC(area + ofs);
+
+               /* dec readers count */
+               do {
+                       readers = _HA_ATOMIC_LOAD(area + ofs);
+               } while ((readers > RING_MAX_READERS ||
+                         !_HA_ATOMIC_CAS(area + ofs, &readers, readers - 1)) && __ha_cpu_relax());
        }
        HA_ATOMIC_DEC(&ring->readers_count);
        HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
@@ -382,14 +388,15 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t
 {
        size_t head_ofs, tail_ofs;
        size_t ring_size;
-       char *ring_area;
+       uint8_t *ring_area;
        struct ist v1, v2;
        uint64_t msg_len;
        size_t len, cnt;
        ssize_t copied;
+       uint8_t readers;
        int ret;
 
-       ring_area = ring->storage->area;
+       ring_area = (uint8_t *)ring->storage->area;
        ring_size = ring->storage->size;
 
        HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock);
@@ -416,15 +423,22 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t
                /* make ctx->ofs relative to the beginning of the buffer now */
                *ofs_ptr = head_ofs;
 
-               /* and reserve our slot here */
-               HA_ATOMIC_INC(ring_area + head_ofs);
+               /* and reserve our slot here (inc readers count) */
+               do {
+                       readers = _HA_ATOMIC_LOAD(ring_area + head_ofs);
+               } while ((readers > RING_MAX_READERS ||
+                         !_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers + 1)) && __ha_cpu_relax());
        }
 
        /* we have the guarantee we can restart from our own head */
        head_ofs = *ofs_ptr;
        BUG_ON(head_ofs >= ring_size);
 
-       HA_ATOMIC_DEC(ring_area + head_ofs);
+       /* dec readers count */
+       do {
+               readers = _HA_ATOMIC_LOAD(ring_area + head_ofs);
+       } while ((readers > RING_MAX_READERS ||
+                 !_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers - 1)) && __ha_cpu_relax());
 
        /* in this loop, head_ofs always points to the counter byte that precedes
         * the message so that we can take our reference there if we have to
@@ -432,7 +446,7 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t
         * origin, while pos is relative to the ring's head.
         */
        ret = 1;
-       vp_ring_to_data(&v1, &v2, ring_area, ring_size, head_ofs, tail_ofs);
+       vp_ring_to_data(&v1, &v2, (char *)ring_area, ring_size, head_ofs, tail_ofs);
 
        while (1) {
                if (vp_size(v1, v2) <= 1) {
@@ -462,9 +476,14 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t
                vp_skip(&v1, &v2, cnt + msg_len);
        }
 
-       vp_data_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs);
+       vp_data_to_ring(v1, v2, (char *)ring_area, ring_size, &head_ofs, &tail_ofs);
+
+       /* inc readers count */
+       do {
+               readers = _HA_ATOMIC_LOAD(ring_area + head_ofs);
+       } while ((readers > RING_MAX_READERS ||
+                 !_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers + 1)) && __ha_cpu_relax());
 
-       HA_ATOMIC_INC(ring_area + head_ofs);
        if (last_ofs_ptr)
                *last_ofs_ptr = tail_ofs;
        *ofs_ptr = head_ofs;