]> git.ipfire.org Git - thirdparty/freeradius-server.git/commitdiff
Add a simple SPSC atomic, expandable queue chain
authorArran Cudbard-Bell <a.cudbardb@freeradius.org>
Thu, 23 Apr 2026 13:43:01 +0000 (09:43 -0400)
committerArran Cudbard-Bell <a.cudbardb@freeradius.org>
Thu, 23 Apr 2026 17:58:17 +0000 (13:58 -0400)
15 files changed:
src/lib/io/atomic_queue.c
src/lib/io/atomic_queue.h
src/lib/io/channel.c
src/lib/io/coord.c
src/lib/io/network.c
src/lib/io/worker.c
src/tests/bin/all.mk
src/tests/util/all.mk
src/tests/util/atomic_queue_test.c
src/tests/util/atomic_ring_test.c [new file with mode: 0644]
src/tests/util/atomic_ring_test.mk [new file with mode: 0644]
src/tests/util/channel_test.c
src/tests/util/control_test.c
src/tests/util/radius1_test.c
src/tests/util/worker_test.c

index 12ceb6293da8f179a27ae753cafd6712dacd7f38..f57d32e7d6de78ee885a0b56464d6dee261b993a 100644 (file)
@@ -32,6 +32,7 @@ RCSID("$Id$")
 #include <stdint.h>
 #include <stdalign.h>
 #include <inttypes.h>
+#include <stdlib.h>
 
 #include <freeradius-devel/autoconf.h>
 #include <freeradius-devel/io/atomic_queue.h>
@@ -90,15 +91,44 @@ struct fr_atomic_queue_s {
                                                                        ///< it can end up directly after tail in memory
                                                                        ///< and share a cache line.
 
-       void                                            *chunk;         //!< The start of the talloc chunk to pass to free.
-                                                                       ///< We need to play tricks to get aligned memory
-                                                                       ///< with talloc.
+       void                                            *chunk;         //!< The start of the talloc chunk to pass to free,
+                                                                       ///< or NULL if this queue was allocated raw via
+                                                                       ///< #fr_atomic_queue_malloc.  We need to play
+                                                                       ///< tricks to get aligned memory with talloc.
 
        alignas(CACHE_LINE_SIZE) fr_atomic_queue_entry_t entry[];       //!< The entry array, also aligned
                                                                        ///< to ensure it's not in the same cache
                                                                        ///< line as tail and size.
 };
 
+/** Initialise the sequence numbers and head/tail on a fresh queue buffer
+ *
+ * Shared between the talloc and raw allocators.  The buffer must already
+ * be cache-line aligned and sized to hold `size` entries.
+ *
+ * @param[in] aq       The queue buffer to initialise.
+ * @param[in] size     Entry count, already rounded up to a power of 2.
+ */
+static void atomic_queue_init(fr_atomic_queue_t *aq, size_t size)
+{
+       size_t  i;
+
+       /*
+        *      Initialize the array.  Data is NULL, and indexes are
+        *      the array entry number.
+        */
+       for (i = 0; i < size; i++) {
+               aq->entry[i].data = NULL;
+               store(aq->entry[i].seq, (int64_t)i);
+       }
+
+       aq->size = size;
+
+       store(aq->head, 0);
+       store(aq->tail, 0);
+       atomic_thread_fence(memory_order_seq_cst);
+}
+
 /** Create fixed-size atomic queue
  *
  * @note the queue must be freed explicitly by the ctx being freed, or by using
@@ -110,10 +140,8 @@ struct fr_atomic_queue_s {
  *     - NULL on error.
  *     - fr_atomic_queue_t *, a pointer to the allocated and initialized queue.
  */
-fr_atomic_queue_t *fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
+fr_atomic_queue_t *fr_atomic_queue_talloc(TALLOC_CTX *ctx, size_t size)
 {
-       size_t                  i;
-       int64_t                 seq;
        fr_atomic_queue_t       *aq;
        TALLOC_CTX              *chunk;
 
@@ -138,26 +166,39 @@ fr_atomic_queue_t *fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
 
        talloc_set_name_const(chunk, "fr_atomic_queue_t");
 
-       /*
-        *      Initialize the array.  Data is NULL, and indexes are
-        *      the array entry number.
-        */
-       for (i = 0; i < size; i++) {
-               seq = i;
+       atomic_queue_init(aq, size);
 
-               aq->entry[i].data = NULL;
-               store(aq->entry[i].seq, seq);
-       }
+       return aq;
+}
 
