]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: ring: make the offset relative to the head/tail instead of absolute
authorWilly Tarreau <w@1wt.eu>
Wed, 22 Feb 2023 13:50:14 +0000 (14:50 +0100)
committerWilly Tarreau <w@1wt.eu>
Fri, 24 Feb 2023 08:26:30 +0000 (09:26 +0100)
The ring's offset currently contains a perpetually growing custor which
is the number of bytes written from the start. It's used by readers to
know where to (re)start reading from. It was made absolute because both
the head and the tail can change during writes and we needed a fixed
position to know where the reader was attached. But this is complicated,
error-prone, and limits the ability to reduce the lock's coverage. In
fact what is needed is to know where the reader is currently waiting, if
at all. And this location is exactly where it stored its count, so the
absolute position in the buffer (the seek offset from the first storage
byte) does represent exactly this, as it doesn't move (we don't realign
the buffer), and is stable regardless of how head/tail changes with writes.

This patch modifies this so that the application code now uses this
representation instead. The most noticeable change is the initialization,
where we've kept ~0 as a marker to go to the end, and it's now set to
the tail offset instead of trying to resolve the current write offset
against the current ring's position.

The offset was also used at the end of the consuming loop, to detect
if a new write had happened between the lock being released and taken
again, so as to wake the consumer(s) up again. For this we used to
take a copy of the ring->ofs before unlocking and comparing with the
new value read in the next lock. Since it's not possible to write past
the current reader's location, there's no risk of complete rollover, so
it's sufficient to check if the tail has changed.

Note that the change also has an impact on the haring consumer which
needs to adapt as well. But that's good in fact because it will rely
on one less variable, and will use offsets relative to the buffer's
head, and the change remains backward-compatible.

dev/haring/haring.c
src/dns.c
src/ring.c
src/sink.c

index 53352cbd165cbea4e9d0c9ede7f68ea965f2834d..4b09c9e096d2df387f5fe59a9d0dc2206093bfe5 100644 (file)
@@ -123,20 +123,14 @@ int dump_ring(struct ring *ring, size_t ofs, int flags)
                ofs = 0;
 
                /* going to the end means looking at tail-1 */
-               if (flags & RING_WF_SEEK_NEW)
-                       ofs += b_data(&buf) - 1;
+               ofs = (flags & RING_WF_SEEK_NEW) ? buf.data - 1 : 0;
 
                //HA_ATOMIC_INC(b_peek(&buf, ofs));
-               ofs += ring->ofs;
        }
 
        while (1) {
                //HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
 
-               /* we were already there, adjust the offset to be relative to
-                * the buffer's head and remove us from the counter.
-                */
-               ofs -= ring->ofs;
                if (ofs >= buf.size) {
                        fprintf(stderr, "FATAL error at %d\n", __LINE__);
                        return 1;
@@ -203,7 +197,6 @@ int dump_ring(struct ring *ring, size_t ofs, int flags)
                }
 
                //HA_ATOMIC_INC(b_peek(&buf, ofs));
-               ofs += ring->ofs;
                //HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
 
                if (!(flags & RING_WF_WAIT_MODE))
index dc84242e3fc914583aabfb1e5e32291c30c5084c..003f8b9cfc18841603c51e08a95927281c404c77 100644 (file)
--- a/src/dns.c
+++ b/src/dns.c
@@ -330,13 +330,11 @@ static void dns_resolve_send(struct dgram_conn *dgram)
        if (unlikely(ofs == ~0)) {
                ofs = 0;
                HA_ATOMIC_INC(b_peek(buf, ofs));
-               ofs += ring->ofs;
        }
 
        /* we were already there, adjust the offset to be relative to
         * the buffer's head and remove us from the counter.
         */
-       ofs -= ring->ofs;
        BUG_ON(ofs >= buf->size);
        HA_ATOMIC_DEC(b_peek(buf, ofs));
 
@@ -380,7 +378,6 @@ static void dns_resolve_send(struct dgram_conn *dgram)
 out:
 
        HA_ATOMIC_INC(b_peek(buf, ofs));
-       ofs += ring->ofs;
        ns->dgram->ofs_req = ofs;
        HA_RWLOCK_RDUNLOCK(DNS_LOCK, &ring->lock);
        HA_SPIN_UNLOCK(DNS_LOCK, &dgram->lock);
@@ -498,7 +495,6 @@ static void dns_session_io_handler(struct appctx *appctx)
                ofs = 0;
 
                HA_ATOMIC_INC(b_peek(buf, ofs));
-               ofs += ring->ofs;
        }
 
        /* in this loop, ofs always points to the counter byte that precedes
@@ -509,7 +505,6 @@ static void dns_session_io_handler(struct appctx *appctx)
                /* we were already there, adjust the offset to be relative to
                 * the buffer's head and remove us from the counter.
                 */
