]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: mux-quic/h3/qpack: use ncbuf for uni streams
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Fri, 13 May 2022 13:41:04 +0000 (15:41 +0200)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Fri, 13 May 2022 15:29:49 +0000 (17:29 +0200)
This commit is the equivalent for uni-streams of previous commit
  MEDIUM: mux-quic/h3/hq-interop: use ncbuf for bidir streams

All unidirectional streams data is now handle in MUX Rx ncbuf. The
obsolete buffer is not unused and will be cleared in the following
patches.

src/h3.c
src/qpack-dec.c
src/xprt_quic.c

index 488fccfabc51575525887cfc5673869ab9d3434e..9919c7a2a9b333704bc9e2ad56f0f8e7578e35f3 100644 (file)
--- a/src/h3.c
+++ b/src/h3.c
@@ -334,12 +334,12 @@ static int h3_decode_qcs(struct qcs *qcs, int fin, void *ctx)
  * <rxbuf> buffer. This function does not update this buffer.
  * Returns 0 if something wrong happened, 1 if not.
  */
-static int h3_parse_settings_frm(struct h3 *h3, const struct buffer *rxbuf, size_t flen)
+static int h3_parse_settings_frm(struct h3 *h3, const struct ncbuf *rxbuf, size_t flen)
 {
        uint64_t id, value;
        const unsigned char *buf, *end;
 
-       buf = (const unsigned char *)b_head(rxbuf);
+       buf = (const unsigned char *)ncb_head(rxbuf);
        end = buf + flen;
 
        while (buf <= end) {
@@ -376,20 +376,20 @@ static int h3_parse_settings_frm(struct h3 *h3, const struct buffer *rxbuf, size
  */
 static int h3_control_recv(struct h3_uqs *h3_uqs, void *ctx)
 {
-       struct buffer *rxbuf = &h3_uqs->qcs->rx.buf;
+       struct ncbuf *rxbuf = &h3_uqs->qcs->rx.ncbuf;
        struct h3 *h3 = ctx;
 
        h3_debug_printf(stderr, "%s STREAM ID: %lu\n", __func__,  h3_uqs->qcs->id);
-       if (!b_data(rxbuf))
+       if (!ncb_data(rxbuf, 0))
                return 1;
 
-       while (b_data(rxbuf)) {
+       while (ncb_data(rxbuf, 0)) {
                size_t hlen;
                uint64_t ftype, flen;
                struct buffer b;
 
                /* Work on a copy of <rxbuf> */
-               b = b_make(rxbuf->area, rxbuf->size, rxbuf->head, rxbuf->data);
+               b = h3_b_dup(rxbuf);
                hlen = h3_decode_frm_header(&ftype, &flen, &b);
                if (!hlen)
                        break;
@@ -399,7 +399,8 @@ static int h3_control_recv(struct h3_uqs *h3_uqs, void *ctx)
                if (flen > b_data(&b))
                        break;
 
-               b_del(rxbuf, hlen);
+               ncb_advance(rxbuf, hlen);
+               h3_uqs->qcs->rx.offset += hlen;
                /* From here, a frame must not be truncated */
                switch (ftype) {
                case H3_FT_CANCEL_PUSH:
@@ -423,14 +424,15 @@ static int h3_control_recv(struct h3_uqs *h3_uqs, void *ctx)
                        h3->err = H3_FRAME_UNEXPECTED;
                        return 0;
                }
-               b_del(rxbuf, flen);
+               ncb_advance(rxbuf, flen);
+               h3_uqs->qcs->rx.offset += flen;
        }
 
        /* Handle the case where remaining data are present in the buffer. This
         * can happen if there is an incomplete frame. In this case, subscribe
         * on the lower layer to restart receive operation.
         */
-       if (b_data(rxbuf))
+       if (ncb_data(rxbuf, 0))
                qcs_subscribe(h3_uqs->qcs, SUB_RETRY_RECV, &h3_uqs->wait_event);
 
        return 1;
@@ -773,12 +775,19 @@ static int h3_attach_ruqs(struct qcs *qcs, void *ctx)
 {
        uint64_t strm_type;
        struct h3 *h3 = ctx;
-       struct buffer *rxbuf = &qcs->rx.buf;
+       struct ncbuf *rxbuf = &qcs->rx.ncbuf;
+       struct buffer b;
+       size_t len = 0;
+
+       b = h3_b_dup(rxbuf);
 
        /* First octets: the uni-stream type */
-       if (!b_quic_dec_int(&strm_type, rxbuf, NULL) || strm_type > H3_UNI_STRM_TP_MAX)
+       if (!b_quic_dec_int(&strm_type, &b, &len) || strm_type > H3_UNI_STRM_TP_MAX)
                return 0;
 
+       ncb_advance(rxbuf, len);
+       qcs->rx.offset += len;
+
        /* Note that for all the uni-streams below, this is an error to receive two times the
         * same type of uni-stream (even for Push stream which is not supported at this time.
         */
index 166e1ae2aa79ca0548d30260367cbab76189ecee..30ab6a71fbe538adca190bf5d4c371dab5a98a8f 100644 (file)
@@ -27,6 +27,7 @@
 #include <haproxy/buf.h>
 #include <haproxy/chunk.h>
 #include <haproxy/h3.h>
+#include <haproxy/ncbuf.h>
 #include <haproxy/qpack-t.h>
 #include <haproxy/qpack-dec.h>
 #include <haproxy/qpack-tbl.h>
@@ -96,19 +97,19 @@ static uint64_t qpack_get_varint(const unsigned char **buf, uint64_t *len_in, in
 int qpack_decode_enc(struct h3_uqs *h3_uqs, void *ctx)
 {
        size_t len;
-       struct buffer *rxbuf;
+       struct ncbuf *rxbuf;
        unsigned char inst;
 
-       rxbuf = &h3_uqs->qcs->rx.buf;
-       len = b_data(rxbuf);
-       qpack_debug_hexdump(stderr, "[QPACK-DEC-ENC] ", b_head(rxbuf), 0, len);
+       rxbuf = &h3_uqs->qcs->rx.ncbuf;
+       len = ncb_data(rxbuf, 0);
+       qpack_debug_hexdump(stderr, "[QPACK-DEC-ENC] ", ncb_head(rxbuf), 0, len);
 
        if (!len) {
                qpack_debug_printf(stderr, "[QPACK-DEC-ENC] empty stream\n");
                return 0;
        }
 
-       inst = (unsigned char)*b_head(rxbuf) & QPACK_ENC_INST_BITMASK;
+       inst = (unsigned char)*ncb_head(rxbuf) & QPACK_ENC_INST_BITMASK;
        if (inst == QPACK_ENC_INST_DUP) {
                /* Duplicate */
        }
@@ -129,19 +130,19 @@ int qpack_decode_enc(struct h3_uqs *h3_uqs, void *ctx)
 int qpack_decode_dec(struct h3_uqs *h3_uqs, void *ctx)
 {
        size_t len;
-       struct buffer *rxbuf;
+       struct ncbuf *rxbuf;
        unsigned char inst;
 
-       rxbuf = &h3_uqs->qcs->rx.buf;
-       len = b_data(rxbuf);
-       qpack_debug_hexdump(stderr, "[QPACK-DEC-DEC] ", b_head(rxbuf), 0, len);
+       rxbuf = &h3_uqs->qcs->rx.ncbuf;
+       len = ncb_data(rxbuf, 0);
+       qpack_debug_hexdump(stderr, "[QPACK-DEC-DEC] ", ncb_head(rxbuf), 0, len);
 
        if (!len) {
                qpack_debug_printf(stderr, "[QPACK-DEC-DEC] empty stream\n");
                return 0;
        }
 
-       inst = (unsigned char)*b_head(rxbuf) & QPACK_DEC_INST_BITMASK;
+       inst = (unsigned char)*ncb_head(rxbuf) & QPACK_DEC_INST_BITMASK;
        if (inst == QPACK_DEC_INST_ICINC) {
                /* Insert count increment */
        }
index 4f3bcf76657f123a19027bb8442b65dcaf6776bf..099ce67b27e8e421b375b3e322994e85187f654d 100644 (file)
@@ -38,6 +38,7 @@
 #include <haproxy/hq_interop.h>
 #include <haproxy/log.h>
 #include <haproxy/mux_quic.h>
+#include <haproxy/ncbuf.h>
 #include <haproxy/pipe.h>
 #include <haproxy/proxy.h>
 #include <haproxy/quic_cc.h>
@@ -2170,20 +2171,6 @@ struct quic_rx_strm_frm *new_quic_rx_strm_frm(struct quic_stream *stream_frm,
        return frm;
 }
 
-/* Copy as most as possible STREAM data from <strm_frm> into <strm> stream.
- * Also update <strm_frm> frame to reflect the data which have been consumed.
- */
-static size_t qc_strm_cpy(struct buffer *buf, struct quic_stream *strm_frm)
-{
-       size_t ret;
-
-       ret = b_putblk(buf, (char *)strm_frm->data, strm_frm->len);
-       strm_frm->len -= ret;
-       strm_frm->offset.key += ret;
-
-       return ret;
-}
-
 /* Handle <strm_frm> bidirectional STREAM frame. Depending on its ID, several
  * streams may be open. The data are copied to the stream RX buffer if possible.
  * If not, the STREAM frame is stored to be treated again later.
@@ -2221,8 +2208,7 @@ static int qc_handle_uni_strm_frm(struct quic_rx_packet *pkt,
                                   struct quic_conn *qc)
 {
        struct qcs *strm;
-       struct quic_rx_strm_frm *frm;
-       size_t strm_frm_len;
+       enum ncb_ret ret;
 
        strm = qcc_get_qcs(qc->qcc, strm_frm->id);
        if (!strm) {
@@ -2246,46 +2232,22 @@ static int qc_handle_uni_strm_frm(struct quic_rx_packet *pkt,
                strm_frm->data += diff;
        }
 
-       strm_frm_len = strm_frm->len;
-       if (strm_frm->offset.key == strm->rx.offset) {
-               int ret;
-
-               if (!qc_get_buf(strm, &strm->rx.buf))
-                   goto store_frm;
-
-               /* qc_strm_cpy() will modify the offset, depending on the number
-                * of bytes copied.
-                */
-               ret = qc_strm_cpy(&strm->rx.buf, strm_frm);
-               /* Inform the application of the arrival of this new stream */
-               if (!strm->rx.offset && !qc->qcc->app_ops->attach_ruqs(strm, qc->qcc->ctx)) {
-                       TRACE_PROTO("Could not set an uni-stream", QUIC_EV_CONN_PSTRM, qc);
-                       return 0;
-               }
-
-               if (ret)
-                       qcs_notify_recv(strm);
+       qc_get_ncbuf(strm, &strm->rx.ncbuf);
+       if (ncb_is_null(&strm->rx.ncbuf))
+               return 0;
 
-               strm_frm->offset.key += ret;
-       }
-       /* Take this frame into an account for the stream flow control */
-       strm->rx.offset += strm_frm_len;
-       /* It all the data were provided to the application, there is no need to
-        * store any more information for it.
-        */
-       if (!strm_frm->len)
-               goto out;
+       ret = ncb_add(&strm->rx.ncbuf, strm_frm->offset.key - strm->rx.offset,
+                      (char *)strm_frm->data, strm_frm->len, NCB_ADD_COMPARE);
+       if (ret != NCB_RET_OK)
+               return 0;
 
- store_frm:
-       frm = new_quic_rx_strm_frm(strm_frm, pkt);
-       if (!frm) {
-               TRACE_PROTO("Could not alloc RX STREAM frame",
-                           QUIC_EV_CONN_PSTRM, qc);
+       /* Inform the application of the arrival of this new stream */
+       if (!strm->rx.offset && !qc->qcc->app_ops->attach_ruqs(strm, qc->qcc->ctx)) {
+               TRACE_PROTO("Could not set an uni-stream", QUIC_EV_CONN_PSTRM, qc);
                return 0;
        }
 
-       eb64_insert(&strm->rx.frms, &frm->offset_node);
-       quic_rx_packet_refinc(pkt);
+       qcs_notify_recv(strm);
 
  out:
        return 1;