]> git.ipfire.org Git - thirdparty/freeradius-server.git/commitdiff
Add more notes on how the atomic queues work
authorArran Cudbard-Bell <a.cudbardb@freeradius.org>
Sun, 28 Sep 2025 19:32:15 +0000 (15:32 -0400)
committerArran Cudbard-Bell <a.cudbardb@freeradius.org>
Sun, 28 Sep 2025 19:32:15 +0000 (15:32 -0400)
Replace modulo with bitwise and.

src/lib/io/atomic_queue.c

index f008df000188eb7593b8a7c48eabf62e78df7d1e..75f31d4eb46306a28e7242999772e9c1c348f201 100644 (file)
  * @brief Thread-safe queues.
  * @file io/atomic_queue.c
  *
+ * This is an implementation of a bounded MPMC ring buffer with per-slot
+ * sequence numbers, described by Dmitry Vyukov.
+ *
  * @copyright 2016 Alan DeKok (aland@freeradius.org)
  * @copyright 2016 Alister Winfield
  */
+
 RCSID("$Id$")
 
 #include <stdint.h>
@@ -32,6 +36,7 @@ RCSID("$Id$")
 #include <freeradius-devel/autoconf.h>
 #include <freeradius-devel/io/atomic_queue.h>
 #include <freeradius-devel/util/talloc.h>