-       aq->size = size;
+/** Create fixed-size atomic queue outside any talloc hierarchy
+ *
+ * Backed by `posix_memalign`; the resulting queue is released with
+ * #fr_atomic_queue_free (which detects the raw allocation via a NULL
+ * `chunk` field) or plain `free()` on the queue pointer.
+ *
+ * Intended for callers that push or pop from threads where talloc is
+ * not safe (for example, library-owned callback threads).
+ *
+ * @param[in] size     The number of entries in the queue.
+ * @return
+ *     - NULL on error.
+ *     - fr_atomic_queue_t *, a pointer to the allocated and initialized queue.
+ */
+fr_atomic_queue_t *fr_atomic_queue_malloc(size_t size)
+{
+       fr_atomic_queue_t       *aq;
+       size_t                  bytes;
 
-       /*
-        *      Set the head / tail indexes, and force other cores to
-        *      see the writes.
-        */
-       store(aq->head, 0);
-       store(aq->tail, 0);
-       atomic_thread_fence(memory_order_seq_cst);
+       if (size == 0) return NULL;
+
+       size = (size_t)fr_roundup_pow2_uint64((uint64_t)size);
+       bytes = sizeof(*aq) + (size) * sizeof(aq->entry[0]);
+
+       if (posix_memalign((void **)&aq, CACHE_LINE_SIZE, bytes) != 0) return NULL;
+
+       aq->chunk = NULL;       /* sentinel: raw allocation, free with free() */
+       atomic_queue_init(aq, size);
 
        return aq;
 }
@@ -165,13 +206,18 @@ fr_atomic_queue_t *fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
 /** Free an atomic queue if it's not freed by ctx
  *
  * This function is needed because the atomic queue memory
- * must be cache line aligned.
+ * must be cache line aligned, and may live either in a talloc chunk
+ * or a raw `posix_memalign` allocation (`aq->chunk == NULL`).
  */
 void fr_atomic_queue_free(fr_atomic_queue_t **aq)
 {
        if (!*aq) return;
 
-       talloc_free((*aq)->chunk);
+       if ((*aq)->chunk) {
+               talloc_free((*aq)->chunk);
+       } else {
+               free(*aq);
+       }
        *aq = NULL;
 }
 
@@ -373,6 +419,204 @@ size_t fr_atomic_queue_size(fr_atomic_queue_t *aq)
        return aq->size;
 }
 
