From: Amaury Denoyelle Date: Mon, 16 May 2022 12:38:25 +0000 (+0200) Subject: MINOR: mux-quic: implement MAX_STREAM_DATA emission X-Git-Tag: v2.6-dev11~55 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=a977355aa1f9636d0f8fe2af811915d33c99260e;p=thirdparty%2Fhaproxy.git MINOR: mux-quic: implement MAX_STREAM_DATA emission Send MAX_STREAM_DATA frames when at least half of the allocated flow-control has been demuxed, frame and cleared. This is necessary to support QUIC STREAM with received data greater than a buffer. Transcoders must use the new function qcc_consume_qcs() to empty the QCS buffer. This will allow to monitor current flow-control level and generate a MAX_STREAM_DATA frame if required. This frame will be emitted via qc_io_cb(). --- diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index 1135ee2af2..7a9d09a01a 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -105,7 +105,8 @@ struct qcs { uint64_t offset; /* absolute current base offset of ncbuf */ struct ncbuf ncbuf; /* receive buffer - can handle out-of-order offset frames */ struct buffer app_buf; /* receive buffer used by conn_stream layer */ - uint64_t msd; /* fctl bytes limit to enforce */ + uint64_t msd; /* current max-stream-data limit to enforce */ + uint64_t msd_init; /* initial max-stream-data */ } rx; struct { uint64_t offset; /* last offset of data ready to be sent */ diff --git a/include/haproxy/mux_quic.h b/include/haproxy/mux_quic.h index 55c6b00663..87d1b9987d 100644 --- a/include/haproxy/mux_quic.h +++ b/include/haproxy/mux_quic.h @@ -23,6 +23,7 @@ struct ncbuf *qc_get_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf); int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es); void qcs_notify_recv(struct qcs *qcs); void qcs_notify_send(struct qcs *qcs); +void qcs_consume(struct qcs *qcs, uint64_t bytes); int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset, char fin, char *data); diff --git a/src/h3.c b/src/h3.c index 5307134936..d61b8e3e74 100644 --- a/src/h3.c +++ b/src/h3.c @@ -288,10 +288,9 @@ static int h3_decode_qcs(struct qcs *qcs, int fin, void *ctx) h3_debug_printf(stderr, "%s: ftype: %lu, flen: %lu\n", __func__, ftype, flen); - ncb_advance(rxbuf, hlen); h3s->demux_frame_type = ftype; h3s->demux_frame_len = flen; - qcs->rx.offset += hlen; + qcs_consume(qcs, hlen); } flen = h3s->demux_frame_len; @@ -327,10 +326,9 @@ static int h3_decode_qcs(struct qcs *qcs, int fin, void *ctx) } if (ret) { - ncb_advance(rxbuf, ret); BUG_ON(h3s->demux_frame_len < ret); h3s->demux_frame_len -= ret; - qcs->rx.offset += ret; + qcs_consume(qcs, ret); } } @@ -410,8 +408,7 @@ static int h3_control_recv(struct h3_uqs *h3_uqs, void *ctx) if (flen > b_data(&b)) break; - ncb_advance(rxbuf, hlen); - h3_uqs->qcs->rx.offset += hlen; + qcs_consume(h3_uqs->qcs, hlen); /* From here, a frame must not be truncated */ switch (ftype) { case H3_FT_CANCEL_PUSH: @@ -435,8 +432,7 @@ static int h3_control_recv(struct h3_uqs *h3_uqs, void *ctx) h3->err = H3_FRAME_UNEXPECTED; return 0; } - ncb_advance(rxbuf, flen); - h3_uqs->qcs->rx.offset += flen; + qcs_consume(h3_uqs->qcs, flen); } /* Handle the case where remaining data are present in the buffer. This @@ -796,8 +792,7 @@ static int h3_attach_ruqs(struct qcs *qcs, void *ctx) if (!b_quic_dec_int(&strm_type, &b, &len) || strm_type > H3_UNI_STRM_TP_MAX) return 0; - ncb_advance(rxbuf, len); - qcs->rx.offset += len; + qcs_consume(qcs, len); /* Note that for all the uni-streams below, this is an error to receive two times the * same type of uni-stream (even for Push stream which is not supported at this time. diff --git a/src/hq_interop.c b/src/hq_interop.c index db4387e894..a47f5d1792 100644 --- a/src/hq_interop.c +++ b/src/hq_interop.c @@ -76,9 +76,7 @@ static int hq_interop_decode_qcs(struct qcs *qcs, int fin, void *ctx) if (!cs) return -1; - - qcs->rx.offset += ncb_data(rxbuf, 0); - ncb_advance(rxbuf, ncb_data(rxbuf, 0)); + qcs_consume(qcs, ncb_data(rxbuf, 0)); b_free(&htx_buf); if (fin) diff --git a/src/mux_quic.c b/src/mux_quic.c index 54bec78f2c..2d34a7adf8 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -160,6 +160,7 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type) /* TODO use uni limit for unidirectional streams */ qcs->rx.msd = quic_stream_is_local(qcc, id) ? qcc->lfctl.msd_bidi_l : qcc->lfctl.msd_bidi_r; + qcs->rx.msd_init = qcs->rx.msd; qcs->tx.buf = BUF_NULL; qcs->tx.offset = 0; @@ -283,6 +284,38 @@ void qcs_notify_send(struct qcs *qcs) } } +/* Remove from Rx buffer. This must be called by transcoders + * after STREAM parsing. Flow-control for received offsets may be allocated for + * the peer if needed. + */ +void qcs_consume(struct qcs *qcs, uint64_t bytes) +{ + struct qcc *qcc = qcs->qcc; + struct quic_frame *frm; + enum ncb_ret ret; + + ret = ncb_advance(&qcs->rx.ncbuf, bytes); + if (ret) { + ABORT_NOW(); /* should not happens because removal only in data */ + } + + qcs->rx.offset += bytes; + if (qcs->rx.msd - qcs->rx.offset < qcs->rx.msd_init / 2) { + frm = pool_zalloc(pool_head_quic_frame); + BUG_ON(!frm); /* TODO handle this properly */ + + qcs->rx.msd = qcs->rx.offset + qcs->rx.msd_init; + + LIST_INIT(&frm->reflist); + frm->type = QUIC_FT_MAX_STREAM_DATA; + frm->max_stream_data.id = qcs->id; + frm->max_stream_data.max_stream_data = qcs->rx.msd; + + LIST_APPEND(&qcc->lfctl.frms, &frm->list); + tasklet_wakeup(qcc->wait_event.tasklet); + } +} + /* Retrieve as an ebtree node the stream with as ID, possibly allocates * several streams, depending on the already open ones. * Return this node if succeeded, NULL if not. @@ -449,9 +482,6 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset, return 1; } - /* TODO initial max-stream-data reached. Implement MAX_STREAM_DATA emission. */ - BUG_ON(offset + len == qcs->rx.msd); - if (fin) qcs->flags |= QC_SF_FIN_RECV;