+#include <freeradius-devel/util/math.h>
 
 /*
  *     Some macros to make our life easier.
@@ -65,14 +70,23 @@ typedef struct CC_HINT(packed, aligned(CACHE_LINE_SIZE)) {
  *
  */
 struct fr_atomic_queue_s {
-       alignas(CACHE_LINE_SIZE) atomic_int64_t         head;           //!< Head, aligned bytes to ensure
-                                                                       ///< it's in a different cache line to tail
-                                                                       ///< to reduce memory contention.
-       atomic_int64_t                                  tail;
+       alignas(CACHE_LINE_SIZE) atomic_int64_t         head;           //!< Position of the producer.
+                                                                       ///< Cache aligned bytes to ensure it's in a
+                                                                       ///< different cache line to tail to reduce
+                                                                       ///< memory contention.
+
+       alignas(CACHE_LINE_SIZE) atomic_int64_t         tail;           //!< Position of the consumer.
+                                                                       ///< Cache aligned bytes to ensure it's in a
+                                                                       ///< different cache line to tail to reduce
+                                                                       ///< memory contention.
+                                                                       ///< Reads may still need to occur from size
+                                                                       ///< whilst the producer is writing to tail.
 
-       size_t                                          size;
+       size_t                                          size;           //!< The length of the queue.  This is static.
 
-       void                                            *chunk;         //!< To pass to free. The non-aligned address.
+       void                                            *chunk;         //!< The start of the talloc chunk to pass to free.
+                                                                       ///< We need to play tricks to get aligned memory
+                                                                       ///< with talloc.
 
        alignas(CACHE_LINE_SIZE) fr_atomic_queue_entry_t entry[];       //!< The entry array, also aligned
                                                                        ///< to ensure it's not in the same cache
@@ -99,6 +113,11 @@ fr_atomic_queue_t *fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
 
        if (size == 0) return NULL;
 
+       /*
+        *      Roundup to the next power of 2 so we don't need modulo.
+        */
+       size = (size_t)fr_roundup_pow2_uint64((uint64_t)size);
+
        /*
         *      Allocate a contiguous blob for the header and queue.
         *      This helps with memory locality.
@@ -127,7 +146,7 @@ fr_atomic_queue_t *fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
        aq->size = size;
 
        /*
-        *      Set the head / tail indexes, and force other CPUs to
+        *      Set the head / tail indexes, and force other cores to
         *      see the writes.
         */
        store(aq->head, 0);
@@ -165,6 +184,28 @@ bool fr_atomic_queue_push(fr_atomic_queue_t *aq, void *data)
 
        if (!data) return false;
 
+       /*
+        *      Here we're essentially racing with other producers
+        *      to find the current head of the queue.
+        *
+        *      1. Load the current head (which may be incremented
+        *         by another producer before we enter the loop).
+        *      2. Find the head entry, which is head modulo the
+        *         queue size (keeps head looping through the queue).
+        *      3. Read the sequence number of the entry.
+        *         The sequence numbers are initialised to the index
+        *         of the entries in the queue.  Each pass of the
+        *         producer increments the sequence number by one.
+        *      4.
+        *         a. If the sequence number is equal to the head,
+        *         then we can use the entry. Increment the head
+        *         so other producers know we've used it.
+        *         b. If it's greater than head, the producer has
+        *         already written to this entry, so we need to re-load
+        *         the head and race other producers again.
+        *         c. If it's less than the head, the entry has not yet
+        *         been consumed, and the queue is full.
+        */
        head = load(aq->head);
 
        /*
@@ -173,12 +214,21 @@ bool fr_atomic_queue_push(fr_atomic_queue_t *aq, void *data)
        for (;;) {
                int64_t seq, diff;
 
-               entry = &aq->entry[ head % aq->size ];
+               /*
+                *      Alloc function guarantees size is a power
+                *      of 2, so we can use this hack to avoid
+                *      modulo.
+                */
+               entry = &aq->entry[head & (aq->size - 1)];
                seq = acquire(entry->seq);
                diff = (seq - head);
 
                /*
-                *      head is larger than the current entry, the queue is full.
+                *      head is larger than the current entry, the
+                *      queue is full.
+                *      The consumer will set entry seq to entry +
+                *      queue size, marking it as free for the
+                *      producer to use.
                 */
                if (diff < 0) {
 #if 0
@@ -188,7 +238,8 @@ bool fr_atomic_queue_push(fr_atomic_queue_t *aq, void *data)
                }
 
                /*
-                *      Someone else has already written to this entry.  Get the new head pointer, and continue.
+                *      Someone else has already written to this entry
+                *      we lost the race, try again.
                 */
                if (diff > 0) {
                        head = load(aq->head);
@@ -196,10 +247,17 @@ bool fr_atomic_queue_push(fr_atomic_queue_t *aq, void *data)
                }
 
                /*
-                *      We have the possibility that we can write to
-                *      this entry.  Try it.  If the write succeeds,
-                *      we're done.  If the write fails, re-load the
-                *      current head entry, and continue.
+                *      See if we can increment the head value
+                *      (and check it's still at its old value).
+                *
+                *      This means no two producers can have the same
+                *      entry in the queue, because they can't exit
+                *      the loop until they've incremented the head
+                *      successfully.
+                *
+                *      When we fail, we don't increment head before
+                *      trying again, because we need to detect queue
+                *      full conditions.
                 */
                if (cas_incr(aq->head, head)) {
                        break;
@@ -212,6 +270,23 @@ bool fr_atomic_queue_push(fr_atomic_queue_t *aq, void *data)
         *      other CPUs.
         */
        entry->data = data;
+
+       /*
+        *      Technically head can overflow.  Practically, with a
+        *      3GHz CPU, doing nothing but incrementing head
+        *      uncontended it'd take about 100 years for this to
+        *      happen.  But hey, maybe someone invents an optical
+        *      CPU with a significantly higher clock speed, it's ok
+        *      for us to exit every 9 quintillion packets.
+        */
+#ifdef __clang_analyzer__
+       if (unlikely((head + 1) == INT64_MAX)) exit(1);
+#endif
+
+       /*
+        *      Mark up the entry as written to.  Any other producer
+        *      attempting to write will see (diff > 0) and retry.
+        */
        store(entry->seq, head + 1);
        return true;
 }
@@ -237,23 +312,35 @@ bool fr_atomic_queue_pop(fr_atomic_queue_t *aq, void **p_data)
        for (;;) {
                int64_t diff;
 
-               entry = &aq->entry[ tail % aq->size ];
+               entry = &aq->entry[tail % aq->size];
                seq = acquire(entry->seq);
 
                diff = (seq - (tail + 1));
 
                /*
-                *      tail is smaller than the current entry, the queue is empty.
+                *      Tail is smaller than the current entry,
+                *      the queue is empty.
+                *
+                *      Tail should now be equal to the head.
                 */
                if (diff < 0) {
                        return false;
                }
 
+               /*
+                *      Tail is now ahead of us.
+                *      Something else has consumed it.
+                *      We lost the race with another consumer.
+                */
                if (diff > 0) {
                        tail = load(aq->tail);
                        continue;
                }
 
+               /*
+                *      Same deal as push.
+                *      After this point we own the entry.
+                */
                if (cas_incr(aq->tail, tail)) {
                        break;
                }
@@ -267,10 +354,10 @@ bool fr_atomic_queue_pop(fr_atomic_queue_t *aq, void **p_data)
 
        /*
         *      Set the current entry to past the end of the queue.
-        *      i.e. it's unused.
+        *      This is equal to what head will be on its next pass
+        *      through the queue.  This marks the entry as free.
         */
-       seq = tail + aq->size;
-       store(entry->seq, seq);
+       store(entry->seq, tail + aq->size);
 
        return true;
 }