]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: mux-quic: store QCS Rx buf in a single-entry tree
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Mon, 24 Feb 2025 15:28:50 +0000 (16:28 +0100)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Fri, 7 Mar 2025 11:06:26 +0000 (12:06 +0100)
Convert QCS rx buffer pointer to a tree container. Additionnaly, offset
field of qc_stream_rxbuf is thus transformed into a node tree.

For now, only a single Rx buffer is stored at most in QCS tree. Multiple
Rx buffers will be implemented in a future patch to improve QUIC clients
upload throughput.

include/haproxy/mux_quic-t.h
src/mux_quic.c

index 567e0402539ef3a575bd9de33d4846c28a310a11..d87e2fecde6648857032a7dab946324d85426aa9 100644 (file)
@@ -134,11 +134,13 @@ enum qcs_state {
        QC_SS_CLO,      /* closed */
 } __attribute__((packed));
 
-/* STREAM receive buffer. Can handle out-of-order storage. */
+/* STREAM receive buffer. Can handle out-of-order storage.
+ * Can be used as a tree node to allocate multiple entries ordered by offsets.
+ */
 struct qc_stream_rxbuf {
-       struct ncbuf ncb; /* data storage with support for out of order offset */
-       uint64_t off;     /* base offset of current buffer */
-       uint64_t off_end; /* first offset directly outside of current buffer */
+       struct eb64_node off_node; /* base offset of current buffer, node for QCS rx.bufs */
+       struct ncbuf ncb;          /* data storage with support for out of order offset */
+       uint64_t off_end;          /* first offset directly outside of current buffer */
 };
 
 struct qcs {
@@ -151,7 +153,7 @@ struct qcs {
        struct {
                uint64_t offset; /* read offset */
                uint64_t offset_max; /* maximum absolute offset received */
-               struct qc_stream_rxbuf *buf; /* single receive buffer */
+               struct eb_root bufs; /* receive buffers tree ordered by offset */
                struct buffer app_buf; /* receive buffer used by stconn layer */
                uint64_t msd; /* current max-stream-data limit to enforce */
                uint64_t msd_init; /* initial max-stream-data */
index 859880f50438df257050400b9f24b5bddab72445..7639a6665b66716e1b948318f65c9cc5676392bb 100644 (file)
@@ -51,24 +51,21 @@ static void qcs_free_rxbuf(struct qcs *qcs, struct qc_stream_rxbuf *rxbuf)
        struct ncbuf *ncbuf;
        struct buffer buf;
 
-       if (!rxbuf)
-               return;
-
        ncbuf = &rxbuf->ncb;
-
        if (!ncb_is_null(ncbuf)) {
                buf = b_make(ncbuf->area, ncbuf->size, 0, 0);
                b_free(&buf);
                offer_buffers(NULL, 1);
        }
+       rxbuf->ncb = NCBUF_NULL;
 
        /* Reset DEM_FULL as buffer is released. This ensures mux is not woken
         * up from rcv_buf stream callback when demux was previously blocked.
         */
        qcs->flags &= ~QC_SF_DEM_FULL;
 
+       eb64_delete(&rxbuf->off_node);
        pool_free(pool_head_qc_stream_rxbuf, rxbuf);
-       qcs->rx.buf = NULL;
 }
 
 /* Free <qcs> instance. This function is reserved for internal usage : it must
@@ -78,6 +75,7 @@ static void qcs_free_rxbuf(struct qcs *qcs, struct qc_stream_rxbuf *rxbuf)
 static void qcs_free(struct qcs *qcs)
 {
        struct qcc *qcc = qcs->qcc;
+       struct qc_stream_rxbuf *b;
 
        TRACE_ENTER(QMUX_EV_QCS_END, qcc->conn, qcs);
        TRACE_STATE("releasing QUIC stream", QMUX_EV_QCS_END, qcc->conn, qcs);
@@ -105,7 +103,11 @@ static void qcs_free(struct qcs *qcs)
        }
 
        /* Free Rx buffer. */
-       qcs_free_rxbuf(qcs, qcs->rx.buf);
+       while (!eb_is_empty(&qcs->rx.bufs)) {
+               b = container_of(eb64_first(&qcs->rx.bufs),
+                                struct qc_stream_rxbuf, off_node);
+               qcs_free_rxbuf(qcs, b);
+       }
 
        /* Remove qcs from qcc tree. */
        eb64_delete(&qcs->by_id);
@@ -160,7 +162,7 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
                qfctl_init(&qcs->tx.fc, 0);
        }
 
-       qcs->rx.buf = NULL;
+       qcs->rx.bufs = EB_ROOT_UNIQUE;
        qcs->rx.app_buf = BUF_NULL;
        qcs->rx.offset = qcs->rx.offset_max = 0;
 
@@ -1094,10 +1096,22 @@ static inline struct buffer qcs_b_dup(const struct qc_stream_rxbuf *b)
        }
 }
 
