]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: mux-quic/h3/hq-interop: use ncbuf for bidir streams
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Fri, 13 May 2022 12:49:05 +0000 (14:49 +0200)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Fri, 13 May 2022 15:28:46 +0000 (17:28 +0200)
Add a ncbuf for data reception on qcs. Thanks to this, the MUX is able
to buffered all received frame directly into the buffer. Flow control
parameters will be used to ensure there is never an overflow.

This change will simplify Rx path with the future deletion of acked
frames tree previously used for frames out of order.

include/haproxy/mux_quic-t.h
include/haproxy/mux_quic.h
src/h3.c
src/hq_interop.c
src/mux_quic.c
src/xprt_quic.c

index 83fccb29ec74f06e630c100d6146b8e2d2f577c8..d096d1dca4d1b2e4d40838954ea3b4bac546a26f 100644 (file)
@@ -11,6 +11,7 @@
 #include <haproxy/buf-t.h>
 #include <haproxy/connection-t.h>
 #include <haproxy/list-t.h>
+#include <haproxy/ncbuf-t.h>
 #include <haproxy/quic_stream-t.h>
 #include <haproxy/conn_stream-t.h>
 
@@ -101,8 +102,9 @@ struct qcs {
 
        struct {
                struct eb_root frms; /* received frames ordered by their offsets */
-               uint64_t offset; /* the current offset of received data */
+               uint64_t offset; /* absolute current base offset of ncbuf */
                struct buffer buf; /* receive buffer, always valid (buf_empty or real buffer) */
+               struct ncbuf ncbuf; /* receive buffer - can handle out-of-order offset frames */
                struct buffer app_buf; /* receive buffer used by conn_stream layer */
                uint64_t msd; /* fctl bytes limit to enforce */
        } rx;
index 08ff9a35874125e3deaa09fda9a1ae80176d0b48..72e52f318733f39bdd5990ac6422932f9750751d 100644 (file)
@@ -18,13 +18,14 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type);
 void qcs_free(struct qcs *qcs);
 
 struct buffer *qc_get_buf(struct qcs *qcs, struct buffer *bptr);
+struct ncbuf *qc_get_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf);
 
 int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es);
 void qcs_notify_recv(struct qcs *qcs);
 void qcs_notify_send(struct qcs *qcs);
 
 int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
-             char fin, char *data, struct qcs **out_qcs, size_t *done);
+             char fin, char *data, struct qcs **out_qcs);
 int qcc_recv_max_data(struct qcc *qcc, uint64_t max);
 int qcc_recv_max_stream_data(struct qcc *qcc, uint64_t id, uint64_t max);
 int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs);
index 68fc1025a689234775cdb6b971e9f9bd2cafad81..488fccfabc51575525887cfc5673869ab9d3434e 100644 (file)
--- a/src/h3.c
+++ b/src/h3.c
@@ -25,6 +25,7 @@
 #include <haproxy/htx.h>
 #include <haproxy/istbuf.h>
 #include <haproxy/mux_quic.h>
+#include <haproxy/ncbuf.h>
 #include <haproxy/pool.h>
 #include <haproxy/qpack-dec.h>
 #include <haproxy/qpack-enc.h>
