]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: mux-quic: account stream txbuf in QCC
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Tue, 13 Aug 2024 09:57:50 +0000 (11:57 +0200)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Tue, 20 Aug 2024 15:17:17 +0000 (17:17 +0200)
A limit per connection is put on the number of buffers allocated by QUIC
MUX for emission accross all its streams. This ensures memory
consumption remains under control. This limit is simply explained as a
count of buffers which can be concurrently allocated for each
connection.

As such, quic_conn structure was used to account currently allocated
buffers. However, a quic_conn nevers allocates new stream buffers. This
is only done at QUIC MUX layer. As such, this commit moves buffer
accounting inside QCC structure. This simplifies the API, most notably
qc_stream_buf_alloc() usage.

Note that this commit inverts the accounting. Previously, it was
initially set to 0 and increment for each allocated buffer. Now, it is
set to the maximum value and decrement for each buf usage. This is
considered as clearer to use.

include/haproxy/mux_quic-t.h
include/haproxy/mux_quic.h
include/haproxy/quic_conn-t.h
include/haproxy/quic_stream.h
src/mux_quic.c
src/quic_conn.c
src/quic_stream.c

index ee3fbe54c0f60b7c03539ab1f228eebfb6d73243..22fd1b910538ff8d71605dcb48afbcccc51b0dcb 100644 (file)
@@ -67,6 +67,7 @@ struct qcc {
 
        struct {
                struct quic_fctl fc; /* stream flow control applied on sending */
+               int avail_bufs; /* count of available buffers for this connection */
        } tx;
 
        uint64_t largest_bidi_r; /* largest remote bidi stream ID opened. */
index 1ed8ad1dc49cf0a270e8d160ba392718e4cd1cc2..28a5af7b8489aa58c3706d51c8c68d497a5950e1 100644 (file)
@@ -22,7 +22,7 @@ int qcs_is_close_remote(struct qcs *qcs);
 int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es);
 void qcs_notify_recv(struct qcs *qcs);
 void qcs_notify_send(struct qcs *qcs);
-int qcc_notify_buf(struct qcc *qcc);
+void qcc_notify_buf(struct qcc *qcc, int free_count);
 
 struct buffer *qcc_get_stream_rxbuf(struct qcs *qcs);
 struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err);
index ed5ec01c9e5e043ba3dc44eb170eb3b4633e0374..5ac3f1ff4669584e96165ff232796be1f82b7356 100644 (file)
@@ -398,7 +398,6 @@ struct quic_conn {
        struct mt_list accept_list; /* chaining element used for accept, only valid for frontend connections */
 
        struct eb_root streams_by_id; /* qc_stream_desc tree */
-       int stream_buf_count; /* total count of allocated stream buffers for this connection */
 
        /* MUX */
        struct qcc *qcc;
index 7788d903978a5dac4eb8444309aba797cc89f05c..c7a237969571e59dafe5d86b2ccee902112c2c1b 100644 (file)
@@ -16,7 +16,7 @@ void qc_stream_desc_free(struct qc_stream_desc *stream, int closing);
 
 struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream);
 struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream,
-                                   uint64_t offset, int *avail);
+                                   uint64_t offset);
 void qc_stream_buf_release(struct qc_stream_desc *stream);
 
 #endif /* USE_QUIC */
index 30882ab2ebf63c5d44556f64f45a21113ecdf62f..bdd366f4fef0631869e40628caba5123d35a3073 100644 (file)
@@ -7,6 +7,7 @@
 #include <haproxy/chunk.h>
 #include <haproxy/connection.h>
 #include <haproxy/dynbuf.h>
+#include <haproxy/global-t.h>
 #include <haproxy/h3.h>
 #include <haproxy/list.h>
 #include <haproxy/ncbuf.h>
@@ -523,34 +524,36 @@ void qcs_notify_send(struct qcs *qcs)
        }
 }
 
