]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: haterm: Add support for 0-copy data forwading and option to disable it
authorChristopher Faulet <cfaulet@haproxy.com>
Thu, 2 Apr 2026 19:44:05 +0000 (21:44 +0200)
committerChristopher Faulet <cfaulet@haproxy.com>
Fri, 3 Apr 2026 08:46:15 +0000 (10:46 +0200)
The support for the zero-copy data forwarding was added and enabled by
default. The command line option '-dZ' was also added to disable the
feature.

Concretely, when haterm pushes the response payload, if the zero-copy
forwarding is supported, a dedicated function is used to do so.
hstream_ff_snd() will rely on se_nego_ff() to know how many data can send
and at the end, on se_done_ff() to really send data.

hstream_add_ff_data() function was added to perform the raw copy of the
payload in the sedesc I/O buffer.

src/haterm.c
src/haterm_init.c

index 2d3aaaec2a573ce044ffcc22ac184f1eba245159..d4a2e80a404ebb6853d5c505c66152179c1d85b1 100644 (file)
@@ -5,6 +5,7 @@
 #include <haproxy/hstream-t.h>
 #include <haproxy/http_htx.h>
 #include <haproxy/http.h>
+#include <haproxy/istbuf.h>
 #include <haproxy/pool.h>
 #include <haproxy/proxy-t.h>
 #include <haproxy/sc_strm.h>
@@ -61,6 +62,7 @@ static char common_chunk_resp[RESPSIZE];
 static char *random_resp;
 static int random_resp_len = RESPSIZE;
 
+static size_t hstream_add_ff_data(struct hstream *hs, struct sedesc *sd, unsigned long long len);
 static size_t hstream_add_htx_data(struct hstream *hs, struct htx *htx, unsigned long long len);
 
 #define TRACE_SOURCE &trace_haterm
@@ -298,6 +300,55 @@ static int hstream_htx_buf_rcv(struct connection *conn, struct hstream *hs)
        goto leave;
 }
 
+static int hstream_ff_snd(struct connection *conn, struct hstream *hs)
+{
+       size_t len;
+       unsigned int nego_flags = NEGO_FF_FL_NONE;
+       struct sedesc *sd = hs->sc->sedesc;
+       int ret = 0;
+
+       /* First try to resume FF*/
+       if (se_have_ff_data(sd)) {
+               ret = CALL_MUX_WITH_RET(conn->mux, resume_fastfwd(hs->sc, 0));
+               if (ret > 0)
+                       sd->iobuf.flags &= ~IOBUF_FL_FF_BLOCKED;
+       }
+
+       nego_flags |= NEGO_FF_FL_EXACT_SIZE;
+       len = se_nego_ff(sd, &BUF_NULL, hs->to_write, nego_flags);
+       if (sd->iobuf.flags & IOBUF_FL_NO_FF) {
+               TRACE_DEVEL("Fast-forwarding not supported by endpoint, disable it", HS_EV_HSTRM_RESP, hs);
+               goto abort;
+       }
+       if (sd->iobuf.flags & IOBUF_FL_FF_BLOCKED) {
+               TRACE_DEVEL("Fast-forwarding blocked", HS_EV_HSTRM_RESP, hs);
+               goto out;
+       }
+
+       hs->to_write -= hstream_add_ff_data(hs, sd, len);
+       if (!hs->to_write)
+               sd->iobuf.flags |= IOBUF_FL_EOI;
+
+       if (se_done_ff(sd) != 0 || !(sd->iobuf.flags & (IOBUF_FL_FF_BLOCKED|IOBUF_FL_FF_WANT_ROOM))) {
+               /* Something was forwarding or the consumer states it is not
+                * blocked anyore, don't reclaim more room */
+       }
+
+       if (se_have_ff_data(sd)) {
+               TRACE_DEVEL("data not fully sent, wait", HS_EV_HSTRM_SEND, hs);
+               conn->mux->subscribe(hs->sc, SUB_RETRY_SEND, &hs->sc->wait_event);
+       }
+       else if (hs->to_write) {
+               TRACE_STATE("waking up task", HS_EV_HSTRM_IO_CB, hs);
+               task_wakeup(hs->task, TASK_WOKEN_IO);
+       }
+  out:
+       return ret;
+
+  abort:
+       return -1;
+}
+
 /* Send HTX data prepared for <hs> haterm stream from <conn> connection */
 static int hstream_htx_buf_snd(struct connection *conn, struct hstream *hs)
 {
@@ -449,6 +500,44 @@ err:
        goto leave;
 }
 
