]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: ring: protect the initialization of the initial reader offset
authorWilly Tarreau <w@1wt.eu>
Wed, 28 Feb 2024 16:42:56 +0000 (17:42 +0100)
committerWilly Tarreau <w@1wt.eu>
Mon, 25 Mar 2024 17:34:19 +0000 (17:34 +0000)
Since we're going to remove the lock, there's no more way to prevent the
ring from being fed while we're attaching a client to it. We need to
freeze the buffer while looking at its head so that we can attach there
and have a trustable one. We could do it by setting the lock bit on the
tail offset but quite frankly we don't need to bother with that, attaching
a client is rare enough to permit a thread_isolate().

src/ring.c

index 613c8c6b918b084fff0666d8745e39ea49d9f420..963a70f31cdd9c7c402b187ccc00df0153c49696 100644 (file)
@@ -450,18 +450,22 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t
        ring_area = (uint8_t *)ring->storage->area;
        ring_size = ring->storage->size;
 
-       tail_ofs = ring_tail(ring);
-       head_ofs = HA_ATOMIC_LOAD(&ring->storage->head);
-
        /* explanation for the initialization below: it would be better to do
         * this in the parsing function but this would occasionally result in
         * dropped events because we'd take a reference on the oldest message
         * and keep it while being scheduled. Thus instead let's take it the
         * first time we enter here so that we have a chance to pass many
         * existing messages before grabbing a reference to a location. This
-        * value cannot be produced after initialization.
+        * value cannot be produced after initialization. The first offset
+        * needs to be taken under isolation as it must not move while we're
+        * trying to catch it.
         */
        if (unlikely(*ofs_ptr == ~0)) {
+               thread_isolate();
+
+               head_ofs = HA_ATOMIC_LOAD(&ring->storage->head);
+               tail_ofs = ring_tail(ring);
+
                if (flags & RING_WF_SEEK_NEW) {
                        /* going to the end means looking at tail-1 */
                        head_ofs = tail_ofs + ring_size - 1;
@@ -469,20 +473,27 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t
                                head_ofs -= ring_size;
                }
 
-               /* make ctx->ofs relative to the beginning of the buffer now */
-               *ofs_ptr = head_ofs;
-
-               /* and reserve our slot here (inc readers count) */
+               /* reserve our slot here (inc readers count) */
                do {
                        readers = _HA_ATOMIC_LOAD(ring_area + head_ofs);
                } while ((readers > RING_MAX_READERS ||
                          !_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers + 1)) && __ha_cpu_relax());
+
+               thread_release();
+
+               /* store this precious offset in our context, and we're done */
+               *ofs_ptr = head_ofs;
        }
 
        /* we have the guarantee we can restart from our own head */
        head_ofs = *ofs_ptr;
        BUG_ON(head_ofs >= ring_size);
 
+       /* the tail will continue to move but we're getting a safe value
+        * here that will continue to work.
+        */
+       tail_ofs = ring_tail(ring);
+
        /* we keep track of where we were and we don't release it before
         * we've protected the next place.
         */