]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: quic: limit total stream buffers per connection
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Fri, 15 Apr 2022 15:30:49 +0000 (17:30 +0200)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Thu, 21 Apr 2022 10:04:04 +0000 (12:04 +0200)
MUX streams can now allocate multiple buffers for sending. quic-conn is
responsible to limit the total count of allowed allocated buffers. A
counter is stored in the new field <stream_buf_count>.

For the moment, the value is hardcoded to 30.

On stream buffer allocation failure, the qcc MUX is flagged with
QC_CF_CONN_FULL. The MUX is then woken up as soon as a buffer is freed,
most notably on ACK reception.

include/haproxy/mux_quic-t.h
include/haproxy/quic_stream.h
include/haproxy/xprt_quic-t.h
src/mux_quic.c
src/quic_stream.c
src/xprt_quic.c

index 0c8ce600881f68b3dae6d3a3d8fd8039461c0717..44fb4d62746122824df86e4d3a1cdbe2ec1fc1af 100644 (file)
@@ -26,6 +26,7 @@ enum qcs_type {
 };
 
 #define QC_CF_BLK_MFCTL 0x00000001 /* sending blocked due to connection flow-control */
+#define QC_CF_CONN_FULL 0x00000002 /* no stream buffers available on connection */
 
 struct qcc {
        struct connection *conn;
index 0550f4f0c3181ce9c6370fd84d25383103387259..6d9359d0fb47d2e63c87840777632fa97d9a6b88 100644 (file)
@@ -14,6 +14,7 @@ int qc_stream_desc_ack(struct qc_stream_desc **stream, size_t offset, size_t len
 void qc_stream_desc_free(struct qc_stream_desc *stream);
 
 struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream);
+int qc_stream_buf_avail(struct quic_conn *qc);
 struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream,
                                    uint64_t offset);
 void qc_stream_buf_release(struct qc_stream_desc *stream);
index ab714ea509c425d272546b542318d67d9fd4c779..6c4336cc47c0d8db66c7889861e43329cdab8a2e 100644 (file)
@@ -750,6 +750,7 @@ struct quic_conn {
        struct mt_list accept_list; /* chaining element used for accept, only valid for frontend connections */
 
        struct eb_root streams_by_id; /* qc_stream_desc tree */
+       int stream_buf_count; /* total count of allocated stream buffers for this connection */
 
        /* MUX */
        struct qcc *qcc;
index d0fd0b2a7a0b27e83adf9b9690ce8e5abe3db0d1..b969d3a4bda2099750e493fc7d7eaa45352e4d1c 100644 (file)
@@ -722,7 +722,14 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset)
 
        if (qcs->tx.offset == qcs->tx.sent_offset && b_full(&qcs->stream->buf->buf)) {
                qc_stream_buf_release(qcs->stream);
-               tasklet_wakeup(qcc->wait_event.tasklet);
+
+               /* reschedule send if buffers available */
+               if (qc_stream_buf_avail(qcc->conn->handle.qc)) {
+                       tasklet_wakeup(qcc->wait_event.tasklet);
+               }
+               else {
+                       qcc->flags |= QC_CF_CONN_FULL;
+               }
        }
 }
 
@@ -884,14 +891,15 @@ static int qc_send(struct qcc *qcc)
                        continue;
                }
 
