]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: quic: refactor MUX send notification
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Wed, 25 Sep 2024 15:55:10 +0000 (17:55 +0200)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Tue, 1 Oct 2024 14:19:25 +0000 (16:19 +0200)
For STREAM emission, MUX QUIC generates one or several frames and emit
them via qc_send_mux(). Lower layer may use them as-is, or split them to
lower chunk to fit in a QUIC packet. It is then responsible to notify
the MUX to report the amount of data sent.

Previously, this was done via a direct call from quic_conn to MUX using
qcc_streams_sent_done(). Modify this to have a better isolation accross
layers. Define a send callback handled by the qc_stream_desc instance.
This allows the MUX to register each QCS instance individually to the
renamved qmux_ctrl_send() which replaces qcc_streams_sent_done().

At quic_conn layer, qc_stream_desc_send() can be used now. This is a
wrapper to qc_stream_desc layer to invoke the send callback if
registered.

This mechanism of qc_stream_desc callback should be extended later to
implement other notifications accross the QUIC stack.

include/haproxy/mux_quic.h
include/haproxy/quic_stream-t.h
include/haproxy/quic_stream.h
src/mux_quic.c
src/quic_stream.c
src/quic_tx.c

index fd6c09024b2e7fd90a91523cde56b1e4588f6587..cc5eb1e96a51c4f75ccade0e1f341150f597f784 100644 (file)
@@ -40,7 +40,6 @@ int qcc_recv_max_data(struct qcc *qcc, uint64_t max);
 int qcc_recv_max_stream_data(struct qcc *qcc, uint64_t id, uint64_t max);
 int qcc_recv_reset_stream(struct qcc *qcc, uint64_t id, uint64_t err, uint64_t final_size);
 int qcc_recv_stop_sending(struct qcc *qcc, uint64_t id, uint64_t err);
-void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset);
 
 /* Bit shift to get the stream sub ID for internal use which is obtained
  * shifting the stream IDs by this value, knowing that the
index 1a5cf47dfde5d6c447e31b824883b9710e9450d9..8e0dcfd48437b6c3b6218ea0596eb59dfbe0667b 100644 (file)
@@ -46,7 +46,8 @@ struct qc_stream_desc {
 
        int flags; /* QC_SD_FL_* values */
 
-       void *ctx; /* MUX specific context */
+       void (*notify_send)(struct qc_stream_desc *, uint64_t offset, uint64_t len);
+       void *ctx; /* notify context */
 };
 
 #endif /* USE_QUIC */
index 11c4878365e1534aa1a28d4733313f1238832713..fcd843010804eb49ecbc986ffc3ba52e872e42aa 100644 (file)
@@ -20,5 +20,22 @@ struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream,
 struct buffer *qc_stream_buf_realloc(struct qc_stream_desc *stream);
 void qc_stream_buf_release(struct qc_stream_desc *stream);
 
+/* Reports emission of STREAM frame starting at <offset> and of length <len>,
+ * related to <stream> data storage.
+ */
+static inline void qc_stream_desc_send(struct qc_stream_desc *stream,
+                                       uint64_t offset, uint64_t len)
+{
+       if (stream->notify_send)
+               stream->notify_send(stream, len, offset);
+}
+
+/* Subscribe for send notification on <stream>. */
+static inline void qc_stream_desc_sub_send(struct qc_stream_desc *stream,
+                                           void (*cb)(struct qc_stream_desc *s, uint64_t offset, uint64_t len))
+{
+       stream->notify_send = cb;
+}
+
 #endif /* USE_QUIC */
 #endif /* _HAPROXY_QUIC_STREAM_H_ */
index 630e98bbc6ecccf603077b527747cb943ba1b341..dc80c88fd6824805da9d7a36c1ffaea62c5e4098 100644 (file)
@@ -32,6 +32,8 @@
 DECLARE_POOL(pool_head_qcc, "qcc", sizeof(struct qcc));
 DECLARE_POOL(pool_head_qcs, "qcs", sizeof(struct qcs));
 