-/* Notify on a new stream-desc buffer available for <qcc> connection.
- *
- * Returns true if a stream was woken up. If false is returned, this indicates
- * to the caller that it's currently unnecessary to notify for the rest of the
- * available buffers.
+/* Report that <free_count> stream-desc buffer have been released for <qcc>
+ * connection.
  */
-int qcc_notify_buf(struct qcc *qcc)
+void qcc_notify_buf(struct qcc *qcc, int free_count)
 {
        struct qcs *qcs;
-       int ret = 0;
 
        TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
 
+       BUG_ON(qcc->tx.avail_bufs + free_count > global.tune.quic_streams_buf);
+       qcc->tx.avail_bufs += free_count;
+
        if (qcc->flags & QC_CF_CONN_FULL) {
                TRACE_STATE("new stream desc buffer available", QMUX_EV_QCC_WAKE, qcc->conn);
                qcc->flags &= ~QC_CF_CONN_FULL;
        }
 
-       if (!LIST_ISEMPTY(&qcc->buf_wait_list)) {
+       /* TODO a simple optimization would be to only wake up <free_count> QCS
+        * instances. But it may not work if a woken QCS is in error and does
+        * not try to allocate a buffer, leaving the unwoken QCS indefinitely
+        * in the buflist.
+        */
+       while (!LIST_ISEMPTY(&qcc->buf_wait_list)) {
                qcs = LIST_ELEM(qcc->buf_wait_list.n, struct qcs *, el_buf);
                LIST_DEL_INIT(&qcs->el_buf);
                tot_time_stop(&qcs->timer.buf);
                qcs_notify_send(qcs);
-               ret = 1;
        }
 
        TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn);
-       return ret;
 }
 
 /* A fatal error is detected locally for <qcc> connection. It should be closed
@@ -1007,7 +1010,6 @@ struct buffer *qcc_get_stream_rxbuf(struct qcs *qcs)
 struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err)
 {
        struct qcc *qcc = qcs->qcc;
-       int buf_avail;
        struct buffer *out = qc_stream_buf_get(qcs->stream);
 
        /* Stream must not try to reallocate a buffer if currently waiting for one. */
@@ -1022,21 +1024,22 @@ struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err)
                        goto out;
                }
 