-               ofs -= ring->ofs;
                BUG_ON(ofs >= buf->size);
                HA_ATOMIC_DEC(b_peek(buf, ofs));
 
@@ -637,7 +632,6 @@ static void dns_session_io_handler(struct appctx *appctx)
                }
 
                HA_ATOMIC_INC(b_peek(buf, ofs));
-               ofs += ring->ofs;
                ds->ofs = ofs;
        }
        HA_RWLOCK_RDUNLOCK(DNS_LOCK, &ring->lock);
@@ -1129,13 +1123,11 @@ static struct task *dns_process_req(struct task *t, void *context, unsigned int
        if (unlikely(ofs == ~0)) {
                ofs = 0;
                HA_ATOMIC_INC(b_peek(buf, ofs));
-               ofs += ring->ofs;
        }
 
        /* we were already there, adjust the offset to be relative to
         * the buffer's head and remove us from the counter.
         */
-       ofs -= ring->ofs;
        BUG_ON(ofs >= buf->size);
        HA_ATOMIC_DEC(b_peek(buf, ofs));
 
@@ -1224,7 +1216,6 @@ static struct task *dns_process_req(struct task *t, void *context, unsigned int
        }
 
        HA_ATOMIC_INC(b_peek(buf, ofs));
-       ofs += ring->ofs;
        dss->ofs_req = ofs;
        HA_RWLOCK_RDUNLOCK(DNS_LOCK, &ring->lock);
 
index ae7438b3670d945bedab685069b551c0fa59bda5..b24669458f87a4586e579f277b5c8d7ea0511339 100644 (file)
@@ -31,7 +31,7 @@
 /* context used to dump the contents of a ring via "show events" or "show errors" */
 struct show_ring_ctx {
        struct ring *ring; /* ring to be dumped */
-       size_t ofs;        /* offset to restart from, ~0 = end */
+       size_t ofs;        /* storage offset to restart from; ~0=oldest */
        uint flags;        /* set of RING_WF_* */
 };
 
@@ -278,8 +278,9 @@ int ring_attach(struct ring *ring)
        return 1;
 }
 
