]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: mux-quic: properly handle conn Tx buf exhaustion
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Wed, 17 Jan 2024 14:15:55 +0000 (15:15 +0100)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Wed, 31 Jan 2024 15:28:54 +0000 (16:28 +0100)
This commit is a direct follow-up on the major rearchitecture of send
buffering. This patch implements the proper handling of connection pool
buffer temporary exhaustion.

The first step is to be able to differentiate a fatal allocation error
from a temporary pool exhaustion. This is done via a new output argument
on qcc_get_stream_txbuf(). For a fatal error, application protocol layer
will schedule the immediate connection closing. For a pool exhaustion,
QCC is flagged with QC_CF_CONN_FULL and stream sending process is
interrupted. QCS instance is also registered in a new list
<qcc.buf_wait_list>.

A new connection buffer can become available when all ACKs are received
for an older buffer. This process is taken in charge by quic-conn layer.
It uses qcc_notify_buf() function to clear QC_CF_CONN_FULL and to wake
up every streams registered on buf_wait_list to resume sending process.

include/haproxy/mux_quic-t.h
include/haproxy/mux_quic.h
src/h3.c
src/hq_interop.c
src/mux_quic.c
src/quic_stream.c

index a98234253dfff9ccbc70946159cddad297fb5e86..92b8159ab0cb371702695117ed826e8dd0a19ce0 100644 (file)
@@ -32,7 +32,7 @@ enum qcs_type {
 #define QC_CF_ERRL      0x00000001 /* fatal error detected locally, connection should be closed soon */
 #define QC_CF_ERRL_DONE 0x00000002 /* local error properly handled, connection can be released */
 /* unused 0x00000004 */
-/* unused 0x00000008 */
+#define QC_CF_CONN_FULL 0x00000008 /* no stream buffers available on connection */
 #define QC_CF_APP_SHUT  0x00000010 /* Application layer shutdown done. */
 #define QC_CF_ERR_CONN  0x00000020 /* fatal error reported by transport layer */
 
@@ -84,6 +84,7 @@ struct qcc {
        struct list send_retry_list; /* list of qcs eligible to send retry */
        struct list send_list; /* list of qcs ready to send (STREAM, STOP_SENDING or RESET_STREAM emission) */
        struct list fctl_list; /* list of sending qcs blocked on conn flow control */
+       struct list buf_wait_list; /* list of qcs blocked on stream desc buf */
 
        struct wait_event wait_event;  /* To be used if we're waiting for I/Os */
 
@@ -167,6 +168,7 @@ struct qcs {
        struct list el_send; /* element of qcc.send_list */
        struct list el_opening; /* element of qcc.opening_list */
        struct list el_fctl; /* element of qcc.fctl_list */
+       struct list el_buf; /* element of qcc.buf_wait_list */
 
        struct wait_event wait_event;
        struct wait_event *subs;
index a35dddc9bfe55b5c3bac035a1bf5c52470aa4edb..e2437c351e60d689f1d8d2c45465a6e09e030f3b 100644 (file)
@@ -21,9 +21,10 @@ int qcs_is_close_remote(struct qcs *qcs);
 int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es);
 void qcs_notify_recv(struct qcs *qcs);
 void qcs_notify_send(struct qcs *qcs);
+int qcc_notify_buf(struct qcc *qcc);
 
 struct buffer *qcc_get_stream_rxbuf(struct qcs *qcs);
-struct buffer *qcc_get_stream_txbuf(struct qcs *qcs);
+struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err);
 int qcc_release_stream_txbuf(struct qcs *qcs);
 int qcc_stream_can_send(const struct qcs *qcs);
 void qcc_reset_stream(struct qcs *qcs, int err);
index 431922f01194a118e75bf2a0c133e235eaaef176..69bbd8e27ee6ddd552e3976ba35456b9f6806d11 100644 (file)
--- a/src/h3.c
+++ b/src/h3.c
@@ -1428,6 +1428,7 @@ static ssize_t h3_rcv_buf(struct qcs *qcs, struct buffer *b, int fin)
  */
 static int h3_control_send(struct qcs *qcs, void *ctx)
 {
+       int err;
        int ret;
        struct h3c *h3c = ctx;
        unsigned char data[(2 + 3) * 2 * QUIC_VARINT_MAX_SIZE]; /* enough for 3 settings */
@@ -1468,7 +1469,8 @@ static int h3_control_send(struct qcs *qcs, void *ctx)
                goto err;
        }
 
-       if (!(res = qcc_get_stream_txbuf(qcs))) {
+       if (!(res = qcc_get_stream_txbuf(qcs, &err))) {
+               /* Consider alloc failure fatal for control stream even on conn buf limit. */
                TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_SETTINGS, qcs->qcc->conn, qcs);
                goto err;
        }
@@ -1496,6 +1498,7 @@ static int h3_control_send(struct qcs *qcs, void *ctx)
 
 static int h3_resp_headers_send(struct qcs *qcs, struct htx *htx)
 {
+       int err;
        struct h3s *h3s = qcs->ctx;
        struct h3c *h3c = h3s->h3c;
        struct buffer outbuf;
@@ -1550,10 +1553,15 @@ static int h3_resp_headers_send(struct qcs *qcs, struct htx *htx)
 
        list[hdr].n = ist("");
 
-       if (!(res = qcc_get_stream_txbuf(qcs))) {
-               TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
-               h3c->err = H3_INTERNAL_ERROR;
-               goto err;
+       if (!(res = qcc_get_stream_txbuf(qcs, &err))) {
+               if (err) {
+                       TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
+                       h3c->err = H3_INTERNAL_ERROR;
+                       goto err;
+               }
+
+               TRACE_STATE("conn buf limit reached", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
+               goto end;
        }
 
        /* At least 5 bytes to store frame type + length as a varint max size */
@@ -1626,6 +1634,7 @@ static int h3_resp_headers_send(struct qcs *qcs, struct htx *htx)
                        break;
        }
 
+ end:
        TRACE_LEAVE(H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
        return ret;
 
@@ -1647,6 +1656,7 @@ static int h3_resp_headers_send(struct qcs *qcs, struct htx *htx)
  */
 static int h3_resp_trailers_send(struct qcs *qcs, struct htx *htx)
 {
+       int err;
        struct h3s *h3s = qcs->ctx;
        struct h3c *h3c = h3s->h3c;
        struct buffer headers_buf = BUF_NULL;
@@ -1708,10 +1718,15 @@ static int h3_resp_trailers_send(struct qcs *qcs, struct htx *htx)
        list[hdr].n = ist("");
 
  start:
-       if (!(res = qcc_get_stream_txbuf(qcs))) {
-               TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
-               h3c->err = H3_INTERNAL_ERROR;
-               goto err;
+       if (!(res = qcc_get_stream_txbuf(qcs, &err))) {
+               if (err) {
+                       TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
+                       h3c->err = H3_INTERNAL_ERROR;
+                       goto err;
+               }
+
+               TRACE_STATE("conn buf limit reached", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
+               goto end;
        }
 
        /* At least 9 bytes to store frame type + length as a varint max size */
@@ -1815,6 +1830,7 @@ static int h3_resp_trailers_send(struct qcs *qcs, struct htx *htx)
 static int h3_resp_data_send(struct qcs *qcs, struct htx *htx,
                              struct buffer *buf, size_t count)
 {
+       int err;
        struct h3s *h3s = qcs->ctx;
        struct h3c *h3c = h3s->h3c;
        struct buffer outbuf;
@@ -1840,10 +1856,16 @@ static int h3_resp_data_send(struct qcs *qcs, struct htx *htx,
        if (type != HTX_BLK_DATA)
                goto end;
 
-       if (!(res = qcc_get_stream_txbuf(qcs))) {
-               TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_DATA, qcs->qcc->conn, qcs);
-               h3c->err = H3_INTERNAL_ERROR;
-               goto err;
+       if (!(res = qcc_get_stream_txbuf(qcs, &err))) {
+               if (err) {
+                       TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_DATA, qcs->qcc->conn, qcs);
+                       h3c->err = H3_INTERNAL_ERROR;
+                       goto err;
+               }
+
+               /* Connection buf limit reached, stconn will subscribe on SEND. */
+               TRACE_STATE("conn buf limit reached", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
+               goto end;
        }
 
        /* If HTX contains only one DATA block, try to exchange it with MUX
@@ -2040,6 +2062,7 @@ static size_t h3_snd_buf(struct qcs *qcs, struct buffer *buf, size_t count)
 
 static size_t h3_nego_ff(struct qcs *qcs, size_t count)
 {
+       int err;
        struct buffer *res;
        int hsize;
        size_t sz, ret = 0;
@@ -2047,8 +2070,13 @@ static size_t h3_nego_ff(struct qcs *qcs, size_t count)
        TRACE_ENTER(H3_EV_STRM_SEND, qcs->qcc->conn, qcs);
 
  start:
-       if (!(res = qcc_get_stream_txbuf(qcs))) {
-               qcs->sd->iobuf.flags |= IOBUF_FL_NO_FF;
+       if (!(res = qcc_get_stream_txbuf(qcs, &err))) {
+               if (err) {
+                       qcs->sd->iobuf.flags |= IOBUF_FL_NO_FF;
+                       goto end;
+               }
+
+               qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
                goto end;
        }
 
@@ -2224,6 +2252,7 @@ static void h3_detach(struct qcs *qcs)
  */
 static int h3_send_goaway(struct h3c *h3c)
 {
+       int err;
        struct qcs *qcs = h3c->ctrl_strm;
        struct buffer pos, *res;
        unsigned char data[3 * QUIC_VARINT_MAX_SIZE];
@@ -2243,10 +2272,10 @@ static int h3_send_goaway(struct h3c *h3c)
        b_quic_enc_int(&pos, frm_len, 0);
        b_quic_enc_int(&pos, h3c->id_goaway, 0);
 
-       res = qcc_get_stream_txbuf(qcs);
+       res = qcc_get_stream_txbuf(qcs, &err);
        if (!res || b_room(res) < b_data(&pos) ||
            qfctl_sblocked(&qcs->tx.fc) || qfctl_sblocked(&h3c->qcc->tx.fc)) {
-               /* Do not try forcefully to emit GOAWAY if no space left. */
+               /* Do not try forcefully to emit GOAWAY if no buffer available or not enough space left. */
                TRACE_ERROR("cannot send GOAWAY", H3_EV_H3C_END, h3c->qcc->conn, qcs);
                goto err;
        }
index 0d0e47f59c94b85d52ed30903b29b970126ba7b5..02ef12626a1149b194ea00eab314cffc94fa5b92 100644 (file)
@@ -95,6 +95,7 @@ static size_t hq_interop_snd_buf(struct qcs *qcs, struct buffer *buf,
        uint32_t bsize, fsize;
        struct buffer *res = NULL;
        size_t total = 0;
+       int err;
 
        htx = htx_from_buf(buf);
 
@@ -109,10 +110,11 @@ static size_t hq_interop_snd_buf(struct qcs *qcs, struct buffer *buf,
 
                switch (btype) {
                case HTX_BLK_DATA:
-                       res = qcc_get_stream_txbuf(qcs);
+                       res = qcc_get_stream_txbuf(qcs, &err);
                        if (!res) {
-                               /* TODO */
-                               ABORT_NOW();
+                               if (err)
+                                       ABORT_NOW();
+                               goto end;
                        }
 
                        if (unlikely(fsize == count &&
@@ -179,16 +181,16 @@ static size_t hq_interop_snd_buf(struct qcs *qcs, struct buffer *buf,
 
 static size_t hq_interop_nego_ff(struct qcs *qcs, size_t count)
 {
-       int ret = 0;
+       int err, ret = 0;
        struct buffer *res;
 
  start:
-       res = qcc_get_stream_txbuf(qcs);
+       res = qcc_get_stream_txbuf(qcs, &err);
        if (!res) {
+               if (err)
+                       ABORT_NOW();
                qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
                goto end;
-               /* TODO */
-               ABORT_NOW();
        }
 
        if (!b_room(res)) {
index 408e20c01aa8a020ec72b18ec2bad58bd2ebcd9f..9df45b491fef30c565c59694b1d41bace6c1f82c 100644 (file)
@@ -60,6 +60,7 @@ static void qcs_free(struct qcs *qcs)
        LIST_DEL_INIT(&qcs->el_opening);
        LIST_DEL_INIT(&qcs->el_send);
        LIST_DEL_INIT(&qcs->el_fctl);
+       LIST_DEL_INIT(&qcs->el_buf);
 
        /* Release stream endpoint descriptor. */
        BUG_ON(qcs->sd && !se_fl_test(qcs->sd, SE_FL_ORPHAN));
@@ -109,6 +110,7 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
        LIST_INIT(&qcs->el_opening);
        LIST_INIT(&qcs->el_send);
        LIST_INIT(&qcs->el_fctl);
+       LIST_INIT(&qcs->el_buf);
        qcs->start = TICK_ETERNITY;
 
        /* store transport layer stream descriptor in qcc tree */
@@ -496,6 +498,35 @@ void qcs_notify_send(struct qcs *qcs)
        }
 }
 
+/* Notify on a new stream-desc buffer available for <qcc> connection.
+ *
+ * Returns true if a stream was woken up. If false is returned, this indicates
+ * to the caller that it's currently unnecessary to notify for the rest of the
+ * available buffers.
+ */
+int qcc_notify_buf(struct qcc *qcc)
+{
+       struct qcs *qcs;
+       int ret = 0;
+
+       TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
+
+       if (qcc->flags & QC_CF_CONN_FULL) {
+               TRACE_STATE("new stream desc buffer available", QMUX_EV_QCC_WAKE, qcc->conn);
+               qcc->flags &= ~QC_CF_CONN_FULL;
+       }
+
+       if (!LIST_ISEMPTY(&qcc->buf_wait_list)) {
+               qcs = LIST_ELEM(qcc->buf_wait_list.n, struct qcs *, el_buf);
+               LIST_DEL_INIT(&qcs->el_buf);
+               qcs_notify_send(qcs);
+               ret = 1;
+       }
+
+       TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn);
+       return ret;
+}
+
 /* A fatal error is detected locally for <qcc> connection. It should be closed
  * with a CONNECTION_CLOSE using <err> code. Set <app> to true to indicate that
  * the code must be considered as an application level error. This function
@@ -923,22 +954,48 @@ struct buffer *qcc_get_stream_rxbuf(struct qcs *qcs)
 
 /* Allocate if needed and retrieve <qcs> stream buffer for data emission.
  *
- * Returns buffer pointer. May be NULL on allocation failure.
+ * <err> is an output argument which is useful to differentiate the failure
+ * cause when the buffer cannot be allocated. It is set to 0 if the connection
+ * buffer limit is reached. For fatal errors, its value is non-zero.
+ *
+ * Returns buffer pointer. May be NULL on allocation failure, in which case
+ * <err> will refer to the cause.
  */
-struct buffer *qcc_get_stream_txbuf(struct qcs *qcs)
+struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err)
 {
        struct qcc *qcc = qcs->qcc;
        int buf_avail;
        struct buffer *out = qc_stream_buf_get(qcs->stream);
 
+       /* Stream must not try to reallocate a buffer if currently waiting for one. */
+       BUG_ON(LIST_INLIST(&qcs->el_buf));
+
+       *err = 0;
+
        if (!out) {
+               if (qcc->flags & QC_CF_CONN_FULL) {
+                       LIST_APPEND(&qcc->buf_wait_list, &qcs->el_buf);
+                       goto out;
+               }
+
                out = qc_stream_buf_alloc(qcs->stream, qcs->tx.fc.off_real,
                                          &buf_avail);
-               if (!out)
+               if (!out) {
+                       if (buf_avail) {
+                               TRACE_ERROR("stream desc alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs);
+                               *err = 1;
+                               goto out;
+                       }
+
+                       TRACE_STATE("hitting stream desc buffer limit", QMUX_EV_QCS_SEND, qcc->conn, qcs);
+                       LIST_APPEND(&qcc->buf_wait_list, &qcs->el_buf);
+                       qcc->flags |= QC_CF_CONN_FULL;
                        goto out;
+               }
 
                if (!b_alloc(out)) {
                        TRACE_ERROR("buffer alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs);
+                       *err = 1;
                        goto out;
                }
        }
@@ -988,7 +1045,7 @@ int qcc_release_stream_txbuf(struct qcs *qcs)
 /* Returns true if stream layer can proceed to emission via <qcs>. */
 int qcc_stream_can_send(const struct qcs *qcs)
 {
-       return !(qcs->flags & QC_SF_BLK_MROOM);
+       return !(qcs->flags & QC_SF_BLK_MROOM) && !LIST_INLIST(&qcs->el_buf);
 }
 
 /* Wakes up every streams of <qcc> which are currently waiting for sending but
@@ -1014,6 +1071,10 @@ void qcc_reset_stream(struct qcs *qcs, int err)
        if ((qcs->flags & QC_SF_TO_RESET) || qcs_is_close_local(qcs))
                return;
 
+       /* TODO if QCS waiting for buffer, it could be removed from
+        * <qcc.buf_wait_list> if sending is closed now.
+        */
+
        TRACE_STATE("reset stream", QMUX_EV_QCS_END, qcc->conn, qcs);
        qcs->flags |= QC_SF_TO_RESET;
        qcs->err = err;
@@ -2575,6 +2636,7 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
 
        LIST_INIT(&qcc->send_list);
        LIST_INIT(&qcc->fctl_list);
+       LIST_INIT(&qcc->buf_wait_list);
 
        qcc->wait_event.tasklet->process = qcc_io_cb;
        qcc->wait_event.tasklet->context = qcc;
@@ -2790,6 +2852,9 @@ static size_t qmux_strm_snd_buf(struct stconn *sc, struct buffer *buf,
 
        TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
 
+       /* Stream must not be woken up if already waiting for conn buffer. */
+       BUG_ON(LIST_INLIST(&qcs->el_buf));
+
        /* Sending forbidden if QCS is locally closed (FIN or RESET_STREAM sent). */
        BUG_ON(qcs_is_close_local(qcs) || (qcs->flags & QC_SF_TO_RESET));
 
@@ -2849,6 +2914,9 @@ static size_t qmux_strm_nego_ff(struct stconn *sc, struct buffer *input,
 
        TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
 
+       /* Stream must not be woken up if already waiting for conn buffer. */
+       BUG_ON(LIST_INLIST(&qcs->el_buf));
+
        /* Sending forbidden if QCS is locally closed (FIN or RESET_STREAM sent). */
        BUG_ON(qcs_is_close_local(qcs) || (qcs->flags & QC_SF_TO_RESET));
 
index aa8895ae79eb4d1d4db4414925465d26f6a12d0b..e153660db20d36f2edc97d3cbbddf20c451c0ca4 100644 (file)
@@ -6,7 +6,7 @@
 #include <haproxy/buf.h>
 #include <haproxy/dynbuf.h>
 #include <haproxy/list.h>
-#include <haproxy/mux_quic-t.h>
+#include <haproxy/mux_quic.h>
 #include <haproxy/pool.h>
 #include <haproxy/quic_conn.h>
 #include <haproxy/task.h>
@@ -37,7 +37,13 @@ static void qc_stream_buf_free(struct qc_stream_desc *stream,
        /* notify MUX about available buffers. */
        --qc->stream_buf_count;
        if (qc->mux_state == QC_MUX_READY) {
-               /* TODO notify MUX for available buffer. */
+               /* notify MUX about available buffers.
+                *
+                * TODO several streams may be woken up even if a single buffer
+                * is available for now.
+                */
+               while (qcc_notify_buf(qc->qcc))
+                       ;
        }
 }
 
@@ -199,7 +205,13 @@ void qc_stream_desc_free(struct qc_stream_desc *stream, int closing)
 
                qc->stream_buf_count -= free_count;
                if (qc->mux_state == QC_MUX_READY) {
-                       /* TODO notify MUX for available buffer. */
+                       /* notify MUX about available buffers.
+                        *
+                        * TODO several streams may be woken up even if a single buffer
+                        * is available for now.
+                        */
+                       while (qcc_notify_buf(qc->qcc))
+                               ;
                }
        }