]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MAJOR: mux-quic: support pacing emission
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Fri, 25 Oct 2024 14:55:43 +0000 (16:55 +0200)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Thu, 31 Oct 2024 14:35:31 +0000 (15:35 +0100)
include/haproxy/quic_pacing.h
src/mux_quic.c
src/quic_pacing.c

index e760d475ca784a4e8014d650e3bd8cab34fd6ce0..ee536fb2e5dd64dd0503d2eb95fed7b5cf4006ed 100644 (file)
@@ -37,6 +37,8 @@ static inline ullong quic_pacing_ns_pkt(const struct quic_pacer *pacer)
 
 int quic_pacing_expired(const struct quic_pacer *pacer);
 
+enum quic_tx_err quic_pacing_send(struct quic_pacer *pacer, struct quic_conn *qc);
+
 void quic_pacing_sent_done(struct quic_pacer *pacer, int sent);
 
 #endif /* _HAPROXY_QUIC_PACING_H */
index 1d8268c5b11d16a16ea22c4214d6f65c9a41380b..10bfca79c7487faacd621b21be10b28a40f7b910 100644 (file)
@@ -390,6 +390,13 @@ static void qcc_refresh_timeout(struct qcc *qcc)
 
 void qcc_wakeup(struct qcc *qcc)
 {
+       HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
+       tasklet_wakeup(qcc->wait_event.tasklet);
+}
+
+static void qcc_wakeup_pacing(struct qcc *qcc)
+{
+       HA_ATOMIC_OR(&qcc->wait_event.tasklet->state, TASK_F_USR1);
        tasklet_wakeup(qcc->wait_event.tasklet);
 }
 
@@ -2078,18 +2085,22 @@ static int qcc_subscribe_send(struct qcc *qcc)
  *
  * Returns 0 if all data sent with success else non-zero.
  */
-static int qcc_send_frames(struct qcc *qcc, struct list *frms)
+static int qcc_send_frames(struct qcc *qcc, struct list *frms, int stream)
 {
        enum quic_tx_err ret;
+       struct quic_pacer *pacer = NULL;
 
        TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
 
        if (LIST_ISEMPTY(frms)) {
                TRACE_DEVEL("leaving on no frame to send", QMUX_EV_QCC_SEND, qcc->conn);
-               return 1;
+               return -1;
        }
 
-       ret = qc_send_mux(qcc->conn->handle.qc, frms, NULL);
+       if (stream)
+               pacer = &qcc->tx.pacer;
+
+       ret = qc_send_mux(qcc->conn->handle.qc, frms, pacer);
        if (ret == QUIC_TX_ERR_FATAL) {
                TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn);
                qcc_subscribe_send(qcc);
@@ -2099,18 +2110,18 @@ static int qcc_send_frames(struct qcc *qcc, struct list *frms)
        /* If there is frames left at this stage, transport layer is blocked.
         * Subscribe on it to retry later.
         */
-       if (!LIST_ISEMPTY(frms)) {
+       if (!LIST_ISEMPTY(frms) && ret != QUIC_TX_ERR_AGAIN) {
                TRACE_DEVEL("remaining frames to send", QMUX_EV_QCC_SEND, qcc->conn);
                qcc_subscribe_send(qcc);
                goto err;
        }
 
        TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn);
-       return 0;
+       return ret == QUIC_TX_ERR_AGAIN ? 1 : 0;
 
  err:
        TRACE_DEVEL("leaving on error", QMUX_EV_QCC_SEND, qcc->conn);
-       return 1;
+       return -1;
 }
 
 /* Emit a RESET_STREAM on <qcs>.
@@ -2135,7 +2146,7 @@ static int qcs_send_reset(struct qcs *qcs)
        frm->reset_stream.final_size = qcs->tx.fc.off_real;
 
        LIST_APPEND(&frms, &frm->list);
-       if (qcc_send_frames(qcs->qcc, &frms)) {
+       if (qcc_send_frames(qcs->qcc, &frms, 0)) {
                if (!LIST_ISEMPTY(&frms))
                        qc_frm_free(qcs->qcc->conn->handle.qc, &frm);
                TRACE_DEVEL("cannot send RESET_STREAM", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
@@ -2186,7 +2197,7 @@ static int qcs_send_stop_sending(struct qcs *qcs)
        frm->stop_sending.app_error_code = qcs->err;
 
        LIST_APPEND(&frms, &frm->list);
-       if (qcc_send_frames(qcs->qcc, &frms)) {
+       if (qcc_send_frames(qcs->qcc, &frms, 0)) {
                if (!LIST_ISEMPTY(&frms))
                        qc_frm_free(qcc->conn->handle.qc, &frm);
                TRACE_DEVEL("cannot send STOP_SENDING", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
@@ -2254,11 +2265,12 @@ static int qcs_send(struct qcs *qcs, struct list *frms, uint64_t window_conn)
 static int qcc_io_send(struct qcc *qcc)
 {
        /* Temporary list for QCS on error. */
