]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: mux-quic: implement MAX_STREAM_DATA emission
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Mon, 16 May 2022 12:38:25 +0000 (14:38 +0200)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Wed, 18 May 2022 14:25:07 +0000 (16:25 +0200)
Send MAX_STREAM_DATA frames when at least half of the allocated
flow-control has been demuxed, frame and cleared. This is necessary to
support QUIC STREAM with received data greater than a buffer.

Transcoders must use the new function qcc_consume_qcs() to empty the QCS
buffer. This will allow to monitor current flow-control level and
generate a MAX_STREAM_DATA frame if required. This frame will be emitted
via qc_io_cb().

include/haproxy/mux_quic-t.h
include/haproxy/mux_quic.h
src/h3.c
src/hq_interop.c
src/mux_quic.c

index 1135ee2af274bbcbe2bb2d9ec74c8b53693b6033..7a9d09a01a018acfeb389993a81bfa03eabe09b4 100644 (file)
@@ -105,7 +105,8 @@ struct qcs {
                uint64_t offset; /* absolute current base offset of ncbuf */
                struct ncbuf ncbuf; /* receive buffer - can handle out-of-order offset frames */
                struct buffer app_buf; /* receive buffer used by conn_stream layer */
-               uint64_t msd; /* fctl bytes limit to enforce */
+               uint64_t msd; /* current max-stream-data limit to enforce */
+               uint64_t msd_init; /* initial max-stream-data */
        } rx;
        struct {
                uint64_t offset; /* last offset of data ready to be sent */
index 55c6b00663f8b5a2a8df9192e57715c71fa89eec..87d1b9987dc81dbec1430cb672a8f43a05fdcfcf 100644 (file)
@@ -23,6 +23,7 @@ struct ncbuf *qc_get_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf);
 int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es);
 void qcs_notify_recv(struct qcs *qcs);
 void qcs_notify_send(struct qcs *qcs);
+void qcs_consume(struct qcs *qcs, uint64_t bytes);
 
 int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
              char fin, char *data);
index 5307134936a7cbf8ca91578c8028a3f03aec2e0a..d61b8e3e74450726f91b74c906f38f963ae7bc9e 100644 (file)
--- a/src/h3.c
+++ b/src/h3.c
@@ -288,10 +288,9 @@ static int h3_decode_qcs(struct qcs *qcs, int fin, void *ctx)
                        h3_debug_printf(stderr, "%s: ftype: %lu, flen: %lu\n",
                                        __func__, ftype, flen);
 
-                       ncb_advance(rxbuf, hlen);
                        h3s->demux_frame_type = ftype;
                        h3s->demux_frame_len = flen;
-                       qcs->rx.offset += hlen;
+                       qcs_consume(qcs, hlen);
                }
 
                flen = h3s->demux_frame_len;
@@ -327,10 +326,9 @@ static int h3_decode_qcs(struct qcs *qcs, int fin, void *ctx)
                }
 
                if (ret) {
-                       ncb_advance(rxbuf, ret);
                        BUG_ON(h3s->demux_frame_len < ret);
                        h3s->demux_frame_len -= ret;
-                       qcs->rx.offset += ret;
+                       qcs_consume(qcs, ret);
                }
        }
 
@@ -410,8 +408,7 @@ static int h3_control_recv(struct h3_uqs *h3_uqs, void *ctx)
                if (flen > b_data(&b))
                        break;
 
-               ncb_advance(rxbuf, hlen);
-               h3_uqs->qcs->rx.offset += hlen;
+               qcs_consume(h3_uqs->qcs, hlen);
                /* From here, a frame must not be truncated */
                switch (ftype) {
                case H3_FT_CANCEL_PUSH:
@@ -435,8 +432,7 @@ static int h3_control_recv(struct h3_uqs *h3_uqs, void *ctx)
                        h3->err = H3_FRAME_UNEXPECTED;
                        return 0;
                }
