]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: mux-h2: Add consumer-side fast-forwarding support
authorChristopher Faulet <cfaulet@haproxy.com>
Thu, 3 Aug 2023 16:18:45 +0000 (18:18 +0200)
committerChristopher Faulet <cfaulet@haproxy.com>
Tue, 17 Oct 2023 16:51:13 +0000 (18:51 +0200)
The H2 multiplexer now implements callbacks to consume fast-forwarded
data. It is the most usful case: A H2 client getting data from a H1
server. It is also the easiest case to implement. The producer side is
trickier because of multiplexing. It is not obvious this case would be
improved with data fast-forwarding.

src/mux_h2.c

index a8f7792e94d61b7d084f7bd54f8e034a58a0a1dd..35b440e9f517159d4b3e39eb98f6b6bd3a1c20fb 100644 (file)
@@ -32,6 +32,7 @@
 #include <haproxy/stconn.h>
 #include <haproxy/stream.h>
 #include <haproxy/trace.h>
+#include <haproxy/xref.h>
 
 
 /* dummy streams returned for closed, error, refused, idle and states */
@@ -6871,6 +6872,192 @@ static size_t h2_snd_buf(struct stconn *sc, struct buffer *buf, size_t count, in
        return total;
 }
 
+static size_t h2_init_ff(struct stconn *sc, struct buffer *input, size_t count, unsigned int may_splice)
+{
+       struct h2s *h2s = __sc_mux_strm(sc);
+       struct h2c *h2c = h2s->h2c;
+       struct buffer *mbuf;
+       size_t sz , ret = 0;
+
+       TRACE_ENTER(H2_EV_H2S_SEND|H2_EV_STRM_SEND, h2s->h2c->conn, h2s);
+
+       /* If we were not just woken because we wanted to send but couldn't,
+        * and there's somebody else that is waiting to send, do nothing,
+        * we will subscribe later and be put at the end of the list
+        */
+       if (!(h2s->flags & H2_SF_NOTIFIED) &&
+           (!LIST_ISEMPTY(&h2c->send_list) || !LIST_ISEMPTY(&h2c->fctl_list))) {
+               if (LIST_INLIST(&h2s->list))
+                       TRACE_DEVEL("stream already waiting, leaving", H2_EV_H2S_SEND|H2_EV_H2S_BLK, h2s->h2c->conn, h2s);
+               else {
+                       TRACE_DEVEL("other streams already waiting, going to the queue and leaving", H2_EV_H2S_SEND|H2_EV_H2S_BLK, h2s->h2c->conn, h2s);
+                       h2s->h2c->flags |= H2_CF_WAIT_INLIST;
+               }
+               h2s->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
+               goto end;
+       }
+       h2s->flags &= ~H2_SF_NOTIFIED;
+
+       if (h2s_mws(h2s) <= 0) {
+               h2s->flags |= H2_SF_BLK_SFCTL;
+               if (LIST_INLIST(&h2s->list))
+                       LIST_DEL_INIT(&h2s->list);
+               LIST_APPEND(&h2c->blocked_list, &h2s->list);
+               h2s->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
+               TRACE_STATE("stream window <=0, flow-controlled", H2_EV_H2S_SEND|H2_EV_H2S_FCTL, h2c->conn, h2s);
+               goto end;
+       }
+       if (h2c->mws <= 0) {
+               h2s->flags |= H2_SF_BLK_MFCTL;
+               h2s->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
+               TRACE_STATE("connection window <=0, stream flow-controlled", H2_EV_H2S_SEND|H2_EV_H2C_FCTL, h2c->conn, h2s);
+               goto end;
+       }
+
+       sz = count;
+       if (sz > h2s_mws(h2s))
+               sz = h2s_mws(h2s);
+       if (h2c->mfs && sz > h2c->mfs)
+               sz = h2c->mfs; // >0
+       if (sz > h2c->mws)
+               sz = h2c->mws;
+
+       if (count > sz)
+               count = sz;
+
+       mbuf = br_tail(h2c->mbuf);
+ retry:
+       if (!h2_get_buf(h2c, mbuf)) {
+               h2c->flags |= H2_CF_MUX_MALLOC;
+               h2s->flags |= H2_SF_BLK_MROOM;
+               h2s->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
+               TRACE_STATE("waiting for room in output buffer", H2_EV_H2S_SEND|H2_EV_H2S_BLK, h2c->conn, h2s);
+               goto end;
+       }
+
+       if (b_room(mbuf) < sz && b_room(mbuf) < b_size(mbuf) / 4) {
+               if ((mbuf = br_tail_add(h2c->mbuf)) != NULL)
+                       goto retry;
+               h2c->flags |= H2_CF_MUX_MFULL;
+               h2s->flags |= H2_SF_BLK_MROOM;
+               h2s->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
+               TRACE_STATE("too large data present in output buffer, waiting for emptiness", H2_EV_H2S_SEND|H2_EV_H2S_BLK, h2c->conn, h2s);
+               goto end;
+       }
+
+       while (1) {
+               if (b_contig_space(mbuf) >= 9 || !b_space_wraps(mbuf))
+                       break;
+               b_slow_realign(mbuf, trash.area, b_data(mbuf));
+       }
+
+       if (b_contig_space(mbuf) <= 9) {
+               if ((mbuf = br_tail_add(h2c->mbuf)) != NULL)
+                       goto retry;
+               h2c->flags |= H2_CF_MUX_MFULL;
+               h2s->flags |= H2_SF_BLK_MROOM;
+               h2s->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
+               TRACE_STATE("output buffer full", H2_EV_H2S_SEND|H2_EV_H2S_BLK, h2c->conn, h2s);
+               goto end;
+       }
+
+       /* Cannot forward more than available room in output buffer */
+       sz = b_contig_space(mbuf) - 9;
+       if (count > sz)
+               count = sz;
+
+       /* len: 0x000000 (fill later), type: 0(DATA), flags: none=0 */
+       memcpy(b_tail(mbuf), "\x00\x00\x00\x00\x00", 5);
+       write_n32(b_tail(mbuf) + 5, h2s->id); // 4 bytes
+
+       h2s->sd->iobuf.buf = mbuf;
+       h2s->sd->iobuf.offset = 9;
+       h2s->sd->iobuf.data = 0;
+
+       /* forward remaining input data */
+       if (b_data(input)) {
+               size_t xfer = count;
+
+               if (xfer > b_data(input))
+                       xfer = b_data(input);
+               b_add(mbuf, 9);
+               h2s->sd->iobuf.data = b_xfer(mbuf, input, xfer);
+               b_sub(mbuf, 9);
+
+               /* Cannot forward more data, wait for room */
+               if (b_data(input))
+                       goto end;
+       }
+
+       ret = count - h2s->sd->iobuf.data;
+ end:
+       TRACE_LEAVE(H2_EV_H2S_SEND|H2_EV_STRM_SEND, h2s->h2c->conn, h2s);
+       return ret;
+}
+
+static void h2_done_ff(struct stconn *sc)
+{
+       struct h2s *h2s = __sc_mux_strm(sc);
+       struct h2c *h2c = h2s->h2c;
+       struct sedesc *sd = h2s->sd;
+       struct buffer *mbuf;
+       char *head;
+
+       TRACE_ENTER(H2_EV_H2S_SEND|H2_EV_STRM_SEND, h2s->h2c->conn, h2s);
+
+       mbuf = sd->iobuf.buf;
+       if (!mbuf)
+               goto end;
+       head = b_peek(mbuf, b_data(mbuf) - sd->iobuf.data);
+
+       /* FIXME: Must be handled with a flag. It is just a temporary hack */
+       {
+               struct xref *peer;
+               struct sedesc *sdo;
+
+               peer = xref_get_peer_and_lock(&h2s->sd->xref);
+               if (!peer)
+                       goto end;
+
+               sdo = container_of(peer, struct sedesc, xref);
+               xref_unlock(&h2s->sd->xref, peer);
+
+               if (se_fl_test(sdo, SE_FL_EOI))
+                       h2s->flags &= ~H2_SF_MORE_HTX_DATA;
+       }
+
+       if (!(sd->iobuf.flags & IOBUF_FL_FF_BLOCKED) &&
+           !(h2s->flags & H2_SF_BLK_SFCTL) &&
+           !(h2s->flags & (H2_SF_WANT_SHUTR|H2_SF_WANT_SHUTW))) {
+               /* Ok we managed to send something, leave the send_list if we were still there */
+               h2_remove_from_list(h2s);
+       }
+
+       if (!sd->iobuf.data)
+               goto end;
+
+       /* Perform a synchronous send but in all cases, consider
+        * everything was already sent from the SC point of view.
+        */
+       h2_set_frame_size(head, sd->iobuf.data);
+       b_add(mbuf, 9);
+       h2s->sws -= sd->iobuf.data;
+       h2c->mws -= sd->iobuf.data;
+       h2_process(h2c);
+
+ end:
+       sd->iobuf.buf = NULL;
+       sd->iobuf.offset = 0;
+       sd->iobuf.data = 0;
+
+       TRACE_LEAVE(H2_EV_H2S_SEND|H2_EV_STRM_SEND, h2s->h2c->conn, h2s);
+}
+
+static int h2_resume_ff(struct stconn *sc, unsigned int flags)
+{
+       return 0;
+}
+
 /* appends some info about stream <h2s> to buffer <msg>, or does nothing if
  * <h2s> is NULL. Returns non-zero if the stream is considered suspicious. May
  * emit multiple lines, each new one being prefixed with <pfx>, if <pfx> is not
@@ -7185,6 +7372,9 @@ static const struct mux_ops h2_ops = {
        .wake = h2_wake,
        .snd_buf = h2_snd_buf,
        .rcv_buf = h2_rcv_buf,
+       .init_fastfwd = h2_init_ff,
+       .done_fastfwd = h2_done_ff,
+       .resume_fastfwd = h2_resume_ff,
        .subscribe = h2_subscribe,
        .unsubscribe = h2_unsubscribe,
        .attach = h2_attach,