]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: add an MPSC ring buffer implementation
authorMaxime Henrion <mhenrion@haproxy.com>
Mon, 23 Feb 2026 22:19:15 +0000 (17:19 -0500)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Thu, 30 Apr 2026 13:33:07 +0000 (15:33 +0200)
This is to be used in the QUIC code, where the multiple producers are
the listener threads, and the single consumer is the datagram handler
thread. Entries are variable-length with a size header, and are kept
contiguous in the buffer, so padding is inserted at the end when an
entry would otherwise wrap around. The size field is overloaded to also
mark padding (-1) and entries that are still free or not yet ready for
reads (0).

Headers and payloads are aligned on 8 bytes. Aligning on 16 bytes might
be beneficial on some architectures to let memcpy() use 128-bit SIMD
instructions.

The head and tail offsets are 64-bit unsigned integers, making ABA
issues from integer overflow impossible on current or near-future
hardware. Reservation uses a CAS rather than FAA because of the need to
insert padding to keep entries contiguous.

Makefile
include/haproxy/mpring.h [new file with mode: 0644]
src/mpring.c [new file with mode: 0644]

index ebfaff0a42a9a28e3728d7084ee346412458b0e2..faa2b2102b67dc74a8662dca84ad165c73844a12 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -676,7 +676,8 @@ OPTIONS_OBJS += src/mux_quic.o src/h3.o src/quic_rx.o src/quic_tx.o \
                 src/quic_cc_nocc.o src/quic_cc.o src/quic_pacing.o     \
                 src/h3_stats.o src/quic_stats.o src/qpack-enc.o                \
                 src/qpack-tbl.o src/quic_cc_drs.o src/quic_fctl.o      \
-                src/quic_enc.o src/mux_quic_qstrm.o src/xprt_qstrm.o
+                src/quic_enc.o src/mux_quic_qstrm.o src/xprt_qstrm.o   \
+                src/mpring.o
 endif
 
 ifneq ($(USE_QUIC_OPENSSL_COMPAT:0=),)
