]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: mux-h1: Add fast-forwarding support
authorChristopher Faulet <cfaulet@haproxy.com>
Fri, 29 Sep 2023 12:25:40 +0000 (14:25 +0200)
committerChristopher Faulet <cfaulet@haproxy.com>
Tue, 17 Oct 2023 16:51:13 +0000 (18:51 +0200)
The H1 multiplexer now implements callbacks function to produce and consume
fast-forwarded data.

include/haproxy/mux_h1-t.h
src/mux_h1.c

index ea06ff25150c0f1dd0f1c7d71cf20a268f2d51ff..2f49a495eb0aaf609e53fef91cf7151b6762a30e 100644 (file)
@@ -50,8 +50,9 @@
 #define H1C_F_UPG_H2C        0x00010000 /* set if an upgrade to h2 should be done */
 #define H1C_F_CO_MSG_MORE    0x00020000 /* set if CO_SFL_MSG_MORE must be set when calling xprt->snd_buf() */
 #define H1C_F_CO_STREAMER    0x00040000 /* set if CO_SFL_STREAMER must be set when calling xprt->snd_buf() */
+#define H1C_F_CANT_FASTFWD   0x00080000 /* Fast-forwarding is not supported (exclusive with WANT_FASTFWD) */
 
-/* 0x00040000 - 0x40000000 unused */
+/* 0x00100000 - 0x40000000 unused */
 #define H1C_F_IS_BACK        0x80000000 /* Set on outgoing connection */
 
 
@@ -70,7 +71,7 @@ static forceinline char *h1c_show_flags(char *buf, size_t len, const char *delim
        _(H1C_F_EOS, _(H1C_F_ERR_PENDING, _(H1C_F_ERROR,
        _(H1C_F_SILENT_SHUT, _(H1C_F_ABRT_PENDING, _(H1C_F_ABRTED,
        _(H1C_F_WANT_FASTFWD, _(H1C_F_WAIT_NEXT_REQ, _(H1C_F_UPG_H2C, _(H1C_F_CO_MSG_MORE,
-       _(H1C_F_CO_STREAMER, _(H1C_F_IS_BACK)))))))))))))))));
+       _(H1C_F_CO_STREAMER, _(H1C_F_CANT_FASTFWD, _(H1C_F_IS_BACK))))))))))))))))));
        /* epilogue */
        _(~0U);
        return buf;
index 86e368b40f11af59240b7a40d80a6ca77ee2bd01..d098c1b29eb9ea2c7046b5acc00efed3ff386643 100644 (file)
 #include <haproxy/istbuf.h>
 #include <haproxy/log.h>
 #include <haproxy/mux_h1-t.h>
-#include <haproxy/pipe-t.h>
+#include <haproxy/pipe.h>
 #include <haproxy/proxy.h>
 #include <haproxy/session-t.h>
 #include <haproxy/stats.h>
 #include <haproxy/stconn.h>
 #include <haproxy/stream.h>
 #include <haproxy/trace.h>
