* @brief Thread-safe queues.
* @file io/atomic_queue.c
*
+ * This is an implementation of a bounded MPMC ring buffer with per-slot
+ * sequence numbers, described by Dmitry Vyukov.
+ *
* @copyright 2016 Alan DeKok (aland@freeradius.org)
* @copyright 2016 Alister Winfield
*/
+
RCSID("$Id$")
#include <stdint.h>
#include <freeradius-devel/autoconf.h>
#include <freeradius-devel/io/atomic_queue.h>
#include <freeradius-devel/util/talloc.h>
+#include <freeradius-devel/util/math.h>
/*
* Some macros to make our life easier.
*
*/
struct fr_atomic_queue_s {
- alignas(CACHE_LINE_SIZE) atomic_int64_t head; //!< Head, aligned bytes to ensure
- ///< it's in a different cache line to tail
- ///< to reduce memory contention.
- atomic_int64_t tail;
+ alignas(CACHE_LINE_SIZE) atomic_int64_t head; //!< Position of the producer.
+ ///< Cache aligned bytes to ensure it's in a
+ ///< different cache line to tail to reduce
+ ///< memory contention.
+
+ alignas(CACHE_LINE_SIZE) atomic_int64_t tail; //!< Position of the consumer.
+ ///< Cache aligned bytes to ensure it's in a
+ ///< different cache line to tail to reduce
+ ///< memory contention.
+ ///< Reads may still need to occur from size
+ ///< whilst the producer is writing to tail.
- size_t size;
+ size_t size; //!< The length of the queue. This is static.
- void *chunk; //!< To pass to free. The non-aligned address.
+ void *chunk; //!< The start of the talloc chunk to pass to free.
+ ///< We need to play tricks to get aligned memory
+ ///< with talloc.
alignas(CACHE_LINE_SIZE) fr_atomic_queue_entry_t entry[]; //!< The entry array, also aligned
///< to ensure it's not in the same cache
if (size == 0) return NULL;
+ /*
+ * Roundup to the next power of 2 so we don't need modulo.
+ */
+ size = (size_t)fr_roundup_pow2_uint64((uint64_t)size);
+
/*
* Allocate a contiguous blob for the header and queue.
* This helps with memory locality.
aq->size = size;
/*
- * Set the head / tail indexes, and force other CPUs to
+ * Set the head / tail indexes, and force other cores to
* see the writes.
*/
store(aq->head, 0);
if (!data) return false;
+ /*
+ * Here we're essentially racing with other producers
+ * to find the current head of the queue.
+ *
+ * 1. Load the current head (which may be incremented
+ * by another producer before we enter the loop).
+ * 2. Find the head entry, which is head modulo the
+ * queue size (keeps head looping through the queue).
+ * 3. Read the sequence number of the entry.
+ * The sequence numbers are initialised to the index
+ * of the entries in the queue. Each pass of the
+ * producer increments the sequence number by one.
+ * 4.
+ * a. If the sequence number is equal to the head,
+ * then we can use the entry. Increment the head
+ * so other producers know we've used it.
+ * b. If it's greater than head, the producer has
+ * already written to this entry, so we need to re-load
+ * the head and race other producers again.
+ * c. If it's less than the head, the entry has not yet
+ * been consumed, and the queue is full.
+ */
head = load(aq->head);
/*
for (;;) {
int64_t seq, diff;
- entry = &aq->entry[ head % aq->size ];
+ /*
+ * Alloc function guarantees size is a power
+ * of 2, so we can use this hack to avoid
+ * modulo.
+ */
+ entry = &aq->entry[head & (aq->size - 1)];
seq = acquire(entry->seq);
diff = (seq - head);
/*
- * head is larger than the current entry, the queue is full.
+ * head is larger than the current entry, the
+ * queue is full.
+ * The consumer will set entry seq to entry +
+ * queue size, marking it as free for the
+ * producer to use.
*/
if (diff < 0) {
#if 0
}
/*
- * Someone else has already written to this entry. Get the new head pointer, and continue.
+ * Someone else has already written to this entry
+ * we lost the race, try again.
*/
if (diff > 0) {
head = load(aq->head);
}
/*
- * We have the possibility that we can write to
- * this entry. Try it. If the write succeeds,
- * we're done. If the write fails, re-load the
- * current head entry, and continue.
+ * See if we can increment the head value
+ * (and check it's still at its old value).
+ *
+ * This means no two producers can have the same
+ * entry in the queue, because they can't exit
+ * the loop until they've incremented the head
+ * successfully.
+ *
+ * When we fail, we don't increment head before
+ * trying again, because we need to detect queue
+ * full conditions.
*/
if (cas_incr(aq->head, head)) {
break;
* other CPUs.
*/
entry->data = data;
+
+ /*
+ * Technically head can overflow. Practically, with a
+ * 3GHz CPU, doing nothing but incrementing head
+ * uncontended it'd take about 100 years for this to
+ * happen. But hey, maybe someone invents an optical
+ * CPU with a significantly higher clock speed, it's ok
+ * for us to exit every 9 quintillion packets.
+ */
+#ifdef __clang_analyzer__
+ if (unlikely((head + 1) == INT64_MAX)) exit(1);
+#endif
+
+ /*
+ * Mark up the entry as written to. Any other producer
+ * attempting to write will see (diff > 0) and retry.
+ */
store(entry->seq, head + 1);
return true;
}
for (;;) {
int64_t diff;
- entry = &aq->entry[ tail % aq->size ];
+ entry = &aq->entry[tail % aq->size];
seq = acquire(entry->seq);
diff = (seq - (tail + 1));
/*
- * tail is smaller than the current entry, the queue is empty.
+ * Tail is smaller than the current entry,
+ * the queue is empty.
+ *
+ * Tail should now be equal to the head.
*/
if (diff < 0) {
return false;
}
+ /*
+ * Tail is now ahead of us.
+ * Something else has consumed it.
+ * We lost the race with another consumer.
+ */
if (diff > 0) {
tail = load(aq->tail);
continue;
}
+ /*
+ * Same deal as push.
+ * After this point we own the entry.
+ */
if (cas_incr(aq->tail, tail)) {
break;
}
/*
* Set the current entry to past the end of the queue.
- * i.e. it's unused.
+ * This is equal to what head will be on its next pass
+ * through the queue. This marks the entry as free.
*/
- seq = tail + aq->size;
- store(entry->seq, seq);
+ store(entry->seq, tail + aq->size);
return true;
}