@@ -76,9 +77,9 @@ struct h3s {
 DECLARE_STATIC_POOL(pool_head_h3s, "h3s", sizeof(struct h3s));
 
 /* Simple function to duplicate a buffer */
-static inline struct buffer h3_b_dup(struct buffer *b)
+static inline struct buffer h3_b_dup(struct ncbuf *b)
 {
-       return b_make(b->area, b->size, b->head, b->data);
+       return b_make(ncb_orig(b), b->size, b->head, ncb_data(b, 0));
 }
 
 /* Decode a h3 frame header made of two QUIC varints from <b> buffer.
@@ -104,7 +105,7 @@ static inline size_t h3_decode_frm_header(uint64_t *ftype, uint64_t *flen,
  *
  * Returns the number of bytes handled or a negative error code.
  */
-static int h3_headers_to_htx(struct qcs *qcs, struct buffer *buf, uint64_t len,
+static int h3_headers_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len,
                              char fin)
 {
        struct buffer htx_buf = BUF_NULL;
@@ -119,8 +120,8 @@ static int h3_headers_to_htx(struct qcs *qcs, struct buffer *buf, uint64_t len,
        int hdr_idx;
 
        /* TODO support buffer wrapping */
-       BUG_ON(b_contig_data(buf, 0) != b_data(buf));
-       if (qpack_decode_fs((const unsigned char *)b_head(buf), len, tmp, list) < 0)
+       BUG_ON(ncb_head(buf) + len >= ncb_wrap(buf));
+       if (qpack_decode_fs((const unsigned char *)ncb_head(buf), len, tmp, list) < 0)
                return -1;
 
        qc_get_buf(qcs, &htx_buf);
@@ -200,12 +201,12 @@ static int h3_headers_to_htx(struct qcs *qcs, struct buffer *buf, uint64_t len,
  *
  * Returns the number of bytes handled or a negative error code.
  */
-static int h3_data_to_htx(struct qcs *qcs, struct buffer *buf, uint64_t len,
+static int h3_data_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len,
                           char fin)
 {
        struct buffer *appbuf;
        struct htx *htx = NULL;
-       size_t contig = 0, htx_sent = 0;
+       size_t htx_sent = 0;
        int htx_space;
        char *head;
 
@@ -213,12 +214,12 @@ static int h3_data_to_htx(struct qcs *qcs, struct buffer *buf, uint64_t len,
        BUG_ON(!appbuf);
        htx = htx_from_buf(appbuf);
 
-       if (len > b_data(buf)) {
-               len = b_data(buf);
+       if (len > ncb_data(buf, 0)) {
+               len = ncb_data(buf, 0);
                fin = 0;
        }
 
-       head = b_head(buf);
+       head = ncb_head(buf);
  retry:
        htx_space = htx_free_data_space(htx);
        if (!htx_space) {
@@ -231,10 +232,10 @@ static int h3_data_to_htx(struct qcs *qcs, struct buffer *buf, uint64_t len,
                fin = 0;
        }
 
-       contig = b_contig_data(buf, contig);
-       if (len > contig) {
-               htx_sent = htx_add_data(htx, ist2(b_head(buf), contig));
-               head = b_orig(buf);
+       if (head + len > ncb_wrap(buf)) {
+               size_t contig = ncb_wrap(buf) - head;
+               htx_sent = htx_add_data(htx, ist2(ncb_head(buf), contig));
+               head = ncb_orig(buf);
                len -= contig;
                goto retry;
        }
@@ -256,15 +257,15 @@ static int h3_data_to_htx(struct qcs *qcs, struct buffer *buf, uint64_t len,
  */
 static int h3_decode_qcs(struct qcs *qcs, int fin, void *ctx)
 {
-       struct buffer *rxbuf = &qcs->rx.buf;
+       struct ncbuf *rxbuf = &qcs->rx.ncbuf;
        struct h3s *h3s = qcs->ctx;
        ssize_t ret;
 
        h3_debug_printf(stderr, "%s: STREAM ID: %lu\n", __func__, qcs->id);
-       if (!b_data(rxbuf))
+       if (!ncb_data(rxbuf, 0))
                return 0;
 
-       while (b_data(rxbuf) && !(qcs->flags & QC_SF_DEM_FULL)) {
+       while (ncb_data(rxbuf, 0) && !(qcs->flags & QC_SF_DEM_FULL)) {
                uint64_t ftype, flen;
                struct buffer b;
                char last_stream_frame = 0;
@@ -279,16 +280,17 @@ static int h3_decode_qcs(struct qcs *qcs, int fin, void *ctx)
                        h3_debug_printf(stderr, "%s: ftype: %lu, flen: %lu\n",
                                        __func__, ftype, flen);
 
-                       b_del(rxbuf, hlen);
+                       ncb_advance(rxbuf, hlen);
                        h3s->demux_frame_type = ftype;
                        h3s->demux_frame_len = flen;
+                       qcs->rx.offset += hlen;
                }
 
                flen = h3s->demux_frame_len;
                ftype = h3s->demux_frame_type;
-               if (flen > b_data(&b) && !b_full(rxbuf))
+               if (flen > b_data(&b) && !ncb_is_full(rxbuf))
                        break;
-               last_stream_frame = (fin && flen == b_data(rxbuf));
+               last_stream_frame = (fin && flen == ncb_total_data(rxbuf));
 
                switch (ftype) {
                case H3_FT_DATA:
@@ -303,20 +305,21 @@ static int h3_decode_qcs(struct qcs *qcs, int fin, void *ctx)
                        break;
                case H3_FT_PUSH_PROMISE:
                        /* Not supported */
-                       ret = MIN(b_data(rxbuf), flen);
+                       ret = MIN(ncb_data(rxbuf, 0), flen);
                        break;
                default:
                        /* draft-ietf-quic-http34 9. Extensions to HTTP/3
                         * unknown frame types MUST be ignored
                         */
                        h3_debug_printf(stderr, "ignore unknown frame type 0x%lx\n", ftype);
-                       ret = MIN(b_data(rxbuf), flen);
+                       ret = MIN(ncb_data(rxbuf, 0), flen);
                }
 
                if (ret) {
-                       b_del(rxbuf, ret);
+                       ncb_advance(rxbuf, ret);
                        BUG_ON(h3s->demux_frame_len < ret);
                        h3s->demux_frame_len -= ret;
+                       qcs->rx.offset += ret;
                }
        }
 
@@ -386,7 +389,7 @@ static int h3_control_recv(struct h3_uqs *h3_uqs, void *ctx)
                struct buffer b;
 
                /* Work on a copy of <rxbuf> */
-               b = h3_b_dup(rxbuf);
+               b = b_make(rxbuf->area, rxbuf->size, rxbuf->head, rxbuf->data);
                hlen = h3_decode_frm_header(&ftype, &flen, &b);
                if (!hlen)
                        break;
index dcaba8ef94ada928b6193d03066e6f4cbf06f6a0..db4387e8947892b56424cbd25a2257586f992cf1 100644 (file)
@@ -7,19 +7,20 @@
 #include <haproxy/htx.h>
 #include <haproxy/http.h>
 #include <haproxy/mux_quic.h>
+#include <haproxy/ncbuf.h>
 
 static int hq_interop_decode_qcs(struct qcs *qcs, int fin, void *ctx)
 {
-       struct buffer *rxbuf = &qcs->rx.buf;
+       struct ncbuf *rxbuf = &qcs->rx.ncbuf;
        struct htx *htx;
        struct htx_sl *sl;
        struct conn_stream *cs;
        struct buffer htx_buf = BUF_NULL;
        struct ist path;
-       char *ptr = b_head(rxbuf);
-       char *end = b_wrap(rxbuf);
-       size_t size = b_size(rxbuf);
-       size_t data = b_data(rxbuf);
+       char *ptr = ncb_head(rxbuf);
+       char *end = ncb_wrap(rxbuf);
+       size_t size = ncb_size(rxbuf);
+       size_t data = ncb_data(rxbuf, 0);
 
        b_alloc(&htx_buf);
        htx = htx_from_buf(&htx_buf);
@@ -76,7 +77,8 @@ static int hq_interop_decode_qcs(struct qcs *qcs, int fin, void *ctx)
                return -1;
 
 
-       b_del(rxbuf, b_data(rxbuf));
+       qcs->rx.offset += ncb_data(rxbuf, 0);
+       ncb_advance(rxbuf, ncb_data(rxbuf, 0));
        b_free(&htx_buf);
 
        if (fin)
index bc8910c1e1fbc0bdc5f6b78fa35c01d7a7f2b948..022b1dbf53ad646f058348e0b8eba637d0f99404 100644 (file)
@@ -8,6 +8,7 @@
 #include <haproxy/dynbuf.h>
 #include <haproxy/htx.h>
 #include <haproxy/list.h>
+#include <haproxy/ncbuf.h>
 #include <haproxy/pool.h>
 #include <haproxy/quic_stream.h>
 #include <haproxy/sink.h>
@@ -153,6 +154,7 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
                                                      qcc->rfctl.msd_bidi_l;
 
        qcs->rx.buf = BUF_NULL;
+       qcs->rx.ncbuf = NCBUF_NULL;
        qcs->rx.app_buf = BUF_NULL;
        qcs->rx.offset = 0;
        qcs->rx.frms = EB_ROOT_UNIQUE;
@@ -184,6 +186,16 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
        return NULL;
 }
 
+static void qc_free_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf)
+{
+       struct buffer buf;
+
+       buf = b_make(ncbuf->area, ncbuf->size, 0, 0);
+       b_free(&buf);
+
+       *ncbuf = NCBUF_NULL;
+}
+
 /* Free a qcs. This function must only be done to remove a stream on allocation
  * error or connection shutdown. Else use qcs_destroy which handle all the
  * QUIC connection mechanism.
@@ -191,6 +203,7 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
 void qcs_free(struct qcs *qcs)
 {
        b_free(&qcs->rx.buf);
+       qc_free_ncbuf(qcs, &qcs->rx.ncbuf);
        b_free(&qcs->tx.buf);
 
        BUG_ON(!qcs->qcc->strms[qcs_id_type(qcs->id)].nb_streams);
@@ -215,6 +228,21 @@ struct buffer *qc_get_buf(struct qcs *qcs, struct buffer *bptr)
        return buf;
 }
 
+struct ncbuf *qc_get_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf)
+{
+       struct buffer buf = BUF_NULL;
+
+       if (ncb_is_null(ncbuf)) {
+               b_alloc(&buf);
+               BUG_ON(b_is_null(&buf));
+
+               *ncbuf = ncb_make(buf.area, buf.size, 0);
+               ncb_init(ncbuf, 0);
+       }
+
+       return ncbuf;
+}
+
 int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es)
 {
        struct qcc *qcc = qcs->qcc;
@@ -344,22 +372,17 @@ struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id)
  * <out_qcs>. In case of success, the caller can immediatly call qcc_decode_qcs
  * to process the frame content.
  *
- * Returns a code indicating how the frame was handled.
- * - 0: frame received completely and can be dropped.
- * - 1: frame not received but can be dropped.
- * - 2: frame cannot be handled, either partially or not at all. <done>
- *   indicated the number of bytes handled. The rest should be buffered.
+ * Returns 0 on success else non-zero.
  */
 int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
-             char fin, char *data, struct qcs **out_qcs, size_t *done)
+             char fin, char *data, struct qcs **out_qcs)
 {
        struct qcs *qcs;
-       size_t total, diff;
+       enum ncb_ret ret;
 
        TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
 
        *out_qcs = NULL;
-       *done = 0;
 
        qcs = qcc_get_qcs(qcc, id);
        if (!qcs) {
@@ -375,44 +398,46 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
 
        *out_qcs = qcs;
 
-       if (offset > qcs->rx.offset)
-               return 2;
-
        if (offset + len <= qcs->rx.offset) {
                TRACE_DEVEL("leaving on already received offset", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
                return 0;
        }
 
-       /* Last frame already handled for this stream. */
-       BUG_ON(qcs->flags & QC_SF_FIN_RECV);
+       /* TODO if last frame already received, stream size must not change.
+        * Else send FINAL_SIZE_ERROR.
+        */
+
        /* TODO initial max-stream-data overflow. Implement FLOW_CONTROL_ERROR emission. */
        BUG_ON(offset + len > qcs->rx.msd);
 
-       if (!qc_get_buf(qcs, &qcs->rx.buf) || b_full(&qcs->rx.buf)) {
+       if (!qc_get_ncbuf(qcs, &qcs->rx.ncbuf) || ncb_is_null(&qcs->rx.ncbuf)) {
                /* TODO should mark qcs as full */
-               return 2;
+               ABORT_NOW();
+               return 1;
        }
 
        TRACE_DEVEL("newly received offset", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
-       diff = qcs->rx.offset - offset;
-
-       len -= diff;
-       data += diff;
-
-       /* TODO handle STREAM frames larger than RX buffer. */
-       BUG_ON(len > b_size(&qcs->rx.buf));
+       if (offset < qcs->rx.offset) {
+               len -= qcs->rx.offset - offset;
+               offset = qcs->rx.offset;
+       }
 
-       total = b_putblk(&qcs->rx.buf, data, len);
-       qcs->rx.offset += total;
-       *done = total;
+       ret = ncb_add(&qcs->rx.ncbuf, offset - qcs->rx.offset, data, len, NCB_ADD_COMPARE);
+       if (ret != NCB_RET_OK) {
+               if (ret == NCB_RET_DATA_REJ) {
+                       /* TODO generate PROTOCOL_VIOLATION error */
+                       TRACE_DEVEL("leaving on data rejected", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV,
+                                   qcc->conn, qcs);
+               }
+               else if (ret == NCB_RET_GAP_SIZE) {
+                       TRACE_DEVEL("cannot bufferize frame due to gap size limit", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV,
+                                   qcc->conn, qcs);
+               }
+               return 1;
+       }
 
        /* TODO initial max-stream-data reached. Implement MAX_STREAM_DATA emission. */
-       BUG_ON(qcs->rx.offset == qcs->rx.msd);
-
-       if (total < len) {
-               TRACE_DEVEL("leaving on partially received offset", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
-               return 2;
-       }
+       BUG_ON(offset + len == qcs->rx.msd);
 
        if (fin)
                qcs->flags |= QC_SF_FIN_RECV;
index d4f3ef0bf3e2a4aa3587f905a2e88e55c0bc4d41..4f3bcf76657f123a19027bb8442b65dcaf6776bf 100644 (file)
@@ -2194,105 +2194,19 @@ static int qc_handle_bidi_strm_frm(struct quic_rx_packet *pkt,
                                    struct quic_stream *strm_frm,
                                    struct quic_conn *qc)
 {
-       struct quic_rx_strm_frm *frm;
-       struct eb64_node *frm_node;
        struct qcs *qcs = NULL;
-       size_t done, buf_was_full;
        int ret;
 
        ret = qcc_recv(qc->qcc, strm_frm->id, strm_frm->len,
                       strm_frm->offset.key, strm_frm->fin,
-                      (char *)strm_frm->data, &qcs, &done);
+                      (char *)strm_frm->data, &qcs);
 
-       /* invalid frame */
-       if (ret == 1)
+       /* frame rejected - packet must not be acknowledeged */
+       if (ret)
                return 0;
 
-       /* already fully received offset */
-       if (ret == 0 && done == 0)
-               return 1;
-
-       /* frame not handled (partially or completely) must be buffered */
-       if (ret == 2) {
-               frm = new_quic_rx_strm_frm(strm_frm, pkt);
-               if (!frm) {
-                       TRACE_PROTO("Could not alloc RX STREAM frame",
-                                   QUIC_EV_CONN_PSTRM, qc);
-                       return 0;
-               }
-
-               /* frame partially handled by the MUX */
-               if (done) {
-                       BUG_ON(done >= frm->len); /* must never happen */
-                       frm->len -= done;
-                       frm->data += done;
-                       frm->offset_node.key += done;
-               }
-
-               eb64_insert(&qcs->rx.frms, &frm->offset_node);
-               quic_rx_packet_refinc(pkt);
-
-               /* interrupt only if frame was not received at all. */
-               if (!done)
-                       return 1;
-       }
-
-       /* Decode the data if buffer is already full as it's not possible to
-        * dequeue a frame in this condition.
-        */
-       if (b_full(&qcs->rx.buf))
+       if (qcs)
                qcc_decode_qcs(qc->qcc, qcs);
-
- retry:
-       /* Frame received (partially or not) by the mux.
-        * If there is buffered frame for next offset, it may be possible to
-        * receive them now.
-        */
-       frm_node = eb64_first(&qcs->rx.frms);
-       while (frm_node) {
-               frm = eb64_entry(frm_node,
-                                struct quic_rx_strm_frm, offset_node);
-
-               ret = qcc_recv(qc->qcc, qcs->id, frm->len,
-                              frm->offset_node.key, frm->fin,
-                              (char *)frm->data, &qcs, &done);
-
-               BUG_ON(ret == 1); /* must never happen for buffered frames */
-
-               /* interrupt the parsing if the frame cannot be handled
-                * entirely for the moment only.
-                */
-               if (ret == 2) {
-                       if (done) {
-                               BUG_ON(done >= frm->len); /* must never happen */
-                               frm->len -= done;
-                               frm->data += done;
-
-                               eb64_delete(&frm->offset_node);
-                               frm->offset_node.key += done;
-                               eb64_insert(&qcs->rx.frms, &frm->offset_node);
-                       }
-                       break;
-               }
-
-               /* Remove a newly received frame or an invalid one. */
-               frm_node = eb64_next(frm_node);
-               eb64_delete(&frm->offset_node);
-               quic_rx_packet_refdec(frm->pkt);
-               pool_free(pool_head_quic_rx_strm_frm, frm);
-       }
-
-       buf_was_full = b_full(&qcs->rx.buf);
-       /* Decode the received data. */
-       qcc_decode_qcs(qc->qcc, qcs);
-
-       /* Buffer was full so the reception was stopped. Now the buffer has
-        * space available thanks to qcc_decode_qcs(). We can now retry to
-        * handle more data.
-        */
-       if (buf_was_full && !b_full(&qcs->rx.buf))
-               goto retry;
-
        return 1;
 }