]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: ring: change the ring reader to use the new vector-based API now
authorWilly Tarreau <w@1wt.eu>
Tue, 27 Feb 2024 06:58:26 +0000 (07:58 +0100)
committerWilly Tarreau <w@1wt.eu>
Mon, 25 Mar 2024 17:34:19 +0000 (17:34 +0000)
The code now looks cleaner and more easily shows what still needs to be
addressed. There are not that many changes in practice, these are mostly
mechanical, essentially hiding the buffer from the callers.

include/haproxy/applet.h
include/haproxy/log.h
include/haproxy/ring.h
src/applet.c
src/log.c
src/ring.c

index 1c88c128d439483e5a5c8d1c31e6bbf0d56604d8..b1ac7df7a84962dd04e5c407986d69c78537bc25 100644 (file)
@@ -58,7 +58,7 @@ size_t appctx_raw_snd_buf(struct appctx *appctx, struct buffer *buf, size_t coun
 size_t appctx_snd_buf(struct stconn *sc, struct buffer *buf, size_t count, unsigned int flags);
 
 int appctx_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags);
-ssize_t applet_append_line(void *ctx, const struct buffer *buf, size_t ofs, size_t len);
+ssize_t applet_append_line(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len);
 
 static inline struct appctx *appctx_new_here(struct applet *applet, struct sedesc *sedesc)
 {
index 591be3b4b535ac4437b6f5c206f90874c103f2af..c61089661f83d52b2c3e942d8caf4d7d93aead1e 100644 (file)
@@ -87,7 +87,7 @@ void app_log(struct list *loggers, struct buffer *tag, int level, const char *fo
  */
 int add_to_logformat_list(char *start, char *end, int type, struct list *list_format, char **err);
 
-ssize_t syslog_applet_append_event(void *ctx, const struct buffer *buf, size_t ofs, size_t len);
+ssize_t syslog_applet_append_event(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len);
 
 /*
  * Parse the log_format string and fill a linked list.
index 9e8970164ac69861ece4216a3994521849194847..057693c58a35f7d7c86871591de6a7d6e668f281 100644 (file)
@@ -42,7 +42,7 @@ void cli_io_release_show_ring(struct appctx *appctx);
 
 size_t ring_max_payload(const struct ring *ring);
 int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t *last_ofs_ptr, uint flags,
-                          ssize_t (*msg_handler)(void *ctx, const struct buffer *buf, size_t ofs, size_t len));
+                          ssize_t (*msg_handler)(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len));
 
 /* returns the ring storage's area */
 static inline void *ring_area(const struct ring *ring)
index afd76f20551216ada6c2cfd09bd5bde2e2875922..59838648f0705d30c1ad1055247c99cdb6a38ead 100644 (file)
@@ -24,6 +24,7 @@
 #include <haproxy/stream.h>
 #include <haproxy/task.h>
 #include <haproxy/trace.h>
+#include <haproxy/vecpair.h>
 #include <haproxy/xref.h>
 
 unsigned int nb_applets = 0;
@@ -725,15 +726,15 @@ end:
        return ret;
 }
 
-/* Atomically append a line to applet <ctx>'s output, appending a trailing 'LF'.
- * The line is read from <buf> at offset <ofs> relative to the buffer's origin,
- * for <len> bytes. It returns the number of bytes consumed from the input
- * buffer on success, -1 if it temporarily cannot (buffer full), -2 if it will
- * never be able to (too large msg). The input buffer is not modified. The
- * caller is responsible for making sure that there are at least ofs+len bytes
- * in the input buffer.
+/* Atomically append a line to applet <ctx>'s output, appending a trailing LF.
+ * The line is read from vectors <v1> and <v2> at offset <ofs> relative to the
+ * area's origin, for <len> bytes. It returns the number of bytes consumed from
+ * the input vectors on success, -1 if it temporarily cannot (buffer full), -2
+ * if it will never be able to (too large msg). The vectors are not modified.
+ * The caller is responsible for making sure that there are at least ofs+len
+ * bytes in the input vectors.
  */
-ssize_t applet_append_line(void *ctx, const struct buffer *buf, size_t ofs, size_t len)
+ssize_t applet_append_line(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len)
 {
        struct appctx *appctx = ctx;
 
@@ -743,7 +744,7 @@ ssize_t applet_append_line(void *ctx, const struct buffer *buf, size_t ofs, size
        }
 
        chunk_reset(&trash);
-       b_getblk_ofs(buf, trash.area, len, ofs);
+       vp_peek_ofs(v1, v2, ofs, trash.area, len);
        trash.data += len;
        trash.area[trash.data++] = '\n';
        if (applet_putchk(appctx, &trash) == -1)
index d2c0f80dcd910f75d13aa821a3c0613678758d99..a25a5cfd963b09f25796022793a6e55abed2a0c0 100644 (file)
--- a/src/log.c
+++ b/src/log.c
@@ -45,6 +45,7 @@
 #include <haproxy/time.h>
 #include <haproxy/hash.h>
 #include <haproxy/tools.h>
+#include <haproxy/vecpair.h>
 
 /* global recv logs counter */
 int cum_log_messages;
@@ -4346,15 +4347,14 @@ static struct applet syslog_applet = {
 };
 
 /* Atomically append an event to applet >ctx>'s output, prepending it with its
- * size in decimal followed by a space.
- * The line is read from <buf> at offset <ofs> relative to the buffer's origin,
- * for <len> bytes. It returns the number of bytes consumed from the input
- * buffer on success, -1 if it temporarily cannot (buffer full), -2 if it will
- * never be able to (too large msg). The input buffer is not modified. The
- * caller is responsible for making sure that there are at least ofs+len bytes
- * in the input buffer.
+ * size in decimal followed by a space. The line is read from vectors <v1> and
+ * <v2> at offset <ofs> relative to the area's origin, for <len> bytes. It
+ * returns the number of bytes consumed from the input vectors on success, -1
+ * if it temporarily cannot (buffer full), -2 if it will never be able to (too
+ * large msg). The input vectors are not modified. The caller is responsible for
+ * making sure that there are at least ofs+len bytes in the input buffer.
  */
-ssize_t syslog_applet_append_event(void *ctx, const struct buffer *buf, size_t ofs, size_t len)
+ssize_t syslog_applet_append_event(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len)
 {
        struct appctx *appctx = ctx;
        char *p;
@@ -4372,7 +4372,7 @@ ssize_t syslog_applet_append_event(void *ctx, const struct buffer *buf, size_t o
                return -2;
 
        /* try to transfer it or report full */
-       trash.data += b_getblk_ofs(buf, trash.area + trash.data, len, ofs);
+       trash.data += vp_peek_ofs(v1, v2, ofs, trash.area, len);
        if (applet_putchk(appctx, &trash) == -1)
                return -1;
 
index ca9a6c85311ddcc1ad5da68e19fb4f5407897c3b..442e98aef887c248093a0c4f0f1b95dfe296d732 100644 (file)
@@ -325,14 +325,11 @@ void ring_detach_appctx(struct ring *ring, struct appctx *appctx, size_t ofs)
        HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
        if (ofs != ~0) {
                /* reader was still attached */
-               if (ofs < ring_head(ring))
-                       ofs += ring_size(ring) - ring_head(ring);
-               else
-                       ofs -= ring_head(ring);
+               char *area = ring_area(ring);
 
                BUG_ON(ofs >= ring_size(ring));
                LIST_DEL_INIT(&appctx->wait_entry);
-               HA_ATOMIC_DEC(b_peek(&ring->storage->buf, ofs));
+               HA_ATOMIC_DEC(area + ofs);
        }
        HA_ATOMIC_DEC(&ring->readers_count);
        HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
@@ -374,18 +371,26 @@ int ring_attach_cli(struct ring *ring, struct appctx *appctx, uint flags)
  * if it needs to pause, 1 once finished.
  */
 int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t *last_ofs_ptr, uint flags,
-                          ssize_t (*msg_handler)(void *ctx, const struct buffer *buf, size_t ofs, size_t len))
+                          ssize_t (*msg_handler)(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len))
 {
        struct buffer *buf = &ring->storage->buf;
+       size_t head_ofs, tail_ofs;
+       size_t ring_size;
+       char *ring_area;
+       struct ist v1, v2;
        uint64_t msg_len;
-       ssize_t copied;
        size_t len, cnt;
-       size_t ofs; /* absolute offset from the buffer's origin */
-       size_t pos; /* relative position from head (0..data-1) */
+       ssize_t copied;
        int ret;
 
+       ring_area = b_orig(buf);
+       ring_size = b_size(buf);
+
        HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock);
 
+       head_ofs = b_head_ofs(buf);
+       tail_ofs = b_tail_ofs(buf);
+
        /* explanation for the initialization below: it would be better to do
         * this in the parsing function but this would occasionally result in
         * dropped events because we'd take a reference on the oldest message
@@ -395,37 +400,49 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t
         * value cannot be produced after initialization.
         */
        if (unlikely(*ofs_ptr == ~0)) {
-               /* going to the end means looking at tail-1 */
-               *ofs_ptr = b_peek_ofs(buf, (flags & RING_WF_SEEK_NEW) ? b_data(buf) - 1 : 0);
-               HA_ATOMIC_INC(b_orig(buf) + *ofs_ptr);
+               if (flags & RING_WF_SEEK_NEW) {
+                       /* going to the end means looking at tail-1 */
+                       head_ofs = tail_ofs + ring_size - 1;
+                       if (head_ofs >= ring_size)
+                               head_ofs -= ring_size;
+               }
+
+               /* make ctx->ofs relative to the beginning of the buffer now */
+               *ofs_ptr = head_ofs;
+
+               /* and reserve our slot here */
+               HA_ATOMIC_INC(ring_area + head_ofs);
        }
 
-       ofs = *ofs_ptr;
-       BUG_ON(ofs >= buf->size);
-       HA_ATOMIC_DEC(b_orig(buf) + ofs);
+       /* we have the guarantee we can restart from our own head */
+       head_ofs = *ofs_ptr;
+       BUG_ON(head_ofs >= ring_size);
+
+       HA_ATOMIC_DEC(ring_area + head_ofs);
 
-       /* in this loop, ofs always points to the counter byte that precedes
+       /* in this loop, head_ofs always points to the counter byte that precedes
         * the message so that we can take our reference there if we have to
-        * stop before the end (ret=0).
+        * stop before the end (ret=0). The reference is relative to the ring's
+        * origin, while pos is relative to the ring's head.
         */
        ret = 1;
-       while (1) {
-               /* relative position in the buffer */
-               pos = b_rel_ofs(buf, ofs);
+       vp_ring_to_data(&v1, &v2, ring_area, ring_size, head_ofs, tail_ofs);
 
-               if (pos + 1 >= b_data(buf)) {
+       while (1) {
+               if (vp_size(v1, v2) <= 1) {
                        /* no more data */
                        break;
                }
 
                cnt = 1;
-               len = b_peek_varint(buf, pos + cnt, &msg_len);
+               len = vp_peek_varint_ofs(v1, v2, cnt, &msg_len);
                if (!len)
                        break;
                cnt += len;
-               BUG_ON(msg_len + pos + cnt + 1 > b_data(buf));
 
-               copied = msg_handler(ctx, buf, ofs + cnt, msg_len);
+               BUG_ON(msg_len + cnt + 1 > vp_size(v1, v2));
+
+               copied = msg_handler(ctx, v1, v2, cnt, msg_len);
                if (copied == -2) {
                        /* too large a message to ever fit, let's skip it */
                        goto skip;
@@ -436,13 +453,15 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t
                        break;
                }
        skip:
-               ofs = b_add_ofs(buf, ofs, cnt + msg_len);
+               vp_skip(&v1, &v2, cnt + msg_len);
        }
 
-       HA_ATOMIC_INC(b_orig(buf) + ofs);
+       vp_data_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs);
+
+       HA_ATOMIC_INC(ring_area + head_ofs);
        if (last_ofs_ptr)
-               *last_ofs_ptr = b_tail_ofs(buf);
-       *ofs_ptr = ofs;
+               *last_ofs_ptr = tail_ofs;
+       *ofs_ptr = head_ofs;
        HA_RWLOCK_RDUNLOCK(RING_LOCK, &ring->lock);
        return ret;
 }