+static void qmux_ctrl_send(struct qc_stream_desc *, uint64_t data, uint64_t offset);
+
 static void qcs_free_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf)
 {
        struct buffer buf;
@@ -179,6 +181,8 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
                        TRACE_ERROR("qc_stream_desc alloc failure", QMUX_EV_QCS_NEW, qcc->conn, qcs);
                        goto err;
                }
+
+               qc_stream_desc_sub_send(qcs->stream, qmux_ctrl_send);
        }
 
        if (qcc->app_ops->attach && qcc->app_ops->attach(qcs, qcc->ctx)) {
@@ -525,6 +529,100 @@ void qcs_notify_send(struct qcs *qcs)
        }
 }
 
+/* Returns total number of bytes not already sent to quic-conn layer. */
+static uint64_t qcs_prep_bytes(const struct qcs *qcs)
+{
+       struct buffer *out = qc_stream_buf_get(qcs->stream);
+       uint64_t diff, base_off;
+
+       if (!out)
+               return 0;
+
+       /* if ack_offset < buf_offset, it points to an older buffer. */
+       base_off = MAX(qcs->stream->buf_offset, qcs->stream->ack_offset);
+       diff = qcs->tx.fc.off_real - base_off;
+       return b_data(out) - diff;
+}
+
+/* Used as a callback for qc_stream_desc layer to notify about emission of a
+ * STREAM frame of <data> length starting at <offset>.
+ */
+static void qmux_ctrl_send(struct qc_stream_desc *stream, uint64_t data, uint64_t offset)
+{
+       struct qcs *qcs = stream->ctx;
+       struct qcc *qcc = qcs->qcc;
+       uint64_t diff;
+
+       TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs);
+
+       /* Real off MUST always be the greatest offset sent. */
+       BUG_ON(offset > qcs->tx.fc.off_real);
+
+       /* check if the STREAM frame has already been notified. It can happen
+        * for retransmission.
+        */
+       if (offset + data < qcs->tx.fc.off_real) {
+               TRACE_DEVEL("offset already notified", QMUX_EV_QCS_SEND, qcc->conn, qcs);
+               goto out;
+       }
+
+       qcs_idle_open(qcs);
+
+       diff = offset + data - qcs->tx.fc.off_real;
+       if (diff) {
+               struct quic_fctl *fc_conn = &qcc->tx.fc;
+               struct quic_fctl *fc_strm = &qcs->tx.fc;
+
+               /* Ensure real offset never exceeds soft value. */
+               BUG_ON(fc_conn->off_real + diff > fc_conn->off_soft);
+               BUG_ON(fc_strm->off_real + diff > fc_strm->off_soft);
+
+               /* increase offset sum on connection */
+               if (qfctl_rinc(fc_conn, diff)) {
+                       TRACE_STATE("connection flow-control reached",
+                                   QMUX_EV_QCS_SEND, qcc->conn);
+               }
+
+               /* increase offset on stream */
+               if (qfctl_rinc(fc_strm, diff)) {
+                       TRACE_STATE("stream flow-control reached",
+                                   QMUX_EV_QCS_SEND, qcc->conn, qcs);
+               }
+               /* Release buffer if everything sent and buf is full or stream is waiting for room. */
+               if (!qcs_prep_bytes(qcs) &&
+                   (b_full(&qcs->stream->buf->buf) || qcs->flags & QC_SF_BLK_MROOM)) {
+                       qc_stream_buf_release(qcs->stream);
+                       qcs->flags &= ~QC_SF_BLK_MROOM;
+                       qcs_notify_send(qcs);
+               }
+
+               /* Add measurement for send rate. This is done at the MUX layer
+                * to account only for STREAM frames without retransmission.
+                */
+               increment_send_rate(diff, 0);
+       }
+
+       if (!qc_stream_buf_get(qcs->stream) || !qcs_prep_bytes(qcs)) {
+               /* Remove stream from send_list if all was sent. */
+               LIST_DEL_INIT(&qcs->el_send);
+               TRACE_STATE("stream sent done", QMUX_EV_QCS_SEND, qcc->conn, qcs);
+
+               if (qcs->flags & (QC_SF_FIN_STREAM|QC_SF_DETACH)) {
+                       /* Close stream locally. */
+                       qcs_close_local(qcs);
+
+                       if (qcs->flags & QC_SF_FIN_STREAM) {
+                               qcs->stream->flags |= QC_SD_FL_WAIT_FOR_FIN;
+                               /* Reset flag to not emit multiple FIN STREAM frames. */
+                               qcs->flags &= ~QC_SF_FIN_STREAM;
+                       }
+               }
+       }
+
+ out:
+       TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
+}
+
 /* Returns true if <qcc> buffer window does not have room for a new buffer. */
 static inline int qcc_bufwnd_full(const struct qcc *qcc)
 {
@@ -1123,21 +1221,6 @@ struct buffer *qcc_realloc_stream_txbuf(struct qcs *qcs)
        return out && b_size(out) ? out : NULL;
 }
 
