]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: stconn/muxes: Loop on data fast-forwarding to forward at least a buffer
authorChristopher Faulet <cfaulet@haproxy.com>
Tue, 7 Nov 2023 09:44:06 +0000 (10:44 +0100)
committerChristopher Faulet <cfaulet@haproxy.com>
Wed, 8 Nov 2023 20:14:07 +0000 (21:14 +0100)
In the mux-to-mux data forwarding, we now try, as far as possible to send at
least a buffer. Of course, if the consumer side is congested or if nothing
more can be received, we leave. But the idea is to retry to fast-forward
data if less than a buffer was forwarded. It is only performed for buffer
fast-forwarding, not splicing.

The idea behind this patch is to optimise the forwarding, when a first
forward was performed to complete a buffer with some existing data. In this
case, the amount of data forwarded is artificially limited because we are
using a non-empty buffer. But without this limitation, it is highly probable
that a full buffer could have been sent. And indeed, with H2 client, a
significant improvement was observed during our test.

To do so, .done_fastfwd() callback function must be able to deal with
interim forwards. Especially for the H2 mux, to remove H2_SF_NOTIFIED flags
on the H2S on the last call only. Otherwise, the H2 stream can be blocked by
itself because it is in the send_list. IOBUF_FL_INTERIM_FF iobuf flag is
used to notify the consumer it is not the last call. This flag is then
removed on the last call.

include/haproxy/stconn-t.h
src/mux_h1.c
src/mux_h2.c

index 78c77de9e80d0a5b35f20b49a912e6853f9b8921..ce06a5cce07137d91d11a9c3518363428115ae61 100644 (file)
@@ -33,6 +33,10 @@ enum iobuf_flags {
        IOBUF_FL_NO_FF            = 0x00000001, /* Fast-forwarding is not supported */
        IOBUF_FL_NO_SPLICING      = 0x00000002, /* Splicing is not supported or unusable for this stream */
        IOBUF_FL_FF_BLOCKED       = 0x00000004, /* Fast-forwarding is blocked (buffer allocation/full) */
+
+       IOBUF_FL_INTERIM_FF       = 0x00000008, /* Producer side warn it will immediately retry a fast-forward.
+                                                *  .done_fastfwd() on consumer side must take care of this flag
+                                                */
 };
 
 struct iobuf {
index 7ee61c393969270c7b5bc85783c1cb499d2e0328..b022c0d018d572c8fe76cb2f4a15b80e51c77cc2 100644 (file)
@@ -4588,6 +4588,9 @@ static int h1_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags)
                goto out;
        }
 
+  retry:
+       ret = 0;
+
        if (h1m->state == H1_MSG_DATA && (h1m->flags & (H1_MF_CHNK|H1_MF_CLEN)) &&  count > h1m->curr_len)
                count = h1m->curr_len;
 
@@ -4603,7 +4606,7 @@ static int h1_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags)
                /* Fast forwading is not supported by the consumer */
                h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_CANT_FASTFWD;
                TRACE_DEVEL("Fast-forwarding not supported by opposite endpoint, disable it", H1_EV_STRM_RECV, h1c->conn, h1s);
-               goto end;
+               goto out;
        }
        if (sdo->iobuf.flags & IOBUF_FL_FF_BLOCKED) {
                se_fl_set(h1s->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
@@ -4626,6 +4629,7 @@ static int h1_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags)
                        goto end;
                }
                total += ret;
+               count -= ret;
                if (!ret) {
                        TRACE_STATE("failed to receive data, subscribing", H1_EV_STRM_RECV, h1c->conn);
                        h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
@@ -4642,16 +4646,30 @@ static int h1_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags)
                }
                b_sub(sdo->iobuf.buf, sdo->iobuf.offset);
                total += ret;
+               count -= ret;
                sdo->iobuf.data += ret;
        }
 
