]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: mux-quic: define rxbuf wrapper
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Mon, 24 Feb 2025 15:22:22 +0000 (16:22 +0100)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Fri, 7 Mar 2025 11:06:26 +0000 (12:06 +0100)
Define a new type qc_stream_rxbuf. This is used as a wrapper around QCS
Rx buffer with encapsulation of the ncbuf storage. It is allocated via a
new pool. Several functions are adapted to be able to deal with
qc_stream_rxbuf as a wrapper instead of the previous plain ncbuf
instance.

No functional change should happen with this patch. For now, only a
single qc_stream_rxbuf can be instantiated per QCS. However, this new
type will be useful to implement multiple Rx buffer storage in a future
commit.

include/haproxy/mux_quic-t.h
src/mux_quic.c

index c92ed3644a4314499040fa1e9cd45f6e2fce050d..567e0402539ef3a575bd9de33d4846c28a310a11 100644 (file)
@@ -134,6 +134,13 @@ enum qcs_state {
        QC_SS_CLO,      /* closed */
 } __attribute__((packed));
 
+/* STREAM receive buffer. Can handle out-of-order storage. */
+struct qc_stream_rxbuf {
+       struct ncbuf ncb; /* data storage with support for out of order offset */
+       uint64_t off;     /* base offset of current buffer */
+       uint64_t off_end; /* first offset directly outside of current buffer */
+};
+
 struct qcs {
        struct qcc *qcc;
        struct sedesc *sd;
@@ -142,9 +149,9 @@ struct qcs {
        void *ctx;           /* app-ops context */
 
        struct {
-               uint64_t offset; /* absolute current base offset of ncbuf */
+               uint64_t offset; /* read offset */
                uint64_t offset_max; /* maximum absolute offset received */
-               struct ncbuf ncbuf; /* receive buffer - can handle out-of-order offset frames */
+               struct qc_stream_rxbuf *buf; /* single receive buffer */
                struct buffer app_buf; /* receive buffer used by stconn layer */
                uint64_t msd; /* current max-stream-data limit to enforce */
                uint64_t msd_init; /* initial max-stream-data */
index ba6b446e58425ad8c180cef3fd0ef939a50afccb..859880f50438df257050400b9f24b5bddab72445 100644 (file)
@@ -34,6 +34,7 @@
 
 DECLARE_POOL(pool_head_qcc, "qcc", sizeof(struct qcc));
 DECLARE_POOL(pool_head_qcs, "qcs", sizeof(struct qcs));
+DECLARE_STATIC_POOL(pool_head_qc_stream_rxbuf, "qc_stream_rxbuf", sizeof(struct qc_stream_rxbuf));
 
 static void qmux_ctrl_send(struct qc_stream_desc *, uint64_t data, uint64_t offset);
 static void qmux_ctrl_room(struct qc_stream_desc *, uint64_t room);
@@ -44,23 +45,30 @@ static int qcc_is_pacing_active(const struct connection *conn)
        return !(quic_tune.options & QUIC_TUNE_NO_PACING);
 }
 
-static void qcs_free_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf)
+/* Free <rxbuf> instance and its inner data storage attached to <qcs> stream. */
+static void qcs_free_rxbuf(struct qcs *qcs, struct qc_stream_rxbuf *rxbuf)
 {
+       struct ncbuf *ncbuf;
        struct buffer buf;
 
-       if (ncb_is_null(ncbuf))
+       if (!rxbuf)
                return;
 
-       buf = b_make(ncbuf->area, ncbuf->size, 0, 0);
-       b_free(&buf);
-       offer_buffers(NULL, 1);
+       ncbuf = &rxbuf->ncb;
 
-       *ncbuf = NCBUF_NULL;
+       if (!ncb_is_null(ncbuf)) {
+               buf = b_make(ncbuf->area, ncbuf->size, 0, 0);
+               b_free(&buf);
+               offer_buffers(NULL, 1);
+       }
 
        /* Reset DEM_FULL as buffer is released. This ensures mux is not woken
         * up from rcv_buf stream callback when demux was previously blocked.
         */
        qcs->flags &= ~QC_SF_DEM_FULL;
+
+       pool_free(pool_head_qc_stream_rxbuf, rxbuf);
+       qcs->rx.buf = NULL;
 }
 
 /* Free <qcs> instance. This function is reserved for internal usage : it must
@@ -97,7 +105,7 @@ static void qcs_free(struct qcs *qcs)
        }
 
        /* Free Rx buffer. */
-       qcs_free_ncbuf(qcs, &qcs->rx.ncbuf);
+       qcs_free_rxbuf(qcs, qcs->rx.buf);
 
        /* Remove qcs from qcc tree. */
        eb64_delete(&qcs->by_id);
@@ -152,7 +160,7 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
                qfctl_init(&qcs->tx.fc, 0);
        }
 
-       qcs->rx.ncbuf = NCBUF_NULL;
+       qcs->rx.buf = NULL;
        qcs->rx.app_buf = BUF_NULL;
        qcs->rx.offset = qcs->rx.offset_max = 0;
 
@@ -1074,10 +1082,22 @@ int qcc_get_qcs(struct qcc *qcc, uint64_t id, int receive_only, int send_only,
        return 1;
 }
 
