]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: mux-quic: Add consumer-side fast-forwarding support
authorChristopher Faulet <cfaulet@haproxy.com>
Fri, 27 Oct 2023 13:48:13 +0000 (15:48 +0200)
committerChristopher Faulet <cfaulet@haproxy.com>
Fri, 24 Nov 2023 06:42:43 +0000 (07:42 +0100)
The QUIC multiplexer now implements callbacks to consume fast-forwarded
data. It relies on the H3 stack to acquire the buffer and format the frame.

include/haproxy/mux_quic-t.h
src/h3.c
src/mux_quic.c

index 4fd99932e26e1d8e36fbef18fbf57ac18ed49b33..f293d96d51e15ae2870dfc9f37e1628f5df986e4 100644 (file)
@@ -189,6 +189,8 @@ struct qcc_app_ops {
        int (*attach)(struct qcs *qcs, void *conn_ctx);
        ssize_t (*decode_qcs)(struct qcs *qcs, struct buffer *b, int fin);
        size_t (*snd_buf)(struct qcs *qcs, struct htx *htx, size_t count);
+       size_t (*nego_ff)(struct qcs *qcs, size_t count);
+       size_t (*done_ff)(struct qcs *qcs);
        int (*close)(struct qcs *qcs, enum qcc_app_ops_close_side side);
        void (*detach)(struct qcs *qcs);
        int (*finalize)(void *ctx);
index ae46b353fe5cd277ae4cccc8142cc80c3db558fc..6ae6ee2119d74f735f3de02b140c2342d11cd365 100644 (file)
--- a/src/h3.c
+++ b/src/h3.c
@@ -1836,6 +1836,69 @@ static size_t h3_snd_buf(struct qcs *qcs, struct htx *htx, size_t count)
        return total;
 }
 
