From: Amaury Denoyelle Date: Fri, 3 Jun 2022 14:40:34 +0000 (+0200) Subject: MINOR: mux-quic: simplify decode_qcs API X-Git-Tag: v2.7-dev1~94 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=62eef85961f4a2a241e0b24ef540cc91f156b842;p=thirdparty%2Fhaproxy.git MINOR: mux-quic: simplify decode_qcs API Slightly modify decode_qcs function used by transcoders. The MUX now gives a buffer instance on which each transcoder is free to work on it. At the return of the function, the MUX removes consume data from its own buffer. This reduces the number of invocation to qcs_consume at the end of a full demuxing process. The API is also cleaner with the transcoders not responsible of calling it with the risk of having the input buffer freed if empty. --- diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index b32d4f9668..9a69b76387 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -102,6 +102,9 @@ struct qcc { #define QC_SF_DEM_FULL 0x00000020 /* demux blocked on request channel buffer full */ #define QC_SF_READ_ABORTED 0x00000040 /* stream rejected by app layer */ +/* Maximum size of stream Rx buffer. */ +#define QC_S_RX_BUF_SZ (global.tune.bufsize - NCB_RESERVED_SZ) + struct qcs { struct qcc *qcc; struct sedesc *sd; @@ -137,7 +140,7 @@ struct qcs { struct qcc_app_ops { int (*init)(struct qcc *qcc); int (*attach)(struct qcs *qcs, void *conn_ctx); - int (*decode_qcs)(struct qcs *qcs, int fin); + int (*decode_qcs)(struct qcs *qcs, struct buffer *b, int fin); size_t (*snd_buf)(struct stconn *sc, struct buffer *buf, size_t count, int flags); void (*detach)(struct qcs *qcs); int (*finalize)(void *ctx); diff --git a/include/haproxy/mux_quic.h b/include/haproxy/mux_quic.h index 3c713a3a6d..0846ca7b79 100644 --- a/include/haproxy/mux_quic.h +++ b/include/haproxy/mux_quic.h @@ -23,7 +23,6 @@ struct ncbuf *qc_get_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf); int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es); void qcs_notify_recv(struct qcs *qcs); void qcs_notify_send(struct qcs *qcs); -void qcs_consume(struct qcs *qcs, uint64_t bytes); void qcc_emit_cc_app(struct qcc *qcc, int err); int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset, diff --git a/src/h3.c b/src/h3.c index b9a9fed361..655bf26a06 100644 --- a/src/h3.c +++ b/src/h3.c @@ -26,7 +26,6 @@ #include #include #include -#include #include #include #include @@ -144,22 +143,15 @@ struct h3s { DECLARE_STATIC_POOL(pool_head_h3s, "h3s", sizeof(struct h3s)); -/* Simple function to duplicate a buffer */ -static inline struct buffer h3_b_dup(const struct ncbuf *b) -{ - return b_make(ncb_orig(b), b->size, b->head, ncb_data(b, 0)); -} - -/* Initialize an uni-stream by reading its type from . +/* Initialize an uni-stream by reading its type from . * * Returns 0 on success else non-zero. */ static int h3_init_uni_stream(struct h3c *h3c, struct qcs *qcs, - struct ncbuf *rxbuf) + struct buffer *b) { /* decode unidirectional stream type */ struct h3s *h3s = qcs->ctx; - struct buffer b; uint64_t type; size_t len = 0, ret; @@ -168,8 +160,7 @@ static int h3_init_uni_stream(struct h3c *h3c, struct qcs *qcs, BUG_ON_HOT(!quic_stream_is_uni(qcs->id) || h3s->flags & H3_SF_UNI_INIT); - b = h3_b_dup(rxbuf); - ret = b_quic_dec_int(&type, &b, &len); + ret = b_quic_dec_int(&type, b, &len); if (!ret) { ABORT_NOW(); } @@ -220,7 +211,6 @@ static int h3_init_uni_stream(struct h3c *h3c, struct qcs *qcs, }; h3s->flags |= H3_SF_UNI_INIT; - qcs_consume(qcs, len); TRACE_LEAVE(H3_EV_H3S_NEW, qcs->qcc->conn, qcs); return 0; @@ -231,7 +221,7 @@ static int h3_init_uni_stream(struct h3c *h3c, struct qcs *qcs, * * Returns 0 on success else non-zero. */ -static int h3_parse_uni_stream_no_h3(struct qcs *qcs, struct ncbuf *rxbuf) +static int h3_parse_uni_stream_no_h3(struct qcs *qcs, struct buffer *b) { struct h3s *h3s = qcs->ctx; @@ -263,14 +253,13 @@ static int h3_parse_uni_stream_no_h3(struct qcs *qcs, struct ncbuf *rxbuf) * consumed. */ static inline size_t h3_decode_frm_header(uint64_t *ftype, uint64_t *flen, - struct ncbuf *rxbuf) + struct buffer *b) { size_t hlen; - struct buffer b = h3_b_dup(rxbuf); hlen = 0; - if (!b_quic_dec_int(ftype, &b, &hlen) || - !b_quic_dec_int(flen, &b, &hlen)) { + if (!b_quic_dec_int(ftype, b, &hlen) || + !b_quic_dec_int(flen, b, &hlen)) { return 0; } @@ -333,8 +322,8 @@ static int h3_is_frame_valid(struct h3c *h3c, struct qcs *qcs, uint64_t ftype) * * Returns the number of bytes handled or a negative error code. */ -static int h3_headers_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len, - char fin) +static int h3_headers_to_htx(struct qcs *qcs, const struct buffer *buf, + uint64_t len, char fin) { struct buffer htx_buf = BUF_NULL; struct buffer *tmp = get_trash_chunk(); @@ -350,8 +339,8 @@ static int h3_headers_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len, TRACE_ENTER(H3_EV_RX_FRAME|H3_EV_RX_HDR, qcs->qcc->conn, qcs); /* TODO support buffer wrapping */ - BUG_ON(ncb_head(buf) + len >= ncb_wrap(buf)); - if (qpack_decode_fs((const unsigned char *)ncb_head(buf), len, tmp, list) < 0) + BUG_ON(b_head(buf) + len >= b_wrap(buf)); + if (qpack_decode_fs((const unsigned char *)b_head(buf), len, tmp, list) < 0) return -1; qc_get_buf(qcs, &htx_buf); @@ -431,8 +420,8 @@ static int h3_headers_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len, * * Returns the number of bytes handled or a negative error code. */ -static int h3_data_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len, - char fin) +static int h3_data_to_htx(struct qcs *qcs, const struct buffer *buf, + uint64_t len, char fin) { struct buffer *appbuf; struct htx *htx = NULL; @@ -446,12 +435,12 @@ static int h3_data_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len, BUG_ON(!appbuf); htx = htx_from_buf(appbuf); - if (len > ncb_data(buf, 0)) { - len = ncb_data(buf, 0); + if (len > b_data(buf)) { + len = b_data(buf); fin = 0; } - head = ncb_head(buf); + head = b_head(buf); retry: htx_space = htx_free_data_space(htx); if (!htx_space) { @@ -464,16 +453,16 @@ static int h3_data_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len, fin = 0; } - if (head + len > ncb_wrap(buf)) { - size_t contig = ncb_wrap(buf) - head; - htx_sent = htx_add_data(htx, ist2(ncb_head(buf), contig)); + if (head + len > b_wrap(buf)) { + size_t contig = b_wrap(buf) - head; + htx_sent = htx_add_data(htx, ist2(b_head(buf), contig)); if (htx_sent < contig) { qcs->flags |= QC_SF_DEM_FULL; goto out; } len -= contig; - head = ncb_orig(buf); + head = b_orig(buf); goto retry; } @@ -493,11 +482,11 @@ static int h3_data_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len, return htx_sent; } -/* Parse a SETTINGS frame of length of payload . +/* Parse a SETTINGS frame of length of payload . * * Returns the number of bytes handled or a negative error code. */ -static size_t h3_parse_settings_frm(struct h3c *h3c, const struct ncbuf *rxbuf, +static size_t h3_parse_settings_frm(struct h3c *h3c, const struct buffer *buf, size_t len) { struct buffer b; @@ -507,8 +496,11 @@ static size_t h3_parse_settings_frm(struct h3c *h3c, const struct ncbuf *rxbuf, TRACE_ENTER(H3_EV_RX_FRAME|H3_EV_RX_SETTINGS, h3c->qcc->conn); - b = h3_b_dup(rxbuf); - b_set_data(&b, len); + /* Work on a copy of . */ + b = b_make(b_orig(buf), b_size(buf), b_head_ofs(buf), b_data(buf)); + + /* TODO handle incomplete SETTINGS frame */ + BUG_ON(len < b_data(&b)); while (b_data(&b)) { if (!b_quic_dec_int(&id, &b, &ret) || !b_quic_dec_int(&value, &b, &ret)) { @@ -576,36 +568,35 @@ static size_t h3_parse_settings_frm(struct h3c *h3c, const struct ncbuf *rxbuf, * * Returns 0 on success else non-zero. */ -static int h3_decode_qcs(struct qcs *qcs, int fin) +static int h3_decode_qcs(struct qcs *qcs, struct buffer *b, int fin) { - struct ncbuf *rxbuf = &qcs->rx.ncbuf; struct h3s *h3s = qcs->ctx; struct h3c *h3c = h3s->h3c; ssize_t ret; h3_debug_printf(stderr, "%s: STREAM ID: %lu\n", __func__, qcs->id); - if (!ncb_data(rxbuf, 0)) + if (!b_data(b)) return 0; if (quic_stream_is_uni(qcs->id) && !(h3s->flags & H3_SF_UNI_INIT)) { - if (h3_init_uni_stream(h3c, qcs, rxbuf)) + if (h3_init_uni_stream(h3c, qcs, b)) return 1; } if (quic_stream_is_uni(qcs->id) && (h3s->flags & H3_SF_UNI_NO_H3)) { /* For non-h3 STREAM, parse it and return immediately. */ - if (h3_parse_uni_stream_no_h3(qcs, rxbuf)) + if (h3_parse_uni_stream_no_h3(qcs, b)) return 1; return 0; } - while (ncb_data(rxbuf, 0) && !(qcs->flags & QC_SF_DEM_FULL)) { + while (b_data(b) && !(qcs->flags & QC_SF_DEM_FULL)) { uint64_t ftype, flen; char last_stream_frame = 0; /* Work on a copy of */ if (!h3s->demux_frame_len) { - size_t hlen = h3_decode_frm_header(&ftype, &flen, rxbuf); + size_t hlen = h3_decode_frm_header(&ftype, &flen, b); if (!hlen) break; @@ -620,8 +611,7 @@ static int h3_decode_qcs(struct qcs *qcs, int fin) return 1; } - qcs_consume(qcs, hlen); - if (!ncb_data(rxbuf, 0)) + if (!b_data(b)) break; } @@ -631,31 +621,31 @@ static int h3_decode_qcs(struct qcs *qcs, int fin) /* Do not demux incomplete frames except H3 DATA which can be * fragmented in multiple HTX blocks. */ - if (flen > ncb_data(rxbuf, 0) && ftype != H3_FT_DATA) { + if (flen > b_data(b) && ftype != H3_FT_DATA) { /* Reject frames bigger than bufsize. * * TODO HEADERS should in complement be limited with H3 * SETTINGS_MAX_FIELD_SECTION_SIZE parameter to prevent * excessive decompressed size. */ - if (flen > ncb_size(rxbuf)) { + if (flen > QC_S_RX_BUF_SZ) { qcc_emit_cc_app(qcs->qcc, H3_EXCESSIVE_LOAD); return 1; } break; } - last_stream_frame = (fin && flen == ncb_total_data(rxbuf)); + last_stream_frame = (fin && flen == b_data(b)); h3_inc_frame_type_cnt(h3c->prx_counters, ftype); switch (ftype) { case H3_FT_DATA: - ret = h3_data_to_htx(qcs, rxbuf, flen, last_stream_frame); + ret = h3_data_to_htx(qcs, b, flen, last_stream_frame); /* TODO handle error reporting. Stream closure required. */ if (ret < 0) { ABORT_NOW(); } break; case H3_FT_HEADERS: - ret = h3_headers_to_htx(qcs, rxbuf, flen, last_stream_frame); + ret = h3_headers_to_htx(qcs, b, flen, last_stream_frame); /* TODO handle error reporting. Stream closure required. */ if (ret < 0) { ABORT_NOW(); } break; @@ -667,7 +657,7 @@ static int h3_decode_qcs(struct qcs *qcs, int fin) ret = flen; break; case H3_FT_SETTINGS: - ret = h3_parse_settings_frm(qcs->qcc->ctx, rxbuf, flen); + ret = h3_parse_settings_frm(qcs->qcc->ctx, b, flen); if (ret < 0) { qcc_emit_cc_app(qcs->qcc, h3c->err); return 1; @@ -688,7 +678,7 @@ static int h3_decode_qcs(struct qcs *qcs, int fin) if (ret) { BUG_ON(h3s->demux_frame_len < ret); h3s->demux_frame_len -= ret; - qcs_consume(qcs, ret); + b_del(b, ret); } } diff --git a/src/hq_interop.c b/src/hq_interop.c index f5c0e79cdd..4b4b5222d0 100644 --- a/src/hq_interop.c +++ b/src/hq_interop.c @@ -7,20 +7,18 @@ #include #include #include -#include -static int hq_interop_decode_qcs(struct qcs *qcs, int fin) +static int hq_interop_decode_qcs(struct qcs *qcs, struct buffer *b, int fin) { - struct ncbuf *rxbuf = &qcs->rx.ncbuf; struct htx *htx; struct htx_sl *sl; struct stconn *sc; struct buffer htx_buf = BUF_NULL; struct ist path; - char *ptr = ncb_head(rxbuf); - char *end = ncb_wrap(rxbuf); - size_t size = ncb_size(rxbuf); - size_t data = ncb_data(rxbuf, 0); + char *ptr = b_head(b); + char *end = b_wrap(b); + size_t size = b_size(b); + size_t data = b_data(b); b_alloc(&htx_buf); htx = htx_from_buf(&htx_buf); @@ -76,7 +74,7 @@ static int hq_interop_decode_qcs(struct qcs *qcs, int fin) if (!sc) return 1; - qcs_consume(qcs, ncb_data(rxbuf, 0)); + b_reset(b); b_free(&htx_buf); if (fin) diff --git a/src/mux_quic.c b/src/mux_quic.c index 84237c3e6e..28a1896a58 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -286,57 +286,6 @@ void qcs_notify_send(struct qcs *qcs) } } -/* Remove from Rx buffer. This must be called by transcoders - * after STREAM parsing. Flow-control for received offsets may be allocated for - * the peer if needed. - */ -void qcs_consume(struct qcs *qcs, uint64_t bytes) -{ - struct qcc *qcc = qcs->qcc; - struct quic_frame *frm; - struct ncbuf *buf = &qcs->rx.ncbuf; - enum ncb_ret ret; - - ret = ncb_advance(buf, bytes); - if (ret) { - ABORT_NOW(); /* should not happens because removal only in data */ - } - - if (ncb_is_empty(buf)) - qc_free_ncbuf(qcs, buf); - - qcs->rx.offset += bytes; - if (qcs->rx.msd - qcs->rx.offset < qcs->rx.msd_init / 2) { - frm = pool_zalloc(pool_head_quic_frame); - BUG_ON(!frm); /* TODO handle this properly */ - - qcs->rx.msd = qcs->rx.offset + qcs->rx.msd_init; - - LIST_INIT(&frm->reflist); - frm->type = QUIC_FT_MAX_STREAM_DATA; - frm->max_stream_data.id = qcs->id; - frm->max_stream_data.max_stream_data = qcs->rx.msd; - - LIST_APPEND(&qcc->lfctl.frms, &frm->list); - tasklet_wakeup(qcc->wait_event.tasklet); - } - - qcc->lfctl.offsets_consume += bytes; - if (qcc->lfctl.md - qcc->lfctl.offsets_consume < qcc->lfctl.md_init / 2) { - frm = pool_zalloc(pool_head_quic_frame); - BUG_ON(!frm); /* TODO handle this properly */ - - qcc->lfctl.md = qcc->lfctl.offsets_consume + qcc->lfctl.md_init; - - LIST_INIT(&frm->reflist); - frm->type = QUIC_FT_MAX_DATA; - frm->max_data.max_data = qcc->lfctl.md; - - LIST_APPEND(&qcs->qcc->lfctl.frms, &frm->list); - tasklet_wakeup(qcs->qcc->wait_event.tasklet); - } -} - /* Retrieve as an ebtree node the stream with as ID, possibly allocates * several streams, depending on the already open ones. * Return this node if succeeded, NULL if not. @@ -425,6 +374,63 @@ struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id) return NULL; } +/* Simple function to duplicate a buffer */ +static inline struct buffer qcs_b_dup(const struct ncbuf *b) +{ + return b_make(ncb_orig(b), b->size, b->head, ncb_data(b, 0)); +} + +/* Remove from Rx buffer. This must be called by transcoders + * after STREAM parsing. Flow-control for received offsets may be allocated for + * the peer if needed. + */ +static void qcs_consume(struct qcs *qcs, uint64_t bytes) +{ + struct qcc *qcc = qcs->qcc; + struct quic_frame *frm; + struct ncbuf *buf = &qcs->rx.ncbuf; + enum ncb_ret ret; + + ret = ncb_advance(buf, bytes); + if (ret) { + ABORT_NOW(); /* should not happens because removal only in data */ + } + + if (ncb_is_empty(buf)) + qc_free_ncbuf(qcs, buf); + + qcs->rx.offset += bytes; + if (qcs->rx.msd - qcs->rx.offset < qcs->rx.msd_init / 2) { + frm = pool_zalloc(pool_head_quic_frame); + BUG_ON(!frm); /* TODO handle this properly */ + + qcs->rx.msd = qcs->rx.offset + qcs->rx.msd_init; + + LIST_INIT(&frm->reflist); + frm->type = QUIC_FT_MAX_STREAM_DATA; + frm->max_stream_data.id = qcs->id; + frm->max_stream_data.max_stream_data = qcs->rx.msd; + + LIST_APPEND(&qcc->lfctl.frms, &frm->list); + tasklet_wakeup(qcc->wait_event.tasklet); + } + + qcc->lfctl.offsets_consume += bytes; + if (qcc->lfctl.md - qcc->lfctl.offsets_consume < qcc->lfctl.md_init / 2) { + frm = pool_zalloc(pool_head_quic_frame); + BUG_ON(!frm); /* TODO handle this properly */ + + qcc->lfctl.md = qcc->lfctl.offsets_consume + qcc->lfctl.md_init; + + LIST_INIT(&frm->reflist); + frm->type = QUIC_FT_MAX_DATA; + frm->max_data.max_data = qcc->lfctl.md; + + LIST_APPEND(&qcs->qcc->lfctl.frms, &frm->list); + tasklet_wakeup(qcs->qcc->wait_event.tasklet); + } +} + /* Decode the content of STREAM frames already received on the stream instance * . * @@ -432,14 +438,27 @@ struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id) */ static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs) { + struct buffer b; + size_t data, done; + int ret; + TRACE_ENTER(QMUX_EV_QCS_RECV, qcc->conn, qcs); - if (qcc->app_ops->decode_qcs(qcs, qcs->flags & QC_SF_FIN_RECV)) { + b = qcs_b_dup(&qcs->rx.ncbuf); + data = b_data(&b); + + ret = qcc->app_ops->decode_qcs(qcs, &b, qcs->flags & QC_SF_FIN_RECV); + if (ret) { TRACE_DEVEL("leaving on decoding error", QMUX_EV_QCS_RECV, qcc->conn, qcs); return 1; } - qcs_notify_recv(qcs); + BUG_ON_HOT(data < b_data(&b)); + done = data - b_data(&b); + if (done) { + qcs_consume(qcs, done); + qcs_notify_recv(qcs); + } TRACE_LEAVE(QMUX_EV_QCS_RECV, qcc->conn, qcs);