]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: mux-quic: limit conn flow control on snd_buf
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Wed, 18 Oct 2023 15:48:11 +0000 (17:48 +0200)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Wed, 31 Jan 2024 15:28:54 +0000 (16:28 +0100)
This commit is a direct follow-up on the previous one. This time, it
deals with connection level flow control. Process is similar to stream
level : soft offset is incremented during snd_buf and real offset during
STREAM frame emission.

On MAX_DATA reception, both stream layer and QMUX is woken up if
necessary. One extra feature for conn level is the introduction of a new
QCC list to reference QCS instances. It will store instances for which
snd_buf callback has been interrupted on QCC soft offset reached. Every
stream instances is woken up on MAX_DATA reception if soft_offset is
unblocked.

include/haproxy/mux_quic-t.h
src/h3.c
src/mux_quic.c
src/qmux_trace.c

index 33f617a9a4b8593593ef49612b30e39117fc49c3..d2c7bdbccf126ffb5061f0d2deb8c946051b7c8d 100644 (file)
@@ -31,7 +31,7 @@ enum qcs_type {
 
 #define QC_CF_ERRL      0x00000001 /* fatal error detected locally, connection should be closed soon */
 #define QC_CF_ERRL_DONE 0x00000002 /* local error properly handled, connection can be released */
-#define QC_CF_BLK_MFCTL 0x00000004 /* sending blocked due to connection flow-control */
+/* unused 0x00000004 */
 #define QC_CF_CONN_FULL 0x00000008 /* no stream buffers available on connection */
 #define QC_CF_APP_SHUT  0x00000010 /* Application layer shutdown done. */
 #define QC_CF_ERR_CONN  0x00000020 /* fatal error reported by transport layer */
@@ -71,6 +71,8 @@ struct qcc {
        } rfctl;
 
        struct {
+               struct quic_fctl fc; /* stream flow control applied on sending */
+
                uint64_t offsets; /* sum of all offsets prepared */
                uint64_t sent_offsets; /* sum of all offset sent */
        } tx;
@@ -84,6 +86,7 @@ struct qcc {
 
        struct list send_retry_list; /* list of qcs eligible to send retry */
        struct list send_list; /* list of qcs ready to send (STREAM, STOP_SENDING or RESET_STREAM emission) */
+       struct list fctl_list; /* list of sending qcs blocked on conn flow control */
 
        struct wait_event wait_event;  /* To be used if we're waiting for I/Os */
 
@@ -170,6 +173,7 @@ struct qcs {
        struct list el; /* element of qcc.send_retry_list */
        struct list el_send; /* element of qcc.send_list */
        struct list el_opening; /* element of qcc.opening_list */
+       struct list el_fctl; /* element of qcc.fctl_list */
 
        struct wait_event wait_event;
        struct wait_event *subs;
index 6c2cad7dddd6774b4cf009548a84f3e9cd378b8d..ec8edfea6a02e2ba025a301379a6b404f48eeec5 100644 (file)
--- a/src/h3.c
+++ b/src/h3.c
@@ -1463,7 +1463,7 @@ static int h3_control_send(struct qcs *qcs, void *ctx)
                b_quic_enc_int(&pos, h3_settings_max_field_section_size, 0);
        }
 
-       if (qfctl_sblocked(&qcs->tx.fc)) {
+       if (qfctl_sblocked(&qcs->tx.fc) || qfctl_sblocked(&qcs->qcc->tx.fc)) {
                TRACE_ERROR("not enough initial credit for control stream", H3_EV_TX_FRAME|H3_EV_TX_SETTINGS, qcs->qcc->conn, qcs);
                goto err;
        }
@@ -2236,7 +2236,7 @@ static int h3_send_goaway(struct h3c *h3c)
 
        res = qcc_get_stream_txbuf(qcs);
        if (!res || b_room(res) < b_data(&pos) ||
-           qfctl_sblocked(&qcs->tx.fc)) {
+           qfctl_sblocked(&qcs->tx.fc) || qfctl_sblocked(&h3c->qcc->tx.fc)) {
                /* Do not try forcefully to emit GOAWAY if no space left. */
                TRACE_ERROR("cannot send GOAWAY", H3_EV_H3C_END, h3c->qcc->conn, qcs);
                goto err;
index 6e6654943e66ba6b43ca5c07f15ebf2d1998da9f..f2b4cb9708498deab8b5f4d3f531e632cbb8aa8f 100644 (file)
@@ -59,6 +59,7 @@ static void qcs_free(struct qcs *qcs)
        /* Safe to use even if already removed from the list. */
        LIST_DEL_INIT(&qcs->el_opening);
        LIST_DEL_INIT(&qcs->el_send);
+       LIST_DEL_INIT(&qcs->el_fctl);
 
        /* Release stream endpoint descriptor. */
        BUG_ON(qcs->sd && !se_fl_test(qcs->sd, SE_FL_ORPHAN));
@@ -108,6 +109,7 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
         */
        LIST_INIT(&qcs->el_opening);
        LIST_INIT(&qcs->el_send);
+       LIST_INIT(&qcs->el_fctl);
        qcs->start = TICK_ETERNITY;
 
        /* store transport layer stream descriptor in qcc tree */
@@ -933,6 +935,20 @@ struct buffer *qcc_get_stream_txbuf(struct qcs *qcs)
        return b_alloc(&qcs->tx.buf);
 }
 
+/* Wakes up every streams of <qcc> which are currently waiting for sending but
+ * are blocked on connection flow control.
+ */
+static void qcc_notify_fctl(struct qcc *qcc)
+{
+       struct qcs *qcs;
+
+       while (!LIST_ISEMPTY(&qcc->fctl_list)) {
+               qcs = LIST_ELEM(qcc->fctl_list.n, struct qcs *, el_fctl);
+               LIST_DEL_INIT(&qcs->el_fctl);
+               qcs_notify_send(qcs);
+       }
+}
+
 /* Prepare for the emission of RESET_STREAM on <qcs> with error code <err>. */
 void qcc_reset_stream(struct qcs *qcs, int err)
 {
@@ -954,6 +970,15 @@ void qcc_reset_stream(struct qcs *qcs, int err)
                qcs->tx.offset = qcs->tx.sent_offset;
        }
 
+       /* Substract to conn flow control data amount prepared on stream not yet sent. */
+       if (qcs->tx.fc.off_soft > qcs->tx.fc.off_real) {
+               const int soft_blocked = qfctl_sblocked(&qcc->tx.fc);
+
+               qcc->tx.fc.off_soft -= (qcs->tx.fc.off_soft - qcs->tx.fc.off_real);
+               if (soft_blocked && !qfctl_sblocked(&qcc->tx.fc))
+                       qcc_notify_fctl(qcc);
+       }
+
        /* Report send error to stream-endpoint layer. */
        if (qcs_sc(qcs)) {
                se_fl_set_error(qcs->sd);
@@ -987,8 +1012,10 @@ void qcc_send_stream(struct qcs *qcs, int urg, int count)
                        LIST_APPEND(&qcs->qcc->send_list, &qcs->el_send);
        }
 
-       if (count)
+       if (count) {
+               qfctl_sinc(&qcc->tx.fc, count);
                qfctl_sinc(&qcs->tx.fc, count);
+       }
 
        TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
 }
@@ -1210,17 +1237,19 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
  */
 int qcc_recv_max_data(struct qcc *qcc, uint64_t max)
 {
+       int unblock_soft = 0, unblock_real = 0;
+
        TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
 
        TRACE_PROTO("receiving MAX_DATA", QMUX_EV_QCC_RECV, qcc->conn);
-       if (qcc->rfctl.md < max) {
-               qcc->rfctl.md = max;
+       if (qfctl_set_max(&qcc->tx.fc, max, &unblock_soft, &unblock_real)) {
                TRACE_DATA("increase remote max-data", QMUX_EV_QCC_RECV, qcc->conn);
 
-               if (qcc->flags & QC_CF_BLK_MFCTL) {
-                       qcc->flags &= ~QC_CF_BLK_MFCTL;
+               if (unblock_real)
                        tasklet_wakeup(qcc->wait_event.tasklet);
-               }
+
+               if (unblock_soft)
+                       qcc_notify_fctl(qcc);
        }
 
        TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);
@@ -1570,11 +1599,11 @@ static int qcs_xfer_data(struct qcs *qcs, struct buffer *out, struct buffer *in)
                to_xfer = qcs->tx.fc.limit - qcs->tx.offset;
        }
 
-       BUG_ON_HOT(qcc->tx.offsets > qcc->rfctl.md);
+       BUG_ON_HOT(qcc->tx.offsets > qcc->tx.fc.limit);
        /* do not overcome flow control limit on connection */
-       if (qcc->tx.offsets + to_xfer > qcc->rfctl.md) {
+       if (qcc->tx.offsets + to_xfer > qcc->tx.fc.limit) {
                TRACE_DATA("do not exceed conn flow control", QMUX_EV_QCS_SEND, qcc->conn, qcs);
-               to_xfer = qcc->rfctl.md - qcc->tx.offsets;
+               to_xfer = qcc->tx.fc.limit - qcc->tx.offsets;
        }
 
        if (!left && !to_xfer)
@@ -1631,7 +1660,7 @@ static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin,
        BUG_ON((!total && qcs->tx.sent_offset > qcs->tx.offset) ||
               (total && qcs->tx.sent_offset >= qcs->tx.offset));
        BUG_ON(qcs->tx.sent_offset + total > qcs->tx.offset);
-       BUG_ON(qcc->tx.sent_offsets + total > qcc->rfctl.md);
+       BUG_ON(qcc->tx.sent_offsets + total > qcc->tx.fc.limit);
 
        TRACE_PROTO("sending STREAM frame", QMUX_EV_QCS_SEND, qcc->conn, qcs);
        frm = qc_frm_alloc(QUIC_FT_STREAM_8);
@@ -1732,17 +1761,19 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset)
 
        diff = offset + data - qcs->tx.sent_offset;
        if (diff) {
+               struct quic_fctl *fc_conn = &qcc->tx.fc;
                struct quic_fctl *fc_strm = &qcs->tx.fc;
 
                /* Ensure real offset never exceeds soft value. */
+               BUG_ON(fc_conn->off_real + diff > fc_conn->off_soft);
                BUG_ON(fc_strm->off_real + diff > fc_strm->off_soft);
 
                /* increase offset sum on connection */
                qcc->tx.sent_offsets += diff;
-               BUG_ON_HOT(qcc->tx.sent_offsets > qcc->rfctl.md);
-               if (qcc->tx.sent_offsets == qcc->rfctl.md) {
-                       qcc->flags |= QC_CF_BLK_MFCTL;
-                       TRACE_STATE("connection flow-control reached", QMUX_EV_QCS_SEND, qcc->conn);
+               BUG_ON_HOT(qcc->tx.sent_offsets > fc_conn->limit);
+               if (qfctl_rinc(fc_conn, diff)) {
+                       TRACE_STATE("connection flow-control reached",
+                                   QMUX_EV_QCS_SEND, qcc->conn);
                }
 
                /* increase offset on stream */
@@ -2102,7 +2133,7 @@ static int qcc_io_send(struct qcc *qcc)
                        continue;
                }
 
-               if (!(qcc->flags & QC_CF_BLK_MFCTL) &&
+               if (!qfctl_rblocked(&qcc->tx.fc) &&
                    !qfctl_rblocked(&qcs->tx.fc)) {
                        if ((ret = qcs_send(qcs, &frms)) < 0) {
                                /* Temporarily remove QCS from send-list. */
@@ -2128,7 +2159,7 @@ static int qcc_io_send(struct qcc *qcc)
        /* Retry sending until no frame to send, data rejected or connection
         * flow-control limit reached.
         */
-       while (qcc_send_frames(qcc, &frms) == 0 && !(qcc->flags & QC_CF_BLK_MFCTL)) {
+       while (qcc_send_frames(qcc, &frms) == 0 && !qfctl_rblocked(&qcc->tx.fc)) {
                /* Reloop over <qcc.send_list>. Useful for streams which have
                 * fulfilled their qc_stream_desc buf and have now release it.
                 */
@@ -2167,7 +2198,7 @@ static int qcc_io_send(struct qcc *qcc)
                        LIST_APPEND(&qcc->send_list, &qcs->el_send);
                }
 
-               if (!(qcc->flags & QC_CF_BLK_MFCTL))
+               if (!qfctl_rblocked(&qcc->tx.fc))
                        tasklet_wakeup(qcc->wait_event.tasklet);
        }
 
@@ -2575,7 +2606,7 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
        qcc->lfctl.offsets_recv = qcc->lfctl.offsets_consume = 0;
 
        rparams = &conn->handle.qc->tx.params;
-       qcc->rfctl.md = rparams->initial_max_data;
+       qfctl_init(&qcc->tx.fc, rparams->initial_max_data);
        qcc->rfctl.msd_bidi_l = rparams->initial_max_stream_data_bidi_local;
        qcc->rfctl.msd_bidi_r = rparams->initial_max_stream_data_bidi_remote;
        qcc->rfctl.msd_uni_l = rparams->initial_max_stream_data_uni;
@@ -2600,6 +2631,7 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
        }
 
        LIST_INIT(&qcc->send_list);
+       LIST_INIT(&qcc->fctl_list);
 
        qcc->wait_event.tasklet->process = qcc_io_cb;
        qcc->wait_event.tasklet->context = qcc;
@@ -2828,6 +2860,17 @@ static size_t qmux_strm_snd_buf(struct stconn *sc, struct buffer *buf,
                goto end;
        }
 
+       if (qfctl_sblocked(&qcs->qcc->tx.fc)) {
+               TRACE_DEVEL("leaving on connection flow control",
+                           QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
+               if (!LIST_INLIST(&qcs->el_fctl)) {
+                       TRACE_DEVEL("append to fctl-list",
+                                   QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
+                       LIST_APPEND(&qcs->qcc->fctl_list, &qcs->el_fctl);
+               }
+               goto end;
+       }
+
        if (qfctl_sblocked(&qcs->tx.fc)) {
                TRACE_DEVEL("leaving on flow-control reached",
                            QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
@@ -2887,6 +2930,16 @@ static size_t qmux_strm_nego_ff(struct stconn *sc, struct buffer *input,
                goto end;
        }
 
+       if (qfctl_sblocked(&qcs->qcc->tx.fc)) {
+               TRACE_DEVEL("leaving on connection flow control", QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
+               if (!LIST_INLIST(&qcs->el_fctl)) {
+                       TRACE_DEVEL("append to fctl-list", QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
+                       LIST_APPEND(&qcs->qcc->fctl_list, &qcs->el_fctl);
+               }
+               qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
+               goto end;
+       }
+
        if (qfctl_sblocked(&qcs->tx.fc)) {
                TRACE_DEVEL("leaving on flow-control reached", QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
                qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
index 298f4a35bdb5ae03aaeb1d79d6b5f7577b7ce98f..992b940d46bf3f916b79ab7a38455dc6f177463e 100644 (file)
@@ -77,7 +77,7 @@ static void qmux_trace(enum trace_level level, uint64_t mask,
                        chunk_appendf(&trace_buf, " qc=%p", qcc->conn->handle.qc);
 
                chunk_appendf(&trace_buf, " md=%llu/%llu/%llu",
-                             (ullong)qcc->rfctl.md, (ullong)qcc->tx.offsets, (ullong)qcc->tx.sent_offsets);
+                             (ullong)qcc->tx.fc.limit, (ullong)qcc->tx.offsets, (ullong)qcc->tx.sent_offsets);
 
                if (qcs) {
                        chunk_appendf(&trace_buf, " qcs=%p .id=%llu .st=%s",