+static size_t h3_nego_ff(struct qcs *qcs, size_t count)
+{
+       struct buffer *res;
+       int hsize;
+       size_t sz, ret = 0;
+
+       h3_debug_printf(stderr, "%s\n", __func__);
+
+       /* FIXME: no check on ALLOC ? */
+       res = mux_get_buf(qcs);
+
+       /* h3 DATA headers : 1-byte frame type + varint frame length */
+       hsize = 1 + QUIC_VARINT_MAX_SIZE;
+       while (1) {
+               if (b_contig_space(res) >= hsize || !b_space_wraps(res))
+                       break;
+               b_slow_realign(res, trash.area, b_data(res));
+       }
+
+       /* Not enough room for headers and at least one data byte, block the
+        * stream. It is expected that the stream connector layer will subscribe
+        * on SEND.
+        */
+       if (b_contig_space(res) <= hsize) {
+               qcs->flags |= QC_SF_BLK_MROOM;
+               qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
+               goto end;
+       }
+
+       /* Cannot forward more than available room in output buffer */
+       sz = b_contig_space(res) - hsize;
+       if (count > sz)
+               count = sz;
+
+       qcs->sd->iobuf.buf = res;
+       qcs->sd->iobuf.offset = hsize;
+       qcs->sd->iobuf.data = 0;
+
+       ret = count;
+  end:
+       return ret;
+}
+
+static size_t h3_done_ff(struct qcs *qcs)
+{
+       size_t total = qcs->sd->iobuf.data;
+
+       h3_debug_printf(stderr, "%s\n", __func__);
+
+       if (qcs->sd->iobuf.data) {
+               b_sub(qcs->sd->iobuf.buf, qcs->sd->iobuf.data);
+               b_putchr(qcs->sd->iobuf.buf, 0x00); /* h3 frame type = DATA */
+               b_quic_enc_int(qcs->sd->iobuf.buf, qcs->sd->iobuf.data, QUIC_VARINT_MAX_SIZE); /* h3 frame length */
+               b_add(qcs->sd->iobuf.buf, qcs->sd->iobuf.data);
+       }
+
+       qcs->sd->iobuf.buf = NULL;
+       qcs->sd->iobuf.offset = 0;
+       qcs->sd->iobuf.data = 0;
+
+       return total;
+}
+
 /* Notify about a closure on <qcs> stream requested by the remote peer.
  *
  * Stream channel <side> is explained relative to our endpoint : WR for
@@ -2132,6 +2195,8 @@ const struct qcc_app_ops h3_ops = {
        .attach      = h3_attach,
        .decode_qcs  = h3_decode_qcs,
        .snd_buf     = h3_snd_buf,
+       .nego_ff     = h3_nego_ff,
+       .done_ff     = h3_done_ff,
        .close       = h3_close,
        .detach      = h3_detach,
        .finalize    = h3_finalize,
index 397cfb5c433a85c67510964224a5c16a884b8c9c..c22668c8349b7a0bb36f87cb21ee94c68dcd5af1 100644 (file)
@@ -21,6 +21,7 @@
 #include <haproxy/stconn.h>
 #include <haproxy/time.h>
 #include <haproxy/trace.h>
+#include <haproxy/xref.h>
 
 DECLARE_POOL(pool_head_qcc, "qcc", sizeof(struct qcc));
 DECLARE_POOL(pool_head_qcs, "qcs", sizeof(struct qcs));
@@ -2807,6 +2808,85 @@ static size_t qmux_strm_snd_buf(struct stconn *sc, struct buffer *buf,
        return ret;
 }
 
+
+static size_t qmux_nego_ff(struct stconn *sc, struct buffer *input, size_t count, unsigned int may_splice)
+{
+       struct qcs *qcs = __sc_mux_strm(sc);
+       size_t ret = 0;
+
+       TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
+
+       /* stream layer has been detached so no transfer must occur after. */
+       BUG_ON_HOT(qcs->flags & QC_SF_DETACH);
+
+       if (!qcs->qcc->app_ops->nego_ff || !qcs->qcc->app_ops->done_ff) {
+               /* Fast forwading is not supported by the QUIC application layer */
+               qcs->sd->iobuf.flags |= IOBUF_FL_NO_FF;
+               goto end;
+       }
+
+       /* Alawys disable splicing */
+       qcs->sd->iobuf.flags |= IOBUF_FL_NO_SPLICING;
+
+       ret = qcs->qcc->app_ops->nego_ff(qcs, count);
+       if (!ret)
+               goto end;
+
+       /* forward remaining input data */
+       if (b_data(input)) {
+               size_t xfer = ret;
+
+               if (xfer > b_data(input))
+                       xfer = b_data(input);
+               b_add(qcs->sd->iobuf.buf, qcs->sd->iobuf.offset);
+               qcs->sd->iobuf.data = b_xfer(qcs->sd->iobuf.buf, input, xfer);
+               b_sub(qcs->sd->iobuf.buf, qcs->sd->iobuf.offset);
+
+               /* Cannot forward more data, wait for room */
+               if (b_data(input))
+                       goto end;
+       }
+       ret -= qcs->sd->iobuf.data;
+
+ end:
+       TRACE_LEAVE(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
+       return ret;
+}
+
+static size_t qmux_done_ff(struct stconn *sc)
+{
+       struct qcs *qcs = __sc_mux_strm(sc);
+       struct qcc *qcc = qcs->qcc;
+       struct sedesc *sd = qcs->sd;
+       size_t total = 0;
+
+       TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
+
+       if (sd->iobuf.flags & IOBUF_FL_EOI)
+               qcs->flags |= QC_SF_FIN_STREAM;
+
+       if (!(qcs->flags & QC_SF_FIN_STREAM) && !sd->iobuf.data)
+               goto end;
+
+       total = qcs->qcc->app_ops->done_ff(qcs);
+
+       qcc_send_stream(qcs, 0);
+       if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND))
+               tasklet_wakeup(qcc->wait_event.tasklet);
+
+  end:
+       if (!b_data(&qcs->tx.buf))
+               b_free(&qcs->tx.buf);
+
+       TRACE_LEAVE(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
+       return total;
+}
+
+static int qmux_resume_ff(struct stconn *sc, unsigned int flags)
+{
+       return 0;
+}
+
 /* Called from the upper layer, to subscribe <es> to events <event_type>. The
  * event subscriber <es> is not allowed to change from a previous call as long
  * as at least one event is still subscribed. The <event_type> must only be a
@@ -2927,6 +3007,9 @@ static const struct mux_ops qmux_ops = {
        .detach      = qmux_strm_detach,
        .rcv_buf     = qmux_strm_rcv_buf,
        .snd_buf     = qmux_strm_snd_buf,
+       .nego_fastfwd = qmux_nego_ff,
+       .done_fastfwd = qmux_done_ff,
+       .resume_fastfwd = qmux_resume_ff,
        .subscribe   = qmux_strm_subscribe,
        .unsubscribe = qmux_strm_unsubscribe,
        .wake        = qmux_wake,