+/* Returns the current Rx buffer instance for <qcs> stream. */
+static struct qc_stream_rxbuf *qcs_get_curr_rxbuf(struct qcs *qcs)
+{
+       struct eb64_node *node;
+       struct qc_stream_rxbuf *buf;
+
+       node = eb64_first(&qcs->rx.bufs);
+       buf = container_of(node, struct qc_stream_rxbuf, off_node);
+       return buf;
+}
+
 /* Returns the amount of data readable at <qcs> stream current offset. */
 static ncb_sz_t qcs_rx_avail_data(struct qcs *qcs)
 {
-       return qcs->rx.buf ? ncb_data(&qcs->rx.buf->ncb, 0) : 0;
+       struct qc_stream_rxbuf *b = qcs_get_curr_rxbuf(qcs);
+       return b ? ncb_data(&b->ncb, 0) : 0;
 }
 
 /* Remove <bytes> from <qcs> Rx buffer. Flow-control for received offsets may
@@ -1107,18 +1121,19 @@ static void qcs_consume(struct qcs *qcs, uint64_t bytes)
 {
        struct qcc *qcc = qcs->qcc;
        struct quic_frame *frm;
-       struct ncbuf *ncbuf = &qcs->rx.buf->ncb;
+       struct qc_stream_rxbuf *rxbuf;
        enum ncb_ret ret;
 
        TRACE_ENTER(QMUX_EV_QCS_RECV, qcc->conn, qcs);
 
-       ret = ncb_advance(ncbuf, bytes);
+       rxbuf = qcs_get_curr_rxbuf(qcs);
+       ret = ncb_advance(&rxbuf->ncb, bytes);
        if (ret) {
                ABORT_NOW(); /* should not happens because removal only in data */
        }
 
-       if (ncb_is_empty(ncbuf))
-               qcs_free_rxbuf(qcs, qcs->rx.buf);
+       if (ncb_is_empty(&rxbuf->ncb))
+               qcs_free_rxbuf(qcs, rxbuf);
 
        qcs->rx.offset += bytes;
        /* Not necessary to emit a MAX_STREAM_DATA if all data received. */
@@ -1170,6 +1185,7 @@ static void qcs_consume(struct qcs *qcs, uint64_t bytes)
  */
 static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs)
 {
+       struct qc_stream_rxbuf *rxbuf;
        struct buffer b;
        ssize_t ret;
        int fin = 0;
@@ -1177,7 +1193,8 @@ static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs)
 
        TRACE_ENTER(QMUX_EV_QCS_RECV, qcc->conn, qcs);
 
-       b = qcs_b_dup(qcs->rx.buf);
+       rxbuf = qcs_get_curr_rxbuf(qcs);
+       b = qcs_b_dup(rxbuf);
 
        /* Signal FIN to application if STREAM FIN received with all data. */
        if (qcs_is_close_remote(qcs))
@@ -1607,7 +1624,7 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
        }
 
        if (len) {
-               if (!qcs->rx.buf) {
+               if (eb_is_empty(&qcs->rx.bufs)) {
                        struct qc_stream_rxbuf *buf;
                        buf = pool_alloc(pool_head_qc_stream_rxbuf);
                        if (!buf) {
@@ -1617,13 +1634,15 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
                        }
 
                        buf->ncb = NCBUF_NULL;
-                       buf->off = qcs->rx.offset;
-                       buf->off_end = buf->off + qmux_stream_rx_bufsz();
-                       qcs->rx.buf = buf;
+                       buf->off_node.key = qcs->rx.offset;
+                       buf->off_end = qcs->rx.offset + qmux_stream_rx_bufsz();
+                       eb64_insert(&qcs->rx.bufs, &buf->off_node);
+
                        ncbuf = &buf->ncb;
                }
                else {
-                       ncbuf = &qcs->rx.buf->ncb;
+                       struct qc_stream_rxbuf *buf = qcs_get_curr_rxbuf(qcs);
+                       ncbuf = &buf->ncb;
                }
 
                if (!qcs_get_ncbuf(qcs, ncbuf) || ncb_is_null(ncbuf)) {
@@ -1789,6 +1808,7 @@ int qcc_recv_max_stream_data(struct qcc *qcc, uint64_t id, uint64_t max)
 int qcc_recv_reset_stream(struct qcc *qcc, uint64_t id, uint64_t err, uint64_t final_size)
 {
        struct qcs *qcs;
+       struct qc_stream_rxbuf *b;
        int prev_glitches = qcc->glitches;
 
        TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
@@ -1845,7 +1865,11 @@ int qcc_recv_reset_stream(struct qcc *qcc, uint64_t id, uint64_t err, uint64_t f
         */
        qcs->flags |= QC_SF_SIZE_KNOWN|QC_SF_RECV_RESET;
        qcs_close_remote(qcs);
-       qcs_free_rxbuf(qcs, qcs->rx.buf);
+       while (!eb_is_empty(&qcs->rx.bufs)) {
+               b = container_of(eb64_first(&qcs->rx.bufs),
+                                struct qc_stream_rxbuf, off_node);
+               qcs_free_rxbuf(qcs, b);
+       }
 
  out:
        if (qcc->glitches != prev_glitches)