-       struct list *frms = quic_pacing_frms(&qcc->tx.pacer);
+       struct quic_pacer *pacer = &qcc->tx.pacer;
+       struct list *frms = quic_pacing_frms(pacer);
        struct list qcs_failed = LIST_HEAD_INIT(qcs_failed);
        struct qcs *qcs, *qcs_tmp, *first_qcs = NULL;
        uint64_t window_conn = qfctl_rcap(&qcc->tx.fc);
-       int ret, total = 0, resent;
+       int ret = 0, total = 0, resent;
 
        TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
 
@@ -2268,6 +2280,8 @@ static int qcc_io_send(struct qcc *qcc)
         * apply for STREAM frames.
         */
 
+       quic_pacing_reset(pacer);
+
        /* Check for transport error. */
        if (qcc->flags & QC_CF_ERR_CONN || qcc->conn->flags & CO_FL_ERROR) {
                TRACE_DEVEL("connection on error", QMUX_EV_QCC_SEND, qcc->conn);
@@ -2292,7 +2306,7 @@ static int qcc_io_send(struct qcc *qcc)
        }
 
        if (!LIST_ISEMPTY(&qcc->lfctl.frms)) {
-               if (qcc_send_frames(qcc, &qcc->lfctl.frms)) {
+               if (qcc_send_frames(qcc, &qcc->lfctl.frms, 0)) {
                        TRACE_DEVEL("flow-control frames rejected by transport, aborting send", QMUX_EV_QCC_SEND, qcc->conn);
                        goto out;
                }
@@ -2368,10 +2382,15 @@ static int qcc_io_send(struct qcc *qcc)
                }
        }
 
+       if (!LIST_ISEMPTY(frms) && !quic_pacing_expired(&qcc->tx.pacer)) {
+               qcc_wakeup_pacing(qcc);
+               return 1;
+       }
+
        /* Retry sending until no frame to send, data rejected or connection
         * flow-control limit reached.
         */
-       while ((ret = qcc_send_frames(qcc, frms)) == 0 && !qfctl_rblocked(&qcc->tx.fc)) {
+       while ((ret = qcc_send_frames(qcc, frms, 1)) == 0 && !qfctl_rblocked(&qcc->tx.fc)) {
                window_conn = qfctl_rcap(&qcc->tx.fc);
                resent = 0;
 
@@ -2402,8 +2421,13 @@ static int qcc_io_send(struct qcc *qcc)
        }
 
  sent_done:
-       /* Deallocate frames that the transport layer has rejected. */
-       quic_pacing_reset(&qcc->tx.pacer);
+       if (ret == 1) {
+               qcc_wakeup_pacing(qcc);
+       }
+       else if (!LIST_ISEMPTY(quic_pacing_frms(pacer))) {
+               /* Deallocate frames that the transport layer has rejected. */
+               quic_pacing_reset(pacer);
+       }
 
        /* Re-insert on-error QCS at the end of the send-list. */
        if (!LIST_ISEMPTY(&qcs_failed)) {
@@ -2750,12 +2774,39 @@ static void qcc_release(struct qcc *qcc)
        TRACE_LEAVE(QMUX_EV_QCC_END);
 }
 
+static void qcc_purge_sending(struct qcc *qcc)
+{
+       struct quic_conn *qc = qcc->conn->handle.qc;
+       struct quic_pacer *pacer = &qcc->tx.pacer;
+       enum quic_tx_err ret;
+
+       ret = quic_pacing_send(pacer, qc);
+       if (ret == QUIC_TX_ERR_AGAIN) {
+               BUG_ON(LIST_ISEMPTY(quic_pacing_frms(pacer)));
+               qcc_wakeup_pacing(qcc);
+       }
+       else if (ret == QUIC_TX_ERR_FATAL) {
+               TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn);
+               HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
+               qcc_subscribe_send(qcc);
+       }
+       else {
+               if (!LIST_ISEMPTY(quic_pacing_frms(pacer)))
+                       qcc_subscribe_send(qcc);
+       }
+}
+
 struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status)
 {
        struct qcc *qcc = ctx;
 
        TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
 
+       if (status & TASK_F_USR1) {
+               qcc_purge_sending(qcc);
+               return NULL;
+       }
+
        if (!(qcc->wait_event.events & SUB_RETRY_SEND))
                qcc_io_send(qcc);
 
index df3fedb585248ef7e3556f734dbb8f8f06bbd06f..8a1e33f23cbfbe1d2919db07a7d08334a0285d98 100644 (file)
@@ -9,6 +9,20 @@ int quic_pacing_expired(const struct quic_pacer *pacer)
        return !pacer->next || pacer->next <= now_mono_time();
 }
 
+enum quic_tx_err quic_pacing_send(struct quic_pacer *pacer, struct quic_conn *qc)
+{
+       enum quic_tx_err ret;
+
+       if (!quic_pacing_expired(pacer))
+               return QUIC_TX_ERR_AGAIN;
+
+       BUG_ON(LIST_ISEMPTY(&pacer->frms));
+       ret = qc_send_mux(qc, &pacer->frms, pacer);
+
+       /* TODO handle QUIC_TX_ERR_FATAL */
+       return ret;
+}
+
 void quic_pacing_sent_done(struct quic_pacer *pacer, int sent)
 {
        pacer->next = now_mono_time() + quic_pacing_ns_pkt(pacer) * sent;