]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: mux-quic: implement QMux send
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Fri, 27 Mar 2026 13:41:40 +0000 (14:41 +0100)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Thu, 2 Apr 2026 12:02:04 +0000 (14:02 +0200)
This patchs implement mux-quic reception for the new QMux protocol. This
is performed via the new function qcc_qstrm_send_frames(). Its interface
is similar to the QUIC equivalent : it takes a list of frames and
encodes them in a buffer before sending it via snd_buf.

Contrary to QUIC, a check on CO_FL_ERROR flag is performed prior to
every qcc_qstrm_send_frames() invokation to interrupt emission. This is
necessary as the transport layer may set it during snd_buf. This is not
the case currently for quic_conn layer, but maybe a similar mechanism
should be implemented as well for QUIC in the future.

include/haproxy/mux_quic_qstrm.h
src/mux_quic.c
src/mux_quic_qstrm.c

index 3e537d416cfaa2a6d1fe6e3e4de9e48b911f9785..6b81f3e65dfae5ce1e32ecabb0073d9d57c1d69a 100644 (file)
@@ -5,4 +5,6 @@
 
 int qcc_qstrm_recv(struct qcc *qcc);
 
+int qcc_qstrm_send_frames(struct qcc *qcc, struct list *frms);
+
 #endif /* _HAPROXY_MUX_QUIC_QSTRM_H */
index 1335dfe58a511f3572461e0c0c7b8e68a7c81467..806d2e4205be9e63a1113df6974c35a5bfa7a550 100644 (file)
@@ -2567,24 +2567,13 @@ static int qcc_subscribe_send(struct qcc *qcc)
        return 1;
 }
 
-/* Wrapper for send on transport layer. Send a list of frames <frms> for the
- * connection <qcc>.
- *
- * Returns 0 if all data sent with success. On fatal error, a negative error
- * code is returned. A positive 1 is used if emission should be paced.
- */
-static int qcc_send_frames(struct qcc *qcc, struct list *frms, int stream)
+static int qcc_quic_send_frames(struct qcc *qcc, struct list *frms, int stream)
 {
        enum quic_tx_err ret;
        struct quic_pacer *pacer = NULL;
 
        TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
 
-       if (LIST_ISEMPTY(frms)) {
-               TRACE_DEVEL("leaving on no frame to send", QMUX_EV_QCC_SEND, qcc->conn);
-               return -1;
-       }
-
        if (stream && qcc_is_pacing_active(qcc->conn))
                pacer = &qcc->tx.pacer;
 
@@ -2612,6 +2601,23 @@ static int qcc_send_frames(struct qcc *qcc, struct list *frms, int stream)
        return -1;
 }
 
+/* Wrapper for send on transport layer. Send a list of frames <frms> for the
+ * connection <qcc>.
+ *
+ * Returns 0 if all data sent with success. On fatal error, a negative error
+ * code is returned. A positive 1 is used if emission should be paced.
+ */
+static int qcc_send_frames(struct qcc *qcc, struct list *frms, int stream)
+{
+       if (LIST_ISEMPTY(frms)) {
+               TRACE_DEVEL("leaving on no frame to send", QMUX_EV_QCC_SEND, qcc->conn);
+               return -1;
+       }
+
+       return conn_is_quic(qcc->conn) ? qcc_quic_send_frames(qcc, frms, stream) :
+                                        qcc_qstrm_send_frames(qcc, frms);
+}
+
 /* Emit a RESET_STREAM on <qcs>.
  *
  * Returns 0 if the frame has been successfully sent else non-zero.
@@ -3072,6 +3078,12 @@ static int qcc_io_send(struct qcc *qcc)
         * flow-control limit reached.
         */
        while ((ret = qcc_send_frames(qcc, frms, 1)) == 0 && !qfctl_rblocked(&qcc->tx.fc)) {
+               /* TODO should this check also be performed for QUIC ? */
+               if (!conn_is_quic(qcc->conn) && (qcc->conn->flags & CO_FL_ERROR)) {
+                       TRACE_DEVEL("connection on error", QMUX_EV_QCC_SEND, qcc->conn);
+                       goto out;
+               }
+
                window_conn = qfctl_rcap(&qcc->tx.fc);
                resent = 0;
 
@@ -3083,7 +3095,8 @@ static int qcc_io_send(struct qcc *qcc)
                         * new qc_stream_desc should be present in send_list as
                         * long as transport layer can handle all data.
                         */
-                       BUG_ON(qcs->tx.stream->buf && !qfctl_rblocked(&qcs->tx.fc));
+                       BUG_ON((!conn_is_quic(qcc->conn) || qcs->tx.stream->buf) &&
+                              !qfctl_rblocked(&qcs->tx.fc));
 
                        /* Total sent bytes must not exceed connection window. */
                        BUG_ON(resent > window_conn);
index 25bd37ea6efa31b1d10932003a2eea87917939bd..12f54c3960f39605f070b10094e6a9ac9d535c0e 100644 (file)
@@ -147,3 +147,88 @@ int qcc_qstrm_recv(struct qcc *qcc)
  err:
        return -1;
 }