-               out = qc_stream_buf_alloc(qcs->stream, qcs->tx.fc.off_real,
-                                         &buf_avail);
-               if (!out) {
-                       if (buf_avail) {
-                               TRACE_ERROR("stream desc alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs);
-                               *err = 1;
-                               goto out;
-                       }
-
+               if (!qcc->tx.avail_bufs) {
                        TRACE_STATE("hitting stream desc buffer limit", QMUX_EV_QCS_SEND, qcc->conn, qcs);
                        LIST_APPEND(&qcc->buf_wait_list, &qcs->el_buf);
                        tot_time_start(&qcs->timer.buf);
                        qcc->flags |= QC_CF_CONN_FULL;
                        goto out;
                }
+
+               out = qc_stream_buf_alloc(qcs->stream, qcs->tx.fc.off_real);
+               if (!out) {
+                       TRACE_ERROR("stream desc alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs);
+                       *err = 1;
+                       goto out;
+               }
+
+               --qcc->tx.avail_bufs;
        }
 
  out:
@@ -2703,6 +2706,8 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
        qcc->rfctl.msd_bidi_r = rparams->initial_max_stream_data_bidi_remote;
        qcc->rfctl.msd_uni_l = rparams->initial_max_stream_data_uni;
 
+       qcc->tx.avail_bufs = global.tune.quic_streams_buf;
+
        if (conn_is_back(conn)) {
                qcc->next_bidi_l    = 0x00;
                qcc->largest_bidi_r = 0x01;
index 6516696967d6c9fba113d57eead6fd83537a9701..a07ffd2f4472768833a31da5e6cd2117f1087756 100644 (file)
@@ -1181,7 +1181,6 @@ struct quic_conn *qc_new_conn(const struct quic_version *qv, int ipv4,
        quic_cc_path_init(qc->path, ipv4, server ? l->bind_conf->max_cwnd : 0,
                          cc_algo ? cc_algo : default_quic_cc_algo, qc);
 
-       qc->stream_buf_count = 0;
        memcpy(&qc->local_addr, local_addr, sizeof(qc->local_addr));
        memcpy(&qc->peer_addr, peer_addr, sizeof qc->peer_addr);
 
index a21391346bde74bf7dd97759e72ddf6aacb7ae62..b45bac7bb8c2699109a4b43d44d10cc541bc974a 100644 (file)
@@ -41,15 +41,9 @@ static void qc_stream_buf_free(struct qc_stream_desc *stream,
        *stream_buf = NULL;
 
        /* notify MUX about available buffers. */
-       --qc->stream_buf_count;
        if (qc->mux_state == QC_MUX_READY) {
-               /* notify MUX about available buffers.
-                *
-                * TODO several streams may be woken up even if a single buffer
-                * is available for now.
-                */
-               while (qcc_notify_buf(qc->qcc))
-                       ;
+               /* notify MUX about available buffers. */
+               qcc_notify_buf(qc->qcc, 1);
        }
 }
 
@@ -222,15 +216,9 @@ void qc_stream_desc_free(struct qc_stream_desc *stream, int closing)
        if (free_count) {
                offer_buffers(NULL, free_count);
 
-               qc->stream_buf_count -= free_count;
                if (qc->mux_state == QC_MUX_READY) {
-                       /* notify MUX about available buffers.
-                        *
-                        * TODO several streams may be woken up even if a single buffer
-                        * is available for now.
-                        */
-                       while (qcc_notify_buf(qc->qcc))
-                               ;
+                       /* notify MUX about available buffers. */
+                       qcc_notify_buf(qc->qcc, free_count);
                }
        }
 
@@ -265,45 +253,30 @@ struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream)
        return &stream->buf->buf;
 }
 
-/* Returns the count of available buffer left for <qc>. */
-static int qc_stream_buf_avail(struct quic_conn *qc)
-{
-       BUG_ON(qc->stream_buf_count > global.tune.quic_streams_buf);
-       return global.tune.quic_streams_buf - qc->stream_buf_count;
-}
-
-/* Allocate a new current buffer for <stream>. The buffer limit count for the
- * connection is checked first. This function is not allowed if current buffer
- * is not NULL prior to this call. The new buffer represents stream payload at
- * offset <offset>.
+/* Allocate a new current buffer for <stream>. This function is not allowed if
+ * current buffer is not NULL prior to this call. The new buffer represents
+ * stream payload at offset <offset>.
  *
- * Returns the buffer or NULL on error. Caller may check <avail> to ensure if
- * the connection buffer limit was reached or a fatal error was encountered.
+ * Returns the buffer or NULL on error.
  */
 struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream,
-                                   uint64_t offset, int *avail)
+                                   uint64_t offset)
 {
-       struct quic_conn *qc = stream->qc;
-
        /* current buffer must be released first before allocate a new one. */
        BUG_ON(stream->buf);
 
-       *avail = qc_stream_buf_avail(qc);
-       if (!*avail)
-               return NULL;
-
        stream->buf_offset = offset;
        stream->buf = pool_alloc(pool_head_quic_stream_buf);
        if (!stream->buf)
                return NULL;
 
+       stream->buf->buf = BUF_NULL;
        if (!b_alloc(&stream->buf->buf, DB_MUX_TX)) {
                pool_free(pool_head_quic_stream_buf, stream->buf);
                stream->buf = NULL;
                return NULL;
        }
 
-       ++qc->stream_buf_count;
        LIST_APPEND(&stream->buf_list, &stream->buf->list);
 
        return &stream->buf->buf;