#include <haproxy/quic_enc.h>
#include <haproxy/quic_fctl.h>
#include <haproxy/quic_frame.h>
+#include <haproxy/quic_pacing.h>
#include <haproxy/quic_sock.h>
#include <haproxy/quic_stream.h>
#include <haproxy/quic_tp-t.h>
static void qmux_ctrl_send(struct qc_stream_desc *, uint64_t data, uint64_t offset);
static void qmux_ctrl_room(struct qc_stream_desc *, uint64_t room);
+/* Returns true if pacing should be used for <conn> connection. */
+static int qcc_is_pacing_active(const struct connection *conn)
+{
+ const struct quic_conn *qc = conn->handle.qc;
+ return !!(qc->path->cc.algo->pacing_rate);
+}
+
/* Free <qcc> STREAM frames in Tx list. */
static void qcc_tx_frms_free(struct qcc *qcc)
{
void qcc_wakeup(struct qcc *qcc)
{
+ HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
+ tasklet_wakeup(qcc->wait_event.tasklet);
+}
+
+static void qcc_wakeup_pacing(struct qcc *qcc)
+{
+ HA_ATOMIC_OR(&qcc->wait_event.tasklet->state, TASK_F_USR1);
tasklet_wakeup(qcc->wait_event.tasklet);
}
/* Wrapper for send on transport layer. Send a list of frames <frms> for the
* connection <qcc>.
*
- * Returns 0 if all data sent with success else non-zero.
+ * Returns 0 if all data sent with success. On fatal error, a negative error
+ * code is returned. A positive 1 is used if emission should be paced.
*/
-static int qcc_send_frames(struct qcc *qcc, struct list *frms)
+static int qcc_send_frames(struct qcc *qcc, struct list *frms, int stream)
{
enum quic_tx_err ret;
+ struct quic_pacer *pacer = NULL;
TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
if (LIST_ISEMPTY(frms)) {
TRACE_DEVEL("leaving on no frame to send", QMUX_EV_QCC_SEND, qcc->conn);
- return 1;
+ return -1;
}
- ret = qc_send_mux(qcc->conn->handle.qc, frms, NULL);
+ if (stream && qcc_is_pacing_active(qcc->conn))
+ pacer = &qcc->tx.pacer;
+
+ ret = qc_send_mux(qcc->conn->handle.qc, frms, pacer);
if (ret == QUIC_TX_ERR_FATAL) {
TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn);
qcc_subscribe_send(qcc);
/* If there is frames left at this stage, transport layer is blocked.
* Subscribe on it to retry later.
*/
- if (!LIST_ISEMPTY(frms)) {
+ if (!LIST_ISEMPTY(frms) && ret != QUIC_TX_ERR_PACING) {
TRACE_DEVEL("remaining frames to send", QMUX_EV_QCC_SEND, qcc->conn);
qcc_subscribe_send(qcc);
goto err;
}
TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn);
- return 0;
+ return ret == QUIC_TX_ERR_PACING ? 1 : 0;
err:
TRACE_DEVEL("leaving on error", QMUX_EV_QCC_SEND, qcc->conn);
- return 1;
+ return -1;
}
/* Emit a RESET_STREAM on <qcs>.
frm->reset_stream.final_size = qcs->tx.fc.off_real;
LIST_APPEND(&frms, &frm->list);
- if (qcc_send_frames(qcs->qcc, &frms)) {
+ if (qcc_send_frames(qcs->qcc, &frms, 0)) {
if (!LIST_ISEMPTY(&frms))
qc_frm_free(qcs->qcc->conn->handle.qc, &frm);
TRACE_DEVEL("cannot send RESET_STREAM", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
frm->stop_sending.app_error_code = qcs->err;
LIST_APPEND(&frms, &frm->list);
- if (qcc_send_frames(qcs->qcc, &frms)) {
+ if (qcc_send_frames(qcs->qcc, &frms, 0)) {
if (!LIST_ISEMPTY(&frms))
qc_frm_free(qcc->conn->handle.qc, &frm);
TRACE_DEVEL("cannot send STOP_SENDING", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
struct list qcs_failed = LIST_HEAD_INIT(qcs_failed);
struct qcs *qcs, *qcs_tmp, *first_qcs = NULL;
uint64_t window_conn = qfctl_rcap(&qcc->tx.fc);
- int ret, total = 0, resent;
+ int ret = 0, total = 0, resent;
TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
* apply for STREAM frames.
*/
+ qcc_tx_frms_free(qcc);
+
/* Check for transport error. */
if (qcc->flags & QC_CF_ERR_CONN || qcc->conn->flags & CO_FL_ERROR) {
TRACE_DEVEL("connection on error", QMUX_EV_QCC_SEND, qcc->conn);
}
if (!LIST_ISEMPTY(&qcc->lfctl.frms)) {
- if (qcc_send_frames(qcc, &qcc->lfctl.frms)) {
+ if (qcc_send_frames(qcc, &qcc->lfctl.frms, 0)) {
TRACE_DEVEL("flow-control frames rejected by transport, aborting send", QMUX_EV_QCC_SEND, qcc->conn);
goto out;
}
}
}
+ if (qcc_is_pacing_active(qcc->conn)) {
+ if (!LIST_ISEMPTY(frms) && !quic_pacing_expired(&qcc->tx.pacer)) {
+ qcc_wakeup_pacing(qcc);
+ return 1;
+ }
+ }
+
/* Retry sending until no frame to send, data rejected or connection
* flow-control limit reached.
*/
- while (qcc_send_frames(qcc, frms) == 0 && !qfctl_rblocked(&qcc->tx.fc)) {
+ while ((ret = qcc_send_frames(qcc, frms, 1)) == 0 && !qfctl_rblocked(&qcc->tx.fc)) {
window_conn = qfctl_rcap(&qcc->tx.fc);
resent = 0;
}
sent_done:
- /* Deallocate frames that the transport layer has rejected. */
- qcc_tx_frms_free(qcc);
+ if (ret == 1) {
+ /* qcc_send_frames cannot return 1 if pacing not used. */
+ BUG_ON(!qcc_is_pacing_active(qcc->conn));
+ qcc_wakeup_pacing(qcc);
+ }
+ else if (!LIST_ISEMPTY(&qcc->tx.frms)) {
+ /* Deallocate frames that the transport layer has rejected. */
+ qcc_tx_frms_free(qcc);
+ }
/* Re-insert on-error QCS at the end of the send-list. */
if (!LIST_ISEMPTY(&qcs_failed)) {
TRACE_LEAVE(QMUX_EV_QCC_END);
}
+static void qcc_purge_sending(struct qcc *qcc)
+{
+ struct quic_pacer *pacer = &qcc->tx.pacer;
+ struct list *frms = &qcc->tx.frms;
+ enum quic_tx_err ret = QUIC_TX_ERR_PACING;
+
+ /* This function is reserved for pacing usage. */
+ BUG_ON(!qcc_is_pacing_active(qcc->conn));
+
+ /* Only restart emission if pacing delay is reached. */
+ if (quic_pacing_expired(pacer))
+ ret = qc_send_mux(qcc->conn->handle.qc, frms, pacer);
+
+ if (ret == QUIC_TX_ERR_PACING) {
+ BUG_ON(LIST_ISEMPTY(frms));
+ qcc_wakeup_pacing(qcc);
+ }
+ else if (ret == QUIC_TX_ERR_FATAL) {
+ TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn);
+ HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
+ qcc_subscribe_send(qcc);
+ }
+ else {
+ if (!LIST_ISEMPTY(frms))
+ qcc_subscribe_send(qcc);
+ }
+}
+
struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status)
{
struct qcc *qcc = ctx;
TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
+ if (status & TASK_F_USR1) {
+ qcc_purge_sending(qcc);
+ return NULL;
+ }
+
if (!(qcc->wait_event.events & SUB_RETRY_SEND))
qcc_io_send(qcc);
qcc->tx.buf_in_flight = 0;
+ if (qcc_is_pacing_active(conn))
+ quic_pacing_init(&qcc->tx.pacer, &conn->handle.qc->path->cc);
+
if (conn_is_back(conn)) {
qcc->next_bidi_l = 0x00;
qcc->largest_bidi_r = 0x01;