-               ncb_advance(rxbuf, flen);
-               h3_uqs->qcs->rx.offset += flen;
+               qcs_consume(h3_uqs->qcs, flen);
        }
 
        /* Handle the case where remaining data are present in the buffer. This
@@ -796,8 +792,7 @@ static int h3_attach_ruqs(struct qcs *qcs, void *ctx)
        if (!b_quic_dec_int(&strm_type, &b, &len) || strm_type > H3_UNI_STRM_TP_MAX)
                return 0;
 
-       ncb_advance(rxbuf, len);
-       qcs->rx.offset += len;
+       qcs_consume(qcs, len);
 
        /* Note that for all the uni-streams below, this is an error to receive two times the
         * same type of uni-stream (even for Push stream which is not supported at this time.
index db4387e8947892b56424cbd25a2257586f992cf1..a47f5d17922591ae936555d23758aedbe034c0d1 100644 (file)
@@ -76,9 +76,7 @@ static int hq_interop_decode_qcs(struct qcs *qcs, int fin, void *ctx)
        if (!cs)
                return -1;
 
-
-       qcs->rx.offset += ncb_data(rxbuf, 0);
-       ncb_advance(rxbuf, ncb_data(rxbuf, 0));
+       qcs_consume(qcs, ncb_data(rxbuf, 0));
        b_free(&htx_buf);
 
        if (fin)
index 54bec78f2c398e75769b9c4bdcba3e27873919dd..2d34a7adf82604677d934078c1760fba668f8f16 100644 (file)
@@ -160,6 +160,7 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
        /* TODO use uni limit for unidirectional streams */
        qcs->rx.msd = quic_stream_is_local(qcc, id) ? qcc->lfctl.msd_bidi_l :
                                                      qcc->lfctl.msd_bidi_r;
+       qcs->rx.msd_init = qcs->rx.msd;
 
        qcs->tx.buf = BUF_NULL;
        qcs->tx.offset = 0;
@@ -283,6 +284,38 @@ void qcs_notify_send(struct qcs *qcs)
        }
 }
 
+/* Remove <bytes> from <qcs> Rx buffer. This must be called by transcoders
+ * after STREAM parsing. Flow-control for received offsets may be allocated for
+ * the peer if needed.
+ */
+void qcs_consume(struct qcs *qcs, uint64_t bytes)
+{
+       struct qcc *qcc = qcs->qcc;
+       struct quic_frame *frm;
+       enum ncb_ret ret;
+
+       ret = ncb_advance(&qcs->rx.ncbuf, bytes);
+       if (ret) {
+               ABORT_NOW(); /* should not happens because removal only in data */
+       }
+
+       qcs->rx.offset += bytes;
+       if (qcs->rx.msd - qcs->rx.offset < qcs->rx.msd_init / 2) {
+               frm = pool_zalloc(pool_head_quic_frame);
+               BUG_ON(!frm); /* TODO handle this properly */
+
+               qcs->rx.msd = qcs->rx.offset + qcs->rx.msd_init;
+
+               LIST_INIT(&frm->reflist);
+               frm->type = QUIC_FT_MAX_STREAM_DATA;
+               frm->max_stream_data.id = qcs->id;
+               frm->max_stream_data.max_stream_data = qcs->rx.msd;
+
+               LIST_APPEND(&qcc->lfctl.frms, &frm->list);
+               tasklet_wakeup(qcc->wait_event.tasklet);
+       }
+}
+
 /* Retrieve as an ebtree node the stream with <id> as ID, possibly allocates
  * several streams, depending on the already open ones.
  * Return this node if succeeded, NULL if not.
@@ -449,9 +482,6 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
                return 1;
        }
 
-       /* TODO initial max-stream-data reached. Implement MAX_STREAM_DATA emission. */
-       BUG_ON(offset + len == qcs->rx.msd);
-
        if (fin)
                qcs->flags |= QC_SF_FIN_RECV;