-               if (!out) {
-                       struct connection *conn = qcc->conn;
+               if (!out && (qcc->flags & QC_CF_CONN_FULL)) {
+                       node = eb64_next(node);
+                       continue;
+               }
 
-                       out = qc_stream_buf_alloc(qcs->stream,
-                                                 qcs->tx.offset);
+               if (!out) {
+                       out = qc_stream_buf_alloc(qcs->stream, qcs->tx.offset);
                        if (!out) {
-                               conn->xprt->subscribe(conn, conn->xprt_ctx,
-                                                     SUB_RETRY_SEND, &qcc->wait_event);
+                               qcc->flags |= QC_CF_CONN_FULL;
                                node = eb64_next(node);
                                continue;
                        }
index 0e58366e21ec5c7a57d271375c1014922541117c..2dd9b1c4ce7cd9415e9679e113feef43e977d67e 100644 (file)
@@ -75,11 +75,11 @@ void qc_stream_desc_release(struct qc_stream_desc *stream)
  * Returns the count of byte removed from stream. Do not forget to check if
  * <stream> is NULL after invocation.
  */
-int qc_stream_desc_ack(struct qc_stream_desc **stream, size_t offset,
-                       size_t len)
+int qc_stream_desc_ack(struct qc_stream_desc **stream, size_t offset, size_t len)
 {
        struct qc_stream_desc *s = *stream;
        struct qc_stream_buf *stream_buf;
+       struct quic_conn *qc = s->qc;
        struct buffer *buf;
        size_t diff;
 
@@ -115,6 +115,15 @@ int qc_stream_desc_ack(struct qc_stream_desc **stream, size_t offset,
        pool_free(pool_head_quic_conn_stream_buf, stream_buf);
        offer_buffers(NULL, 1);
 
+       /* notify MUX about available buffers. */
+       --qc->stream_buf_count;
+       if (qc->mux_state == QC_MUX_READY) {
+               if (qc->qcc->flags & QC_CF_CONN_FULL) {
+                       qc->qcc->flags &= ~QC_CF_CONN_FULL;
+                       tasklet_wakeup(qc->qcc->wait_event.tasklet);
+               }
+       }
+
        /* Free stream instance if already released and no buffers left. */
        if (s->release && LIST_ISEMPTY(&s->buf_list)) {
                qc_stream_desc_free(s);
@@ -131,6 +140,7 @@ int qc_stream_desc_ack(struct qc_stream_desc **stream, size_t offset,
 void qc_stream_desc_free(struct qc_stream_desc *stream)
 {
        struct qc_stream_buf *buf, *buf_back;
+       struct quic_conn *qc = stream->qc;
        struct eb64_node *frm_node;
        unsigned int free_count = 0;
 
@@ -148,9 +158,19 @@ void qc_stream_desc_free(struct qc_stream_desc *stream)
                }
        }
 
-       if (free_count)
+       if (free_count) {
                offer_buffers(NULL, free_count);
 
+               qc->stream_buf_count -= free_count;
+               if (qc->mux_state == QC_MUX_READY) {
+                       /* notify MUX about available buffers. */
+                       if (qc->qcc->flags & QC_CF_CONN_FULL) {
+                               qc->qcc->flags &= ~QC_CF_CONN_FULL;
+                               tasklet_wakeup(qc->qcc->wait_event.tasklet);
+                       }
+               }
+       }
+
        /* qc_stream_desc might be freed before having received all its ACKs.
         * This is the case if some frames were retransmitted.
         */
@@ -183,18 +203,35 @@ struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream)
        return &stream->buf->buf;
 }
 
-/* Allocate a new current buffer for <stream>. This function is not allowed if
- * current buffer is not NULL prior to this call. The new buffer represents
- * stream payload at offset <offset>.
+/* Check if a new stream buffer can be allocated for the connection <qc>.
+ * Returns a boolean.
+ */
+int qc_stream_buf_avail(struct quic_conn *qc)
+{
+       /* TODO use a global tune settings for max */
+       return qc->stream_buf_count < 30;
+}
+
+/* Allocate a new current buffer for <stream>. The buffer limit count for the
+ * connection is checked first. This function is not allowed if current buffer
+ * is not NULL prior to this call. The new buffer represents stream payload at
+ * offset <offset>.
  *
  * Returns the buffer or NULL.
  */
 struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream,
                                    uint64_t offset)
 {
+       struct quic_conn *qc = stream->qc;
+
        /* current buffer must be released first before allocate a new one. */
        BUG_ON(stream->buf);
 
+       if (!qc_stream_buf_avail(qc))
+               return NULL;
+
+       ++qc->stream_buf_count;
+
        stream->buf_offset = offset;
        stream->buf = pool_alloc(pool_head_quic_conn_stream_buf);
        if (!stream->buf)
index 543fa2c23882e836a68a68fb3548fb8ee9f51fbd..21abd855794be7070f4dc58e0224de2a04b2f38e 100644 (file)
@@ -4057,6 +4057,7 @@ static struct quic_conn *qc_new_conn(unsigned int version, int ipv4,
        MT_LIST_INIT(&qc->accept_list);
 
        qc->streams_by_id = EB_ROOT_UNIQUE;
+       qc->stream_buf_count = 0;
 
        TRACE_LEAVE(QUIC_EV_CONN_INIT, qc);