+       /* Till now, we forwarded less than a buffer, we can immediately retry
+        * to fast-forward more data.  Instruct the consumer it is an interim
+        * fast-forward. It is of course only possible if there is still data to
+        * fast-forward (count > 0), if the previous attempt was a full success
+        * (0 > ret == try) and if we are not splicing (iobuf.buf != NULL).
+        */
+       if (ret > 0 && ret == try && count && sdo->iobuf.buf && total < b_size(sdo->iobuf.buf)) {
+               sdo->iobuf.flags |= IOBUF_FL_INTERIM_FF;
+               se_done_ff(sdo);
+               goto retry;
+       }
+
+ out:
        if (h1m->state == H1_MSG_DATA && (h1m->flags & (H1_MF_CHNK|H1_MF_CLEN))) {
                if (total > h1m->curr_len) {
                        h1s->flags |= H1S_F_PARSING_ERROR;
                        se_fl_set(h1s->sd, SE_FL_ERROR);
                        TRACE_ERROR("too much payload, more than announced",
                                    H1_EV_STRM_RECV|H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s);
-                       goto out;
+                       goto end;
                }
                h1m->curr_len -= total;
                if (!h1m->curr_len) {
@@ -4677,17 +4695,6 @@ static int h1_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags)
                }
        }
 
-       HA_ATOMIC_ADD(&h1c->px_counters->bytes_in, total);
-       ret = total;
-       se_done_ff(sdo);
-
-       if (sdo->iobuf.pipe) {
-               se_fl_set(h1s->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
-       }
-
-       TRACE_DEVEL("Data fast-forwarded", H1_EV_STRM_RECV, h1c->conn, h1s, 0, (size_t[]){ret});
-
- out:
        if (conn_xprt_read0_pending(h1c->conn)) {
                se_fl_set(h1s->sd, SE_FL_EOS);
                TRACE_STATE("report EOS to SE", H1_EV_STRM_RECV, h1c->conn, h1s);
@@ -4711,7 +4718,19 @@ static int h1_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags)
                TRACE_DEVEL("connection error", H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s);
        }
 
+
+       sdo->iobuf.flags &= ~IOBUF_FL_INTERIM_FF;
+       se_done_ff(sdo);
+
+       ret = total;
+       HA_ATOMIC_ADD(&h1c->px_counters->bytes_in, total);
+
+       if (sdo->iobuf.pipe) {
+               se_fl_set(h1s->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
+       }
+
  end:
+
        if (!(h1c->flags & H1C_F_WANT_FASTFWD)) {
                TRACE_STATE("notify the mux can't use fast-forward anymore", H1_EV_STRM_RECV, h1c->conn, h1s);
                se_fl_clr(h1s->sd, SE_FL_MAY_FASTFWD);
index 98b796b810cc3668f92abbeeb3215187aceefd8c..ab5e3cf60f75f3c634ce65cdf684ea01c56e8fb6 100644 (file)
@@ -6912,6 +6912,9 @@ static size_t h2_nego_ff(struct stconn *sc, struct buffer *input, size_t count,
        /* If we were not just woken because we wanted to send but couldn't,
         * and there's somebody else that is waiting to send, do nothing,
         * we will subscribe later and be put at the end of the list
+        *
+        * WARNING: h2_done_ff() is responsible to remove H2_SF_NOTIFIED flags
+        *          depending on iobuf flags.
         */
        if (!(h2s->flags & H2_SF_NOTIFIED) &&
            (!LIST_ISEMPTY(&h2c->send_list) || !LIST_ISEMPTY(&h2c->fctl_list))) {
@@ -6924,7 +6927,6 @@ static size_t h2_nego_ff(struct stconn *sc, struct buffer *input, size_t count,
                h2s->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
                goto end;
        }
-       h2s->flags &= ~H2_SF_NOTIFIED;
 
        if (h2s_mws(h2s) <= 0) {
                h2s->flags |= H2_SF_BLK_SFCTL;
@@ -7081,6 +7083,9 @@ static size_t h2_done_ff(struct stconn *sc)
        sd->iobuf.offset = 0;
        sd->iobuf.data = 0;
 
+       if (!(sd->iobuf.flags & IOBUF_FL_INTERIM_FF))
+           h2s->flags &= ~H2_SF_NOTIFIED;
+
        TRACE_LEAVE(H2_EV_H2S_SEND|H2_EV_STRM_SEND, h2s->h2c->conn, h2s);
        return total;
 }