]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: mux-pt: Add fast-forwarding support
authorChristopher Faulet <cfaulet@haproxy.com>
Fri, 6 Oct 2023 13:32:47 +0000 (15:32 +0200)
committerChristopher Faulet <cfaulet@haproxy.com>
Tue, 17 Oct 2023 16:51:13 +0000 (18:51 +0200)
The PT multiplexer now implements callbacks function to produce and consume
fast-forwarded data. Only splicing is support because the mux-pt does not
use its own buffers.

src/mux_pt.c

index 40dba18c9068e78d48219886cdb961b698d6c5d8..4b50fcdd6f5467da5bcc46bd3a2105a6b5d77045 100644 (file)
 #include <haproxy/api.h>
 #include <haproxy/buf.h>
 #include <haproxy/connection.h>
-#include <haproxy/pipe-t.h>
+#include <haproxy/pipe.h>
 #include <haproxy/stconn.h>
 #include <haproxy/stream.h>
 #include <haproxy/task.h>
 #include <haproxy/trace.h>
+#include <haproxy/xref.h>
 
 struct mux_pt_ctx {
        struct sedesc *sd;
@@ -322,6 +323,8 @@ static int mux_pt_init(struct connection *conn, struct proxy *prx, struct sessio
        }
        conn->ctx = ctx;
        se_fl_set(ctx->sd, SE_FL_RCV_MORE);
+       if (global.tune.options & GTUNE_USE_SPLICE)
+               se_fl_set(ctx->sd, SE_FL_MAY_FASTFWD);
 
        TRACE_LEAVE(PT_EV_CONN_NEW, conn);
        return 0;
@@ -562,6 +565,195 @@ static size_t mux_pt_snd_buf(struct stconn *sc, struct buffer *buf, size_t count
        return ret;
 }
 
+static inline struct sedesc *mux_pt_opposite_sd(struct mux_pt_ctx *ctx)
+{
+       struct xref *peer;
+       struct sedesc *sdo;
+
+       peer = xref_get_peer_and_lock(&ctx->sd->xref);
+       if (!peer)
+               return NULL;
+
+       sdo = container_of(peer, struct sedesc, xref);
+       xref_unlock(&ctx->sd->xref, peer);
+       return sdo;
+}
+
+static size_t mux_pt_init_ff(struct stconn *sc, struct buffer *input, size_t count, unsigned int may_splice)
+{
+       struct connection *conn = __sc_conn(sc);
+       struct mux_pt_ctx *ctx = conn->ctx;
+       size_t ret = 0;
+
+       TRACE_ENTER(PT_EV_TX_DATA, conn, sc, 0, (size_t[]){count});
+
+       /* Use kernel splicing if it is supported by the sender and if there
+        * are no input data _AND_ no output data.
+        *
+        * TODO: It may be good to add a flag to send obuf data first if any,
+        *       and then data in pipe, or the opposite. For now, it is not
+        *       supported to mix data.
+        */
+       if (!b_data(input) && may_splice) {
+               if (conn->xprt->snd_pipe && (ctx->sd->iobuf.pipe || (pipes_used < global.maxpipes && (ctx->sd->iobuf.pipe = get_pipe())))) {
+                       ctx->sd->iobuf.offset = 0;
+                       ctx->sd->iobuf.data = 0;
+                       ret = count;
+                       goto out;
+               }
+               ctx->sd->iobuf.flags |= IOBUF_FL_NO_SPLICING;
+               TRACE_DEVEL("Unable to allocate pipe for splicing, fallback to buffer", PT_EV_TX_DATA, conn, sc);
+       }
+
+       /* No buffer case */
+
+  out:
+       TRACE_LEAVE(PT_EV_TX_DATA, conn, sc, 0, (size_t[]){ret});
+       return ret;
+}
+
+static void mux_pt_done_ff(struct stconn *sc)
+{
+       struct connection *conn = __sc_conn(sc);
+       struct mux_pt_ctx *ctx = conn->ctx;
+       struct sedesc *sd = ctx->sd;
+       size_t total = 0;
+
+       TRACE_ENTER(PT_EV_TX_DATA, conn, sc);
+
+       if (sd->iobuf.pipe) {
+               total = conn->xprt->snd_pipe(conn, conn->xprt_ctx, sd->iobuf.pipe, sd->iobuf.pipe->data);
+               if (!sd->iobuf.pipe->data) {
+                       put_pipe(sd->iobuf.pipe);
+                       sd->iobuf.pipe = NULL;
+               }
+       }
+       else {
+               BUG_ON(sd->iobuf.buf);
+       }
+
+  out:
+       if (conn->flags & CO_FL_ERROR) {
+               if (conn_xprt_read0_pending(conn))
+                       se_fl_set(ctx->sd, SE_FL_EOS);
+               se_fl_set_error(ctx->sd);
+               TRACE_DEVEL("error on connection", PT_EV_TX_DATA|PT_EV_CONN_ERR, conn, sc);
+       }
+
+       TRACE_LEAVE(PT_EV_TX_DATA, conn, sc, 0, (size_t[]){total});
+}
+
+static int mux_pt_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags)
+{
+       struct connection *conn = __sc_conn(sc);
+       struct mux_pt_ctx *ctx = conn->ctx;
+       struct sedesc *sdo = NULL;
+       size_t total = 0, try = 0;
+        int ret = 0;
+
+       TRACE_ENTER(PT_EV_RX_DATA, conn, sc, 0, (size_t[]){count});
+
+       se_fl_clr(ctx->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
+       conn->flags &= ~CO_FL_WAIT_ROOM;
+       sdo = mux_pt_opposite_sd(ctx);
+       if (!sdo) {
+               TRACE_STATE("Opposite endpoint not available yet", PT_EV_RX_DATA, conn, sc);
+               goto out;
+       }
+
+       try = se_init_ff(sdo, &BUF_NULL, count, conn->xprt->rcv_pipe && !!(flags & CO_RFL_MAY_SPLICE) && !(sdo->iobuf.flags & IOBUF_FL_NO_SPLICING));
+       if (sdo->iobuf.flags & IOBUF_FL_NO_FF) {
+               /* Fast forwading is not supported by the consumer */
+               se_fl_clr(ctx->sd, SE_FL_MAY_FASTFWD);
+               TRACE_DEVEL("Fast-forwarding not supported by opposite endpoint, disable it", PT_EV_RX_DATA, conn, sc);
+               goto end;
+       }
+       if (sdo->iobuf.flags & IOBUF_FL_FF_BLOCKED) {
+               se_fl_set(ctx->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
+               TRACE_STATE("waiting for more room", PT_EV_RX_DATA|PT_EV_STRM_ERR, conn, sc);
+               goto out;
+       }
+
+       total += sdo->iobuf.data;
+
+       if (sdo->iobuf.pipe) {
+               /* Here, not data was xferred */
+               ret = conn->xprt->rcv_pipe(conn, conn->xprt_ctx, sdo->iobuf.pipe, try);
+               if (ret < 0) {
+                       TRACE_ERROR("Error when trying to fast-forward data, disable it and abort",
+                                   PT_EV_RX_DATA|PT_EV_STRM_ERR|PT_EV_CONN_ERR, conn, sc);
+                       se_fl_clr(ctx->sd, SE_FL_MAY_FASTFWD);
+                       BUG_ON(sdo->iobuf.pipe->data);
+                       put_pipe(sdo->iobuf.pipe);
+                       sdo->iobuf.pipe = NULL;
+                       goto end;
+               }
+               total += ret;
+       }
+       else {
+               BUG_ON(sdo->iobuf.buf);
+               ret = -1; /* abort splicing for now and fallback to buffer mode */
+               goto end;
+       }
+
+       ret = total;
+       se_done_ff(sdo);
+
+       if (sdo->iobuf.pipe) {
+               se_fl_set(ctx->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
+       }
+
+       TRACE_DEVEL("Data fast-forwarded", PT_EV_RX_DATA, conn, sc, 0, (size_t[]){ret});
+
+
+ out:
+       if (conn->flags & CO_FL_ERROR) {
+               if (conn_xprt_read0_pending(conn))
+                       se_fl_set(ctx->sd, SE_FL_EOS);
+               se_fl_set(ctx->sd, SE_FL_ERROR);
+               TRACE_DEVEL("error on connection", PT_EV_RX_DATA|PT_EV_CONN_ERR, conn, sc);
+       }
+       else if (conn_xprt_read0_pending(conn))  {
+               se_fl_set(ctx->sd, (SE_FL_EOS|SE_FL_EOI));
+               TRACE_DEVEL("read0 on connection", PT_EV_RX_DATA, conn, sc);
+       }
+  end:
+       TRACE_LEAVE(PT_EV_RX_DATA, conn, sc, 0, (size_t[]){ret});
+       return ret;
+}
+
+static int mux_pt_resume_fastfwd(struct stconn *sc, unsigned int flags)
+{
+       struct connection *conn = __sc_conn(sc);
+       struct mux_pt_ctx *ctx = conn->ctx;
+       struct sedesc *sd = ctx->sd;
+       size_t total = 0;
+
+       TRACE_ENTER(PT_EV_TX_DATA, conn, sc, 0, (size_t[]){flags});
+
+       if (sd->iobuf.pipe) {
+               total = conn->xprt->snd_pipe(conn, conn->xprt_ctx, sd->iobuf.pipe, sd->iobuf.pipe->data);
+               if (!sd->iobuf.pipe->data) {
+                       put_pipe(sd->iobuf.pipe);
+                       sd->iobuf.pipe = NULL;
+               }
+       }
+       else {
+               BUG_ON(sd->iobuf.buf);
+       }
+
+  out:
+       if (conn->flags & CO_FL_ERROR) {
+               if (conn_xprt_read0_pending(conn))
+                       se_fl_set(ctx->sd, SE_FL_EOS);
+               se_fl_set_error(ctx->sd);
+               TRACE_DEVEL("error on connection", PT_EV_TX_DATA|PT_EV_CONN_ERR, conn, sc);
+       }
+
+       TRACE_LEAVE(PT_EV_TX_DATA, conn, sc, 0, (size_t[]){total});
+       return total;
+}
+
 /* Called from the upper layer, to subscribe <es> to events <event_type>. The
  * event subscriber <es> is not allowed to change from a previous call as long
  * as at least one event is still subscribed. The <event_type> must only be a
@@ -608,6 +800,10 @@ const struct mux_ops mux_tcp_ops = {
        .wake = mux_pt_wake,
        .rcv_buf = mux_pt_rcv_buf,
        .snd_buf = mux_pt_snd_buf,
+       .init_fastfwd = mux_pt_init_ff,
+       .done_fastfwd = mux_pt_done_ff,
+       .fastfwd = mux_pt_fastfwd,
+       .resume_fastfwd = mux_pt_resume_fastfwd,
        .subscribe = mux_pt_subscribe,
        .unsubscribe = mux_pt_unsubscribe,
        .attach = mux_pt_attach,
@@ -629,6 +825,10 @@ const struct mux_ops mux_pt_ops = {
        .wake = mux_pt_wake,
        .rcv_buf = mux_pt_rcv_buf,
        .snd_buf = mux_pt_snd_buf,
+       .init_fastfwd = mux_pt_init_ff,
+       .done_fastfwd = mux_pt_done_ff,
+       .fastfwd = mux_pt_fastfwd,
+       .resume_fastfwd = mux_pt_resume_fastfwd,
        .subscribe = mux_pt_subscribe,
        .unsubscribe = mux_pt_unsubscribe,
        .attach = mux_pt_attach,