]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MAJOR: mux-quic: implement QUIC-TCP specific ops 20250214-quic-on-streams
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Wed, 19 Feb 2025 14:02:19 +0000 (15:02 +0100)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Wed, 23 Apr 2025 09:16:05 +0000 (11:16 +0200)
include/haproxy/mux_quic-t.h
src/mux_quic.c

index 96b26e3ce9830bd1eb8cfe40168767fa9ea0455f..528543bf8a7b3c882483e942a3ae2923ed9923ba 100644 (file)
@@ -240,6 +240,7 @@ struct qcc_app_ops {
 #define QC_CF_ERR_CONN  0x00000020 /* fatal error reported by transport layer */
 #define QC_CF_WAIT_HS   0x00000040 /* MUX init before QUIC handshake completed (0-RTT) */
 #define QC_CF_QOS       0x00000080
+#define QC_CF_QSTP_SENT 0x00000100
 
 /* This function is used to report flags in debugging tools. Please reflect
  * below any single-bit flag addition above in the same order via the
index 15ce50cb1ee578e4151e58e6017616253f094d7c..4c60c5e1b62f8194a4c69bf83179746c5fe4d07e 100644 (file)
@@ -38,6 +38,7 @@ DECLARE_STATIC_POOL(pool_head_qc_stream_rxbuf, "qc_stream_rxbuf", sizeof(struct
 
 static void qmux_ctrl_send(struct qc_stream_desc *, uint64_t data, uint64_t offset);
 static void qmux_ctrl_room(struct qc_stream_desc *, uint64_t room);
+static void qmux_qos_ctrl_send(struct qcs *qcs, uint64_t data, uint64_t offset);
 
 int qmux_is_quic(const struct qcc *qcc)
 {
@@ -2350,7 +2351,7 @@ static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin,
                goto err;
        }
 
-       frm->stream.stream = qcs->stream;
+       frm->stream.stream = qmux_is_quic(qcc) ? qcs->stream : (struct qc_stream_desc *)qcs;
        frm->stream.id = qcs->id;
        frm->stream.offset = 0;
        frm->stream.dup = 0;
@@ -2519,7 +2520,8 @@ static int _qcc_qos_send_frames(struct qcc *qcc, struct list *frms, int stream)
                }
 
                if (frm->type >= QUIC_FT_STREAM_8 && frm->type <= QUIC_FT_STREAM_E) {
-                       /* TODO notify MUX */
+                       qmux_qos_ctrl_send((struct qcs *)frm->stream.stream,
+                                          frm->stream.len, frm->stream.offset);
                }
 
                LIST_DEL_INIT(&frm->list);
@@ -4181,3 +4183,487 @@ static struct mux_proto_list mux_proto_quic =
   { .token = IST("quic"), .mode = PROTO_MODE_HTTP, .side = PROTO_SIDE_FE, .mux = &qmux_ops };
 
 INITCALL1(STG_REGISTER, register_mux_proto, &mux_proto_quic);