+
+/* Sends <frms> list of frames for <qcc> connection.
+ *
+ * Returns 0 if all data are emitted or a positive value if sending should be
+ * retry later. A negative error code is used for a fatal failure.
+ */
+int qcc_qstrm_send_frames(struct qcc *qcc, struct list *frms)
+{
+       struct connection *conn = qcc->conn;
+       struct quic_frame *frm, *frm_old;
+       struct quic_frame *split_frm, *orig_frm;
+       unsigned char *pos, *old, *end;
+       size_t ret;
+
+       TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
+       list_for_each_entry_safe(frm, frm_old, frms, list) {
+ loop:
+               split_frm = NULL;
+               b_reset(&trash);
+               old = pos = (unsigned char *)b_orig(&trash);
+               end = (unsigned char *)b_wrap(&trash);
+
+               BUG_ON(!frm);
+               TRACE_PRINTF(TRACE_LEVEL_DEVELOPER, QMUX_EV_QCC_SEND, qcc->conn, 0, 0, 0,
+                            "frm type %02llx", (ullong)frm->type);
+
+               if (frm->type >= QUIC_FT_STREAM_8 && frm->type <= QUIC_FT_STREAM_F) {
+                       size_t flen, split_size;
+
+                       flen = quic_strm_frm_fillbuf(end - pos, frm, &split_size);
+                       if (!flen)
+                               continue;
+
+                       if (split_size) {
+                               split_frm = quic_strm_frm_split(frm, split_size);
+                               if (!split_frm) {
+                                       ABORT_NOW();
+                                       continue;
+                               }
+
+                               orig_frm = frm;
+                               frm = split_frm;
+                       }
+               }
+
+               qc_build_frm(frm, &pos, end, NULL);
+               BUG_ON(pos - old > global.tune.bufsize);
+               BUG_ON(pos == old);
+               b_add(&trash, pos - old);
+
+               ret = conn->xprt->snd_buf(conn, conn->xprt_ctx, &trash, b_data(&trash), NULL, 0, 0);
+               if (!ret) {
+                       TRACE_DEVEL("snd_buf interrupted", QMUX_EV_QCC_SEND, qcc->conn);
+                       if (split_frm)
+                               LIST_INSERT(frms, &split_frm->list);
+                       break;
+               }
+
+               if (ret != b_data(&trash)) {
+                       /* TODO */
+                       ABORT_NOW();
+               }
+
+               if (frm->type >= QUIC_FT_STREAM_8 && frm->type <= QUIC_FT_STREAM_F)
+                       /* TODO notify MUX */
+
+               LIST_DEL_INIT(&frm->list);
+               if (split_frm) {
+                       frm = orig_frm;
+                       goto loop;
+               }
+       }
+
+       if (conn->flags & CO_FL_ERROR) {
+               /* TODO */
+               //ABORT_NOW();
+       }
+       else if (!LIST_ISEMPTY(frms) && !(qcc->wait_event.events & SUB_RETRY_SEND)) {
+               conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_SEND, &qcc->wait_event);
+               return 1;
+       }
+
+       TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn);
+       return 0;
+}