]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: sink: move the generic ring forwarder code use ring_dispatch_messages()
authorWilly Tarreau <w@1wt.eu>
Tue, 27 Feb 2024 16:01:45 +0000 (17:01 +0100)
committerWilly Tarreau <w@1wt.eu>
Mon, 25 Mar 2024 17:34:19 +0000 (17:34 +0000)
Now the code is much simpler than the ring forwarding function almost does
not need any knowledge of the structure of the ring anymore.

src/sink.c

index dcbe61e38ef0ebac8356e6dd5637ec8d9b8b887b..4d7cb9c0e3ff4468d1a269c693d844108071181f 100644 (file)
@@ -355,10 +355,7 @@ static void sink_forward_io_handler(struct appctx *appctx)
        struct sink_forward_target *sft = appctx->svcctx;
        struct sink *sink = sft->sink;
        struct ring *ring = sink->ctx.ring;
-       struct buffer *buf = &ring->buf;
-       uint64_t msg_len;
-       size_t len, cnt, ofs, last_ofs;
-       ssize_t copied;
+       size_t ofs, last_ofs;
        int ret = 0;
 
        if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR)))) {
@@ -389,68 +386,13 @@ static void sink_forward_io_handler(struct appctx *appctx)
        LIST_DEL_INIT(&appctx->wait_entry);
        HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
 
-       HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock);
-
-       /* explanation for the initialization below: it would be better to do
-        * this in the parsing function but this would occasionally result in
-        * dropped events because we'd take a reference on the oldest message
-        * and keep it while being scheduled. Thus instead let's take it the
-        * first time we enter here so that we have a chance to pass many
-        * existing messages before grabbing a reference to a location. This
-        * value cannot be produced after initialization.
-        */
-       if (unlikely(sft->ofs == ~0)) {
-               sft->ofs = b_peek_ofs(buf, 0);
-               HA_ATOMIC_INC(b_orig(buf) + sft->ofs);
-       }
-
-       /* we were already there, adjust the offset to be relative to
-        * the buffer's head and remove us from the counter.
-        */
-       ofs = sft->ofs - b_head_ofs(buf);
-       if (sft->ofs < b_head_ofs(buf))
-               ofs += b_size(buf);
-       BUG_ON(ofs >= buf->size);
-       HA_ATOMIC_DEC(b_peek(buf, ofs));
-
-       /* in this loop, ofs always points to the counter byte that precedes
-        * the message so that we can take our reference there if we have to
-        * stop before the end (ret=0).
-        */
-       ret = 1;
-       while (ofs + 1 < b_data(buf)) {
-               cnt = 1;
-               len = b_peek_varint(buf, ofs + cnt, &msg_len);
-               if (!len)
-                       break;
-               cnt += len;
-               BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
-
-               copied = applet_append_line(appctx, buf, ofs + cnt, msg_len);
-               if (copied == -2) {
-                       /* too large a message to ever fit, let's skip it */
-                       ofs += cnt + msg_len;
-                       continue;
-               }
-               else if (copied == -1) {
-                       /* output full */
-                       ret = 0;
-                       break;
-               }
-               ofs += cnt + msg_len;
-       }
-
-       HA_ATOMIC_INC(b_peek(buf, ofs));
-       last_ofs = b_tail_ofs(buf);
-       sft->ofs = b_peek_ofs(buf, ofs);
-
-       HA_RWLOCK_RDUNLOCK(RING_LOCK, &ring->lock);
+       ret = ring_dispatch_messages(ring, appctx, &sft->ofs, &last_ofs, 0, applet_append_line);
 
        if (ret) {
                /* let's be woken up once new data arrive */
                HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
                LIST_APPEND(&ring->waiters, &appctx->wait_entry);
-               ofs = b_tail_ofs(buf);
+               ofs = b_tail_ofs(&ring->buf);
                HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
                if (ofs != last_ofs) {
                        /* more data was added into the ring between the