-/* Returns total number of bytes not already sent to quic-conn layer. */
-static uint64_t qcs_prep_bytes(const struct qcs *qcs)
-{
-       struct buffer *out = qc_stream_buf_get(qcs->stream);
-       uint64_t diff, base_off;
-
-       if (!out)
-               return 0;
-
-       /* if ack_offset < buf_offset, it points to an older buffer. */
-       base_off = MAX(qcs->stream->buf_offset, qcs->stream->ack_offset);
-       diff = qcs->tx.fc.off_real - base_off;
-       return b_data(out) - diff;
-}
-
 /* Try to realign <out> buffer for <qcs> stream. This is done only if there is
  * no data waiting for ACK.
  *
@@ -1942,85 +2025,6 @@ static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin,
        return -1;
 }
 
-/* This function must be called by the upper layer to inform about the sending
- * of a STREAM frame for <qcs> instance. The frame is of <data> length and on
- * <offset>.
- */
-void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset)
-{
-       struct qcc *qcc = qcs->qcc;
-       uint64_t diff;
-
-       TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs);
-
-       /* Real off MUST always be the greatest offset sent. */
-       BUG_ON(offset > qcs->tx.fc.off_real);
-
-       /* check if the STREAM frame has already been notified. It can happen
-        * for retransmission.
-        */
-       if (offset + data < qcs->tx.fc.off_real) {
-               TRACE_DEVEL("offset already notified", QMUX_EV_QCS_SEND, qcc->conn, qcs);
-               goto out;
-       }
-
-       qcs_idle_open(qcs);
-
-       diff = offset + data - qcs->tx.fc.off_real;
-       if (diff) {
-               struct quic_fctl *fc_conn = &qcc->tx.fc;
-               struct quic_fctl *fc_strm = &qcs->tx.fc;
-
-               /* Ensure real offset never exceeds soft value. */
-               BUG_ON(fc_conn->off_real + diff > fc_conn->off_soft);
-               BUG_ON(fc_strm->off_real + diff > fc_strm->off_soft);
-
-               /* increase offset sum on connection */
-               if (qfctl_rinc(fc_conn, diff)) {
-                       TRACE_STATE("connection flow-control reached",
-                                   QMUX_EV_QCS_SEND, qcc->conn);
-               }
-
-               /* increase offset on stream */
-               if (qfctl_rinc(fc_strm, diff)) {
-                       TRACE_STATE("stream flow-control reached",
-                                   QMUX_EV_QCS_SEND, qcc->conn, qcs);
-               }
-               /* Release buffer if everything sent and buf is full or stream is waiting for room. */
-               if (!qcs_prep_bytes(qcs) &&
-                   (b_full(&qcs->stream->buf->buf) || qcs->flags & QC_SF_BLK_MROOM)) {
-                       qc_stream_buf_release(qcs->stream);
-                       qcs->flags &= ~QC_SF_BLK_MROOM;
-                       qcs_notify_send(qcs);
-               }
-
-               /* Add measurement for send rate. This is done at the MUX layer
-                * to account only for STREAM frames without retransmission.
-                */
-               increment_send_rate(diff, 0);
-       }
-
-       if (!qc_stream_buf_get(qcs->stream) || !qcs_prep_bytes(qcs)) {
-               /* Remove stream from send_list if all was sent. */
-               LIST_DEL_INIT(&qcs->el_send);
-               TRACE_STATE("stream sent done", QMUX_EV_QCS_SEND, qcc->conn, qcs);
-
-               if (qcs->flags & (QC_SF_FIN_STREAM|QC_SF_DETACH)) {
-                       /* Close stream locally. */
-                       qcs_close_local(qcs);
-
-                       if (qcs->flags & QC_SF_FIN_STREAM) {
-                               qcs->stream->flags |= QC_SD_FL_WAIT_FOR_FIN;
-                               /* Reset flag to not emit multiple FIN STREAM frames. */
-                               qcs->flags &= ~QC_SF_FIN_STREAM;
-                       }
-               }
-       }
-
- out:
-       TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
-}
-
 /* Returns true if subscribe set, false otherwise. */
 static int qcc_subscribe_send(struct qcc *qcc)
 {
index 91f657d4766f674fd4194b7775eb455eb409b00d..f6adc2e439149a196a56a8a57f175b70df951c2c 100644 (file)
@@ -91,6 +91,7 @@ struct qc_stream_desc *qc_stream_desc_new(uint64_t id, enum qcs_type type, void
        stream->ack_offset = 0;
        stream->flags = 0;
        stream->ctx = ctx;
+       stream->notify_send = NULL;
 
        return stream;
 }
index 8ad3d515e6d5c8494ba76b6a3e9059b05b487e4f..1420e103b039f2527401fa444bafa4d5fc26362b 100644 (file)
@@ -23,6 +23,7 @@
 #include <haproxy/quic_retransmit.h>
 #include <haproxy/quic_retry.h>
 #include <haproxy/quic_sock.h>
+#include <haproxy/quic_stream.h>
 #include <haproxy/quic_tls.h>
 #include <haproxy/quic_trace.h>
 #include <haproxy/ssl_sock-t.h>
@@ -1664,9 +1665,9 @@ static int qc_build_frms(struct list *outlist, struct list *inlist,
 
                                /* Do not notify MUX on retransmission. */
                                if (qc->flags & QUIC_FL_CONN_TX_MUX_CONTEXT) {
-                                       qcc_streams_sent_done(cf->stream.stream->ctx,
-                                                             cf->stream.len,
-                                                             cf->stream.offset.key);
+                                       qc_stream_desc_send(cf->stream.stream,
+                                                           cf->stream.offset.key,
+                                                           cf->stream.len);
                                }
                        }
                        else {
@@ -1711,14 +1712,14 @@ static int qc_build_frms(struct list *outlist, struct list *inlist,
 
                                /* Do not notify MUX on retransmission. */
                                if (qc->flags & QUIC_FL_CONN_TX_MUX_CONTEXT) {
-                                       qcc_streams_sent_done(new_cf->stream.stream->ctx,
-                                                             new_cf->stream.len,
-                                                             new_cf->stream.offset.key);
+                                       qc_stream_desc_send(new_cf->stream.stream,
+                                                           new_cf->stream.offset.key,
+                                                           new_cf->stream.len);
                                }
                        }
 
                        /* TODO the MUX is notified about the frame sending via
-                        * previous qcc_streams_sent_done call. However, the
+                        * previous qc_stream_desc_send call. However, the
                         * sending can fail later, for example if the sendto
                         * system call returns an error. As the MUX has been
                         * notified, the transport layer is responsible to