+/*
+ *     Segmented single-producer / single-consumer ring.
+ *
+ *     Safety argument: one producer owns `head` and writes `seg->next`
+ *     exactly once (release).  One consumer owns `tail` and reads
+ *     `tail->next` with acquire; by the release/acquire pair plus the
+ *     invariant "no push into s after s->next is set" the consumer
+ *     cannot advance past a segment that still has an in-flight push
+ *     as long as it retries `pop(tail->q)` once more after observing
+ *     `tail->next != NULL`.
+ */
+
+typedef struct fr_atomic_ring_entry_s fr_atomic_ring_entry_t;
+
+struct fr_atomic_ring_entry_s {
+       fr_atomic_queue_t               *q;             //!< Per-segment MPMC ring (used SPSC here).
+       _Atomic(fr_atomic_ring_entry_t *)       next;           //!< NULL until the producer seals this segment
+                                                       ///< because it filled up and moved on to a
+                                                       ///< fresh one.
+};
+
+struct fr_atomic_ring_s {
+       size_t                          seg_size;       //!< Capacity of each segment.
+       _Atomic(fr_atomic_ring_entry_t *)       head;           //!< Producer end.  Writer is the single producer,
+                                                       ///< reader is also the producer - the consumer
+                                                       ///< never loads this.
+       fr_atomic_ring_entry_t          *tail;          //!< Consumer end.  Touched only by the consumer.
+};
+
+/** Allocate a fresh segment and its embedded queue
+ *
+ * Uses the raw (non-talloc) allocator so this function is safe to call
+ * from the producer thread even when that thread cannot safely use talloc.
+ */
+static fr_atomic_ring_entry_t *atomic_ring_entry_alloc(size_t seg_size)
+{
+       fr_atomic_ring_entry_t  *s;
+
+       s = malloc(sizeof(*s));
+       if (!s) return NULL;
+
+       s->q = fr_atomic_queue_malloc(seg_size);
+       if (!s->q) {
+               free(s);
+               return NULL;
+       }
+       atomic_init(&s->next, NULL);
+
+       return s;
+}
+
+static void atomic_ring_entry_free(fr_atomic_ring_entry_t *s)
+{
+       fr_atomic_queue_free(&s->q);
+       free(s);
+}
+
+/** talloc destructor for #fr_atomic_ring_t: walk the chain and free segments */
+static int _atomic_ring_free(fr_atomic_ring_t *ring)
+{
+       fr_atomic_ring_entry_t  *s = ring->tail;
+
+       while (s) {
+               fr_atomic_ring_entry_t *next = atomic_load_explicit(&s->next, memory_order_acquire);
+
+               atomic_ring_entry_free(s);
+               s = next;
+       }
+
+       return 0;
+}
+
+/** Allocate an empty SPSC ring
+ *
+ * @param[in] ctx      talloc ctx that owns the ring handle (segments live
+ *                     outside talloc; they are freed by the ring's
+ *                     destructor).
+ * @param[in] seg_size Per-segment capacity.  Rounded up to a power of 2.
+ * @return
+ *     - NULL on error.
+ *     - A ring containing one initial (empty) segment.
+ */
+fr_atomic_ring_t *fr_atomic_ring_alloc(TALLOC_CTX *ctx, size_t seg_size)
+{
+       fr_atomic_ring_t        *ring;
+       fr_atomic_ring_entry_t  *seg;
+
+       if (seg_size == 0) return NULL;
+
+       ring = talloc(ctx, fr_atomic_ring_t);
+       if (!ring) return NULL;
+
+       seg = atomic_ring_entry_alloc(seg_size);
+       if (!seg) {
+               talloc_free(ring);
+               return NULL;
+       }
+
+       ring->seg_size = seg_size;
+       ring->tail = seg;
+       atomic_init(&ring->head, seg);
+       talloc_set_destructor(ring, _atomic_ring_free);
+
+       return ring;
+}
+
+/** Free the ring and all remaining segments
+ *
+ * Equivalent to `talloc_free()` on the ring, but nulls the caller's
+ * handle in the style of #fr_atomic_queue_free.
+ */
+void fr_atomic_ring_free(fr_atomic_ring_t **ring_p)
+{
+       if (!*ring_p) return;
+
+       talloc_free(*ring_p);
+       *ring_p = NULL;
+}
+
+/** Push a pointer into the ring; allocate a new segment on overflow
+ *
+ * Single-producer only.  Must not be called concurrently with itself.
+ *
+ * @param[in] ring     Ring to push into.
+ * @param[in] data     Value to push (must be non-NULL).
+ * @return
+ *     - true on success.
+ *     - false if both the current segment is full and a new segment
+ *       could not be allocated.
+ */
+bool fr_atomic_ring_push(fr_atomic_ring_t *ring, void *data)
+{
+       fr_atomic_ring_entry_t  *h;
+       fr_atomic_ring_entry_t  *n;
+
+       h = atomic_load_explicit(&ring->head, memory_order_relaxed);
+
+       if (likely(fr_atomic_queue_push(h->q, data))) return true;
+
+       n = atomic_ring_entry_alloc(ring->seg_size);
+       if (unlikely(!n)) return false;
+
+       /*
+        *      Publish ordering matters: the consumer only inspects `h->next`
+        *      and advances past `h` once it sees a non-NULL value there.
+        *      Release here pairs with acquire in fr_atomic_ring_pop.
+        */
+       atomic_store_explicit(&h->next, n, memory_order_release);
+       atomic_store_explicit(&ring->head, n, memory_order_relaxed);
+
+       return fr_atomic_queue_push(n->q, data);
+}
+
+/** Pop a pointer from the ring, advancing past drained segments
+ *
+ * Single-consumer only.  Must not be called concurrently with itself.
+ *
+ * @param[in] ring     Ring to pop from.
+ * @param[out] p_data  Where to write the popped value on success.
+ * @return
+ *     - true if a value was popped.
+ *     - false if the ring is currently empty.
+ */
+bool fr_atomic_ring_pop(fr_atomic_ring_t *ring, void **p_data)
+{
+       fr_atomic_ring_entry_t  *cur;
+       fr_atomic_ring_entry_t  *n;
+       fr_atomic_ring_entry_t  *old;
+
+       for (;;) {
+               cur = ring->tail;
+
+               if (likely(fr_atomic_queue_pop(cur->q, p_data))) return true;
+
+               /*
+                *      Empty from our point of view.  If the producer hasn't
+                *      sealed this segment there might be pushes in our
+                *      future - return and let the caller come back.
+                */
+               n = atomic_load_explicit(&cur->next, memory_order_acquire);
+               if (!n) return false;
+
+               /*
+                *      Sealed.  One more pop to drain anything the producer
+                *      committed before sealing but after our first (empty)
+                *      pop observation.  Without this re-check, late commits
+                *      in the (empty-observation, seal-observation) window
+                *      would be stranded when we advance past `cur`.
+                */
+               if (fr_atomic_queue_pop(cur->q, p_data)) return true;
+
+               old = cur;
+               ring->tail = n;
+               atomic_ring_entry_free(old);
+               /* loop to pop from the new tail */
+       }
+}
+
 #ifdef WITH_VERIFY_PTR
 /** Check the talloc chunk is still valid
  *
index 20a1b1d610f082fba62a6e641917dbb3ce94f2bf..3a566defb63f15aa51287e250b83ac3320ddca85 100644 (file)
@@ -39,12 +39,31 @@ extern "C" {
 
 typedef struct fr_atomic_queue_s fr_atomic_queue_t;
 
-fr_atomic_queue_t      *fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size);
+fr_atomic_queue_t      *fr_atomic_queue_talloc(TALLOC_CTX *ctx, size_t size);
+fr_atomic_queue_t      *fr_atomic_queue_malloc(size_t size);
 void                   fr_atomic_queue_free(fr_atomic_queue_t **aq);
 bool                   fr_atomic_queue_push(fr_atomic_queue_t *aq, void *data);
 bool                   fr_atomic_queue_pop(fr_atomic_queue_t *aq, void **p_data);
 size_t                 fr_atomic_queue_size(fr_atomic_queue_t *aq);
 
+/** Unbounded segmented single-producer / single-consumer queue
+ *
+ * Internally a linked list of fixed-size `fr_atomic_queue_t` segments.
+ * When the producer's current segment fills, a new segment is malloc'd
+ * and linked in; the consumer drains segments in order and frees them
+ * as it advances.  Allocation on the producer side uses raw malloc
+ * (not talloc), so the producer thread is free to be one that cannot
+ * safely use talloc (e.g. a library-owned callback thread).
+ *
+ * Exactly one producer and one consumer.  Not safe for MPMC use.
+ */
+typedef struct fr_atomic_ring_s fr_atomic_ring_t;
+
+fr_atomic_ring_t       *fr_atomic_ring_alloc(TALLOC_CTX *ctx, size_t seg_size);
+void                   fr_atomic_ring_free(fr_atomic_ring_t **ring);
+bool                   fr_atomic_ring_push(fr_atomic_ring_t *ring, void *data);
+bool                   fr_atomic_ring_pop(fr_atomic_ring_t *ring, void **p_data);
+
 #ifdef WITH_VERIFY_PTR
 void                   fr_atomic_queue_verify(fr_atomic_queue_t *aq);
 #endif
