#include <stdint.h>
#include <stdalign.h>
#include <inttypes.h>
+#include <stdlib.h>
#include <freeradius-devel/autoconf.h>
#include <freeradius-devel/io/atomic_queue.h>
///< it can end up directly after tail in memory
///< and share a cache line.
- void *chunk; //!< The start of the talloc chunk to pass to free.
- ///< We need to play tricks to get aligned memory
- ///< with talloc.
+ void *chunk; //!< The start of the talloc chunk to pass to free,
+ ///< or NULL if this queue was allocated raw via
+ ///< #fr_atomic_queue_malloc. We need to play
+ ///< tricks to get aligned memory with talloc.
alignas(CACHE_LINE_SIZE) fr_atomic_queue_entry_t entry[]; //!< The entry array, also aligned
///< to ensure it's not in the same cache
///< line as tail and size.
};
+/** Initialise the sequence numbers and head/tail on a fresh queue buffer
+ *
+ * Shared between the talloc and raw allocators. The buffer must already
+ * be cache-line aligned and sized to hold `size` entries.
+ *
+ * @param[in] aq The queue buffer to initialise.
+ * @param[in] size Entry count, already rounded up to a power of 2.
+ */
+static void atomic_queue_init(fr_atomic_queue_t *aq, size_t size)
+{
+ size_t i;
+
+ /*
+ * Initialize the array. Data is NULL, and indexes are
+ * the array entry number.
+ */
+ for (i = 0; i < size; i++) {
+ aq->entry[i].data = NULL;
+ store(aq->entry[i].seq, (int64_t)i);
+ }
+
+ aq->size = size;
+
+ store(aq->head, 0);
+ store(aq->tail, 0);
+ atomic_thread_fence(memory_order_seq_cst);
+}
+
/** Create fixed-size atomic queue
*
* @note the queue must be freed explicitly by the ctx being freed, or by using
* - NULL on error.
* - fr_atomic_queue_t *, a pointer to the allocated and initialized queue.
*/
-fr_atomic_queue_t *fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
+fr_atomic_queue_t *fr_atomic_queue_talloc(TALLOC_CTX *ctx, size_t size)
{
- size_t i;
- int64_t seq;
fr_atomic_queue_t *aq;
TALLOC_CTX *chunk;
talloc_set_name_const(chunk, "fr_atomic_queue_t");
- /*
- * Initialize the array. Data is NULL, and indexes are
- * the array entry number.
- */
- for (i = 0; i < size; i++) {
- seq = i;
+ atomic_queue_init(aq, size);
- aq->entry[i].data = NULL;
- store(aq->entry[i].seq, seq);
- }
+ return aq;
+}
- aq->size = size;
+/** Create fixed-size atomic queue outside any talloc hierarchy
+ *
+ * Backed by `posix_memalign`; the resulting queue is released with
+ * #fr_atomic_queue_free (which detects the raw allocation via a NULL
+ * `chunk` field) or plain `free()` on the queue pointer.
+ *
+ * Intended for callers that push or pop from threads where talloc is
+ * not safe (for example, library-owned callback threads).
+ *
+ * @param[in] size The number of entries in the queue.
+ * @return
+ * - NULL on error.
+ * - fr_atomic_queue_t *, a pointer to the allocated and initialized queue.
+ */
+fr_atomic_queue_t *fr_atomic_queue_malloc(size_t size)
+{
+ fr_atomic_queue_t *aq;
+ size_t bytes;
- /*
- * Set the head / tail indexes, and force other cores to
- * see the writes.
- */
- store(aq->head, 0);
- store(aq->tail, 0);
- atomic_thread_fence(memory_order_seq_cst);
+ if (size == 0) return NULL;
+
+ size = (size_t)fr_roundup_pow2_uint64((uint64_t)size);
+ bytes = sizeof(*aq) + (size) * sizeof(aq->entry[0]);
+
+ if (posix_memalign((void **)&aq, CACHE_LINE_SIZE, bytes) != 0) return NULL;
+
+ aq->chunk = NULL; /* sentinel: raw allocation, free with free() */
+ atomic_queue_init(aq, size);
return aq;
}
/** Free an atomic queue if it's not freed by ctx
*
* This function is needed because the atomic queue memory
- * must be cache line aligned.
+ * must be cache line aligned, and may live either in a talloc chunk
+ * or a raw `posix_memalign` allocation (`aq->chunk == NULL`).
*/
void fr_atomic_queue_free(fr_atomic_queue_t **aq)
{
if (!*aq) return;
- talloc_free((*aq)->chunk);
+ if ((*aq)->chunk) {
+ talloc_free((*aq)->chunk);
+ } else {
+ free(*aq);
+ }
*aq = NULL;
}
return aq->size;
}
+/*
+ * Segmented single-producer / single-consumer ring.
+ *
+ * Safety argument: one producer owns `head` and writes `seg->next`
+ * exactly once (release). One consumer owns `tail` and reads
+ * `tail->next` with acquire; by the release/acquire pair plus the
+ * invariant "no push into s after s->next is set" the consumer
+ * cannot advance past a segment that still has an in-flight push
+ * as long as it retries `pop(tail->q)` once more after observing
+ * `tail->next != NULL`.
+ */
+
+typedef struct fr_atomic_ring_entry_s fr_atomic_ring_entry_t;
+
+struct fr_atomic_ring_entry_s {
+ fr_atomic_queue_t *q; //!< Per-segment MPMC ring (used SPSC here).
+ _Atomic(fr_atomic_ring_entry_t *) next; //!< NULL until the producer seals this segment
+ ///< because it filled up and moved on to a
+ ///< fresh one.
+};
+
+struct fr_atomic_ring_s {
+ size_t seg_size; //!< Capacity of each segment.
+ _Atomic(fr_atomic_ring_entry_t *) head; //!< Producer end. Writer is the single producer,
+ ///< reader is also the producer - the consumer
+ ///< never loads this.
+ fr_atomic_ring_entry_t *tail; //!< Consumer end. Touched only by the consumer.
+};
+
+/** Allocate a fresh segment and its embedded queue
+ *
+ * Uses the raw (non-talloc) allocator so this function is safe to call
+ * from the producer thread even when that thread cannot safely use talloc.
+ */
+static fr_atomic_ring_entry_t *atomic_ring_entry_alloc(size_t seg_size)
+{
+ fr_atomic_ring_entry_t *s;
+
+ s = malloc(sizeof(*s));
+ if (!s) return NULL;
+
+ s->q = fr_atomic_queue_malloc(seg_size);
+ if (!s->q) {
+ free(s);
+ return NULL;
+ }
+ atomic_init(&s->next, NULL);
+
+ return s;
+}
+
+static void atomic_ring_entry_free(fr_atomic_ring_entry_t *s)
+{
+ fr_atomic_queue_free(&s->q);
+ free(s);
+}
+
+/** talloc destructor for #fr_atomic_ring_t: walk the chain and free segments */
+static int _atomic_ring_free(fr_atomic_ring_t *ring)
+{
+ fr_atomic_ring_entry_t *s = ring->tail;
+
+ while (s) {
+ fr_atomic_ring_entry_t *next = atomic_load_explicit(&s->next, memory_order_acquire);
+
+ atomic_ring_entry_free(s);
+ s = next;
+ }
+
+ return 0;
+}
+
+/** Allocate an empty SPSC ring
+ *
+ * @param[in] ctx talloc ctx that owns the ring handle (segments live
+ * outside talloc; they are freed by the ring's
+ * destructor).
+ * @param[in] seg_size Per-segment capacity. Rounded up to a power of 2.
+ * @return
+ * - NULL on error.
+ * - A ring containing one initial (empty) segment.
+ */
+fr_atomic_ring_t *fr_atomic_ring_alloc(TALLOC_CTX *ctx, size_t seg_size)
+{
+ fr_atomic_ring_t *ring;
+ fr_atomic_ring_entry_t *seg;
+
+ if (seg_size == 0) return NULL;
+
+ ring = talloc(ctx, fr_atomic_ring_t);
+ if (!ring) return NULL;
+
+ seg = atomic_ring_entry_alloc(seg_size);
+ if (!seg) {
+ talloc_free(ring);
+ return NULL;
+ }
+
+ ring->seg_size = seg_size;
+ ring->tail = seg;
+ atomic_init(&ring->head, seg);
+ talloc_set_destructor(ring, _atomic_ring_free);
+
+ return ring;
+}
+
+/** Free the ring and all remaining segments
+ *
+ * Equivalent to `talloc_free()` on the ring, but nulls the caller's
+ * handle in the style of #fr_atomic_queue_free.
+ */
+void fr_atomic_ring_free(fr_atomic_ring_t **ring_p)
+{
+ if (!*ring_p) return;
+
+ talloc_free(*ring_p);
+ *ring_p = NULL;
+}
+
+/** Push a pointer into the ring; allocate a new segment on overflow
+ *
+ * Single-producer only. Must not be called concurrently with itself.
+ *
+ * @param[in] ring Ring to push into.
+ * @param[in] data Value to push (must be non-NULL).
+ * @return
+ * - true on success.
+ * - false if both the current segment is full and a new segment
+ * could not be allocated.
+ */
+bool fr_atomic_ring_push(fr_atomic_ring_t *ring, void *data)
+{
+ fr_atomic_ring_entry_t *h;
+ fr_atomic_ring_entry_t *n;
+
+ h = atomic_load_explicit(&ring->head, memory_order_relaxed);
+
+ if (likely(fr_atomic_queue_push(h->q, data))) return true;
+
+ n = atomic_ring_entry_alloc(ring->seg_size);
+ if (unlikely(!n)) return false;
+
+ /*
+ * Publish ordering matters: the consumer only inspects `h->next`
+ * and advances past `h` once it sees a non-NULL value there.
+ * Release here pairs with acquire in fr_atomic_ring_pop.
+ */
+ atomic_store_explicit(&h->next, n, memory_order_release);
+ atomic_store_explicit(&ring->head, n, memory_order_relaxed);
+
+ return fr_atomic_queue_push(n->q, data);
+}
+
+/** Pop a pointer from the ring, advancing past drained segments
+ *
+ * Single-consumer only. Must not be called concurrently with itself.
+ *
+ * @param[in] ring Ring to pop from.
+ * @param[out] p_data Where to write the popped value on success.
+ * @return
+ * - true if a value was popped.
+ * - false if the ring is currently empty.
+ */
+bool fr_atomic_ring_pop(fr_atomic_ring_t *ring, void **p_data)
+{
+ fr_atomic_ring_entry_t *cur;
+ fr_atomic_ring_entry_t *n;
+ fr_atomic_ring_entry_t *old;
+
+ for (;;) {
+ cur = ring->tail;
+
+ if (likely(fr_atomic_queue_pop(cur->q, p_data))) return true;
+
+ /*
+ * Empty from our point of view. If the producer hasn't
+ * sealed this segment there might be pushes in our
+ * future - return and let the caller come back.
+ */
+ n = atomic_load_explicit(&cur->next, memory_order_acquire);
+ if (!n) return false;
+
+ /*
+ * Sealed. One more pop to drain anything the producer
+ * committed before sealing but after our first (empty)
+ * pop observation. Without this re-check, late commits
+ * in the (empty-observation, seal-observation) window
+ * would be stranded when we advance past `cur`.
+ */
+ if (fr_atomic_queue_pop(cur->q, p_data)) return true;
+
+ old = cur;
+ ring->tail = n;
+ atomic_ring_entry_free(old);
+ /* loop to pop from the new tail */
+ }
+}
+
#ifdef WITH_VERIFY_PTR
/** Check the talloc chunk is still valid
*
typedef struct fr_atomic_queue_s fr_atomic_queue_t;
-fr_atomic_queue_t *fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size);
+fr_atomic_queue_t *fr_atomic_queue_talloc(TALLOC_CTX *ctx, size_t size);
+fr_atomic_queue_t *fr_atomic_queue_malloc(size_t size);
void fr_atomic_queue_free(fr_atomic_queue_t **aq);
bool fr_atomic_queue_push(fr_atomic_queue_t *aq, void *data);
bool fr_atomic_queue_pop(fr_atomic_queue_t *aq, void **p_data);
size_t fr_atomic_queue_size(fr_atomic_queue_t *aq);
+/** Unbounded segmented single-producer / single-consumer queue
+ *
+ * Internally a linked list of fixed-size `fr_atomic_queue_t` segments.
+ * When the producer's current segment fills, a new segment is malloc'd
+ * and linked in; the consumer drains segments in order and frees them
+ * as it advances. Allocation on the producer side uses raw malloc
+ * (not talloc), so the producer thread is free to be one that cannot
+ * safely use talloc (e.g. a library-owned callback thread).
+ *
+ * Exactly one producer and one consumer. Not safe for MPMC use.
+ */
+typedef struct fr_atomic_ring_s fr_atomic_ring_t;
+
+fr_atomic_ring_t *fr_atomic_ring_alloc(TALLOC_CTX *ctx, size_t seg_size);
+void fr_atomic_ring_free(fr_atomic_ring_t **ring);
+bool fr_atomic_ring_push(fr_atomic_ring_t *ring, void *data);
+bool fr_atomic_ring_pop(fr_atomic_ring_t *ring, void **p_data);
+
#ifdef WITH_VERIFY_PTR
void fr_atomic_queue_verify(fr_atomic_queue_t *aq);
#endif
ch->end[TO_RESPONDER].direction = TO_RESPONDER;
ch->end[TO_REQUESTOR].direction = TO_REQUESTOR;
- ch->end[TO_RESPONDER].aq = fr_atomic_queue_alloc(ch, ATOMIC_QUEUE_SIZE);
+ ch->end[TO_RESPONDER].aq = fr_atomic_queue_talloc(ch, ATOMIC_QUEUE_SIZE);
if (!ch->end[TO_RESPONDER].aq) {
talloc_free(ch);
goto nomem;
}
- ch->end[TO_REQUESTOR].aq = fr_atomic_queue_alloc(ch, ATOMIC_QUEUE_SIZE);
+ ch->end[TO_REQUESTOR].aq = fr_atomic_queue_talloc(ch, ATOMIC_QUEUE_SIZE);
if (!ch->end[TO_REQUESTOR].aq) {
talloc_free(ch);
goto nomem;
};
/* Allocate atomic queue / control for receiving messages from workers */
- aq = fr_atomic_queue_alloc(coord, FR_CONTROL_MAX_MESSAGES);
+ aq = fr_atomic_queue_talloc(coord, FR_CONTROL_MAX_MESSAGES);
if (!aq) {
fr_strerror_const("Failed creating worker -> coordinator atomic queue");
fail:
}
/* Allocate atomic queue for workers sending data to coordinators */
- coord->coord_recv_aq = fr_atomic_queue_alloc(coord, FR_CONTROL_MAX_MESSAGES);
+ coord->coord_recv_aq = fr_atomic_queue_talloc(coord, FR_CONTROL_MAX_MESSAGES);
if (!coord->coord_recv_aq) {
fr_strerror_const("Failed creating worker -> coordinator data atomic queue");
goto fail;
return NULL;
}
- aq = fr_atomic_queue_alloc(cw, 1024);
- cw->worker_recv_aq = fr_atomic_queue_alloc(cw, FR_CONTROL_MAX_MESSAGES);
+ aq = fr_atomic_queue_talloc(cw, 1024);
+ cw->worker_recv_aq = fr_atomic_queue_talloc(cw, FR_CONTROL_MAX_MESSAGES);
cw->worker_recv_control = fr_control_create(cw, el, aq, 0);
cw->worker_send_rb = fr_ring_buffer_create(cw, FR_CONTROL_MAX_MESSAGES * FR_CONTROL_MAX_SIZE);
cw->worker_send_ms = fr_message_set_create(cw, FR_CONTROL_MAX_MESSAGES, sizeof(fr_coord_data_t),
nr->signal_pipe[1] = -1;
if (config) nr->config = *config;
- nr->aq_control = fr_atomic_queue_alloc(nr, 1024);
+ nr->aq_control = fr_atomic_queue_talloc(nr, 1024);
if (!nr->aq_control) {
talloc_free(nr);
return NULL;
*/
memset(&worker->tracking, 0, sizeof(worker->tracking));
- worker->aq_control = fr_atomic_queue_alloc(worker, 1024);
+ worker->aq_control = fr_atomic_queue_talloc(worker, 1024);
if (!worker->aq_control) {
fr_strerror_const("Failed creating atomic queue");
fail:
FILES := \
atomic_queue_test \
+ atomic_ring_test \
control_test \
message_set_test \
radclient \
-SUBMAKEFILES := ring_buffer_test.mk message_set_test.mk atomic_queue_test.mk control_test.mk
+SUBMAKEFILES := ring_buffer_test.mk message_set_test.mk atomic_queue_test.mk atomic_ring_test.mk control_test.mk
#
# These require pthread.
argv += (optind - 1);
#endif
- aq = fr_atomic_queue_alloc(autofree, size);
+ aq = fr_atomic_queue_talloc(autofree, size);
#ifndef NDEBUG
if (debug_lvl) {
--- /dev/null
+/*
+ * atomic_ring_test.c Tests for the segmented SPSC atomic ring
+ *
+ * Version: $Id$
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
+ */
+
+RCSID("$Id$")
+
+#include <freeradius-devel/io/atomic_queue.h>
+#include <freeradius-devel/util/debug.h>
+
+#include <inttypes.h>
+#include <pthread.h>
+#include <stdatomic.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+/**********************************************************************/
+typedef struct request_s request_t;
+void request_verify(UNUSED char const *file, UNUSED int line, UNUSED request_t *request);
+void request_verify(UNUSED char const *file, UNUSED int line, UNUSED request_t *request) { }
+/**********************************************************************/
+
+#define TEST(_name) do { printf(" %-48s ", _name); fflush(stdout); } while (0)
+#define OK() do { printf("ok\n"); } while (0)
+#define FAIL(fmt, ...) do { \
+ printf("FAIL\n " fmt "\n", ##__VA_ARGS__); \
+ fr_exit_now(EXIT_FAILURE); \
+} while (0)
+
+#define CHECK(cond, fmt, ...) do { \
+ if (!(cond)) FAIL(fmt, ##__VA_ARGS__); \
+} while (0)
+
+/** 1. alloc/free round-trip with nothing ever pushed */
+static void test_alloc_free(TALLOC_CTX *ctx)
+{
+ fr_atomic_ring_t *ring;
+
+ TEST("alloc/free empty ring");
+
+ ring = fr_atomic_ring_alloc(ctx, 4);
+ CHECK(ring != NULL, "fr_atomic_ring_alloc returned NULL");
+
+ fr_atomic_ring_free(&ring);
+ CHECK(ring == NULL, "fr_atomic_ring_free did not null the handle");
+ OK();
+}
+
+/** 2. push/pop within a single segment preserves FIFO */
+static void test_push_pop_single_segment(TALLOC_CTX *ctx)
+{
+ fr_atomic_ring_t *ring;
+ intptr_t i;
+ void *data;
+
+ TEST("push/pop within one segment preserves FIFO");
+
+ ring = fr_atomic_ring_alloc(ctx, 8);
+
+ for (i = 1; i <= 4; i++) {
+ CHECK(fr_atomic_ring_push(ring, (void *)i), "push %" PRIdPTR " failed", i);
+ }
+ for (i = 1; i <= 4; i++) {
+ CHECK(fr_atomic_ring_pop(ring, &data), "pop %" PRIdPTR " failed", i);
+ CHECK((intptr_t)data == i, "FIFO broken: expected %" PRIdPTR ", got %" PRIdPTR, i, (intptr_t)data);
+ }
+ CHECK(!fr_atomic_ring_pop(ring, &data), "pop from empty ring returned true");
+
+ fr_atomic_ring_free(&ring);
+ OK();
+}
+
+/** 3. push past segment capacity triggers new-segment allocation */
+static void test_grow_across_segments(TALLOC_CTX *ctx)
+{
+ fr_atomic_ring_t *ring;
+ intptr_t i;
+ void *data;
+ size_t const seg_size = 4; /* power of 2 */
+ size_t const n = seg_size * 4; /* force 4 segments */
+
+ TEST("push past segment capacity grows transparently");
+
+ ring = fr_atomic_ring_alloc(ctx, seg_size);
+
+ for (i = 1; i <= (intptr_t)n; i++) {
+ CHECK(fr_atomic_ring_push(ring, (void *)i), "push %" PRIdPTR " failed", i);
+ }
+ for (i = 1; i <= (intptr_t)n; i++) {
+ CHECK(fr_atomic_ring_pop(ring, &data), "pop %" PRIdPTR " failed", i);
+ CHECK((intptr_t)data == i, "FIFO broken across segments: expected %" PRIdPTR ", got %" PRIdPTR,
+ i, (intptr_t)data);
+ }
+ CHECK(!fr_atomic_ring_pop(ring, &data), "pop from empty multi-segment ring returned true");
+
+ fr_atomic_ring_free(&ring);
+ OK();
+}
+
+/** 4. interleaved push/pop keeps FIFO and frees drained segments */
+static void test_interleaved(TALLOC_CTX *ctx)
+{
+ fr_atomic_ring_t *ring;
+ intptr_t push_seq = 1;
+ intptr_t pop_seq = 1;
+ void *data;
+ int round;
+
+ TEST("interleaved push/pop keeps FIFO across segment advances");
+
+ ring = fr_atomic_ring_alloc(ctx, 4);
+
+ /*
+ * Push batches larger than segment capacity so we cross
+ * segment boundaries, then drain. Many rounds to exercise
+ * repeated grow-and-retire cycles.
+ */
+ for (round = 0; round < 32; round++) {
+ int batch = (round % 5) + 6; /* 6..10 */
+ int i;
+
+ for (i = 0; i < batch; i++) {
+ CHECK(fr_atomic_ring_push(ring, (void *)push_seq),
+ "push %" PRIdPTR " failed at round %d", push_seq, round);
+ push_seq++;
+ }
+ for (i = 0; i < batch; i++) {
+ CHECK(fr_atomic_ring_pop(ring, &data),
+ "pop %" PRIdPTR " failed at round %d", pop_seq, round);
+ CHECK((intptr_t)data == pop_seq,
+ "FIFO broken at round %d: expected %" PRIdPTR ", got %" PRIdPTR,
+ round, pop_seq, (intptr_t)data);
+ pop_seq++;
+ }
+ CHECK(!fr_atomic_ring_pop(ring, &data), "ring not empty after round %d", round);
+ }
+
+ fr_atomic_ring_free(&ring);
+ OK();
+}
+
+/** 5. free the ring with items still queued; must not leak/crash */
+static void test_free_nonempty(TALLOC_CTX *ctx)
+{
+ fr_atomic_ring_t *ring;
+ intptr_t i;
+
+ TEST("free releases all remaining segments");
+
+ ring = fr_atomic_ring_alloc(ctx, 4);
+
+ for (i = 1; i <= 32; i++) {
+ CHECK(fr_atomic_ring_push(ring, (void *)i), "push %" PRIdPTR " failed", i);
+ }
+
+ fr_atomic_ring_free(&ring);
+ OK();
+}
+
+
+/*
+ * Threaded stress test: one producer, one consumer, N items.
+ * Verifies that every pushed item is received in order.
+ */
+
+#define STRESS_N 200000
+
+typedef struct {
+ fr_atomic_ring_t *ring;
+ size_t n;
+} stress_arg_t;
+
+static void *stress_producer(void *arg)
+{
+ stress_arg_t *sa = arg;
+ size_t i;
+
+ for (i = 1; i <= sa->n; i++) {
+ while (!fr_atomic_ring_push(sa->ring, (void *)(uintptr_t)i)) {
+ /*
+ * push can only fail on OOM in practice; spin so
+ * we notice stalls rather than miscounting.
+ */
+ sched_yield();
+ }
+ }
+ return NULL;
+}
+
+static void *stress_consumer(void *arg)
+{
+ stress_arg_t *sa = arg;
+ size_t expect = 1;
+ void *data;
+ uintptr_t *errp;
+
+ errp = malloc(sizeof(*errp));
+ *errp = 0;
+
+ while (expect <= sa->n) {
+ if (!fr_atomic_ring_pop(sa->ring, &data)) {
+ sched_yield();
+ continue;
+ }
+ if ((uintptr_t)data != expect) {
+ *errp = expect;
+ return errp;
+ }
+ expect++;
+ }
+
+ return errp; /* 0 on success */
+}
+
+static void test_stress_two_thread(TALLOC_CTX *ctx)
+{
+ fr_atomic_ring_t *ring;
+ pthread_t prod, cons;
+ stress_arg_t sa;
+ void *cons_ret;
+ uintptr_t err;
+ int rc;
+
+ TEST("producer/consumer stress - 200k items, 4-slot segments");
+
+ ring = fr_atomic_ring_alloc(ctx, 4);
+ sa.ring = ring;
+ sa.n = STRESS_N;
+
+ rc = pthread_create(&cons, NULL, stress_consumer, &sa);
+ CHECK(rc == 0, "pthread_create(consumer) failed: %s", strerror(rc));
+ rc = pthread_create(&prod, NULL, stress_producer, &sa);
+ CHECK(rc == 0, "pthread_create(producer) failed: %s", strerror(rc));
+
+ pthread_join(prod, NULL);
+ pthread_join(cons, &cons_ret);
+
+ err = *(uintptr_t *)cons_ret;
+ free(cons_ret);
+ CHECK(err == 0, "consumer saw out-of-order item at position %" PRIuPTR, err);
+
+ /*
+ * Drain anything the producer left behind (shouldn't be any
+ * at this point since consumer drained through sa.n).
+ */
+ {
+ void *data;
+ CHECK(!fr_atomic_ring_pop(ring, &data), "ring not empty after consumer finished");
+ }
+
+ fr_atomic_ring_free(&ring);
+ OK();
+}
+
+
+int main(int argc, UNUSED char *argv[])
+{
+ TALLOC_CTX *autofree = talloc_autofree_context();
+
+ if (argc > 1) {
+ fprintf(stderr, "usage: atomic_ring_test\n");
+ return EXIT_FAILURE;
+ }
+
+ printf("atomic_ring_test:\n");
+
+ test_alloc_free(autofree);
+ test_push_pop_single_segment(autofree);
+ test_grow_across_segments(autofree);
+ test_interleaved(autofree);
+ test_free_nonempty(autofree);
+ test_stress_two_thread(autofree);
+
+ printf(" all tests passed\n");
+ return EXIT_SUCCESS;
+}
--- /dev/null
+TARGET := atomic_ring_test$(E)
+
+SOURCES := atomic_ring_test.c
+
+TGT_PREREQS := $(LIBFREERADIUS_SERVER) libfreeradius-io$(L)
+TGT_LDLIBS := $(LIBS) $(PTHREADLIBS)
kq_worker = kqueue();
fr_assert(kq_worker >= 0);
- aq_master = fr_atomic_queue_alloc(autofree, max_control_plane);
+ aq_master = fr_atomic_queue_talloc(autofree, max_control_plane);
fr_assert(aq_master != NULL);
- aq_worker = fr_atomic_queue_alloc(autofree, max_control_plane);
+ aq_worker = fr_atomic_queue_talloc(autofree, max_control_plane);
fr_assert(aq_worker != NULL);
control_master = fr_control_create(autofree, kq_master, aq_master, 1024);
aq_size = single_aq ? FR_CONTROL_MAX_MESSAGES * num_workers : FR_CONTROL_MAX_MESSAGES;
aq = talloc_array(autofree, fr_atomic_queue_t *, num_aq);
for (i = 0; i < num_aq; i++) {
- aq[i] = fr_atomic_queue_alloc(aq, aq_size);
+ aq[i] = fr_atomic_queue_talloc(aq, aq_size);
fr_assert(aq[i] != NULL);
}
kq_master = kqueue();
fr_assert(kq_master >= 0);
- aq_master = fr_atomic_queue_alloc(ctx, max_control_plane);
+ aq_master = fr_atomic_queue_talloc(ctx, max_control_plane);
fr_assert(aq_master != NULL);
control_master = fr_control_create(ctx, kq_master, aq_master, 1024);
kq_master = kqueue();
fr_assert(kq_master >= 0);
- aq_master = fr_atomic_queue_alloc(autofree, max_control_plane);
+ aq_master = fr_atomic_queue_talloc(autofree, max_control_plane);
fr_assert(aq_master != NULL);
control_master = fr_control_create(autofree, kq_master, aq_master, 1024);