]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: ring: make the ring reader use only absolute offsets
authorWilly Tarreau <w@1wt.eu>
Thu, 23 Feb 2023 08:53:38 +0000 (09:53 +0100)
committerWilly Tarreau <w@1wt.eu>
Mon, 25 Mar 2024 17:34:19 +0000 (17:34 +0000)
The goal is to remove references to the buffer's head and tail in the
fast path so that we can release the lock during some reads. This means
no more comparisons with b_data() nor operations relative to b_head()
will be possible anymore. As a first step we need to have an absolute
offset in the buffer, and to use b_getblk_ofs() in the applet callbacks
to retrieve the data based on this.

src/applet.c
src/log.c
src/ring.c

index 6ab092b76bd3478ec9b327eff549154328562afe..afd76f20551216ada6c2cfd09bd5bde2e2875922 100644 (file)
@@ -726,7 +726,7 @@ end:
 }
 
 /* Atomically append a line to applet <ctx>'s output, appending a trailing 'LF'.
- * The line is read from <buf> at offset <ofs> relative to the buffer's head,
+ * The line is read from <buf> at offset <ofs> relative to the buffer's origin,
  * for <len> bytes. It returns the number of bytes consumed from the input
  * buffer on success, -1 if it temporarily cannot (buffer full), -2 if it will
  * never be able to (too large msg). The input buffer is not modified. The
@@ -743,7 +743,7 @@ ssize_t applet_append_line(void *ctx, const struct buffer *buf, size_t ofs, size
        }
 
        chunk_reset(&trash);
-       b_getblk(buf, trash.area, len, ofs);
+       b_getblk_ofs(buf, trash.area, len, ofs);
        trash.data += len;
        trash.area[trash.data++] = '\n';
        if (applet_putchk(appctx, &trash) == -1)
index be44e3c2e04bcc39b689603aebdc08f9965fdfc7..d2c0f80dcd910f75d13aa821a3c0613678758d99 100644 (file)
--- a/src/log.c
+++ b/src/log.c
@@ -4347,7 +4347,7 @@ static struct applet syslog_applet = {
 
 /* Atomically append an event to applet >ctx>'s output, prepending it with its
  * size in decimal followed by a space.
- * The line is read from <buf> at offset <ofs> relative to the buffer's head,
+ * The line is read from <buf> at offset <ofs> relative to the buffer's origin,
  * for <len> bytes. It returns the number of bytes consumed from the input
  * buffer on success, -1 if it temporarily cannot (buffer full), -2 if it will
  * never be able to (too large msg). The input buffer is not modified. The
@@ -4372,7 +4372,7 @@ ssize_t syslog_applet_append_event(void *ctx, const struct buffer *buf, size_t o
                return -2;
 
        /* try to transfer it or report full */
-       trash.data += b_getblk(buf, trash.area + trash.data, len, ofs);
+       trash.data += b_getblk_ofs(buf, trash.area + trash.data, len, ofs);
        if (applet_putchk(appctx, &trash) == -1)
                return -1;
 
index 7c78b33019b3f6a9291aa9e77ae964b62f35aa47..c1e74847652ed5d0194b5f9dd91d165915f5498f 100644 (file)
@@ -343,7 +343,8 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t
        uint64_t msg_len;
        ssize_t copied;
        size_t len, cnt;
-       size_t ofs;
+       size_t ofs; /* absolute offset from the buffer's origin */
+       size_t pos; /* relative position from head (0..data-1) */
        int ret;
 
        HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock);
@@ -362,47 +363,49 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t
                HA_ATOMIC_INC(b_orig(buf) + *ofs_ptr);
        }
 
-       /* we were already there, adjust the offset to be relative to
-        * the buffer's head and remove us from the counter.
-        */
-       ofs = *ofs_ptr - b_head_ofs(buf);
-       if (*ofs_ptr < b_head_ofs(buf))
-               ofs += b_size(buf);
-
+       ofs = *ofs_ptr;
        BUG_ON(ofs >= buf->size);
-       HA_ATOMIC_DEC(b_peek(buf, ofs));
+       HA_ATOMIC_DEC(b_orig(buf) + ofs);
 
        /* in this loop, ofs always points to the counter byte that precedes
         * the message so that we can take our reference there if we have to
         * stop before the end (ret=0).
         */
        ret = 1;
-       while (ofs + 1 < b_data(buf)) {
+       while (1) {
+               /* relative position in the buffer */
+               pos = b_rel_ofs(buf, ofs);
+
+               if (pos + 1 >= b_data(buf)) {
+                       /* no more data */
+                       break;
+               }
+
                cnt = 1;
-               len = b_peek_varint(buf, ofs + cnt, &msg_len);
+               len = b_peek_varint(buf, pos + cnt, &msg_len);
                if (!len)
                        break;
                cnt += len;
-               BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
+               BUG_ON(msg_len + pos + cnt + 1 > b_data(buf));
 
                copied = msg_handler(ctx, buf, ofs + cnt, msg_len);
                if (copied == -2) {
                        /* too large a message to ever fit, let's skip it */
-                       ofs += cnt + msg_len;
-                       continue;
+                       goto skip;
                }
                else if (copied == -1) {
                        /* output full */
                        ret = 0;
                        break;
                }
-               ofs += cnt + msg_len;
+       skip:
+               ofs = b_add_ofs(buf, ofs, cnt + msg_len);
        }
 
-       HA_ATOMIC_INC(b_peek(buf, ofs));
+       HA_ATOMIC_INC(b_orig(buf) + ofs);
        if (last_ofs_ptr)
                *last_ofs_ptr = b_tail_ofs(buf);
-       *ofs_ptr = b_peek_ofs(buf, ofs);
+       *ofs_ptr = ofs;
        HA_RWLOCK_RDUNLOCK(RING_LOCK, &ring->lock);
        return ret;
 }