diff --git a/include/haproxy/mpring.h b/include/haproxy/mpring.h
new file mode 100644 (file)
index 0000000..52ca31d
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+ * MPSC byte ring buffer with variable sized entries.
+ */
+
+#ifndef _MPRING_H
+#define _MPRING_H
+
+#include <sys/types.h>
+
+#include <haproxy/compiler.h>
+
+struct mpring {
+       size_t capacity;
+       size_t mask;
+       uint8_t *buffer;
+       uint64_t head THREAD_ALIGNED();
+       uint64_t tail THREAD_ALIGNED();
+};
+
+/* Initialize the ring buffer. The size MUST be a power of 2, and bigger than
+ * the value of the MPRING_PAYLOAD_ALIGN macro in mpring.c (currently set to 8).
+ */
+void mpring_init(struct mpring *ring, void *buffer, size_t size);
+
+/* Reserve bytes in the buffer. Returns NULL in case of failure, and otherwise
+ * a pointer to the buffer with enough space to write <len> bytes.
+ */
+void *mpring_write_reserve(struct mpring *ring, size_t len);
+
+/* Commit data to the buffer after it was written to the pointer given by
+ * mpring_write_reserve(). The <ptr> and <len> parameters MUST be identical to
+ * the ones returned by and passed to mpring_write_reserve(), respectively.
+ */
+void mpring_write_commit(struct mpring *ring, void *ptr, size_t len);
+
+/* Convenience shorthand for when we only need to write one contiguous set of
+ * bytes to the buffer. Returns 0 in case of failure, and a non-zero value
+ * otherwise.
+ */
+int mpring_write(struct mpring *ring, const void *data, size_t len);
+
+/* Get the next entry to be read. Returns NULL if there is no data to be read,
+ * otherwise returns a pointer to that data and set the size of the entry in the
+ * <len> pointer.
+ */
+void *mpring_read_begin(struct mpring *ring, size_t *len);
+
+/* Indicate that we are done reading an entry, and that the space can be reused
+ * for new entries. This MUST be called after we are done reading an entry. The
+ * <len> parameter MUST be equal to the length given by mpring_read_begin().
+ */
+void mpring_read_end(struct mpring *ring, size_t len);
+
+#endif /* _MPRING_H */
diff --git a/src/mpring.c b/src/mpring.c
new file mode 100644 (file)
index 0000000..cbbee94
--- /dev/null
@@ -0,0 +1,166 @@
+#include <haproxy/atomic.h>
+#include <haproxy/mpring.h>
+#include <haproxy/bug.h>
+#include <haproxy/compiler.h>
+
+#include <stdint.h>
+#include <string.h>
+
+/* 16 bytes would be more wasteful but would allow 128-bit SIMD/NEON memcpy() */
+#define MPRING_PAYLOAD_ALIGN   8
+
+#define MPRING_HDR_PADDING     (-1)    /* Denotes padding space at the end of the buffer */
+#define MPRING_HDR_BUSY                0       /* No data or it is still being written */
+
+struct mpring_record {
+       /* The length or one of the two magic values above */
+       int64_t header;
+} ALIGNED(MPRING_PAYLOAD_ALIGN);
+
+/* What we call the stride is the total amount of bytes we need to store an
+ * entry, including the record header, and the padding bytes necessary to
+ * maintain proper alignment.
+ */
+#define MPRING_STRIDE_LEN(len) \
+       (sizeof(struct mpring_record) + ((len + MPRING_PAYLOAD_ALIGN - 1) & ~(MPRING_PAYLOAD_ALIGN - 1)))
+
+void mpring_init(struct mpring *ring, void *buffer, size_t size)
+{
+       /* The size of the buffer must be a power of 2 */
+       BUG_ON((size & (size - 1)) != 0);
+
+       /* And must also be bigger than the payload alignment */
+       BUG_ON(size < MPRING_PAYLOAD_ALIGN);
+
+       ring->buffer = buffer;
+       /* We have to zero the buffer to ensure that all records are marked
+        * as BUSY even if we have not written there yet.
+        */
+       memset(ring->buffer, 0, size);
+
+       ring->capacity = size;
+       ring->mask = size - 1;
+
+       ring->head = ring->tail = 0;
+}
+
+void *mpring_write_reserve(struct mpring *ring, size_t len)
+{
+       struct mpring_record *record;
+       uint64_t head, tail;
+       size_t stride, offset, padding, need;
+
+       /* Align writes to the buffer. This is both useful in order to guarantee
+        * that SIMD/NEON optimized memcpy() implementations can be used, but
+        * also required to ensure we always have space at the end of the buffer
+        * for a header to mark padding.
+        */
+       stride = MPRING_STRIDE_LEN(len);
+
+       head = _HA_ATOMIC_LOAD(&ring->head);
+       do {
+               offset = head & ring->mask;
+
+               /* Check if we have enough contiguous space */
+               padding = 0;
+               if (offset + stride > ring->capacity) {
+                       padding = ring->capacity - offset;
+               }
+
+               need = stride + padding;
+
+               tail = HA_ATOMIC_LOAD(&ring->tail);
+               if (ring->capacity < head - tail + need) {
+                       /* Not enough room */
+                       return NULL;
+               }
+       } while (!_HA_ATOMIC_CAS(&ring->head, &head, head + need));
+
+       /* Burn the rest of the buffer */
+       if (padding > 0) {
+               record = (struct mpring_record *)(ring->buffer + offset);
+               HA_ATOMIC_STORE(&record->header, MPRING_HDR_PADDING);
+
+               offset = 0;
+       }
+
+       record = (struct mpring_record *)(ring->buffer + offset);
+       _HA_ATOMIC_STORE(&record->header, MPRING_HDR_BUSY);
+
+       return record + 1;
+}
+
+void mpring_write_commit(struct mpring *ring, void *ptr, size_t len)
+{
+       struct mpring_record *record;
+
+       record = (struct mpring_record *)ptr - 1;
+       HA_ATOMIC_STORE(&record->header, len);
+}
+
+int mpring_write(struct mpring *ring, const void *data, size_t len)
+{
+       void *ptr;
+
+       ptr = mpring_write_reserve(ring, len);
+       if (!ptr)
+               return 0;
+
+       memcpy(ptr, data, len);
+
+       mpring_write_commit(ring, ptr, len);
+       return 1;
+}
+
+void *mpring_read_begin(struct mpring *ring, size_t *len)
+{
+       struct mpring_record *record;
+       uint64_t tail;
+       int64_t size;
+       size_t offset, skip;
+
+       tail = ring->tail;
+
+again:
+       offset = tail & ring->mask;
+       record = (struct mpring_record *)(ring->buffer + offset);
+       size = HA_ATOMIC_LOAD(&record->header);
+
+       if (size == MPRING_HDR_BUSY)
+               return NULL;    /* No more data to read */
+
+       if (size == MPRING_HDR_PADDING) {
+               /* Reset to 0 for next wrap-around */
+               _HA_ATOMIC_STORE(&record->header, MPRING_HDR_BUSY);
+
+               /* Skip over the padding */
+               skip = ring->capacity - offset;
+               tail += skip;
+               _HA_ATOMIC_STORE(&ring->tail, tail);
+               /* Try again with new tail */
+               goto again;
+       }
+
+       *len = size;
+       return record + 1;
+}
+
+void mpring_read_end(struct mpring *ring, size_t len)
+{
+       struct mpring_record *record;
+       uint64_t tail;
+       size_t offset, stride;
+
+       tail = _HA_ATOMIC_LOAD(&ring->tail);
+       offset = tail & ring->mask;
+       record = (struct mpring_record *)(ring->buffer + offset);
+
+       stride = MPRING_STRIDE_LEN(len);
+
+       /* Reset to 0 so all records are set to mpring_HDR_BUSY when
+        * producers wrap around and reuse this memory later.
+        */
+       memset(record, 0, stride);
+
+       HA_ATOMIC_STORE(&ring->tail, tail + stride);
+}