From: Amaury Denoyelle Date: Fri, 25 Oct 2024 14:55:43 +0000 (+0200) Subject: MAJOR: mux-quic: support pacing emission X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=2ba0856739a16236e5d90c206f1ad4fe0f4f7cfe;p=thirdparty%2Fhaproxy.git MAJOR: mux-quic: support pacing emission --- diff --git a/include/haproxy/quic_pacing.h b/include/haproxy/quic_pacing.h index e760d475ca..ee536fb2e5 100644 --- a/include/haproxy/quic_pacing.h +++ b/include/haproxy/quic_pacing.h @@ -37,6 +37,8 @@ static inline ullong quic_pacing_ns_pkt(const struct quic_pacer *pacer) int quic_pacing_expired(const struct quic_pacer *pacer); +enum quic_tx_err quic_pacing_send(struct quic_pacer *pacer, struct quic_conn *qc); + void quic_pacing_sent_done(struct quic_pacer *pacer, int sent); #endif /* _HAPROXY_QUIC_PACING_H */ diff --git a/src/mux_quic.c b/src/mux_quic.c index 1d8268c5b1..10bfca79c7 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -390,6 +390,13 @@ static void qcc_refresh_timeout(struct qcc *qcc) void qcc_wakeup(struct qcc *qcc) { + HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1); + tasklet_wakeup(qcc->wait_event.tasklet); +} + +static void qcc_wakeup_pacing(struct qcc *qcc) +{ + HA_ATOMIC_OR(&qcc->wait_event.tasklet->state, TASK_F_USR1); tasklet_wakeup(qcc->wait_event.tasklet); } @@ -2078,18 +2085,22 @@ static int qcc_subscribe_send(struct qcc *qcc) * * Returns 0 if all data sent with success else non-zero. */ -static int qcc_send_frames(struct qcc *qcc, struct list *frms) +static int qcc_send_frames(struct qcc *qcc, struct list *frms, int stream) { enum quic_tx_err ret; + struct quic_pacer *pacer = NULL; TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn); if (LIST_ISEMPTY(frms)) { TRACE_DEVEL("leaving on no frame to send", QMUX_EV_QCC_SEND, qcc->conn); - return 1; + return -1; } - ret = qc_send_mux(qcc->conn->handle.qc, frms, NULL); + if (stream) + pacer = &qcc->tx.pacer; + + ret = qc_send_mux(qcc->conn->handle.qc, frms, pacer); if (ret == QUIC_TX_ERR_FATAL) { TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn); qcc_subscribe_send(qcc); @@ -2099,18 +2110,18 @@ static int qcc_send_frames(struct qcc *qcc, struct list *frms) /* If there is frames left at this stage, transport layer is blocked. * Subscribe on it to retry later. */ - if (!LIST_ISEMPTY(frms)) { + if (!LIST_ISEMPTY(frms) && ret != QUIC_TX_ERR_AGAIN) { TRACE_DEVEL("remaining frames to send", QMUX_EV_QCC_SEND, qcc->conn); qcc_subscribe_send(qcc); goto err; } TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn); - return 0; + return ret == QUIC_TX_ERR_AGAIN ? 1 : 0; err: TRACE_DEVEL("leaving on error", QMUX_EV_QCC_SEND, qcc->conn); - return 1; + return -1; } /* Emit a RESET_STREAM on . @@ -2135,7 +2146,7 @@ static int qcs_send_reset(struct qcs *qcs) frm->reset_stream.final_size = qcs->tx.fc.off_real; LIST_APPEND(&frms, &frm->list); - if (qcc_send_frames(qcs->qcc, &frms)) { + if (qcc_send_frames(qcs->qcc, &frms, 0)) { if (!LIST_ISEMPTY(&frms)) qc_frm_free(qcs->qcc->conn->handle.qc, &frm); TRACE_DEVEL("cannot send RESET_STREAM", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs); @@ -2186,7 +2197,7 @@ static int qcs_send_stop_sending(struct qcs *qcs) frm->stop_sending.app_error_code = qcs->err; LIST_APPEND(&frms, &frm->list); - if (qcc_send_frames(qcs->qcc, &frms)) { + if (qcc_send_frames(qcs->qcc, &frms, 0)) { if (!LIST_ISEMPTY(&frms)) qc_frm_free(qcc->conn->handle.qc, &frm); TRACE_DEVEL("cannot send STOP_SENDING", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs); @@ -2254,11 +2265,12 @@ static int qcs_send(struct qcs *qcs, struct list *frms, uint64_t window_conn) static int qcc_io_send(struct qcc *qcc) { /* Temporary list for QCS on error. */ - struct list *frms = quic_pacing_frms(&qcc->tx.pacer); + struct quic_pacer *pacer = &qcc->tx.pacer; + struct list *frms = quic_pacing_frms(pacer); struct list qcs_failed = LIST_HEAD_INIT(qcs_failed); struct qcs *qcs, *qcs_tmp, *first_qcs = NULL; uint64_t window_conn = qfctl_rcap(&qcc->tx.fc); - int ret, total = 0, resent; + int ret = 0, total = 0, resent; TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn); @@ -2268,6 +2280,8 @@ static int qcc_io_send(struct qcc *qcc) * apply for STREAM frames. */ + quic_pacing_reset(pacer); + /* Check for transport error. */ if (qcc->flags & QC_CF_ERR_CONN || qcc->conn->flags & CO_FL_ERROR) { TRACE_DEVEL("connection on error", QMUX_EV_QCC_SEND, qcc->conn); @@ -2292,7 +2306,7 @@ static int qcc_io_send(struct qcc *qcc) } if (!LIST_ISEMPTY(&qcc->lfctl.frms)) { - if (qcc_send_frames(qcc, &qcc->lfctl.frms)) { + if (qcc_send_frames(qcc, &qcc->lfctl.frms, 0)) { TRACE_DEVEL("flow-control frames rejected by transport, aborting send", QMUX_EV_QCC_SEND, qcc->conn); goto out; } @@ -2368,10 +2382,15 @@ static int qcc_io_send(struct qcc *qcc) } } + if (!LIST_ISEMPTY(frms) && !quic_pacing_expired(&qcc->tx.pacer)) { + qcc_wakeup_pacing(qcc); + return 1; + } + /* Retry sending until no frame to send, data rejected or connection * flow-control limit reached. */ - while ((ret = qcc_send_frames(qcc, frms)) == 0 && !qfctl_rblocked(&qcc->tx.fc)) { + while ((ret = qcc_send_frames(qcc, frms, 1)) == 0 && !qfctl_rblocked(&qcc->tx.fc)) { window_conn = qfctl_rcap(&qcc->tx.fc); resent = 0; @@ -2402,8 +2421,13 @@ static int qcc_io_send(struct qcc *qcc) } sent_done: - /* Deallocate frames that the transport layer has rejected. */ - quic_pacing_reset(&qcc->tx.pacer); + if (ret == 1) { + qcc_wakeup_pacing(qcc); + } + else if (!LIST_ISEMPTY(quic_pacing_frms(pacer))) { + /* Deallocate frames that the transport layer has rejected. */ + quic_pacing_reset(pacer); + } /* Re-insert on-error QCS at the end of the send-list. */ if (!LIST_ISEMPTY(&qcs_failed)) { @@ -2750,12 +2774,39 @@ static void qcc_release(struct qcc *qcc) TRACE_LEAVE(QMUX_EV_QCC_END); } +static void qcc_purge_sending(struct qcc *qcc) +{ + struct quic_conn *qc = qcc->conn->handle.qc; + struct quic_pacer *pacer = &qcc->tx.pacer; + enum quic_tx_err ret; + + ret = quic_pacing_send(pacer, qc); + if (ret == QUIC_TX_ERR_AGAIN) { + BUG_ON(LIST_ISEMPTY(quic_pacing_frms(pacer))); + qcc_wakeup_pacing(qcc); + } + else if (ret == QUIC_TX_ERR_FATAL) { + TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn); + HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1); + qcc_subscribe_send(qcc); + } + else { + if (!LIST_ISEMPTY(quic_pacing_frms(pacer))) + qcc_subscribe_send(qcc); + } +} + struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status) { struct qcc *qcc = ctx; TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn); + if (status & TASK_F_USR1) { + qcc_purge_sending(qcc); + return NULL; + } + if (!(qcc->wait_event.events & SUB_RETRY_SEND)) qcc_io_send(qcc); diff --git a/src/quic_pacing.c b/src/quic_pacing.c index df3fedb585..8a1e33f23c 100644 --- a/src/quic_pacing.c +++ b/src/quic_pacing.c @@ -9,6 +9,20 @@ int quic_pacing_expired(const struct quic_pacer *pacer) return !pacer->next || pacer->next <= now_mono_time(); } +enum quic_tx_err quic_pacing_send(struct quic_pacer *pacer, struct quic_conn *qc) +{ + enum quic_tx_err ret; + + if (!quic_pacing_expired(pacer)) + return QUIC_TX_ERR_AGAIN; + + BUG_ON(LIST_ISEMPTY(&pacer->frms)); + ret = qc_send_mux(qc, &pacer->frms, pacer); + + /* TODO handle QUIC_TX_ERR_FATAL */ + return ret; +} + void quic_pacing_sent_done(struct quic_pacer *pacer, int sent) { pacer->next = now_mono_time() + quic_pacing_ns_pkt(pacer) * sent;