]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: mux-quic: simplify decode_qcs API
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Fri, 3 Jun 2022 14:40:34 +0000 (16:40 +0200)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Tue, 7 Jun 2022 16:15:47 +0000 (18:15 +0200)
Slightly modify decode_qcs function used by transcoders. The MUX now
gives a buffer instance on which each transcoder is free to work on it.
At the return of the function, the MUX removes consume data from its own
buffer.

This reduces the number of invocation to qcs_consume at the end of a
full demuxing process. The API is also cleaner with the transcoders not
responsible of calling it with the risk of having the input buffer
freed if empty.

include/haproxy/mux_quic-t.h
include/haproxy/mux_quic.h
src/h3.c
src/hq_interop.c
src/mux_quic.c

index b32d4f9668b85b76197cf78524ab02f05987e6a3..9a69b76387cc4166f94adc0c76b218c89a07bf7b 100644 (file)
@@ -102,6 +102,9 @@ struct qcc {
 #define QC_SF_DEM_FULL          0x00000020  /* demux blocked on request channel buffer full */
 #define QC_SF_READ_ABORTED      0x00000040  /* stream rejected by app layer */
 
+/* Maximum size of stream Rx buffer. */
+#define QC_S_RX_BUF_SZ   (global.tune.bufsize - NCB_RESERVED_SZ)
+
 struct qcs {
        struct qcc *qcc;
        struct sedesc *sd;
@@ -137,7 +140,7 @@ struct qcs {
 struct qcc_app_ops {
        int (*init)(struct qcc *qcc);
        int (*attach)(struct qcs *qcs, void *conn_ctx);
-       int (*decode_qcs)(struct qcs *qcs, int fin);
+       int (*decode_qcs)(struct qcs *qcs, struct buffer *b, int fin);
        size_t (*snd_buf)(struct stconn *sc, struct buffer *buf, size_t count, int flags);
        void (*detach)(struct qcs *qcs);
        int (*finalize)(void *ctx);
index 3c713a3a6d7286c0ef5ec701e133c2d52a30cc40..0846ca7b79b0d58705ff32ff26c39e8037da1863 100644 (file)
@@ -23,7 +23,6 @@ struct ncbuf *qc_get_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf);
 int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es);
 void qcs_notify_recv(struct qcs *qcs);
 void qcs_notify_send(struct qcs *qcs);
-void qcs_consume(struct qcs *qcs, uint64_t bytes);
 
 void qcc_emit_cc_app(struct qcc *qcc, int err);
 int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
index b9a9fed3615866200d261cd32ea0dc35b72d3332..655bf26a0633c35dcc9bfc3a655f106e644357dd 100644 (file)
--- a/src/h3.c
+++ b/src/h3.c
@@ -26,7 +26,6 @@
 #include <haproxy/intops.h>
 #include <haproxy/istbuf.h>
 #include <haproxy/mux_quic.h>
-#include <haproxy/ncbuf.h>
 #include <haproxy/pool.h>
 #include <haproxy/qpack-dec.h>
 #include <haproxy/qpack-enc.h>
@@ -144,22 +143,15 @@ struct h3s {
 
 DECLARE_STATIC_POOL(pool_head_h3s, "h3s", sizeof(struct h3s));
 
-/* Simple function to duplicate a buffer */
-static inline struct buffer h3_b_dup(const struct ncbuf *b)
-{
-       return b_make(ncb_orig(b), b->size, b->head, ncb_data(b, 0));
-}
-
-/* Initialize an uni-stream <qcs> by reading its type from <rxbuf>.
+/* Initialize an uni-stream <qcs> by reading its type from <b>.
  *
  * Returns 0 on success else non-zero.
  */
 static int h3_init_uni_stream(struct h3c *h3c, struct qcs *qcs,
-                              struct ncbuf *rxbuf)
+                              struct buffer *b)
 {
        /* decode unidirectional stream type */
        struct h3s *h3s = qcs->ctx;
-       struct buffer b;
        uint64_t type;
        size_t len = 0, ret;
 
@@ -168,8 +160,7 @@ static int h3_init_uni_stream(struct h3c *h3c, struct qcs *qcs,
        BUG_ON_HOT(!quic_stream_is_uni(qcs->id) ||
                   h3s->flags & H3_SF_UNI_INIT);
 
-       b = h3_b_dup(rxbuf);
-       ret = b_quic_dec_int(&type, &b, &len);
+       ret = b_quic_dec_int(&type, b, &len);
        if (!ret) {
                ABORT_NOW();
        }
@@ -220,7 +211,6 @@ static int h3_init_uni_stream(struct h3c *h3c, struct qcs *qcs,
        };
 
        h3s->flags |= H3_SF_UNI_INIT;
-       qcs_consume(qcs, len);
 
        TRACE_LEAVE(H3_EV_H3S_NEW, qcs->qcc->conn, qcs);
        return 0;
@@ -231,7 +221,7 @@ static int h3_init_uni_stream(struct h3c *h3c, struct qcs *qcs,
  *
  * Returns 0 on success else non-zero.
  */
-static int h3_parse_uni_stream_no_h3(struct qcs *qcs, struct ncbuf *rxbuf)
+static int h3_parse_uni_stream_no_h3(struct qcs *qcs, struct buffer *b)
 {
        struct h3s *h3s = qcs->ctx;
 
@@ -263,14 +253,13 @@ static int h3_parse_uni_stream_no_h3(struct qcs *qcs, struct ncbuf *rxbuf)
  * consumed.
  */
 static inline size_t h3_decode_frm_header(uint64_t *ftype, uint64_t *flen,
-                                          struct ncbuf *rxbuf)
+                                          struct buffer *b)
 {
        size_t hlen;
-       struct buffer b = h3_b_dup(rxbuf);
 
        hlen = 0;
-       if (!b_quic_dec_int(ftype, &b, &hlen) ||
-           !b_quic_dec_int(flen, &b, &hlen)) {
+       if (!b_quic_dec_int(ftype, b, &hlen) ||
+           !b_quic_dec_int(flen, b, &hlen)) {
                return 0;
        }
 
@@ -333,8 +322,8 @@ static int h3_is_frame_valid(struct h3c *h3c, struct qcs *qcs, uint64_t ftype)
  *
  * Returns the number of bytes handled or a negative error code.
  */
-static int h3_headers_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len,
-                             char fin)
+static int h3_headers_to_htx(struct qcs *qcs, const struct buffer *buf,
+                             uint64_t len, char fin)
 {
        struct buffer htx_buf = BUF_NULL;
        struct buffer *tmp = get_trash_chunk();
@@ -350,8 +339,8 @@ static int h3_headers_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len,
        TRACE_ENTER(H3_EV_RX_FRAME|H3_EV_RX_HDR, qcs->qcc->conn, qcs);
 
        /* TODO support buffer wrapping */
-       BUG_ON(ncb_head(buf) + len >= ncb_wrap(buf));
-       if (qpack_decode_fs((const unsigned char *)ncb_head(buf), len, tmp, list) < 0)
+       BUG_ON(b_head(buf) + len >= b_wrap(buf));
+       if (qpack_decode_fs((const unsigned char *)b_head(buf), len, tmp, list) < 0)
                return -1;
 
        qc_get_buf(qcs, &htx_buf);
@@ -431,8 +420,8 @@ static int h3_headers_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len,
  *
  * Returns the number of bytes handled or a negative error code.
  */
-static int h3_data_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len,
-                          char fin)
+static int h3_data_to_htx(struct qcs *qcs, const struct buffer *buf,
+                          uint64_t len, char fin)
 {
        struct buffer *appbuf;
        struct htx *htx = NULL;
@@ -446,12 +435,12 @@ static int h3_data_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len,
        BUG_ON(!appbuf);
        htx = htx_from_buf(appbuf);
 
-       if (len > ncb_data(buf, 0)) {
-               len = ncb_data(buf, 0);
+       if (len > b_data(buf)) {
+               len = b_data(buf);
                fin = 0;
        }
 
-       head = ncb_head(buf);
+       head = b_head(buf);
  retry:
        htx_space = htx_free_data_space(htx);
        if (!htx_space) {
@@ -464,16 +453,16 @@ static int h3_data_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len,
                fin = 0;
        }
 
-       if (head + len > ncb_wrap(buf)) {
-               size_t contig = ncb_wrap(buf) - head;
-               htx_sent = htx_add_data(htx, ist2(ncb_head(buf), contig));
+       if (head + len > b_wrap(buf)) {
+               size_t contig = b_wrap(buf) - head;
+               htx_sent = htx_add_data(htx, ist2(b_head(buf), contig));
                if (htx_sent < contig) {
                        qcs->flags |= QC_SF_DEM_FULL;
                        goto out;
                }
 
                len -= contig;
-               head = ncb_orig(buf);
+               head = b_orig(buf);
                goto retry;
        }
 
@@ -493,11 +482,11 @@ static int h3_data_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len,
        return htx_sent;
 }
 
-/* Parse a SETTINGS frame of length <len> of payload <rxbuf>.
+/* Parse a SETTINGS frame of length <len> of payload <buf>.
  *
  * Returns the number of bytes handled or a negative error code.
  */
-static size_t h3_parse_settings_frm(struct h3c *h3c, const struct ncbuf *rxbuf,
+static size_t h3_parse_settings_frm(struct h3c *h3c, const struct buffer *buf,
                                     size_t len)
 {
        struct buffer b;
@@ -507,8 +496,11 @@ static size_t h3_parse_settings_frm(struct h3c *h3c, const struct ncbuf *rxbuf,
 
        TRACE_ENTER(H3_EV_RX_FRAME|H3_EV_RX_SETTINGS, h3c->qcc->conn);
 
-       b = h3_b_dup(rxbuf);
-       b_set_data(&b, len);
+       /* Work on a copy of <buf>. */
+       b = b_make(b_orig(buf), b_size(buf), b_head_ofs(buf), b_data(buf));
+
+       /* TODO handle incomplete SETTINGS frame */
+       BUG_ON(len < b_data(&b));
 
        while (b_data(&b)) {
                if (!b_quic_dec_int(&id, &b, &ret) || !b_quic_dec_int(&value, &b, &ret)) {
@@ -576,36 +568,35 @@ static size_t h3_parse_settings_frm(struct h3c *h3c, const struct ncbuf *rxbuf,
  *
  * Returns 0 on success else non-zero.
  */
-static int h3_decode_qcs(struct qcs *qcs, int fin)
+static int h3_decode_qcs(struct qcs *qcs, struct buffer *b, int fin)
 {
-       struct ncbuf *rxbuf = &qcs->rx.ncbuf;
        struct h3s *h3s = qcs->ctx;
        struct h3c *h3c = h3s->h3c;
        ssize_t ret;
 
        h3_debug_printf(stderr, "%s: STREAM ID: %lu\n", __func__, qcs->id);
-       if (!ncb_data(rxbuf, 0))
+       if (!b_data(b))
                return 0;
 
        if (quic_stream_is_uni(qcs->id) && !(h3s->flags & H3_SF_UNI_INIT)) {
-               if (h3_init_uni_stream(h3c, qcs, rxbuf))
+               if (h3_init_uni_stream(h3c, qcs, b))
                        return 1;
        }
 
        if (quic_stream_is_uni(qcs->id) && (h3s->flags & H3_SF_UNI_NO_H3)) {
                /* For non-h3 STREAM, parse it and return immediately. */
-               if (h3_parse_uni_stream_no_h3(qcs, rxbuf))
+               if (h3_parse_uni_stream_no_h3(qcs, b))
                        return 1;
                return 0;
        }
 
-       while (ncb_data(rxbuf, 0) && !(qcs->flags & QC_SF_DEM_FULL)) {
+       while (b_data(b) && !(qcs->flags & QC_SF_DEM_FULL)) {
                uint64_t ftype, flen;
                char last_stream_frame = 0;
 
                /* Work on a copy of <rxbuf> */
                if (!h3s->demux_frame_len) {
-                       size_t hlen = h3_decode_frm_header(&ftype, &flen, rxbuf);
+                       size_t hlen = h3_decode_frm_header(&ftype, &flen, b);
                        if (!hlen)
                                break;
 
@@ -620,8 +611,7 @@ static int h3_decode_qcs(struct qcs *qcs, int fin)
                                return 1;
                        }
 
-                       qcs_consume(qcs, hlen);
-                       if (!ncb_data(rxbuf, 0))
+                       if (!b_data(b))
                                break;
                }
 
@@ -631,31 +621,31 @@ static int h3_decode_qcs(struct qcs *qcs, int fin)
                /* Do not demux incomplete frames except H3 DATA which can be
                 * fragmented in multiple HTX blocks.
                 */
-               if (flen > ncb_data(rxbuf, 0) && ftype != H3_FT_DATA) {
+               if (flen > b_data(b) && ftype != H3_FT_DATA) {
                        /* Reject frames bigger than bufsize.
                         *
                         * TODO HEADERS should in complement be limited with H3
                         * SETTINGS_MAX_FIELD_SECTION_SIZE parameter to prevent
                         * excessive decompressed size.
                         */
-                       if (flen > ncb_size(rxbuf)) {
+                       if (flen > QC_S_RX_BUF_SZ) {
                                qcc_emit_cc_app(qcs->qcc, H3_EXCESSIVE_LOAD);
                                return 1;
                        }
                        break;
                }
 
-               last_stream_frame = (fin && flen == ncb_total_data(rxbuf));
+               last_stream_frame = (fin && flen == b_data(b));
 
                h3_inc_frame_type_cnt(h3c->prx_counters, ftype);
                switch (ftype) {
                case H3_FT_DATA:
-                       ret = h3_data_to_htx(qcs, rxbuf, flen, last_stream_frame);
+                       ret = h3_data_to_htx(qcs, b, flen, last_stream_frame);
                        /* TODO handle error reporting. Stream closure required. */
                        if (ret < 0) { ABORT_NOW(); }
                        break;
                case H3_FT_HEADERS:
-                       ret = h3_headers_to_htx(qcs, rxbuf, flen, last_stream_frame);
+                       ret = h3_headers_to_htx(qcs, b, flen, last_stream_frame);
                        /* TODO handle error reporting. Stream closure required. */
                        if (ret < 0) { ABORT_NOW(); }
                        break;
@@ -667,7 +657,7 @@ static int h3_decode_qcs(struct qcs *qcs, int fin)
                        ret = flen;
                        break;
                case H3_FT_SETTINGS:
-                       ret = h3_parse_settings_frm(qcs->qcc->ctx, rxbuf, flen);
+                       ret = h3_parse_settings_frm(qcs->qcc->ctx, b, flen);
                        if (ret < 0) {
                                qcc_emit_cc_app(qcs->qcc, h3c->err);
                                return 1;
@@ -688,7 +678,7 @@ static int h3_decode_qcs(struct qcs *qcs, int fin)
                if (ret) {
                        BUG_ON(h3s->demux_frame_len < ret);
                        h3s->demux_frame_len -= ret;
-                       qcs_consume(qcs, ret);
+                       b_del(b, ret);
                }
        }
 
index f5c0e79cdd45da4582b1840f1425393b0424a29f..4b4b5222d0a49df52a7d0a97f76db97b64b6ee5c 100644 (file)
@@ -7,20 +7,18 @@
 #include <haproxy/htx.h>
 #include <haproxy/http.h>
 #include <haproxy/mux_quic.h>
-#include <haproxy/ncbuf.h>
 
-static int hq_interop_decode_qcs(struct qcs *qcs, int fin)
+static int hq_interop_decode_qcs(struct qcs *qcs, struct buffer *b, int fin)
 {
-       struct ncbuf *rxbuf = &qcs->rx.ncbuf;
        struct htx *htx;
        struct htx_sl *sl;
        struct stconn *sc;
        struct buffer htx_buf = BUF_NULL;
        struct ist path;
-       char *ptr = ncb_head(rxbuf);
-       char *end = ncb_wrap(rxbuf);
-       size_t size = ncb_size(rxbuf);
-       size_t data = ncb_data(rxbuf, 0);
+       char *ptr = b_head(b);
+       char *end = b_wrap(b);
+       size_t size = b_size(b);
+       size_t data = b_data(b);
 
        b_alloc(&htx_buf);
        htx = htx_from_buf(&htx_buf);
@@ -76,7 +74,7 @@ static int hq_interop_decode_qcs(struct qcs *qcs, int fin)
        if (!sc)
                return 1;
 
-       qcs_consume(qcs, ncb_data(rxbuf, 0));
+       b_reset(b);
        b_free(&htx_buf);
 
        if (fin)
index 84237c3e6e4e517d96df86d6f3ab7a7e61138da3..28a1896a588ed1f40a9b9c2bd3ad31e0d6612d09 100644 (file)
@@ -286,57 +286,6 @@ void qcs_notify_send(struct qcs *qcs)
        }
 }
 
-/* Remove <bytes> from <qcs> Rx buffer. This must be called by transcoders
- * after STREAM parsing. Flow-control for received offsets may be allocated for
- * the peer if needed.
- */
-void qcs_consume(struct qcs *qcs, uint64_t bytes)
-{
-       struct qcc *qcc = qcs->qcc;
-       struct quic_frame *frm;
-       struct ncbuf *buf = &qcs->rx.ncbuf;
-       enum ncb_ret ret;
-
-       ret = ncb_advance(buf, bytes);
-       if (ret) {
-               ABORT_NOW(); /* should not happens because removal only in data */
-       }
-
-       if (ncb_is_empty(buf))
-               qc_free_ncbuf(qcs, buf);
-
-       qcs->rx.offset += bytes;
-       if (qcs->rx.msd - qcs->rx.offset < qcs->rx.msd_init / 2) {
-               frm = pool_zalloc(pool_head_quic_frame);
-               BUG_ON(!frm); /* TODO handle this properly */
-
-               qcs->rx.msd = qcs->rx.offset + qcs->rx.msd_init;
-
-               LIST_INIT(&frm->reflist);
-               frm->type = QUIC_FT_MAX_STREAM_DATA;
-               frm->max_stream_data.id = qcs->id;
-               frm->max_stream_data.max_stream_data = qcs->rx.msd;
-
-               LIST_APPEND(&qcc->lfctl.frms, &frm->list);
-               tasklet_wakeup(qcc->wait_event.tasklet);
-       }
-
-       qcc->lfctl.offsets_consume += bytes;
-       if (qcc->lfctl.md - qcc->lfctl.offsets_consume < qcc->lfctl.md_init / 2) {
-               frm = pool_zalloc(pool_head_quic_frame);
-               BUG_ON(!frm); /* TODO handle this properly */
-
-               qcc->lfctl.md = qcc->lfctl.offsets_consume + qcc->lfctl.md_init;
-
-               LIST_INIT(&frm->reflist);
-               frm->type = QUIC_FT_MAX_DATA;
-               frm->max_data.max_data = qcc->lfctl.md;
-
-               LIST_APPEND(&qcs->qcc->lfctl.frms, &frm->list);
-               tasklet_wakeup(qcs->qcc->wait_event.tasklet);
-       }
-}
-
 /* Retrieve as an ebtree node the stream with <id> as ID, possibly allocates
  * several streams, depending on the already open ones.
  * Return this node if succeeded, NULL if not.
@@ -425,6 +374,63 @@ struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id)
        return NULL;
 }
 
+/* Simple function to duplicate a buffer */
+static inline struct buffer qcs_b_dup(const struct ncbuf *b)
+{
+       return b_make(ncb_orig(b), b->size, b->head, ncb_data(b, 0));
+}
+
+/* Remove <bytes> from <qcs> Rx buffer. This must be called by transcoders
+ * after STREAM parsing. Flow-control for received offsets may be allocated for
+ * the peer if needed.
+ */
+static void qcs_consume(struct qcs *qcs, uint64_t bytes)
+{
+       struct qcc *qcc = qcs->qcc;
+       struct quic_frame *frm;
+       struct ncbuf *buf = &qcs->rx.ncbuf;
+       enum ncb_ret ret;
+
+       ret = ncb_advance(buf, bytes);
+       if (ret) {
+               ABORT_NOW(); /* should not happens because removal only in data */
+       }
+
+       if (ncb_is_empty(buf))
+               qc_free_ncbuf(qcs, buf);
+
+       qcs->rx.offset += bytes;
+       if (qcs->rx.msd - qcs->rx.offset < qcs->rx.msd_init / 2) {
+               frm = pool_zalloc(pool_head_quic_frame);
+               BUG_ON(!frm); /* TODO handle this properly */
+
+               qcs->rx.msd = qcs->rx.offset + qcs->rx.msd_init;
+
+               LIST_INIT(&frm->reflist);
+               frm->type = QUIC_FT_MAX_STREAM_DATA;
+               frm->max_stream_data.id = qcs->id;
+               frm->max_stream_data.max_stream_data = qcs->rx.msd;
+
+               LIST_APPEND(&qcc->lfctl.frms, &frm->list);
+               tasklet_wakeup(qcc->wait_event.tasklet);
+       }
+
+       qcc->lfctl.offsets_consume += bytes;
+       if (qcc->lfctl.md - qcc->lfctl.offsets_consume < qcc->lfctl.md_init / 2) {
+               frm = pool_zalloc(pool_head_quic_frame);
+               BUG_ON(!frm); /* TODO handle this properly */
+
+               qcc->lfctl.md = qcc->lfctl.offsets_consume + qcc->lfctl.md_init;
+
+               LIST_INIT(&frm->reflist);
+               frm->type = QUIC_FT_MAX_DATA;
+               frm->max_data.max_data = qcc->lfctl.md;
+
+               LIST_APPEND(&qcs->qcc->lfctl.frms, &frm->list);
+               tasklet_wakeup(qcs->qcc->wait_event.tasklet);
+       }
+}
+
 /* Decode the content of STREAM frames already received on the stream instance
  * <qcs>.
  *
@@ -432,14 +438,27 @@ struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id)
  */
 static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs)
 {
+       struct buffer b;
+       size_t data, done;
+       int ret;
+
        TRACE_ENTER(QMUX_EV_QCS_RECV, qcc->conn, qcs);
 
-       if (qcc->app_ops->decode_qcs(qcs, qcs->flags & QC_SF_FIN_RECV)) {
+       b = qcs_b_dup(&qcs->rx.ncbuf);
+       data = b_data(&b);
+
+       ret = qcc->app_ops->decode_qcs(qcs, &b, qcs->flags & QC_SF_FIN_RECV);
+       if (ret) {
                TRACE_DEVEL("leaving on decoding error", QMUX_EV_QCS_RECV, qcc->conn, qcs);
                return 1;
        }
 
-       qcs_notify_recv(qcs);
+       BUG_ON_HOT(data < b_data(&b));
+       done = data - b_data(&b);
+       if (done) {
+               qcs_consume(qcs, done);
+               qcs_notify_recv(qcs);
+       }
 
        TRACE_LEAVE(QMUX_EV_QCS_RECV, qcc->conn, qcs);