+#include <haproxy/xref.h>
 
 /* H1 connection descriptor */
 struct h1c {
@@ -1906,7 +1907,8 @@ static size_t h1_process_demux(struct h1c *h1c, struct buffer *buf, size_t count
        }
 
        /* Here h1s_sc(h1s) is always defined */
-       if ((!(h1m->flags & H1_MF_RESP) || !(h1s->flags & H1S_F_BODYLESS_RESP)) &&
+       if (!(h1c->flags & H1C_F_CANT_FASTFWD) &&
+           (!(h1m->flags & H1_MF_RESP) || !(h1s->flags & H1S_F_BODYLESS_RESP)) &&
            (h1m->state == H1_MSG_DATA || h1m->state == H1_MSG_TUNNEL)) {
                TRACE_STATE("notify the mux can use fast-forward", H1_EV_RX_DATA|H1_EV_RX_BODY, h1c->conn, h1s);
                se_fl_set(h1s->sd, SE_FL_MAY_FASTFWD);
@@ -1917,9 +1919,6 @@ static size_t h1_process_demux(struct h1c *h1c, struct buffer *buf, size_t count
                h1c->flags &= ~H1C_F_WANT_FASTFWD;
        }
 
-       se_fl_clr(h1s->sd, SE_FL_MAY_FASTFWD);
-       h1c->flags &= ~H1C_F_WANT_FASTFWD;
-
        /* Set EOI on stream connector in DONE state iff:
         *  - it is a response
         *  - it is a request but no a protocol upgrade nor a CONNECT
@@ -4377,6 +4376,393 @@ static size_t h1_snd_buf(struct stconn *sc, struct buffer *buf, size_t count, in
        return total;
 }
 
+static inline struct sedesc *h1s_opposite_sd(struct h1s *h1s)
+{
+       struct xref *peer;
+       struct sedesc *sdo;
+
+       peer = xref_get_peer_and_lock(&h1s->sd->xref);
+       if (!peer)
+               return NULL;
+
+       sdo = container_of(peer, struct sedesc, xref);
+       xref_unlock(&h1s->sd->xref, peer);
+       return sdo;
+}
+
+static size_t h1_init_ff(struct stconn *sc, struct buffer *input, size_t count, unsigned int may_splice)
+{
+       struct h1s *h1s = __sc_mux_strm(sc);
+       struct h1c *h1c = h1s->h1c;
+       struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->res : &h1s->req);
+       size_t ret = 0;
+
+       TRACE_ENTER(H1_EV_STRM_SEND, h1c->conn, h1s, 0, (size_t[]){count});
+
+       /* TODO: add check on curr_len if CLEN */
+
+       if (h1m->flags & H1_MF_CHNK) {
+               if (h1m->curr_len) {
+                       BUG_ON(h1m->state != H1_MSG_DATA);
+                       if (count > h1m->curr_len)
+                               count = h1m->curr_len;
+               }
+               else {
+                       BUG_ON(h1m->state != H1_MSG_CHUNK_CRLF && h1m->state != H1_MSG_CHUNK_SIZE);
+                       if (!h1_make_chunk(h1s, h1m, count))
+                               goto out;
+                       h1m->curr_len = count;
+               }
+       }
+
+       /* Use kernel splicing if it is supported by the sender and if there
+        * are no input data _AND_ no output data.
+        *
+        * TODO: It may be good to add a flag to send obuf data first if any,
+        *       and then data in pipe, or the opposite. For now, it is not
+        *       supported to mix data.
+        */
+       if (!b_data(input) && !b_data(&h1c->obuf) && may_splice) {
+               if (h1c->conn->xprt->snd_pipe && (h1s->sd->iobuf.pipe || (pipes_used < global.maxpipes && (h1s->sd->iobuf.pipe = get_pipe())))) {
+                       h1s->sd->iobuf.offset = 0;
+                       h1s->sd->iobuf.data = 0;
+                       ret = count;
+                       goto out;
+               }
+               h1s->sd->iobuf.flags |= IOBUF_FL_NO_SPLICING;
+               TRACE_DEVEL("Unable to allocate pipe for splicing, fallback to buffer", H1_EV_STRM_SEND, h1c->conn, h1s);
+       }
+
+       if (!h1_get_buf(h1c, &h1c->obuf)) {
+               h1c->flags |= H1C_F_OUT_ALLOC;
+               TRACE_STATE("waiting for opposite h1c obuf allocation", H1_EV_STRM_SEND|H1_EV_H1S_BLK, h1c->conn, h1s);
+               goto out;
+       }
+
+       if (b_space_wraps(&h1c->obuf))
+               b_slow_realign(&h1c->obuf, trash.area, b_data(&h1c->obuf));
+
+       h1s->sd->iobuf.buf = &h1c->obuf;
+       h1s->sd->iobuf.offset = 0;
+       h1s->sd->iobuf.data = 0;
+
+       /* Cannot forward more than available room in output buffer */
+       if (count > b_room(&h1c->obuf))
+               count = b_room(&h1c->obuf);
+
+       if (!count) {
+               h1c->flags |= H1C_F_OUT_FULL;
+               h1s->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
+               TRACE_STATE("output buffer full", H1_EV_STRM_SEND|H1_EV_H1S_BLK, h1c->conn, h1s);
+               goto out;
+       }
+
+       /* forward remaining input data */
+       if (b_data(input)) {
+               size_t xfer = count;
+
+               if (xfer > b_data(input))
+                       xfer = b_data(input);
+               h1s->sd->iobuf.data = b_xfer(&h1c->obuf, input, xfer);
+
+               /* Cannot forward more data, wait for room */
+               if (b_data(input))
+                       goto out;
+       }
+
+       ret = count - h1s->sd->iobuf.data;
+
+ out:
+       TRACE_LEAVE(H1_EV_STRM_SEND, h1c->conn, h1s, 0, (size_t[]){ret});
+       return ret;
+}
+
+static void h1_done_ff(struct stconn *sc)
+{
+       struct h1s *h1s = __sc_mux_strm(sc);
+       struct h1c *h1c = h1s->h1c;
+       struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->res : &h1s->req);
+       struct sedesc *sd = h1s->sd;
+       size_t total = 0;
+
+       TRACE_ENTER(H1_EV_STRM_SEND, h1c->conn, h1s);
+
+
+       if (sd->iobuf.pipe) {
+               total = h1c->conn->xprt->snd_pipe(h1c->conn, h1c->conn->xprt_ctx, sd->iobuf.pipe, sd->iobuf.pipe->data);
+               if (total > 0)
+                       HA_ATOMIC_ADD(&h1c->px_counters->spliced_bytes_out, total);
+               if (!sd->iobuf.pipe->data) {
+                       put_pipe(sd->iobuf.pipe);
+                       sd->iobuf.pipe = NULL;
+               }
+       }
+       else {
+               if (b_room(&h1c->obuf) == sd->iobuf.offset)
+                       h1c->flags |= H1C_F_OUT_FULL;
+
+               total = sd->iobuf.data;
+               sd->iobuf.buf = NULL;
+               sd->iobuf.offset = 0;
+               sd->iobuf.data = 0;
+
+               /* Perform a synchronous send but in all cases, consider
+                * everything was already sent from the SC point of view.
+                */
+               h1_send(h1c);
+       }
+
+       if (h1m->curr_len)
+               h1m->curr_len -= total;
+
+       if (!h1m->curr_len && (h1m->flags & H1_MF_CLEN))
+               h1m->state = H1_MSG_DONE;
+       else if (!h1m->curr_len && (h1m->flags & H1_MF_CHNK)) {
+               if (h1m->state == H1_MSG_DATA)
+                       h1m->state = H1_MSG_CHUNK_CRLF;
+       }
+
+       HA_ATOMIC_ADD(&h1c->px_counters->bytes_out, total);
+
+ out:
+       // TODO: should we call h1_process() instead ?
+       if (h1c->conn->flags & CO_FL_ERROR) {
+               h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_ERR_PENDING;
+               if (h1c->flags & H1C_F_EOS)
+                       h1c->flags |= H1C_F_ERROR;
+               else if (!(h1c->wait_event.events & SUB_RETRY_RECV)) {
+                       /* EOS not seen, so subscribe for reads to be able to
+                        * catch the error on the reading path. It is especially
+                        * important if EOI was reached.
+                        */
+                       h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
+               }
+               se_fl_set_error(h1s->sd);
+               TRACE_DEVEL("connection error", H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s);
+       }
+
+       TRACE_LEAVE(H1_EV_STRM_RECV, h1c->conn, h1s, 0, (size_t[]){total});
+}
+
+static int h1_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags)
+{
+       struct h1s *h1s = __sc_mux_strm(sc);
+       struct h1c *h1c = h1s->h1c;
+       struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->req : &h1s->res);
+       struct sedesc *sdo = NULL;
+       size_t total = 0, try = 0;
+       int ret = 0;
+
+       TRACE_ENTER(H1_EV_STRM_RECV, h1c->conn, h1s, 0, (size_t[]){count});
+
+       if (h1m->state != H1_MSG_DATA && h1m->state != H1_MSG_TUNNEL) {
+               h1c->flags &= ~H1C_F_WANT_FASTFWD;
+               TRACE_STATE("Cannot fast-forwad data now !(msg_data|msg_tunnel)", H1_EV_STRM_RECV, h1c->conn, h1s);
+               goto end;
+       }
+
+       se_fl_clr(h1s->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
+       h1c->conn->flags &= ~CO_FL_WAIT_ROOM;
+       h1c->flags |= H1C_F_WANT_FASTFWD;
+
+       if (h1c->flags & (H1C_F_EOS|H1C_F_ERROR)) {
+               h1c->flags &= ~H1C_F_WANT_FASTFWD;
+               TRACE_DEVEL("leaving on (EOS|ERROR)", H1_EV_STRM_RECV, h1c->conn, h1s);
+               goto end;
+       }
+
+       sdo = h1s_opposite_sd(h1s);
+       if (!sdo) {
+               TRACE_STATE("Opposite endpoint not available yet", H1_EV_STRM_RECV, h1c->conn, h1s);
+               goto out;
+       }
+
+       if (h1m->state == H1_MSG_DATA && (h1m->flags & (H1_MF_CHNK|H1_MF_CLEN)) &&  count > h1m->curr_len)
+               count = h1m->curr_len;
+
+       try = se_init_ff(sdo, &h1c->ibuf, count, h1c->conn->xprt->rcv_pipe && !!(flags & CO_RFL_MAY_SPLICE) && !(sdo->iobuf.flags & IOBUF_FL_NO_SPLICING));
+       if (b_room(&h1c->ibuf) && (h1c->flags & H1C_F_IN_FULL)) {
+               h1c->flags &= ~H1C_F_IN_FULL;
+               TRACE_STATE("h1c ibuf not full anymore", H1_EV_STRM_RECV|H1_EV_H1C_BLK);
+       }
+
+       if (sdo->iobuf.flags & IOBUF_FL_NO_FF) {
+               /* Fast forwading is not supported by the consumer */
+               h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_CANT_FASTFWD;
+               TRACE_DEVEL("Fast-forwarding not supported by opposite endpoint, disable it", H1_EV_STRM_RECV, h1c->conn, h1s);
+               goto end;
+       }
+       if (sdo->iobuf.flags & IOBUF_FL_FF_BLOCKED) {
+               se_fl_set(h1s->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
+               TRACE_STATE("waiting for more room", H1_EV_STRM_RECV|H1_EV_H1S_ERR, h1c->conn, h1s);
+               goto out;
+       }
+
+       total += sdo->iobuf.data;
+       if (sdo->iobuf.pipe) {
+               /* Here, not data was xferred */
+               ret = h1c->conn->xprt->rcv_pipe(h1c->conn, h1c->conn->xprt_ctx, sdo->iobuf.pipe, try);
+               if (ret < 0) {
+                       h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_CANT_FASTFWD;
+                       TRACE_ERROR("Error when trying to fast-forward data, disable it and abort",
+                                   H1_EV_STRM_RECV|H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s);
+                       BUG_ON(sdo->iobuf.pipe->data);
+                       put_pipe(sdo->iobuf.pipe);
+                       sdo->iobuf.pipe = NULL;
+                       goto end;
+               }
+               total += ret;
+               if (!ret) {
+                       TRACE_STATE("failed to receive data, subscribing", H1_EV_STRM_RECV, h1c->conn);
+                       h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
+               }
+               HA_ATOMIC_ADD(&h1c->px_counters->spliced_bytes_in, ret);
+       }
+       else {
+               b_add(sdo->iobuf.buf, sdo->iobuf.offset);
+               ret = h1c->conn->xprt->rcv_buf(h1c->conn, h1c->conn->xprt_ctx, sdo->iobuf.buf, try, flags);
+               if (ret < try) {
+                       TRACE_STATE("failed to receive data, subscribing", H1_EV_STRM_RECV, h1c->conn);
+                       h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
+               }
+               b_sub(sdo->iobuf.buf, sdo->iobuf.offset);
+               total += ret;
+               sdo->iobuf.data += ret;
+       }
+
+       if (h1m->state == H1_MSG_DATA && (h1m->flags & (H1_MF_CHNK|H1_MF_CLEN))) {
+               if (total > h1m->curr_len) {
+                       h1s->flags |= H1S_F_PARSING_ERROR;
+                       se_fl_set(h1s->sd, SE_FL_ERROR);
+                       TRACE_ERROR("too much payload, more than announced",
+                                   H1_EV_STRM_RECV|H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s);
+                       goto out;
+               }
+               h1m->curr_len -= total;
+               if (!h1m->curr_len) {
+                       if (h1m->flags & H1_MF_CLEN) {
+                               h1m->state = H1_MSG_DONE;
+                               se_fl_set(h1s->sd, SE_FL_EOI); /* TODO: this line is tricky and must be evaluated first
+                                                               *       Its purpose is to avoid to set CO_SFL_MSG_MORE on the
+                                                               *       next calls to ->complete_fastfwd().
+                                                               */
+                       }
+                       else
+                               h1m->state = H1_MSG_CHUNK_CRLF;
+                       h1c->flags &= ~H1C_F_WANT_FASTFWD;
+
+                       if (!(h1c->flags & H1C_F_IS_BACK)) {
+                               /* The request was fully received. It means the H1S now
+                                * expect data from the opposite side
+                                */
+                               se_expect_data(h1s->sd);
+                       }
+
+                       TRACE_STATE("payload fully received", H1_EV_STRM_RECV, h1c->conn, h1s);
+               }
+       }
+
+       HA_ATOMIC_ADD(&h1c->px_counters->bytes_in, total);
+       ret = total;
+       se_done_ff(sdo);
+
+       if (sdo->iobuf.pipe) {
+               se_fl_set(h1s->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
+       }
+
+       TRACE_DEVEL("Data fast-forwarded", H1_EV_STRM_RECV, h1c->conn, h1s, 0, (size_t[]){ret});
+
+ out:
+       if (conn_xprt_read0_pending(h1c->conn)) {
+               se_fl_set(h1s->sd, SE_FL_EOS);
+               TRACE_STATE("report EOS to SE", H1_EV_STRM_RECV, h1c->conn, h1s);
+               if (h1m->state >= H1_MSG_DONE || !(h1m->flags & H1_MF_XFER_LEN)) {
+                       /* DONE or TUNNEL or SHUTR without XFER_LEN, set
+                        * EOI on the stream connector */
+                       se_fl_set(h1s->sd, SE_FL_EOI);
+                       TRACE_STATE("report EOI to SE", H1_EV_STRM_RECV, h1c->conn, h1s);
+               }
+               else {
+                       se_fl_set(h1s->sd, SE_FL_ERROR);
+                       h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_ERROR;
+                       TRACE_ERROR("message aborted, set error on SC", H1_EV_STRM_RECV|H1_EV_H1S_ERR, h1c->conn, h1s);
+               }
+               h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_EOS;
+               TRACE_STATE("Allow xprt rcv_buf on read0", H1_EV_STRM_RECV, h1c->conn, h1s);
+       }
+       if (h1c->conn->flags & CO_FL_ERROR) {
+               se_fl_set(h1s->sd, SE_FL_ERROR);
+               h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_ERROR;
+               TRACE_DEVEL("connection error", H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s);
+       }
+
+ end:
+       if (!(h1c->flags & H1C_F_WANT_FASTFWD)) {
+               TRACE_STATE("notify the mux can't use fast-forward anymore", H1_EV_STRM_RECV, h1c->conn, h1s);
+               se_fl_clr(h1s->sd, SE_FL_MAY_FASTFWD);
+               if (!(h1c->wait_event.events & SUB_RETRY_RECV)) {
+                       TRACE_STATE("restart receiving data, subscribing", H1_EV_STRM_RECV, h1c->conn, h1s);
+                       h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
+               }
+       }
+
+       TRACE_LEAVE(H1_EV_STRM_RECV, h1c->conn, h1s, 0, (size_t[]){ret});
+       return ret;
+}
+
+static int h1_resume_fastfwd(struct stconn *sc, unsigned int flags)
+{
+       struct h1s *h1s = __sc_mux_strm(sc);
+       struct h1c *h1c = h1s->h1c;
+       struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->res : &h1s->req);
+       struct sedesc *sd = h1s->sd;
+       int ret = 0;
+
+       TRACE_ENTER(H1_EV_STRM_SEND, h1c->conn, h1s, 0, (size_t[]){flags});
+
+       if (sd->iobuf.pipe) {
+               ret = h1c->conn->xprt->snd_pipe(h1c->conn, h1c->conn->xprt_ctx, sd->iobuf.pipe, sd->iobuf.pipe->data);
+               if (ret > 0)
+                       HA_ATOMIC_ADD(&h1c->px_counters->spliced_bytes_out, ret);
+
+               if (!sd->iobuf.pipe->data) {
+                       put_pipe(sd->iobuf.pipe);
+                       sd->iobuf.pipe = NULL;
+               }
+       }
+
+       h1m->curr_len -= ret;
+
+       if (!h1m->curr_len && (h1m->flags & H1_MF_CLEN))
+               h1m->state = H1_MSG_DONE;
+       else if (!h1m->curr_len && (h1m->flags & H1_MF_CHNK)) {
+               if (h1m->state == H1_MSG_DATA)
+                       h1m->state = H1_MSG_CHUNK_CRLF;
+       }
+
+       HA_ATOMIC_ADD(&h1c->px_counters->bytes_out, ret);
+
+ out:
+       // TODO: should we call h1_process() instead ?
+       if (h1c->conn->flags & CO_FL_ERROR) {
+               h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_ERR_PENDING;
+               if (h1c->flags & H1C_F_EOS)
+                       h1c->flags |= H1C_F_ERROR;
+               else if (!(h1c->wait_event.events & SUB_RETRY_RECV)) {
+                       /* EOS not seen, so subscribe for reads to be able to
+                        * catch the error on the reading path. It is especially
+                        * important if EOI was reached.
+                        */
+                       h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
+               }
+               se_fl_set_error(h1s->sd);
+               TRACE_DEVEL("connection error", H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s);
+       }
+
+       TRACE_LEAVE(H1_EV_STRM_RECV, h1c->conn, h1s, 0, (size_t[]){ret});
+       return ret;
+}
+
 static int h1_ctl(struct connection *conn, enum mux_ctl_type mux_ctl, void *output)
 {
        const struct h1c *h1c = conn->ctx;
@@ -4792,6 +5178,10 @@ static const struct mux_ops mux_http_ops = {
        .used_streams = h1_used_streams,
        .rcv_buf     = h1_rcv_buf,
        .snd_buf     = h1_snd_buf,
+       .init_fastfwd = h1_init_ff,
+       .done_fastfwd = h1_done_ff,
+       .fastfwd     = h1_fastfwd,
+       .resume_fastfwd = h1_resume_fastfwd,
        .subscribe   = h1_subscribe,
        .unsubscribe = h1_unsubscribe,
        .shutr       = h1_shutr,
@@ -4815,6 +5205,10 @@ static const struct mux_ops mux_h1_ops = {
        .used_streams = h1_used_streams,
        .rcv_buf     = h1_rcv_buf,
        .snd_buf     = h1_snd_buf,
+       .init_fastfwd = h1_init_ff,
+       .done_fastfwd = h1_done_ff,
+       .fastfwd     = h1_fastfwd,
+       .resume_fastfwd = h1_resume_fastfwd,
        .subscribe   = h1_subscribe,
        .unsubscribe = h1_unsubscribe,
        .shutr       = h1_shutr,