index d6f3e31b83f774bebcfa7d3d2cb46109cc6bb828..6082b1d335e42aaa1273197c0c084aff3bc5bedd 100644 (file)
@@ -195,13 +195,13 @@ fr_channel_t *fr_channel_create(TALLOC_CTX *ctx, fr_control_t *requestor, fr_con
        ch->end[TO_RESPONDER].direction = TO_RESPONDER;
        ch->end[TO_REQUESTOR].direction = TO_REQUESTOR;
 
-       ch->end[TO_RESPONDER].aq = fr_atomic_queue_alloc(ch, ATOMIC_QUEUE_SIZE);
+       ch->end[TO_RESPONDER].aq = fr_atomic_queue_talloc(ch, ATOMIC_QUEUE_SIZE);
        if (!ch->end[TO_RESPONDER].aq) {
                talloc_free(ch);
                goto nomem;
        }
 
-       ch->end[TO_REQUESTOR].aq = fr_atomic_queue_alloc(ch, ATOMIC_QUEUE_SIZE);
+       ch->end[TO_REQUESTOR].aq = fr_atomic_queue_talloc(ch, ATOMIC_QUEUE_SIZE);
        if (!ch->end[TO_REQUESTOR].aq) {
                talloc_free(ch);
                goto nomem;
index 185169b17dd661f116d60479bf7f36b666af85f4..fdbd6a4f007de249e202c3bb878d5bdadfc04aba 100644 (file)
@@ -340,7 +340,7 @@ static fr_coord_t *fr_coord_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_coor
        };
 
        /* Allocate atomic queue / control for receiving messages from workers */
-       aq = fr_atomic_queue_alloc(coord, FR_CONTROL_MAX_MESSAGES);
+       aq = fr_atomic_queue_talloc(coord, FR_CONTROL_MAX_MESSAGES);
        if (!aq) {
                fr_strerror_const("Failed creating worker -> coordinator atomic queue");
        fail:
@@ -354,7 +354,7 @@ static fr_coord_t *fr_coord_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_coor
        }
 
        /* Allocate atomic queue for workers sending data to coordinators */
-       coord->coord_recv_aq = fr_atomic_queue_alloc(coord, FR_CONTROL_MAX_MESSAGES);
+       coord->coord_recv_aq = fr_atomic_queue_talloc(coord, FR_CONTROL_MAX_MESSAGES);
        if (!coord->coord_recv_aq) {
                fr_strerror_const("Failed creating worker -> coordinator data atomic queue");
                goto fail;
@@ -679,8 +679,8 @@ fr_coord_worker_t *fr_coord_attach(TALLOC_CTX *ctx, fr_event_list_t *el, fr_coor
                return NULL;
        }
 
-       aq = fr_atomic_queue_alloc(cw, 1024);
-       cw->worker_recv_aq = fr_atomic_queue_alloc(cw, FR_CONTROL_MAX_MESSAGES);
+       aq = fr_atomic_queue_talloc(cw, 1024);
+       cw->worker_recv_aq = fr_atomic_queue_talloc(cw, FR_CONTROL_MAX_MESSAGES);
        cw->worker_recv_control = fr_control_create(cw, el, aq, 0);
        cw->worker_send_rb = fr_ring_buffer_create(cw, FR_CONTROL_MAX_MESSAGES * FR_CONTROL_MAX_SIZE);
        cw->worker_send_ms = fr_message_set_create(cw, FR_CONTROL_MAX_MESSAGES, sizeof(fr_coord_data_t),
index ee549e34c1f29d9761ef7bfbb97b1cfb70b2b1e8..10abdd7b075988fbd9c92956c6c620c3628dcdb3 100644 (file)
@@ -1948,7 +1948,7 @@ fr_network_t *fr_network_create(TALLOC_CTX *ctx, fr_event_list_t *el, char const
        nr->signal_pipe[1] = -1;
        if (config) nr->config = *config;
 
-       nr->aq_control = fr_atomic_queue_alloc(nr, 1024);
+       nr->aq_control = fr_atomic_queue_talloc(nr, 1024);
        if (!nr->aq_control) {
                talloc_free(nr);
                return NULL;
index b57f38a51836ae1a1da21a9308b38160a164b5e4..30f27099061ef8b093a67e5d9f037d4c22bdb3cf 100644 (file)
@@ -1399,7 +1399,7 @@ nomem:
         */
        memset(&worker->tracking, 0, sizeof(worker->tracking));
 
-       worker->aq_control = fr_atomic_queue_alloc(worker, 1024);
+       worker->aq_control = fr_atomic_queue_talloc(worker, 1024);
        if (!worker->aq_control) {
                fr_strerror_const("Failed creating atomic queue");
        fail:
index 4fa365b0a13ba97d6ebb7411c17e671f7204db5b..a585c8c8346e2d972a7c00d88068f97503e73ee0 100644 (file)
@@ -2,6 +2,7 @@ TEST    := test.bin
 
 FILES  := \
        atomic_queue_test       \
+       atomic_ring_test        \
        control_test            \
        message_set_test        \
        radclient               \
index e7a73d26d939bc98f0a6c3cc586c0e945dd6e70c..016220362274d85f9e29943cd6a2dafdb9e95f6b 100644 (file)
@@ -1,4 +1,4 @@
-SUBMAKEFILES := ring_buffer_test.mk message_set_test.mk atomic_queue_test.mk control_test.mk
+SUBMAKEFILES := ring_buffer_test.mk message_set_test.mk atomic_queue_test.mk atomic_ring_test.mk control_test.mk
 
 #
 #  These require pthread.
index 1325db7eb7977919432cf2e47f0a279a550b6249..286be521a2ad42fb17e8ab5d8d31f418a287489f 100644 (file)
@@ -84,7 +84,7 @@ int main(int argc, char *argv[])
        argv += (optind - 1);
 #endif
 
-       aq = fr_atomic_queue_alloc(autofree, size);
+       aq = fr_atomic_queue_talloc(autofree, size);
 
 #ifndef NDEBUG
        if (debug_lvl) {
diff --git a/src/tests/util/atomic_ring_test.c b/src/tests/util/atomic_ring_test.c
new file mode 100644 (file)
index 0000000..bc35823
--- /dev/null
@@ -0,0 +1,293 @@
+/*
+ * atomic_ring_test.c  Tests for the segmented SPSC atomic ring
+ *
+ * Version:    $Id$
+ *
+ *   This program is free software; you can redistribute it and/or modify
+ *   it under the terms of the GNU General Public License as published by
+ *   the Free Software Foundation; either version 2 of the License, or
+ *   (at your option) any later version.
+ *
+ *   This program is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with this program; if not, write to the Free Software
+ *   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
+ */
+
+RCSID("$Id$")
+
+#include <freeradius-devel/io/atomic_queue.h>
+#include <freeradius-devel/util/debug.h>
+
+#include <inttypes.h>
+#include <pthread.h>
+#include <stdatomic.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+/**********************************************************************/
+typedef struct request_s request_t;
+void request_verify(UNUSED char const *file, UNUSED int line, UNUSED request_t *request);
+void request_verify(UNUSED char const *file, UNUSED int line, UNUSED request_t *request) { }
+/**********************************************************************/
+
+#define TEST(_name) do { printf("  %-48s ", _name); fflush(stdout); } while (0)
+#define OK()         do { printf("ok\n"); } while (0)
+#define FAIL(fmt, ...) do { \
+       printf("FAIL\n    " fmt "\n", ##__VA_ARGS__); \
+       fr_exit_now(EXIT_FAILURE); \
+} while (0)
+
+#define CHECK(cond, fmt, ...) do { \
+       if (!(cond)) FAIL(fmt, ##__VA_ARGS__); \
+} while (0)
+
+/** 1. alloc/free round-trip with nothing ever pushed */
+static void test_alloc_free(TALLOC_CTX *ctx)
+{
+       fr_atomic_ring_t        *ring;
+
+       TEST("alloc/free empty ring");
+
+       ring = fr_atomic_ring_alloc(ctx, 4);
+       CHECK(ring != NULL, "fr_atomic_ring_alloc returned NULL");
+
+       fr_atomic_ring_free(&ring);
+       CHECK(ring == NULL, "fr_atomic_ring_free did not null the handle");
+       OK();
+}
+
+/** 2. push/pop within a single segment preserves FIFO */
+static void test_push_pop_single_segment(TALLOC_CTX *ctx)
+{
+       fr_atomic_ring_t        *ring;
+       intptr_t                i;
+       void                    *data;
+
+       TEST("push/pop within one segment preserves FIFO");
+
+       ring = fr_atomic_ring_alloc(ctx, 8);
+
+       for (i = 1; i <= 4; i++) {
+               CHECK(fr_atomic_ring_push(ring, (void *)i), "push %" PRIdPTR " failed", i);
+       }
+       for (i = 1; i <= 4; i++) {
+               CHECK(fr_atomic_ring_pop(ring, &data), "pop %" PRIdPTR " failed", i);
+               CHECK((intptr_t)data == i, "FIFO broken: expected %" PRIdPTR ", got %" PRIdPTR, i, (intptr_t)data);
+       }
+       CHECK(!fr_atomic_ring_pop(ring, &data), "pop from empty ring returned true");
+
+       fr_atomic_ring_free(&ring);
+       OK();
+}
+
+/** 3. push past segment capacity triggers new-segment allocation */
+static void test_grow_across_segments(TALLOC_CTX *ctx)
+{
+       fr_atomic_ring_t        *ring;
+       intptr_t                i;
+       void                    *data;
+       size_t const            seg_size = 4;           /* power of 2 */
+       size_t const            n = seg_size * 4;       /* force 4 segments */
+
+       TEST("push past segment capacity grows transparently");
+
+       ring = fr_atomic_ring_alloc(ctx, seg_size);
+
+       for (i = 1; i <= (intptr_t)n; i++) {
+               CHECK(fr_atomic_ring_push(ring, (void *)i), "push %" PRIdPTR " failed", i);
+       }
+       for (i = 1; i <= (intptr_t)n; i++) {
+               CHECK(fr_atomic_ring_pop(ring, &data), "pop %" PRIdPTR " failed", i);
+               CHECK((intptr_t)data == i, "FIFO broken across segments: expected %" PRIdPTR ", got %" PRIdPTR,
+                     i, (intptr_t)data);
+       }
+       CHECK(!fr_atomic_ring_pop(ring, &data), "pop from empty multi-segment ring returned true");
+
+       fr_atomic_ring_free(&ring);
+       OK();
+}
+
+/** 4. interleaved push/pop keeps FIFO and frees drained segments */
+static void test_interleaved(TALLOC_CTX *ctx)
+{
+       fr_atomic_ring_t        *ring;
+       intptr_t                push_seq = 1;
+       intptr_t                pop_seq = 1;
+       void                    *data;
+       int                     round;
+
+       TEST("interleaved push/pop keeps FIFO across segment advances");
+
+       ring = fr_atomic_ring_alloc(ctx, 4);
+
+       /*
+        *      Push batches larger than segment capacity so we cross
+        *      segment boundaries, then drain.  Many rounds to exercise
+        *      repeated grow-and-retire cycles.
+        */
+       for (round = 0; round < 32; round++) {
+               int batch = (round % 5) + 6;    /* 6..10 */
+               int i;
+
+               for (i = 0; i < batch; i++) {
+                       CHECK(fr_atomic_ring_push(ring, (void *)push_seq),
+                             "push %" PRIdPTR " failed at round %d", push_seq, round);
+                       push_seq++;
+               }
+               for (i = 0; i < batch; i++) {
+                       CHECK(fr_atomic_ring_pop(ring, &data),
+                             "pop %" PRIdPTR " failed at round %d", pop_seq, round);
+                       CHECK((intptr_t)data == pop_seq,
+                             "FIFO broken at round %d: expected %" PRIdPTR ", got %" PRIdPTR,
+                             round, pop_seq, (intptr_t)data);
+                       pop_seq++;
+               }
+               CHECK(!fr_atomic_ring_pop(ring, &data), "ring not empty after round %d", round);
+       }
+
+       fr_atomic_ring_free(&ring);
+       OK();
+}
+
+/** 5. free the ring with items still queued; must not leak/crash */
+static void test_free_nonempty(TALLOC_CTX *ctx)
+{
+       fr_atomic_ring_t        *ring;
+       intptr_t                i;
+
+       TEST("free releases all remaining segments");
+
+       ring = fr_atomic_ring_alloc(ctx, 4);
+
+       for (i = 1; i <= 32; i++) {
+               CHECK(fr_atomic_ring_push(ring, (void *)i), "push %" PRIdPTR " failed", i);
+       }
+
+       fr_atomic_ring_free(&ring);
+       OK();
+}
+
+
+/*
+ *     Threaded stress test: one producer, one consumer, N items.
+ *     Verifies that every pushed item is received in order.
+ */
+
+#define STRESS_N       200000
+
+typedef struct {
+       fr_atomic_ring_t        *ring;
+       size_t                  n;
+} stress_arg_t;
+
+static void *stress_producer(void *arg)
+{
+       stress_arg_t    *sa = arg;
+       size_t          i;
+
+       for (i = 1; i <= sa->n; i++) {
+               while (!fr_atomic_ring_push(sa->ring, (void *)(uintptr_t)i)) {
+                       /*
+                        *      push can only fail on OOM in practice; spin so
+                        *      we notice stalls rather than miscounting.
+                        */
+                       sched_yield();
+               }
+       }
+       return NULL;
+}
+
+static void *stress_consumer(void *arg)
+{
+       stress_arg_t    *sa = arg;
+       size_t          expect = 1;
+       void            *data;
+       uintptr_t       *errp;
+
+       errp = malloc(sizeof(*errp));
+       *errp = 0;
+
+       while (expect <= sa->n) {
+               if (!fr_atomic_ring_pop(sa->ring, &data)) {
+                       sched_yield();
+                       continue;
+               }
+               if ((uintptr_t)data != expect) {
+                       *errp = expect;
+                       return errp;
+               }
+               expect++;
+       }
+
+       return errp;    /* 0 on success */
+}
+
+static void test_stress_two_thread(TALLOC_CTX *ctx)
+{
+       fr_atomic_ring_t        *ring;
+       pthread_t               prod, cons;
+       stress_arg_t            sa;
+       void                    *cons_ret;
+       uintptr_t               err;
+       int                     rc;
+
+       TEST("producer/consumer stress - 200k items, 4-slot segments");
+
+       ring = fr_atomic_ring_alloc(ctx, 4);
+       sa.ring = ring;
+       sa.n = STRESS_N;
+
+       rc = pthread_create(&cons, NULL, stress_consumer, &sa);
+       CHECK(rc == 0, "pthread_create(consumer) failed: %s", strerror(rc));
+       rc = pthread_create(&prod, NULL, stress_producer, &sa);
+       CHECK(rc == 0, "pthread_create(producer) failed: %s", strerror(rc));
+
+       pthread_join(prod, NULL);
+       pthread_join(cons, &cons_ret);
+
+       err = *(uintptr_t *)cons_ret;
+       free(cons_ret);
+       CHECK(err == 0, "consumer saw out-of-order item at position %" PRIuPTR, err);
+
+       /*
+        *      Drain anything the producer left behind (shouldn't be any
+        *      at this point since consumer drained through sa.n).
+        */
+       {
+               void *data;
+               CHECK(!fr_atomic_ring_pop(ring, &data), "ring not empty after consumer finished");
+       }
+
+       fr_atomic_ring_free(&ring);
+       OK();
+}
+
+
+int main(int argc, UNUSED char *argv[])
+{
+       TALLOC_CTX      *autofree = talloc_autofree_context();
+
+       if (argc > 1) {
+               fprintf(stderr, "usage: atomic_ring_test\n");
+               return EXIT_FAILURE;
+       }
+
+       printf("atomic_ring_test:\n");
+
+       test_alloc_free(autofree);
+       test_push_pop_single_segment(autofree);
+       test_grow_across_segments(autofree);
+       test_interleaved(autofree);
+       test_free_nonempty(autofree);
+       test_stress_two_thread(autofree);
+
+       printf("  all tests passed\n");
+       return EXIT_SUCCESS;
+}
diff --git a/src/tests/util/atomic_ring_test.mk b/src/tests/util/atomic_ring_test.mk
new file mode 100644 (file)
index 0000000..18cbc7a
--- /dev/null
@@ -0,0 +1,6 @@
+TARGET                 := atomic_ring_test$(E)
+
+SOURCES                := atomic_ring_test.c
+
+TGT_PREREQS    := $(LIBFREERADIUS_SERVER) libfreeradius-io$(L)
+TGT_LDLIBS     := $(LIBS) $(PTHREADLIBS)
index 1eeac385cefd9396515ad0b28a03804fd6597090..e3a16c84b4f856b9b9698794f33c34a92225e2d9 100644 (file)
@@ -533,10 +533,10 @@ int main(int argc, char *argv[])
        kq_worker = kqueue();
        fr_assert(kq_worker >= 0);
 
-       aq_master = fr_atomic_queue_alloc(autofree, max_control_plane);
+       aq_master = fr_atomic_queue_talloc(autofree, max_control_plane);
        fr_assert(aq_master != NULL);
 
-       aq_worker = fr_atomic_queue_alloc(autofree, max_control_plane);
+       aq_worker = fr_atomic_queue_talloc(autofree, max_control_plane);
        fr_assert(aq_worker != NULL);
 
        control_master = fr_control_create(autofree, kq_master, aq_master, 1024);
index ec016b71088a9227871f7a05f0089a2daf98acf3..0de76c1c6bd755e4d6724dd04baa428d129d3676 100644 (file)
@@ -230,7 +230,7 @@ int main(int argc, char *argv[])
        aq_size = single_aq ? FR_CONTROL_MAX_MESSAGES * num_workers : FR_CONTROL_MAX_MESSAGES;
        aq = talloc_array(autofree, fr_atomic_queue_t *, num_aq);
        for (i = 0; i < num_aq; i++) {
-               aq[i] = fr_atomic_queue_alloc(aq, aq_size);
+               aq[i] = fr_atomic_queue_talloc(aq, aq_size);
                fr_assert(aq[i] != NULL);
        }
 
index 3c2c6a5c5c16782519381bc226f376e366c3bfc5..39f00c599f001f78294237f7b4cc611bbeb41527 100644 (file)
@@ -231,7 +231,7 @@ static void master_process(TALLOC_CTX *ctx)
        kq_master = kqueue();
        fr_assert(kq_master >= 0);
 
-       aq_master = fr_atomic_queue_alloc(ctx, max_control_plane);
+       aq_master = fr_atomic_queue_talloc(ctx, max_control_plane);
        fr_assert(aq_master != NULL);
 
        control_master = fr_control_create(ctx, kq_master, aq_master, 1024);
index e139cac2fd6aab63bc5b08806b73f2037d4cbcf5..87fcf50cf4406f538236303caf6880a6feae1dd1 100644 (file)
@@ -533,7 +533,7 @@ int main(int argc, char *argv[])
        kq_master = kqueue();
        fr_assert(kq_master >= 0);
 
-       aq_master = fr_atomic_queue_alloc(autofree, max_control_plane);
+       aq_master = fr_atomic_queue_talloc(autofree, max_control_plane);
        fr_assert(aq_master != NULL);
 
        control_master = fr_control_create(autofree, kq_master, aq_master, 1024);