-/* Simple function to duplicate a buffer */
-static inline struct buffer qcs_b_dup(const struct ncbuf *b)
+/* Convert <b> out-of-order storage into a contiguous buffer. */
+static inline struct buffer qcs_b_dup(const struct qc_stream_rxbuf *b)
+{
+       if (b) {
+               const struct ncbuf *ncb = &b->ncb;
+               return b_make(ncb_orig(ncb), ncb->size, ncb->head, ncb_data(ncb, 0));
+       }
+       else {
+               return BUF_NULL;
+       }
+}
+
+/* Returns the amount of data readable at <qcs> stream current offset. */
+static ncb_sz_t qcs_rx_avail_data(struct qcs *qcs)
 {
-       return b_make(ncb_orig(b), b->size, b->head, ncb_data(b, 0));
+       return qcs->rx.buf ? ncb_data(&qcs->rx.buf->ncb, 0) : 0;
 }
 
 /* Remove <bytes> from <qcs> Rx buffer. Flow-control for received offsets may
@@ -1087,18 +1107,18 @@ static void qcs_consume(struct qcs *qcs, uint64_t bytes)
 {
        struct qcc *qcc = qcs->qcc;
        struct quic_frame *frm;
-       struct ncbuf *buf = &qcs->rx.ncbuf;
+       struct ncbuf *ncbuf = &qcs->rx.buf->ncb;
        enum ncb_ret ret;
 
        TRACE_ENTER(QMUX_EV_QCS_RECV, qcc->conn, qcs);
 
-       ret = ncb_advance(buf, bytes);
+       ret = ncb_advance(ncbuf, bytes);
        if (ret) {
                ABORT_NOW(); /* should not happens because removal only in data */
        }
 
-       if (ncb_is_empty(buf))
-               qcs_free_ncbuf(qcs, buf);
+       if (ncb_is_empty(ncbuf))
+               qcs_free_rxbuf(qcs, qcs->rx.buf);
 
        qcs->rx.offset += bytes;
        /* Not necessary to emit a MAX_STREAM_DATA if all data received. */
@@ -1157,7 +1177,7 @@ static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs)
 
        TRACE_ENTER(QMUX_EV_QCS_RECV, qcc->conn, qcs);
 
-       b = qcs_b_dup(&qcs->rx.ncbuf);
+       b = qcs_b_dup(qcs->rx.buf);
 
        /* Signal FIN to application if STREAM FIN received with all data. */
        if (qcs_is_close_remote(qcs))
@@ -1500,6 +1520,7 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
              char fin, char *data)
 {
        const int fin_standalone = (!len && fin);
+       struct ncbuf *ncbuf;
        struct qcs *qcs;
        enum ncb_ret ret;
 
@@ -1586,13 +1607,32 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
        }
 
        if (len) {
-               if (!qcs_get_ncbuf(qcs, &qcs->rx.ncbuf) || ncb_is_null(&qcs->rx.ncbuf)) {
+               if (!qcs->rx.buf) {
+                       struct qc_stream_rxbuf *buf;
+                       buf = pool_alloc(pool_head_qc_stream_rxbuf);
+                       if (!buf) {
+                               TRACE_ERROR("rxbuf alloc failure", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
+                               qcc_set_error(qcc, QC_ERR_INTERNAL_ERROR, 0);
+                               goto err;
+                       }
+
+                       buf->ncb = NCBUF_NULL;
+                       buf->off = qcs->rx.offset;
+                       buf->off_end = buf->off + qmux_stream_rx_bufsz();
+                       qcs->rx.buf = buf;
+                       ncbuf = &buf->ncb;
+               }
+               else {
+                       ncbuf = &qcs->rx.buf->ncb;
+               }
+
+               if (!qcs_get_ncbuf(qcs, ncbuf) || ncb_is_null(ncbuf)) {
                        TRACE_ERROR("receive ncbuf alloc failure", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
                        qcc_set_error(qcc, QC_ERR_INTERNAL_ERROR, 0);
                        goto err;
                }
 
-               ret = ncb_add(&qcs->rx.ncbuf, offset - qcs->rx.offset, data, len, NCB_ADD_COMPARE);
+               ret = ncb_add(ncbuf, offset - qcs->rx.offset, data, len, NCB_ADD_COMPARE);
                switch (ret) {
                case NCB_RET_OK:
                        break;
@@ -1624,11 +1664,11 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
                qcs->flags |= QC_SF_SIZE_KNOWN;
 
        if (qcs->flags & QC_SF_SIZE_KNOWN &&
-           qcs->rx.offset_max == qcs->rx.offset + ncb_data(&qcs->rx.ncbuf, 0)) {
+           qcs->rx.offset_max == qcs->rx.offset + qcs_rx_avail_data(qcs)) {
                qcs_close_remote(qcs);
        }
 
-       if ((ncb_data(&qcs->rx.ncbuf, 0) && !(qcs->flags & QC_SF_DEM_FULL)) ||
+       if ((qcs_rx_avail_data(qcs) && !(qcs->flags & QC_SF_DEM_FULL)) ||
            unlikely(fin_standalone && qcs_is_close_remote(qcs))) {
                qcc_decode_qcs(qcc, qcs);
                LIST_DEL_INIT(&qcs->el_recv);
@@ -1805,7 +1845,7 @@ int qcc_recv_reset_stream(struct qcc *qcc, uint64_t id, uint64_t err, uint64_t f
         */
        qcs->flags |= QC_SF_SIZE_KNOWN|QC_SF_RECV_RESET;
        qcs_close_remote(qcs);
-       qcs_free_ncbuf(qcs, &qcs->rx.ncbuf);
+       qcs_free_rxbuf(qcs, qcs->rx.buf);
 
  out:
        if (qcc->glitches != prev_glitches)
@@ -3345,7 +3385,7 @@ static size_t qmux_strm_rcv_buf(struct stconn *sc, struct buffer *buf,
                /* Ensure DEM_FULL is only set if there is available data to
                 * ensure we never do unnecessary wakeup here.
                 */
-               BUG_ON(!ncb_data(&qcs->rx.ncbuf, 0));
+               BUG_ON(!qcs_rx_avail_data(qcs));
 
                qcs->flags &= ~QC_SF_DEM_FULL;
                if (!(qcc->flags & QC_CF_ERRL)) {