From: Willy Tarreau Date: Wed, 28 Feb 2024 08:03:46 +0000 (+0100) Subject: MINOR: ring: make the reader check the readers count before inc/dec X-Git-Tag: v3.0-dev6~28 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=d336d71cbb211f91e750c0fcc4dc21ed9bbf8a20;p=thirdparty%2Fhaproxy.git MINOR: ring: make the reader check the readers count before inc/dec We'll want to reserve some special values for the readers count to temporary lock the following message, but for this it will be mandatory that readers check for them before incrementing/decrementing the counter. Let'sdo that using a CAS. The readers performance is not as critical as the writer's anyway so the slight overhead is not a problem. --- diff --git a/src/ring.c b/src/ring.c index 13d75dfb2e..fb19efc466 100644 --- a/src/ring.c +++ b/src/ring.c @@ -332,11 +332,17 @@ void ring_detach_appctx(struct ring *ring, struct appctx *appctx, size_t ofs) HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock); if (ofs != ~0) { /* reader was still attached */ - char *area = ring_area(ring); + uint8_t *area = (uint8_t *)ring_area(ring); + uint8_t readers; BUG_ON(ofs >= ring_size(ring)); LIST_DEL_INIT(&appctx->wait_entry); - HA_ATOMIC_DEC(area + ofs); + + /* dec readers count */ + do { + readers = _HA_ATOMIC_LOAD(area + ofs); + } while ((readers > RING_MAX_READERS || + !_HA_ATOMIC_CAS(area + ofs, &readers, readers - 1)) && __ha_cpu_relax()); } HA_ATOMIC_DEC(&ring->readers_count); HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock); @@ -382,14 +388,15 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t { size_t head_ofs, tail_ofs; size_t ring_size; - char *ring_area; + uint8_t *ring_area; struct ist v1, v2; uint64_t msg_len; size_t len, cnt; ssize_t copied; + uint8_t readers; int ret; - ring_area = ring->storage->area; + ring_area = (uint8_t *)ring->storage->area; ring_size = ring->storage->size; HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock); @@ -416,15 +423,22 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t /* make ctx->ofs relative to the beginning of the buffer now */ *ofs_ptr = head_ofs; - /* and reserve our slot here */ - HA_ATOMIC_INC(ring_area + head_ofs); + /* and reserve our slot here (inc readers count) */ + do { + readers = _HA_ATOMIC_LOAD(ring_area + head_ofs); + } while ((readers > RING_MAX_READERS || + !_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers + 1)) && __ha_cpu_relax()); } /* we have the guarantee we can restart from our own head */ head_ofs = *ofs_ptr; BUG_ON(head_ofs >= ring_size); - HA_ATOMIC_DEC(ring_area + head_ofs); + /* dec readers count */ + do { + readers = _HA_ATOMIC_LOAD(ring_area + head_ofs); + } while ((readers > RING_MAX_READERS || + !_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers - 1)) && __ha_cpu_relax()); /* in this loop, head_ofs always points to the counter byte that precedes * the message so that we can take our reference there if we have to @@ -432,7 +446,7 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t * origin, while pos is relative to the ring's head. */ ret = 1; - vp_ring_to_data(&v1, &v2, ring_area, ring_size, head_ofs, tail_ofs); + vp_ring_to_data(&v1, &v2, (char *)ring_area, ring_size, head_ofs, tail_ofs); while (1) { if (vp_size(v1, v2) <= 1) { @@ -462,9 +476,14 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t vp_skip(&v1, &v2, cnt + msg_len); } - vp_data_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs); + vp_data_to_ring(v1, v2, (char *)ring_area, ring_size, &head_ofs, &tail_ofs); + + /* inc readers count */ + do { + readers = _HA_ATOMIC_LOAD(ring_area + head_ofs); + } while ((readers > RING_MAX_READERS || + !_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers + 1)) && __ha_cpu_relax()); - HA_ATOMIC_INC(ring_area + head_ofs); if (last_ofs_ptr) *last_ofs_ptr = tail_ofs; *ofs_ptr = head_ofs;