From: Christopher Faulet Date: Fri, 27 Oct 2023 13:48:13 +0000 (+0200) Subject: MEDIUM: mux-quic: Add consumer-side fast-forwarding support X-Git-Tag: v2.9-dev11~2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=1bcc0f8892069ad94201a04599ca8af87ba7300b;p=thirdparty%2Fhaproxy.git MEDIUM: mux-quic: Add consumer-side fast-forwarding support The QUIC multiplexer now implements callbacks to consume fast-forwarded data. It relies on the H3 stack to acquire the buffer and format the frame. --- diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index 4fd99932e2..f293d96d51 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -189,6 +189,8 @@ struct qcc_app_ops { int (*attach)(struct qcs *qcs, void *conn_ctx); ssize_t (*decode_qcs)(struct qcs *qcs, struct buffer *b, int fin); size_t (*snd_buf)(struct qcs *qcs, struct htx *htx, size_t count); + size_t (*nego_ff)(struct qcs *qcs, size_t count); + size_t (*done_ff)(struct qcs *qcs); int (*close)(struct qcs *qcs, enum qcc_app_ops_close_side side); void (*detach)(struct qcs *qcs); int (*finalize)(void *ctx); diff --git a/src/h3.c b/src/h3.c index ae46b353fe..6ae6ee2119 100644 --- a/src/h3.c +++ b/src/h3.c @@ -1836,6 +1836,69 @@ static size_t h3_snd_buf(struct qcs *qcs, struct htx *htx, size_t count) return total; } +static size_t h3_nego_ff(struct qcs *qcs, size_t count) +{ + struct buffer *res; + int hsize; + size_t sz, ret = 0; + + h3_debug_printf(stderr, "%s\n", __func__); + + /* FIXME: no check on ALLOC ? */ + res = mux_get_buf(qcs); + + /* h3 DATA headers : 1-byte frame type + varint frame length */ + hsize = 1 + QUIC_VARINT_MAX_SIZE; + while (1) { + if (b_contig_space(res) >= hsize || !b_space_wraps(res)) + break; + b_slow_realign(res, trash.area, b_data(res)); + } + + /* Not enough room for headers and at least one data byte, block the + * stream. It is expected that the stream connector layer will subscribe + * on SEND. + */ + if (b_contig_space(res) <= hsize) { + qcs->flags |= QC_SF_BLK_MROOM; + qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED; + goto end; + } + + /* Cannot forward more than available room in output buffer */ + sz = b_contig_space(res) - hsize; + if (count > sz) + count = sz; + + qcs->sd->iobuf.buf = res; + qcs->sd->iobuf.offset = hsize; + qcs->sd->iobuf.data = 0; + + ret = count; + end: + return ret; +} + +static size_t h3_done_ff(struct qcs *qcs) +{ + size_t total = qcs->sd->iobuf.data; + + h3_debug_printf(stderr, "%s\n", __func__); + + if (qcs->sd->iobuf.data) { + b_sub(qcs->sd->iobuf.buf, qcs->sd->iobuf.data); + b_putchr(qcs->sd->iobuf.buf, 0x00); /* h3 frame type = DATA */ + b_quic_enc_int(qcs->sd->iobuf.buf, qcs->sd->iobuf.data, QUIC_VARINT_MAX_SIZE); /* h3 frame length */ + b_add(qcs->sd->iobuf.buf, qcs->sd->iobuf.data); + } + + qcs->sd->iobuf.buf = NULL; + qcs->sd->iobuf.offset = 0; + qcs->sd->iobuf.data = 0; + + return total; +} + /* Notify about a closure on stream requested by the remote peer. * * Stream channel is explained relative to our endpoint : WR for @@ -2132,6 +2195,8 @@ const struct qcc_app_ops h3_ops = { .attach = h3_attach, .decode_qcs = h3_decode_qcs, .snd_buf = h3_snd_buf, + .nego_ff = h3_nego_ff, + .done_ff = h3_done_ff, .close = h3_close, .detach = h3_detach, .finalize = h3_finalize, diff --git a/src/mux_quic.c b/src/mux_quic.c index 397cfb5c43..c22668c834 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -21,6 +21,7 @@ #include #include #include +#include DECLARE_POOL(pool_head_qcc, "qcc", sizeof(struct qcc)); DECLARE_POOL(pool_head_qcs, "qcs", sizeof(struct qcs)); @@ -2807,6 +2808,85 @@ static size_t qmux_strm_snd_buf(struct stconn *sc, struct buffer *buf, return ret; } + +static size_t qmux_nego_ff(struct stconn *sc, struct buffer *input, size_t count, unsigned int may_splice) +{ + struct qcs *qcs = __sc_mux_strm(sc); + size_t ret = 0; + + TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); + + /* stream layer has been detached so no transfer must occur after. */ + BUG_ON_HOT(qcs->flags & QC_SF_DETACH); + + if (!qcs->qcc->app_ops->nego_ff || !qcs->qcc->app_ops->done_ff) { + /* Fast forwading is not supported by the QUIC application layer */ + qcs->sd->iobuf.flags |= IOBUF_FL_NO_FF; + goto end; + } + + /* Alawys disable splicing */ + qcs->sd->iobuf.flags |= IOBUF_FL_NO_SPLICING; + + ret = qcs->qcc->app_ops->nego_ff(qcs, count); + if (!ret) + goto end; + + /* forward remaining input data */ + if (b_data(input)) { + size_t xfer = ret; + + if (xfer > b_data(input)) + xfer = b_data(input); + b_add(qcs->sd->iobuf.buf, qcs->sd->iobuf.offset); + qcs->sd->iobuf.data = b_xfer(qcs->sd->iobuf.buf, input, xfer); + b_sub(qcs->sd->iobuf.buf, qcs->sd->iobuf.offset); + + /* Cannot forward more data, wait for room */ + if (b_data(input)) + goto end; + } + ret -= qcs->sd->iobuf.data; + + end: + TRACE_LEAVE(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); + return ret; +} + +static size_t qmux_done_ff(struct stconn *sc) +{ + struct qcs *qcs = __sc_mux_strm(sc); + struct qcc *qcc = qcs->qcc; + struct sedesc *sd = qcs->sd; + size_t total = 0; + + TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); + + if (sd->iobuf.flags & IOBUF_FL_EOI) + qcs->flags |= QC_SF_FIN_STREAM; + + if (!(qcs->flags & QC_SF_FIN_STREAM) && !sd->iobuf.data) + goto end; + + total = qcs->qcc->app_ops->done_ff(qcs); + + qcc_send_stream(qcs, 0); + if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND)) + tasklet_wakeup(qcc->wait_event.tasklet); + + end: + if (!b_data(&qcs->tx.buf)) + b_free(&qcs->tx.buf); + + TRACE_LEAVE(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); + return total; +} + +static int qmux_resume_ff(struct stconn *sc, unsigned int flags) +{ + return 0; +} + /* Called from the upper layer, to subscribe to events . The * event subscriber is not allowed to change from a previous call as long * as at least one event is still subscribed. The must only be a @@ -2927,6 +3007,9 @@ static const struct mux_ops qmux_ops = { .detach = qmux_strm_detach, .rcv_buf = qmux_strm_rcv_buf, .snd_buf = qmux_strm_snd_buf, + .nego_fastfwd = qmux_nego_ff, + .done_fastfwd = qmux_done_ff, + .resume_fastfwd = qmux_resume_ff, .subscribe = qmux_strm_subscribe, .unsubscribe = qmux_strm_unsubscribe, .wake = qmux_wake,