From: Willy Tarreau Date: Wed, 28 Feb 2024 16:42:56 +0000 (+0100) Subject: MEDIUM: ring: protect the initialization of the initial reader offset X-Git-Tag: v3.0-dev6~20 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=31b93b40b0200fe2d288e169126249a38289f123;p=thirdparty%2Fhaproxy.git MEDIUM: ring: protect the initialization of the initial reader offset Since we're going to remove the lock, there's no more way to prevent the ring from being fed while we're attaching a client to it. We need to freeze the buffer while looking at its head so that we can attach there and have a trustable one. We could do it by setting the lock bit on the tail offset but quite frankly we don't need to bother with that, attaching a client is rare enough to permit a thread_isolate(). --- diff --git a/src/ring.c b/src/ring.c index 613c8c6b91..963a70f31c 100644 --- a/src/ring.c +++ b/src/ring.c @@ -450,18 +450,22 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t ring_area = (uint8_t *)ring->storage->area; ring_size = ring->storage->size; - tail_ofs = ring_tail(ring); - head_ofs = HA_ATOMIC_LOAD(&ring->storage->head); - /* explanation for the initialization below: it would be better to do * this in the parsing function but this would occasionally result in * dropped events because we'd take a reference on the oldest message * and keep it while being scheduled. Thus instead let's take it the * first time we enter here so that we have a chance to pass many * existing messages before grabbing a reference to a location. This - * value cannot be produced after initialization. + * value cannot be produced after initialization. The first offset + * needs to be taken under isolation as it must not move while we're + * trying to catch it. */ if (unlikely(*ofs_ptr == ~0)) { + thread_isolate(); + + head_ofs = HA_ATOMIC_LOAD(&ring->storage->head); + tail_ofs = ring_tail(ring); + if (flags & RING_WF_SEEK_NEW) { /* going to the end means looking at tail-1 */ head_ofs = tail_ofs + ring_size - 1; @@ -469,20 +473,27 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t head_ofs -= ring_size; } - /* make ctx->ofs relative to the beginning of the buffer now */ - *ofs_ptr = head_ofs; - - /* and reserve our slot here (inc readers count) */ + /* reserve our slot here (inc readers count) */ do { readers = _HA_ATOMIC_LOAD(ring_area + head_ofs); } while ((readers > RING_MAX_READERS || !_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers + 1)) && __ha_cpu_relax()); + + thread_release(); + + /* store this precious offset in our context, and we're done */ + *ofs_ptr = head_ofs; } /* we have the guarantee we can restart from our own head */ head_ofs = *ofs_ptr; BUG_ON(head_ofs >= ring_size); + /* the tail will continue to move but we're getting a safe value + * here that will continue to work. + */ + tail_ofs = ring_tail(ring); + /* we keep track of where we were and we don't release it before * we've protected the next place. */