]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: mux-quic: use direct send transport API for STREAMs
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Wed, 9 Feb 2022 17:16:49 +0000 (18:16 +0100)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Fri, 4 Mar 2022 16:00:12 +0000 (17:00 +0100)
Modify the STREAM emission in qc_send. Use the new transport function
qc_send_app_pkts to directly send the list of constructed frames. This
allows to remove the tasklet wakeup on the quic_conn and should reduce
the latency.

If not all frames are send after the transport call, subscribe the MUX
on the lower layer to be able to retry. Currently there is a bug because
the transport layer does not retry to send frames in excess after a
successful sendto. This might cause the transfer to be interrupted.

include/haproxy/mux_quic-t.h
src/mux_quic.c

index 47a3947b18139afbc926d79358d373d58d88d02d..ef45d664c80b48752fca8e578c7a18d5f564883d 100644 (file)
@@ -55,6 +55,7 @@ struct qcc {
        } rx;
        struct {
                uint64_t max_data; /* Maximum number of bytes which may be sent */
+               struct list frms; /* list of frames ready to be sent */
        } tx;
 
        struct eb_root streams_by_id; /* all active streams by their ID */
index 7c52ac0136840f18d8c78a1eaa96e17cb4ebd139..24ba82933aaa8f959c3bdbb28c1d55ed95ee9094 100644 (file)
@@ -9,6 +9,7 @@
 #include <haproxy/htx.h>
 #include <haproxy/pool.h>
 #include <haproxy/ssl_sock-t.h>
+#include <haproxy/xprt_quic.h>
 
 DECLARE_POOL(pool_head_qcc, "qcc", sizeof(struct qcc));
 DECLARE_POOL(pool_head_qcs, "qcs", sizeof(struct qcs));
@@ -330,11 +331,11 @@ static void qc_release(struct qcc *qcc)
        }
 }
 
-static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint64_t offset)
+static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint64_t offset,
+                          struct list *frm_list)
 {
        struct quic_frame *frm;
        struct buffer *buf = &qcs->tx.xprt_buf;
-       struct quic_enc_level *qel = &qcs->qcc->conn->qc->els[QUIC_TLS_ENC_LEVEL_APP];
        int total = 0, to_xfer;
        unsigned char *btail;
 
@@ -370,7 +371,7 @@ static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint
                frm->stream.len = total;
        }
 
-       LIST_APPEND(&qel->pktns->tx.frms, &frm->list);
+       LIST_APPEND(frm_list, &frm->list);
  out:
        fprintf(stderr, "%s: total=%d fin=%d id=%llu offset=%lu\n",
                __func__, total, fin, (ull)qcs->by_id.key, offset);
@@ -380,37 +381,61 @@ static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint
        return -1;
 }
 
+/* Wrapper for send on transport layer. Send a list of frames <frms> for the
+ * connection <qcc>.
+ *
+ * Returns 0 if all data sent with success else non-zero.
+ */
+static int qc_send_frames(struct qcc *qcc, struct list *frms)
+{
+       if (!LIST_ISEMPTY(frms))
+               qc_send_app_pkts(qcc->conn->qc, frms);
+
+       /* TODO Currently, the transport layer is not complete. It might not
+        * try to send all frames even if the Tx buffer is free. In this case
+        * it is necessary to retry immediately instead of subscribing.
+        */
+       if (!LIST_ISEMPTY(frms)) {
+               fprintf(stderr, "%s: remaining frames to send\n", __func__);
+               qcc->conn->xprt->subscribe(qcc->conn, qcc->conn->xprt_ctx,
+                                          SUB_RETRY_SEND, &qcc->wait_event);
+               return 1;
+       }
+
+       return 0;
+}
+
 static int qc_send(struct qcc *qcc)
 {
        struct eb64_node *node;
-       int xprt_wake = 0;
        int ret = 0;
 
        fprintf(stderr, "%s\n", __func__);
 
-       /* TODO simple loop through all streams and check if there is frames to
-        * send
+       /* loop through all streams, construct STREAM frames if data available.
+        * TODO optimize the loop to favor streams which are not too heavy.
         */
        node = eb64_first(&qcc->streams_by_id);
        while (node) {
                struct qcs *qcs = container_of(node, struct qcs, by_id);
                struct buffer *buf = &qcs->tx.buf;
+
                if (b_data(buf)) {
                        char fin = qcs->flags & QC_SF_FIN_STREAM;
-                       ret = qcs_push_frame(qcs, buf, fin, qcs->tx.offset);
+                       ret = qcs_push_frame(qcs, buf, fin, qcs->tx.offset,
+                                            &qcc->tx.frms);
                        BUG_ON(ret < 0); /* TODO handle this properly */
 
                        if (ret > 0) {
                                qcs_notify_send(qcs);
                                if (qcs->flags & QC_SF_BLK_MROOM)
                                        qcs->flags &= ~QC_SF_BLK_MROOM;
-
-                               xprt_wake = 1;
                        }
 
                        fprintf(stderr, "%s ret=%d\n", __func__, ret);
                        qcs->tx.offset += ret;
 
+                       /* Subscribe if not all data can be send. */
                        if (b_data(buf)) {
                                qcc->conn->xprt->subscribe(qcc->conn, qcc->conn->xprt_ctx,
                                                           SUB_RETRY_SEND, &qcc->wait_event);
@@ -419,8 +444,8 @@ static int qc_send(struct qcc *qcc)
                node = eb64_next(node);
        }
 
-       if (xprt_wake)
-               tasklet_wakeup(((struct ssl_sock_ctx *)(qcc->conn->xprt_ctx))->wait_event.tasklet);
+       qc_send_frames(qcc, &qcc->tx.frms);
+       /* TODO adjust ret if not all frames are sent. */
 
        return ret;
 }
@@ -531,6 +556,7 @@ static int qc_init(struct connection *conn, struct proxy *prx,
 
        qcc->rx.max_data = lparams->initial_max_data;
        qcc->tx.max_data = 0;
+       LIST_INIT(&qcc->tx.frms);
 
        /* Client initiated streams must respect the server flow control. */
        qcc->strms[QCS_CLT_BIDI].max_streams = lparams->initial_max_streams_bidi;