]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: mux-quic: respect peer bidirectional stream data limit
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Mon, 7 Mar 2022 14:47:02 +0000 (15:47 +0100)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Wed, 23 Mar 2022 09:05:29 +0000 (10:05 +0100)
Implement the flow-control max-streams-data limit on emission. We ensure
that we never push more than the offset limit set by the peer. When the
limit is reached, the stream is marked as blocked with a new flag
QC_SF_BLK_SFCTL to disable emission.

Currently, this is only implemented for bidirectional streams. It's
required to unify the sending for unidirectional streams via
qcs_push_frame from the H3 layer to respect the flow-control limit for
them.

include/haproxy/mux_quic-t.h
src/mux_quic.c

index a5d4d2d23adf1aaf6fc051a67cf0561e9c3a9e6c..c3aac186071743edf24c9751d66f40772efc26a7 100644 (file)
@@ -51,6 +51,8 @@ struct qcc {
 
        /* flow-control fields set by the peer which we must respect. */
        struct {
+               uint64_t msd_bidi_l; /* initial max-stream-data for peer local streams */
+               uint64_t msd_bidi_r; /* initial max-stream-data for peer remote streams */
        } rfctl;
 
        struct {
@@ -78,6 +80,7 @@ struct qcc {
 #define QC_SF_FIN_STREAM        0x00000002  /* FIN bit must be set for last frame of the stream */
 #define QC_SF_BLK_MROOM         0x00000004  /* app layer is blocked waiting for room in the qcs.tx.buf */
 #define QC_SF_DETACH            0x00000008  /* cs is detached but there is remaining data to send */
+#define QC_SF_BLK_SFCTL         0x00000010  /* stream blocked due to stream flow control limit */
 
 struct qcs {
        struct qcc *qcc;
@@ -97,6 +100,7 @@ struct qcs {
                uint64_t ack_offset; /* last acked ordered byte offset */
                struct buffer buf; /* transmit buffer before sending via xprt */
                struct buffer xprt_buf; /* buffer for xprt sending, cleared on ACK. */
+               uint64_t msd; /* fctl bytes limit to respect on emission */
        } tx;
 
        struct eb64_node by_id; /* place in qcc's streams_by_id */
index e239856702dbcba515166f8b2946c7413d1282c2..eda63470d6fe8d45185efe01533c025a90802870 100644 (file)
@@ -33,6 +33,11 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
        eb64_insert(&qcc->streams_by_id, &qcs->by_id);
        qcc->strms[type].nb_streams++;
 
+       /* If stream is local, use peer remote-limit, or else the opposite. */
+       /* TODO use uni limit for unidirectional streams */
+       qcs->tx.msd = quic_stream_is_local(qcc, id) ? qcc->rfctl.msd_bidi_r :
+                                                     qcc->rfctl.msd_bidi_l;
+
        qcs->rx.buf = BUF_NULL;
        qcs->rx.app_buf = BUF_NULL;
        qcs->rx.offset = 0;
@@ -355,6 +360,8 @@ static void qc_release(struct qcc *qcc)
  * <payload> to <out> buffer. The STREAM frame payload points to the <out>
  * buffer. The frame is then pushed to <frm_list>. If <fin> is set, and the
  * <payload> buf is emptied after transfer, FIN bit is set on the STREAM frame.
+ * Transfer is automatically adjusted to not exceed the stream flow-control
+ * limit.
  *
  * Returns the total bytes of newly transferred data or a negative error code.
  */
@@ -390,6 +397,12 @@ static int qcs_push_frame(struct qcs *qcs, struct buffer *out,
        head = qcs->tx.sent_offset - qcs->tx.ack_offset;
        left = qcs->tx.offset - qcs->tx.sent_offset;
        to_xfer = QUIC_MIN(b_data(payload), b_room(out));
+
+       BUG_ON_HOT(qcs->tx.offset > qcs->tx.msd);
+       /* do not exceed flow control limit */
+       if (qcs->tx.offset + to_xfer > qcs->tx.msd)
+               to_xfer = qcs->tx.msd - qcs->tx.offset;
+
        if (!left && !to_xfer)
                goto out;
 
@@ -450,6 +463,9 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset)
 
        /* increase offset on stream */
        qcs->tx.sent_offset += diff;
+       BUG_ON_HOT(qcs->tx.sent_offset > qcs->tx.msd);
+       if (qcs->tx.sent_offset == qcs->tx.msd)
+               qcs->flags |= QC_SF_BLK_SFCTL;
 }
 
 /* Wrapper for send on transport layer. Send a list of frames <frms> for the
@@ -552,6 +568,11 @@ static int qc_send(struct qcc *qcc)
                        continue;
                }
 
+               if (qcs->flags & QC_SF_BLK_SFCTL) {
+                       node = eb64_next(node);
+                       continue;
+               }
+
                if (b_data(buf) || b_data(out)) {
                        int ret;
                        char fin = qcs->flags & QC_SF_FIN_STREAM;
@@ -703,7 +724,7 @@ static int qc_init(struct connection *conn, struct proxy *prx,
                    struct session *sess, struct buffer *input)
 {
        struct qcc *qcc;
-       struct quic_transport_params *lparams;
+       struct quic_transport_params *lparams, *rparams;
 
        qcc = pool_alloc(pool_head_qcc);
        if (!qcc)
@@ -752,6 +773,10 @@ static int qc_init(struct connection *conn, struct proxy *prx,
        qcc->lfctl.ms_bidi = qcc->lfctl.ms_bidi_init = lparams->initial_max_streams_bidi;
        qcc->lfctl.cl_bidi_r = 0;
 
+       rparams = &conn->qc->tx.params;
+       qcc->rfctl.msd_bidi_l = rparams->initial_max_stream_data_bidi_local;
+       qcc->rfctl.msd_bidi_r = rparams->initial_max_stream_data_bidi_remote;
+
        qcc->wait_event.tasklet = tasklet_new();
        if (!qcc->wait_event.tasklet)
                goto fail_no_tasklet;