+static size_t hstream_add_ff_data(struct hstream *hs, struct sedesc *sd, unsigned long long len)
+{
+       size_t ret;
+       char *data_ptr;
+       unsigned int offset;
+       char *buffer;
+       size_t buffer_len;
+       int modulo;
+
+       TRACE_ENTER(HS_EV_HSTRM_ADD_DATA, hs);
+       b_add(sd->iobuf.buf, sd->iobuf.offset);
+
+       if (hs->req_random) {
+               buffer = random_resp;
+               buffer_len = random_resp_len;
+               modulo = random_resp_len;
+       }
+       else {
+               buffer = common_response;
+               buffer_len = sizeof(common_response);
+               modulo = HS_COMMON_RESPONSE_LINE_SZ;
+       }
+
+       offset = (hs->req_size - len) % modulo;
+       data_ptr = buffer + offset;
+       if (len > (unsigned long long)(buffer_len - offset))
+               len = (unsigned long long)(buffer_len - offset);
+
+       ret = b_putist(sd->iobuf.buf, ist2(data_ptr, len));
+       if (!ret)
+               TRACE_STATE("unable to fast-forward payload", HS_EV_HSTRM_ADD_DATA, hs);
+
+       b_sub(sd->iobuf.buf, sd->iobuf.offset);
+       sd->iobuf.data += ret;
+       TRACE_LEAVE(HS_EV_HSTRM_ADD_DATA, hs);
+       return ret;
+}
+
 /* Add data to HTX response buffer from pre-built responses */
 static size_t hstream_add_htx_data(struct hstream *hs, struct htx *htx, unsigned long long len)
 {
@@ -758,6 +847,14 @@ static inline int hstream_must_drain(struct hstream *hs)
        return ret;
 }
 
+static inline int hstream_is_fastfwd_supported(struct hstream *hs)
+{
+       return (!(global.tune.no_zero_copy_fwd & NO_ZERO_COPY_FWD) &&
+               sc_ep_test(hs->sc, SE_FL_MAY_FASTFWD_CONS) &&
+               !(hs->sc->sedesc->iobuf.flags & IOBUF_FL_NO_FF) &&
+               !hs->req_chunked && hs->to_write);
+}
+
 /* haterm stream processing task */
 static struct task *process_hstream(struct task *t, void *context, unsigned int state)
 {
@@ -854,6 +951,7 @@ static struct task *process_hstream(struct task *t, void *context, unsigned int
        else {
                struct buffer *buf;
                struct htx *htx;
+               int ret = 0;
 
                /* HTX RX part */
                if (hstream_must_drain(hs)) {
@@ -875,7 +973,19 @@ static struct task *process_hstream(struct task *t, void *context, unsigned int
                if (!hstream_sl_hdrs_htx_buf_snd(hs, conn))
                        goto err;
 
-               /* HTX TX part */
+               /* TX part */
+               if (hstream_is_fastfwd_supported(hs)) {
+                       if (!htx_is_empty(htxbuf(&hs->res)))
+                               goto flush_res_buf;
+                       if (!hs->to_write && !se_have_ff_data(hs->sc->sedesc))
+                               goto out;
+
+                       ret = hstream_ff_snd(conn, hs);
+                       if (ret >= 0)
+                               goto send_done;
+                       /* fallback to regular send */
+               }
+
                if (!hs->to_write && htx_is_empty(htxbuf(&hs->res)))
                        goto out;
 
@@ -891,8 +1001,11 @@ static struct task *process_hstream(struct task *t, void *context, unsigned int
                if (hs->to_write <= 0)
                        htx->flags |= HTX_FL_EOM;
                htx_to_buf(htx, &hs->res);
+
+ flush_res_buf:
                hstream_htx_buf_snd(conn, hs);
 
+ send_done:
                if (hs->req_body && hs->req_after_res && !hs->to_write) {
                        /* Response sending has just complete. The body will be drained upon
                         * next wakeup.
@@ -904,7 +1017,7 @@ static struct task *process_hstream(struct task *t, void *context, unsigned int
        }
 
  out:
-       if (!hs->to_write && !hs->req_body && htx_is_empty(htxbuf(&hs->res))) {
+       if (!hs->to_write && !hs->req_body && htx_is_empty(htxbuf(&hs->res)) && !se_have_ff_data(hs->sc->sedesc)) {
                TRACE_DEVEL("shutting down stream", HS_EV_HSTRM_SEND, hs);
                CALL_MUX_NO_RET(conn->mux, shut(hs->sc, SE_SHW_SILENT|SE_SHW_NORMAL, NULL));
        }
index 2a7cb3897e65145d8e1438058f51df27862e1b75..94e8a96a1fefcf0bf06d47d6be49c784b82f02eb 100644 (file)
@@ -26,6 +26,7 @@ static void haterm_usage(char *name)
                "        -c <curves> : ECSDA curves (ex: \"P-256\", \"P-384\"...)\n"
                "        -v : shows version\n"
                "        -d : enable the traces for all http protocols\n"
+               "        -dZ : disable zero-copy forwarding\n"
                "        --" QUIC_BIND_LONG_OPT " <opts> : append options to QUIC \"bind\" lines\n"
                "        --" TCP_BIND_LONG_OPT " <opts> : append options to TCP \"bind\" lines\n"
                , name);
@@ -242,6 +243,9 @@ void haproxy_init_args(int argc, char **argv)
                                else
                                        haterm_usage(progname);
                        }
+                       else if (*opt == 'd' && *(opt+1) == 'Z') {
+                               global.tune.no_zero_copy_fwd |= NO_ZERO_COPY_FWD;
+                       }
                        else if (*opt == 'd') {
                                /* empty option */
                                if (*(opt + 1))