From: Amaury Denoyelle Date: Mon, 7 Mar 2022 14:47:02 +0000 (+0100) Subject: MEDIUM: mux-quic: respect peer bidirectional stream data limit X-Git-Tag: v2.6-dev4~19 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=6ea781919a520258ca24da277fc942e068d0a752;p=thirdparty%2Fhaproxy.git MEDIUM: mux-quic: respect peer bidirectional stream data limit Implement the flow-control max-streams-data limit on emission. We ensure that we never push more than the offset limit set by the peer. When the limit is reached, the stream is marked as blocked with a new flag QC_SF_BLK_SFCTL to disable emission. Currently, this is only implemented for bidirectional streams. It's required to unify the sending for unidirectional streams via qcs_push_frame from the H3 layer to respect the flow-control limit for them. --- diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index a5d4d2d23a..c3aac18607 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -51,6 +51,8 @@ struct qcc { /* flow-control fields set by the peer which we must respect. */ struct { + uint64_t msd_bidi_l; /* initial max-stream-data for peer local streams */ + uint64_t msd_bidi_r; /* initial max-stream-data for peer remote streams */ } rfctl; struct { @@ -78,6 +80,7 @@ struct qcc { #define QC_SF_FIN_STREAM 0x00000002 /* FIN bit must be set for last frame of the stream */ #define QC_SF_BLK_MROOM 0x00000004 /* app layer is blocked waiting for room in the qcs.tx.buf */ #define QC_SF_DETACH 0x00000008 /* cs is detached but there is remaining data to send */ +#define QC_SF_BLK_SFCTL 0x00000010 /* stream blocked due to stream flow control limit */ struct qcs { struct qcc *qcc; @@ -97,6 +100,7 @@ struct qcs { uint64_t ack_offset; /* last acked ordered byte offset */ struct buffer buf; /* transmit buffer before sending via xprt */ struct buffer xprt_buf; /* buffer for xprt sending, cleared on ACK. */ + uint64_t msd; /* fctl bytes limit to respect on emission */ } tx; struct eb64_node by_id; /* place in qcc's streams_by_id */ diff --git a/src/mux_quic.c b/src/mux_quic.c index e239856702..eda63470d6 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -33,6 +33,11 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type) eb64_insert(&qcc->streams_by_id, &qcs->by_id); qcc->strms[type].nb_streams++; + /* If stream is local, use peer remote-limit, or else the opposite. */ + /* TODO use uni limit for unidirectional streams */ + qcs->tx.msd = quic_stream_is_local(qcc, id) ? qcc->rfctl.msd_bidi_r : + qcc->rfctl.msd_bidi_l; + qcs->rx.buf = BUF_NULL; qcs->rx.app_buf = BUF_NULL; qcs->rx.offset = 0; @@ -355,6 +360,8 @@ static void qc_release(struct qcc *qcc) * to buffer. The STREAM frame payload points to the * buffer. The frame is then pushed to . If is set, and the * buf is emptied after transfer, FIN bit is set on the STREAM frame. + * Transfer is automatically adjusted to not exceed the stream flow-control + * limit. * * Returns the total bytes of newly transferred data or a negative error code. */ @@ -390,6 +397,12 @@ static int qcs_push_frame(struct qcs *qcs, struct buffer *out, head = qcs->tx.sent_offset - qcs->tx.ack_offset; left = qcs->tx.offset - qcs->tx.sent_offset; to_xfer = QUIC_MIN(b_data(payload), b_room(out)); + + BUG_ON_HOT(qcs->tx.offset > qcs->tx.msd); + /* do not exceed flow control limit */ + if (qcs->tx.offset + to_xfer > qcs->tx.msd) + to_xfer = qcs->tx.msd - qcs->tx.offset; + if (!left && !to_xfer) goto out; @@ -450,6 +463,9 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset) /* increase offset on stream */ qcs->tx.sent_offset += diff; + BUG_ON_HOT(qcs->tx.sent_offset > qcs->tx.msd); + if (qcs->tx.sent_offset == qcs->tx.msd) + qcs->flags |= QC_SF_BLK_SFCTL; } /* Wrapper for send on transport layer. Send a list of frames for the @@ -552,6 +568,11 @@ static int qc_send(struct qcc *qcc) continue; } + if (qcs->flags & QC_SF_BLK_SFCTL) { + node = eb64_next(node); + continue; + } + if (b_data(buf) || b_data(out)) { int ret; char fin = qcs->flags & QC_SF_FIN_STREAM; @@ -703,7 +724,7 @@ static int qc_init(struct connection *conn, struct proxy *prx, struct session *sess, struct buffer *input) { struct qcc *qcc; - struct quic_transport_params *lparams; + struct quic_transport_params *lparams, *rparams; qcc = pool_alloc(pool_head_qcc); if (!qcc) @@ -752,6 +773,10 @@ static int qc_init(struct connection *conn, struct proxy *prx, qcc->lfctl.ms_bidi = qcc->lfctl.ms_bidi_init = lparams->initial_max_streams_bidi; qcc->lfctl.cl_bidi_r = 0; + rparams = &conn->qc->tx.params; + qcc->rfctl.msd_bidi_l = rparams->initial_max_stream_data_bidi_local; + qcc->rfctl.msd_bidi_r = rparams->initial_max_stream_data_bidi_remote; + qcc->wait_event.tasklet = tasklet_new(); if (!qcc->wait_event.tasklet) goto fail_no_tasklet;