+
+static int qcc_qos_recv(struct qcc *qcc)
+{
+       struct connection *conn = qcc->conn;
+       struct quic_frame frm;
+       const unsigned char *pos, *end;
+       int ret;
+
+       TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
+
+       chunk_reset(&trash);
+       ret = conn->xprt->rcv_buf(conn, conn->xprt_ctx, &trash, trash.size, 0);
+       BUG_ON(ret < 0);
+
+       if (ret) {
+               b_add(&trash, ret);
+
+               pos = (unsigned char *)b_head(&trash);
+               end = (unsigned char *)b_tail(&trash);
+               ret = qc_parse_frm(&frm, NULL, &pos, end, NULL);
+               BUG_ON(!ret);
+
+               if (frm.type == QUIC_FT_QS_TP) {
+                       struct qf_qs_tp *qs_tp_frm = &frm.qs_tp;
+                       fprintf(stderr, "got qs_transport_parameters frame\n");
+                       fprintf(stderr, "  max_idle_timeout=%llu\n", (ullong)qs_tp_frm->tps.max_idle_timeout);
+                       fprintf(stderr, "  initial_max_data=%llu\n", (ullong)qs_tp_frm->tps.initial_max_data);
+                       qfctl_set_max(&qcc->tx.fc, qs_tp_frm->tps.initial_max_data, NULL, NULL);
+                       fprintf(stderr, "  initial_max_stream_data_bidi_local=%llu\n", (ullong)qs_tp_frm->tps.initial_max_stream_data_bidi_local);
+                       qcc->rfctl.msd_bidi_l = qs_tp_frm->tps.initial_max_stream_data_bidi_local;
+                       fprintf(stderr, "  initial_max_stream_data_bidi_remote=%llu\n", (ullong)qs_tp_frm->tps.initial_max_stream_data_bidi_remote);
+                       qcc->rfctl.msd_bidi_r = qs_tp_frm->tps.initial_max_stream_data_bidi_remote;
+                       fprintf(stderr, "  initial_max_stream_data_uni=%llu\n", (ullong)qs_tp_frm->tps.initial_max_stream_data_uni);
+                       qcc->rfctl.msd_uni_l = qs_tp_frm->tps.initial_max_stream_data_uni;
+                       fprintf(stderr, "  initial_max_streams_bidi=%llu\n", (ullong)qs_tp_frm->tps.initial_max_streams_bidi);
+                       fprintf(stderr, "  initial_max_streams_uni=%llu\n", (ullong)qs_tp_frm->tps.initial_max_streams_uni);
+               }
+               else if (frm.type >= QUIC_FT_STREAM_8 &&
+                        frm.type <= QUIC_FT_STREAM_F) {
+                       struct qf_stream *strm_frm = &frm.stream;
+
+                       qcc_recv(qcc, strm_frm->id, strm_frm->len, strm_frm->offset,
+                                (frm.type & QUIC_STREAM_FRAME_TYPE_FIN_BIT), (char *)strm_frm->data);
+               }
+               else if (frm.type == QUIC_FT_RESET_STREAM) {
+                       struct qf_reset_stream *rst_frm = &frm.reset_stream;
+                       qcc_recv_reset_stream(qcc, rst_frm->id, rst_frm->app_error_code, rst_frm->final_size);
+               }
+               else {
+                       ABORT_NOW();
+               }
+
+       }
+       else {
+               BUG_ON(!trash.size);
+               if (!conn_xprt_read0_pending(qcc->conn)) {
+                       conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_RECV,
+                                             &qcc->wait_event);
+               }
+       }
+
+       TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);
+       return ret;
+
+ err:
+       return -1;
+}
+
+static int qcc_qos_io_recv(struct qcc *qcc)
+{
+       struct qcs *qcs;
+
+       TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
+
+       if (qcc->flags & QC_CF_ERRL) {
+               TRACE_DATA("connection on error", QMUX_EV_QCC_RECV, qcc->conn);
+               TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);
+               return 0;
+       }
+
+       if ((qcc->flags & QC_CF_WAIT_HS) && !(qcc->wait_event.events & SUB_RETRY_RECV))
+               qcc_wait_for_hs(qcc);
+
+       if (!(qcc->wait_event.events & SUB_RETRY_RECV))
+               qcc_qos_recv(qcc);
+
+       while (!LIST_ISEMPTY(&qcc->recv_list)) {
+               qcs = LIST_ELEM(qcc->recv_list.n, struct qcs *, el_recv);
+               /* no need to add an uni local stream in recv_list. */
+               BUG_ON(quic_stream_is_uni(qcs->id) && quic_stream_is_local(qcc, qcs->id));
+               qcc_decode_qcs(qcc, qcs);
+               LIST_DEL_INIT(&qcs->el_recv);
+       }
+
+       TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);
+       return 0;
+}
+
+/* Used as a callback for qc_stream_desc layer to notify about emission of a
+ * STREAM frame of <data> length starting at <offset>.
+ */
+static void qmux_qos_ctrl_send(struct qcs *qcs, uint64_t data, uint64_t offset)
+{
+       struct qcc *qcc = qcs->qcc;
+       uint64_t diff;
+
+       TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs);
+
+       /* Real off MUST always be the greatest offset sent. */
+       BUG_ON(offset > qcs->tx.fc.off_real);
+
+       /* Check if the STREAM frame has already been notified. An empty FIN
+        * frame must not be considered retransmitted.
+        */
+       if (data && offset + data <= qcs->tx.fc.off_real) {
+               TRACE_DEVEL("offset already notified", QMUX_EV_QCS_SEND, qcc->conn, qcs);
+               goto out;
+       }
+
+       /* An empty STREAM frame is only used to notify FIN. A retransmitted
+        * empty FIN cannot be notified as QCS will be unsubscribed first.
+        */
+       BUG_ON(!data && !(qcs->flags & QC_SF_FIN_STREAM));
+
+       qcs_idle_open(qcs);
+
+       diff = offset + data - qcs->tx.fc.off_real;
+       if (diff) {
+               struct quic_fctl *fc_conn = &qcc->tx.fc;
+               struct quic_fctl *fc_strm = &qcs->tx.fc;
+
+               /* Ensure real offset never exceeds soft value. */
+               BUG_ON(fc_conn->off_real + diff > fc_conn->off_soft);
+               BUG_ON(fc_strm->off_real + diff > fc_strm->off_soft);
+
+               /* increase offset sum on connection */
+               if (qfctl_rinc(fc_conn, diff)) {
+                       TRACE_STATE("connection flow-control reached",
+                                   QMUX_EV_QCS_SEND, qcc->conn);
+               }
+
+               /* increase offset on stream */
+               if (qfctl_rinc(fc_strm, diff)) {
+                       TRACE_STATE("stream flow-control reached",
+                                   QMUX_EV_QCS_SEND, qcc->conn, qcs);
+               }
+
+               b_del(&qcs->qos_buf, diff);
+               /* Release buffer if everything sent and buf is full or stream is waiting for room. */
+               if (!qcs_prep_bytes(qcs) && qcs->flags & QC_SF_BLK_MROOM) {
+                       qcs->flags &= ~QC_SF_BLK_MROOM;
+                       qcs_notify_send(qcs);
+               }
+
+               /* Add measurement for send rate. This is done at the MUX layer
+                * to account only for STREAM frames without retransmission.
+                */
+               increment_send_rate(diff, 0);
+       }
+
+       if (!qcs_prep_bytes(qcs)) {
+               /* Remove stream from send_list if all was sent. */
+               LIST_DEL_INIT(&qcs->el_send);
+               TRACE_STATE("stream sent done", QMUX_EV_QCS_SEND, qcc->conn, qcs);
+
+               if (qcs->flags & (QC_SF_FIN_STREAM|QC_SF_DETACH)) {
+                       /* Close stream locally. */
+                       qcs_close_local(qcs);
+
+                       if (qcs->flags & QC_SF_FIN_STREAM) {
+                               /* Reset flag to not emit multiple FIN STREAM frames. */
+                               qcs->flags &= ~QC_SF_FIN_STREAM;
+                       }
+
+                       if (qcs_is_completed(qcs)) {
+                               TRACE_STATE("add stream in purg_list", QMUX_EV_QCS_SEND, qcc->conn, qcs);
+                               LIST_APPEND(&qcc->purg_list, &qcs->el_send);
+                       }
+               }
+       }
+
+ out:
+       TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
+}
+
+static int qcc_qos_send_tp(struct qcc *qcc)
+{
+       struct quic_frame *frm;
+       struct list list = LIST_HEAD_INIT(list);
+
+       TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
+
+       frm = qc_frm_alloc(QUIC_FT_QS_TP);
+       if (!frm) {
+               TRACE_ERROR("frame alloc failure", QMUX_EV_QCC_SEND, qcc->conn);
+               goto err;
+       }
+
+       LIST_APPEND(&list, &frm->list);
+       if (qcc_send_frames(qcc, &list, 0)) {
+               TRACE_DEVEL("QoS frames rejected by transport, aborting send", QMUX_EV_QCC_SEND, qcc->conn);
+               goto err;
+       }
+
+       TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn);
+       return 0;
+
+ err:
+       TRACE_DEVEL("leaving on error", QMUX_EV_QCC_SEND, qcc->conn);
+       return -1;
+}
+
+static int qcc_qos_io_send(struct qcc *qcc)
+{
+       struct list *frms = &qcc->tx.frms;
+       /* Temporary list for QCS on error. */
+       struct list qcs_failed = LIST_HEAD_INIT(qcs_failed);
+       struct qcs *qcs, *qcs_tmp;
+       uint64_t window_conn __maybe_unused = qfctl_rcap(&qcc->tx.fc);
+       int ret __maybe_unused = 0, total = 0, resent __maybe_unused;
+
+       TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
+
+       if (!(qcc->flags & QC_CF_QSTP_SENT)) {
+               if (qcc_qos_send_tp(qcc))
+                       return 0;
+               qcc->flags |= QC_CF_QSTP_SENT;
+       }
+
+       /* Check for transport error. */
+       if (qcc->flags & QC_CF_ERR_CONN || qcc->conn->flags & CO_FL_ERROR) {
+               TRACE_DEVEL("connection on error", QMUX_EV_QCC_SEND, qcc->conn);
+               goto out;
+       }
+
+       /* Check for locally detected connection error. */
+       if (qcc->flags & QC_CF_ERRL) {
+               /* Prepare a CONNECTION_CLOSE if not already done. */
+               if (!(qcc->flags & QC_CF_ERRL_DONE)) {
+                       TRACE_DATA("report a connection error", QMUX_EV_QCC_SEND|QMUX_EV_QCC_ERR, qcc->conn);
+                       quic_set_connection_close(qcc->conn->handle.qc, qcc->err);
+                       qcc->flags |= QC_CF_ERRL_DONE;
+               }
+               goto out;
+       }
+
+       if (qcc->app_st < QCC_APP_ST_INIT) {
+               if (qcc_app_init(qcc))
+                       goto out;
+       }
+
+       if (qcc->conn->flags & CO_FL_SOCK_WR_SH) {
+               qcc->conn->flags |= CO_FL_ERROR;
+               TRACE_DEVEL("connection on error", QMUX_EV_QCC_SEND, qcc->conn);
+               goto out;
+       }
+
+       if (!LIST_ISEMPTY(&qcc->lfctl.frms)) {
+               if (qcc_send_frames(qcc, &qcc->lfctl.frms, 0)) {
+                       TRACE_DEVEL("flow-control frames rejected by transport, aborting send", QMUX_EV_QCC_SEND, qcc->conn);
+                       goto out;
+               }
+       }
+
+       if (qcc_emit_rs_ss(qcc)) {
+               TRACE_DEVEL("emission interrupted on STOP_SENDING/RESET_STREAM send error", QMUX_EV_QCC_SEND, qcc->conn);
+               goto out;
+       }
+
+       /* Encode new STREAM frames if list has been previously cleared. */
+       if (LIST_ISEMPTY(frms) && !LIST_ISEMPTY(&qcc->send_list)) {
+               total = qcc_build_frms(qcc, &qcs_failed);
+               if (LIST_ISEMPTY(frms))
+                       goto out;
+       }
+
+       ret = qcc_send_frames(qcc, frms, 1);
+
+ out:
+       /* Re-insert on-error QCS at the end of the send-list. */
+       if (!LIST_ISEMPTY(&qcs_failed)) {
+               list_for_each_entry_safe(qcs, qcs_tmp, &qcs_failed, el_send) {
+                       LIST_DEL_INIT(&qcs->el_send);
+                       LIST_APPEND(&qcc->send_list, &qcs->el_send);
+               }
+
+               if (!qfctl_rblocked(&qcc->tx.fc))
+                       tasklet_wakeup(qcc->wait_event.tasklet);
+       }
+
+       if (qcc->conn->flags & CO_FL_ERROR && !(qcc->flags & QC_CF_ERR_CONN)) {
+               TRACE_ERROR("error reported by transport layer",
+                           QMUX_EV_QCC_SEND, qcc->conn);
+               qcc->flags |= QC_CF_ERR_CONN;
+       }
+
+       TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn);
+       return total;
+}
+
+struct task *qcc_qos_io_cb(struct task *t, void *ctx, unsigned int status)
+{
+       struct qcc *qcc = ctx;
+
+       TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
+
+       qcc_qos_io_recv(qcc);
+
+       if (!(qcc->wait_event.events & SUB_RETRY_SEND))
+               qcc_qos_io_send(qcc);
+
+       qcc_qos_io_recv(qcc);
+
+       if (qcc_io_process(qcc)) {
+               TRACE_STATE("releasing dead connection", QMUX_EV_QCC_WAKE, qcc->conn);
+               goto release;
+       }
+
+       qcc_refresh_timeout(qcc);
+
+       TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn);
+
+       return NULL;
+
+ release:
+       qcc_shutdown(qcc);
+       qcc_release(qcc);
+
+       TRACE_LEAVE(QMUX_EV_QCC_WAKE);
+
+       return NULL;
+}
+
+static int qmux_qos_init(struct connection *conn, struct proxy *prx,
+                         struct session *sess, struct buffer *input)
+{
+       struct qcc *qcc;
+
+       TRACE_ENTER(QMUX_EV_QCC_NEW);
+
+       qcc = pool_alloc(pool_head_qcc);
+       if (!qcc) {
+               TRACE_ERROR("alloc failure", QMUX_EV_QCC_NEW);
+               goto err;
+       }
+
+       _qcc_init(qcc);
+       conn->ctx = qcc;
+       qcc->nb_hreq = qcc->nb_sc = 0;
+       qcc->flags = QC_CF_QOS;
+       qcc->app_st = QCC_APP_ST_NULL;
+       qcc->glitches = 0;
+       qcc->err = quic_err_transport(QC_ERR_NO_ERROR);
+
+       /* hardcoded inital TP values. Is this really necessary? */
+       qcc->lfctl.ms_bidi = qcc->lfctl.ms_bidi_init = 16384;
+       qcc->lfctl.ms_uni = 3;
+       qcc->lfctl.msd_bidi_l = 16384;
+       qcc->lfctl.msd_bidi_r = 16384;
+       qcc->lfctl.msd_uni_r = 16384;
+       qcc->lfctl.cl_bidi_r = 0;
+
+       qcc->lfctl.md = qcc->lfctl.md_init = 16384;
+       qcc->lfctl.offsets_recv = qcc->lfctl.offsets_consume = 0;
+
+       qfctl_init(&qcc->tx.fc, 0);
+
+       qcc->tx.buf_in_flight = 0;
+
+       if (conn_is_back(conn)) {
+               qcc->next_bidi_l    = 0x00;
+               qcc->largest_bidi_r = 0x01;
+               qcc->next_uni_l     = 0x02;
+               qcc->largest_uni_r  = 0x03;
+       }
+       else {
+               qcc->largest_bidi_r = 0x00;
+               qcc->next_bidi_l    = 0x01;
+               qcc->largest_uni_r  = 0x02;
+               qcc->next_uni_l     = 0x03;
+       }
+
+       qcc->wait_event.tasklet = tasklet_new();
+       if (!qcc->wait_event.tasklet) {
+               TRACE_ERROR("taslket alloc failure", QMUX_EV_QCC_NEW);
+               goto err;
+       }
+
+       LIST_INIT(&qcc->recv_list);
+       LIST_INIT(&qcc->send_list);
+       LIST_INIT(&qcc->fctl_list);
+       LIST_INIT(&qcc->buf_wait_list);
+       LIST_INIT(&qcc->purg_list);
+
+       qcc->wait_event.tasklet->process = qcc_qos_io_cb;
+       qcc->wait_event.tasklet->context = qcc;
+       qcc->wait_event.tasklet->state  |= TASK_F_WANTS_TIME;
+       qcc->wait_event.events = 0;
+
+       qcc->proxy = prx;
+       /* haproxy timeouts */
+       if (conn_is_back(conn)) {
+               qcc->timeout = prx->timeout.server;
+               qcc->shut_timeout = tick_isset(prx->timeout.serverfin) ?
+                                   prx->timeout.serverfin : prx->timeout.server;
+       }
+       else {
+               qcc->timeout = prx->timeout.client;
+               qcc->shut_timeout = tick_isset(prx->timeout.clientfin) ?
+                                   prx->timeout.clientfin : prx->timeout.client;
+       }
+
+       /* Always allocate task even if timeout is unset. In MUX code, if task
+        * is NULL, it indicates that a timeout has stroke earlier.
+        */
+       qcc->task = task_new_here();
+       if (!qcc->task) {
+               TRACE_ERROR("timeout task alloc failure", QMUX_EV_QCC_NEW);
+               goto err;
+       }
+       qcc->task->process = qcc_timeout_task;
+       qcc->task->context = qcc;
+       qcc->task->expire = tick_add_ifset(now_ms, qcc->timeout);
+
+       qcc_reset_idle_start(qcc);
+       LIST_INIT(&qcc->opening_list);
+
+       /* Register conn as app_ops may use it. */
+       qcc->conn = conn;
+
+       /* TODO hardcoded HTTP/3 ops */
+       if (qcc_install_app_ops(qcc, &h3_ops)) {
+               TRACE_PROTO("Cannot install app layer", QMUX_EV_QCC_NEW|QMUX_EV_QCC_ERR, conn);
+               goto err;
+       }
+
+       if (qcc->app_ops == &h3_ops)
+               proxy_inc_fe_cum_sess_ver_ctr(sess->listener, prx, 3);
+
+       /* Register conn for idle front closing. This is done once everything is allocated. */
+       if (!conn_is_back(conn))
+               LIST_APPEND(&mux_stopping_data[tid].list, &conn->stopping_list);
+
+       /* init read cycle */
+       tasklet_wakeup(qcc->wait_event.tasklet);
+
+       TRACE_LEAVE(QMUX_EV_QCC_NEW, conn);
+       return 0;
+
+ err:
+       if (qcc) {
+               /* In case of MUX init failure, session will ensure connection is freed. */
+               qcc->conn = NULL;
+               qcc_release(qcc);
+       }
+
+       TRACE_DEVEL("leaving on error", QMUX_EV_QCC_NEW, conn);
+       return -1;
+}
+
+static const struct mux_ops qmux_qos_ops = {
+       .init        = qmux_qos_init,
+       .destroy     = qmux_destroy,
+       .detach      = qmux_strm_detach,
+       .rcv_buf     = qmux_strm_rcv_buf,
+       .snd_buf     = qmux_strm_snd_buf,
+       .nego_fastfwd = qmux_strm_nego_ff,
+       .done_fastfwd = qmux_strm_done_ff,
+       .resume_fastfwd = qmux_strm_resume_ff,
+       .subscribe   = qmux_strm_subscribe,
+       .unsubscribe = qmux_strm_unsubscribe,
+       .wake        = qmux_wake,
+       .shut        = qmux_strm_shut,
+       .ctl         = qmux_ctl,
+       .sctl        = qmux_sctl,
+       .show_sd     = qmux_strm_show_sd,
+       .flags = MX_FL_HTX|MX_FL_NO_UPG,
+       .name = "QOS",
+};
+
+static struct mux_proto_list mux_proto_qos =
+  { .token = IST("qos"), .mode = PROTO_MODE_HTTP, .side = PROTO_SIDE_FE, .mux = &qmux_qos_ops };
+
+INITCALL1(STG_REGISTER, register_mux_proto, &mux_proto_qos);