From: Christopher Faulet Date: Fri, 6 Oct 2023 13:32:47 +0000 (+0200) Subject: MEDIUM: mux-pt: Add fast-forwarding support X-Git-Tag: v2.9-dev8~20 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=ec22d3102d44a2ead71321584a65ed6d3f029e8a;p=thirdparty%2Fhaproxy.git MEDIUM: mux-pt: Add fast-forwarding support The PT multiplexer now implements callbacks function to produce and consume fast-forwarded data. Only splicing is support because the mux-pt does not use its own buffers. --- diff --git a/src/mux_pt.c b/src/mux_pt.c index 40dba18c90..4b50fcdd6f 100644 --- a/src/mux_pt.c +++ b/src/mux_pt.c @@ -13,11 +13,12 @@ #include #include #include -#include +#include #include #include #include #include +#include struct mux_pt_ctx { struct sedesc *sd; @@ -322,6 +323,8 @@ static int mux_pt_init(struct connection *conn, struct proxy *prx, struct sessio } conn->ctx = ctx; se_fl_set(ctx->sd, SE_FL_RCV_MORE); + if (global.tune.options & GTUNE_USE_SPLICE) + se_fl_set(ctx->sd, SE_FL_MAY_FASTFWD); TRACE_LEAVE(PT_EV_CONN_NEW, conn); return 0; @@ -562,6 +565,195 @@ static size_t mux_pt_snd_buf(struct stconn *sc, struct buffer *buf, size_t count return ret; } +static inline struct sedesc *mux_pt_opposite_sd(struct mux_pt_ctx *ctx) +{ + struct xref *peer; + struct sedesc *sdo; + + peer = xref_get_peer_and_lock(&ctx->sd->xref); + if (!peer) + return NULL; + + sdo = container_of(peer, struct sedesc, xref); + xref_unlock(&ctx->sd->xref, peer); + return sdo; +} + +static size_t mux_pt_init_ff(struct stconn *sc, struct buffer *input, size_t count, unsigned int may_splice) +{ + struct connection *conn = __sc_conn(sc); + struct mux_pt_ctx *ctx = conn->ctx; + size_t ret = 0; + + TRACE_ENTER(PT_EV_TX_DATA, conn, sc, 0, (size_t[]){count}); + + /* Use kernel splicing if it is supported by the sender and if there + * are no input data _AND_ no output data. + * + * TODO: It may be good to add a flag to send obuf data first if any, + * and then data in pipe, or the opposite. For now, it is not + * supported to mix data. + */ + if (!b_data(input) && may_splice) { + if (conn->xprt->snd_pipe && (ctx->sd->iobuf.pipe || (pipes_used < global.maxpipes && (ctx->sd->iobuf.pipe = get_pipe())))) { + ctx->sd->iobuf.offset = 0; + ctx->sd->iobuf.data = 0; + ret = count; + goto out; + } + ctx->sd->iobuf.flags |= IOBUF_FL_NO_SPLICING; + TRACE_DEVEL("Unable to allocate pipe for splicing, fallback to buffer", PT_EV_TX_DATA, conn, sc); + } + + /* No buffer case */ + + out: + TRACE_LEAVE(PT_EV_TX_DATA, conn, sc, 0, (size_t[]){ret}); + return ret; +} + +static void mux_pt_done_ff(struct stconn *sc) +{ + struct connection *conn = __sc_conn(sc); + struct mux_pt_ctx *ctx = conn->ctx; + struct sedesc *sd = ctx->sd; + size_t total = 0; + + TRACE_ENTER(PT_EV_TX_DATA, conn, sc); + + if (sd->iobuf.pipe) { + total = conn->xprt->snd_pipe(conn, conn->xprt_ctx, sd->iobuf.pipe, sd->iobuf.pipe->data); + if (!sd->iobuf.pipe->data) { + put_pipe(sd->iobuf.pipe); + sd->iobuf.pipe = NULL; + } + } + else { + BUG_ON(sd->iobuf.buf); + } + + out: + if (conn->flags & CO_FL_ERROR) { + if (conn_xprt_read0_pending(conn)) + se_fl_set(ctx->sd, SE_FL_EOS); + se_fl_set_error(ctx->sd); + TRACE_DEVEL("error on connection", PT_EV_TX_DATA|PT_EV_CONN_ERR, conn, sc); + } + + TRACE_LEAVE(PT_EV_TX_DATA, conn, sc, 0, (size_t[]){total}); +} + +static int mux_pt_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags) +{ + struct connection *conn = __sc_conn(sc); + struct mux_pt_ctx *ctx = conn->ctx; + struct sedesc *sdo = NULL; + size_t total = 0, try = 0; + int ret = 0; + + TRACE_ENTER(PT_EV_RX_DATA, conn, sc, 0, (size_t[]){count}); + + se_fl_clr(ctx->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM); + conn->flags &= ~CO_FL_WAIT_ROOM; + sdo = mux_pt_opposite_sd(ctx); + if (!sdo) { + TRACE_STATE("Opposite endpoint not available yet", PT_EV_RX_DATA, conn, sc); + goto out; + } + + try = se_init_ff(sdo, &BUF_NULL, count, conn->xprt->rcv_pipe && !!(flags & CO_RFL_MAY_SPLICE) && !(sdo->iobuf.flags & IOBUF_FL_NO_SPLICING)); + if (sdo->iobuf.flags & IOBUF_FL_NO_FF) { + /* Fast forwading is not supported by the consumer */ + se_fl_clr(ctx->sd, SE_FL_MAY_FASTFWD); + TRACE_DEVEL("Fast-forwarding not supported by opposite endpoint, disable it", PT_EV_RX_DATA, conn, sc); + goto end; + } + if (sdo->iobuf.flags & IOBUF_FL_FF_BLOCKED) { + se_fl_set(ctx->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM); + TRACE_STATE("waiting for more room", PT_EV_RX_DATA|PT_EV_STRM_ERR, conn, sc); + goto out; + } + + total += sdo->iobuf.data; + + if (sdo->iobuf.pipe) { + /* Here, not data was xferred */ + ret = conn->xprt->rcv_pipe(conn, conn->xprt_ctx, sdo->iobuf.pipe, try); + if (ret < 0) { + TRACE_ERROR("Error when trying to fast-forward data, disable it and abort", + PT_EV_RX_DATA|PT_EV_STRM_ERR|PT_EV_CONN_ERR, conn, sc); + se_fl_clr(ctx->sd, SE_FL_MAY_FASTFWD); + BUG_ON(sdo->iobuf.pipe->data); + put_pipe(sdo->iobuf.pipe); + sdo->iobuf.pipe = NULL; + goto end; + } + total += ret; + } + else { + BUG_ON(sdo->iobuf.buf); + ret = -1; /* abort splicing for now and fallback to buffer mode */ + goto end; + } + + ret = total; + se_done_ff(sdo); + + if (sdo->iobuf.pipe) { + se_fl_set(ctx->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM); + } + + TRACE_DEVEL("Data fast-forwarded", PT_EV_RX_DATA, conn, sc, 0, (size_t[]){ret}); + + + out: + if (conn->flags & CO_FL_ERROR) { + if (conn_xprt_read0_pending(conn)) + se_fl_set(ctx->sd, SE_FL_EOS); + se_fl_set(ctx->sd, SE_FL_ERROR); + TRACE_DEVEL("error on connection", PT_EV_RX_DATA|PT_EV_CONN_ERR, conn, sc); + } + else if (conn_xprt_read0_pending(conn)) { + se_fl_set(ctx->sd, (SE_FL_EOS|SE_FL_EOI)); + TRACE_DEVEL("read0 on connection", PT_EV_RX_DATA, conn, sc); + } + end: + TRACE_LEAVE(PT_EV_RX_DATA, conn, sc, 0, (size_t[]){ret}); + return ret; +} + +static int mux_pt_resume_fastfwd(struct stconn *sc, unsigned int flags) +{ + struct connection *conn = __sc_conn(sc); + struct mux_pt_ctx *ctx = conn->ctx; + struct sedesc *sd = ctx->sd; + size_t total = 0; + + TRACE_ENTER(PT_EV_TX_DATA, conn, sc, 0, (size_t[]){flags}); + + if (sd->iobuf.pipe) { + total = conn->xprt->snd_pipe(conn, conn->xprt_ctx, sd->iobuf.pipe, sd->iobuf.pipe->data); + if (!sd->iobuf.pipe->data) { + put_pipe(sd->iobuf.pipe); + sd->iobuf.pipe = NULL; + } + } + else { + BUG_ON(sd->iobuf.buf); + } + + out: + if (conn->flags & CO_FL_ERROR) { + if (conn_xprt_read0_pending(conn)) + se_fl_set(ctx->sd, SE_FL_EOS); + se_fl_set_error(ctx->sd); + TRACE_DEVEL("error on connection", PT_EV_TX_DATA|PT_EV_CONN_ERR, conn, sc); + } + + TRACE_LEAVE(PT_EV_TX_DATA, conn, sc, 0, (size_t[]){total}); + return total; +} + /* Called from the upper layer, to subscribe to events . The * event subscriber is not allowed to change from a previous call as long * as at least one event is still subscribed. The must only be a @@ -608,6 +800,10 @@ const struct mux_ops mux_tcp_ops = { .wake = mux_pt_wake, .rcv_buf = mux_pt_rcv_buf, .snd_buf = mux_pt_snd_buf, + .init_fastfwd = mux_pt_init_ff, + .done_fastfwd = mux_pt_done_ff, + .fastfwd = mux_pt_fastfwd, + .resume_fastfwd = mux_pt_resume_fastfwd, .subscribe = mux_pt_subscribe, .unsubscribe = mux_pt_unsubscribe, .attach = mux_pt_attach, @@ -629,6 +825,10 @@ const struct mux_ops mux_pt_ops = { .wake = mux_pt_wake, .rcv_buf = mux_pt_rcv_buf, .snd_buf = mux_pt_snd_buf, + .init_fastfwd = mux_pt_init_ff, + .done_fastfwd = mux_pt_done_ff, + .fastfwd = mux_pt_fastfwd, + .resume_fastfwd = mux_pt_resume_fastfwd, .subscribe = mux_pt_subscribe, .unsubscribe = mux_pt_unsubscribe, .attach = mux_pt_attach,