-/* detach an appctx from a ring. The appctx is expected to be waiting at
- * offset <ofs>. Nothing is done if <ring> is NULL.
+/* detach an appctx from a ring. The appctx is expected to be waiting at offset
+ * <ofs> relative to the beginning of the storage, or ~0 if not waiting yet.
+ * Nothing is done if <ring> is NULL.
  */
 void ring_detach_appctx(struct ring *ring, struct appctx *appctx, size_t ofs)
 {
@@ -289,7 +290,11 @@ void ring_detach_appctx(struct ring *ring, struct appctx *appctx, size_t ofs)
        HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
        if (ofs != ~0) {
                /* reader was still attached */
-               ofs -= ring->ofs;
+               if (ofs < b_head_ofs(&ring->buf))
+                       ofs += b_size(&ring->buf) - b_head_ofs(&ring->buf);
+               else
+                       ofs -= b_head_ofs(&ring->buf);
+
                BUG_ON(ofs >= b_size(&ring->buf));
                LIST_DEL_INIT(&appctx->wait_entry);
                HA_ATOMIC_DEC(b_peek(&ring->buf, ofs));
@@ -340,7 +345,7 @@ int cli_io_handler_show_ring(struct appctx *appctx)
        struct stconn *sc = appctx_sc(appctx);
        struct ring *ring = ctx->ring;
        struct buffer *buf = &ring->buf;
-       size_t ofs = ctx->ofs;
+       size_t ofs;
        size_t last_ofs;
        uint64_t msg_len;
        size_t len, cnt;
@@ -363,21 +368,19 @@ int cli_io_handler_show_ring(struct appctx *appctx)
         * existing messages before grabbing a reference to a location. This
         * value cannot be produced after initialization.
         */
-       if (unlikely(ofs == ~0)) {
-               ofs = 0;
-
+       if (unlikely(ctx->ofs == ~0)) {
                /* going to the end means looking at tail-1 */
-               if (ctx->flags & RING_WF_SEEK_NEW)
-                       ofs += b_data(buf) - 1;
-
-               HA_ATOMIC_INC(b_peek(buf, ofs));
-               ofs += ring->ofs;
+               ctx->ofs = b_peek_ofs(buf, (ctx->flags & RING_WF_SEEK_NEW) ? b_data(buf) - 1 : 0);
+               HA_ATOMIC_INC(b_orig(buf) + ctx->ofs);
        }
 
        /* we were already there, adjust the offset to be relative to
         * the buffer's head and remove us from the counter.
         */
-       ofs -= ring->ofs;
+       ofs = ctx->ofs - b_head_ofs(buf);
+       if (ctx->ofs < b_head_ofs(buf))
+               ofs += b_size(buf);
+
        BUG_ON(ofs >= buf->size);
        HA_ATOMIC_DEC(b_peek(buf, ofs));
 
@@ -413,9 +416,8 @@ int cli_io_handler_show_ring(struct appctx *appctx)
        }
 
        HA_ATOMIC_INC(b_peek(buf, ofs));
-       ofs += ring->ofs;
-       last_ofs = ring->ofs;
-       ctx->ofs = ofs;
+       last_ofs = b_tail_ofs(buf);
+       ctx->ofs = b_peek_ofs(buf, ofs);
        HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
 
        if (ret && (ctx->flags & RING_WF_WAIT_MODE)) {
@@ -426,7 +428,7 @@ int cli_io_handler_show_ring(struct appctx *appctx)
                        /* let's be woken up once new data arrive */
                        HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
                        LIST_APPEND(&ring->waiters, &appctx->wait_entry);
-                       ofs = ring->ofs;
+                       ofs = b_tail_ofs(&ring->buf);
                        HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
                        if (ofs != last_ofs) {
                                /* more data was added into the ring between the
index 44ad629cdf66570896cab298f01336d3d40b53b8..c656b8d545a897166346d48575f1ad041ee0841b 100644 (file)
@@ -362,9 +362,7 @@ static void sink_forward_io_handler(struct appctx *appctx)
         */
        if (unlikely(ofs == ~0)) {
                ofs = 0;
-
                HA_ATOMIC_INC(b_peek(buf, ofs));
-               ofs += ring->ofs;
        }
 
        /* in this loop, ofs always points to the counter byte that precedes
@@ -375,7 +373,6 @@ static void sink_forward_io_handler(struct appctx *appctx)
                /* we were already there, adjust the offset to be relative to
                 * the buffer's head and remove us from the counter.
                 */
-               ofs -= ring->ofs;
                BUG_ON(ofs >= buf->size);
                HA_ATOMIC_DEC(b_peek(buf, ofs));
 
@@ -407,9 +404,8 @@ static void sink_forward_io_handler(struct appctx *appctx)
                }
 
                HA_ATOMIC_INC(b_peek(buf, ofs));
-               ofs += ring->ofs;
+               last_ofs = b_tail_ofs(buf);
                sft->ofs = ofs;
-               last_ofs = ring->ofs;
        }
        HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
 
@@ -417,7 +413,7 @@ static void sink_forward_io_handler(struct appctx *appctx)
                /* let's be woken up once new data arrive */
                HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
                LIST_APPEND(&ring->waiters, &appctx->wait_entry);
-               ofs = ring->ofs;
+               ofs = b_tail_ofs(buf);
                HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
                if (ofs != last_ofs) {
                        /* more data was added into the ring between the
@@ -502,9 +498,7 @@ static void sink_forward_oc_io_handler(struct appctx *appctx)
         */
        if (unlikely(ofs == ~0)) {
                ofs = 0;
-
                HA_ATOMIC_INC(b_peek(buf, ofs));
-               ofs += ring->ofs;
        }
 
        /* in this loop, ofs always points to the counter byte that precedes
@@ -515,7 +509,6 @@ static void sink_forward_oc_io_handler(struct appctx *appctx)
                /* we were already there, adjust the offset to be relative to
                 * the buffer's head and remove us from the counter.
                 */
-               ofs -= ring->ofs;
                BUG_ON(ofs >= buf->size);
                HA_ATOMIC_DEC(b_peek(buf, ofs));
 
@@ -551,7 +544,6 @@ static void sink_forward_oc_io_handler(struct appctx *appctx)
                }
 
                HA_ATOMIC_INC(b_peek(buf, ofs));
-               ofs += ring->ofs;
                sft->ofs = ofs;
        }
        HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);