It is possible to enable pacing if the algorithm is compatible. This is done
by specifying an optional burst argument as described in the next paragraph.
The purpose of pacing is to smooth emission of data to reduce network losses.
- In some scenario, it can significantly improve network throughput by avoiding
- retransmissions. However, it can also increase CPU usage if haproxy is forced
- to wait too long between each emission. Pacing support is still experimental,
- as such it requires "expose-experimental-directives". The BBR congestion
- control algorithm depends on the pacing support which is in this case
- implicitly enabled by choosing the "bbr" algorithm. Note that haproxy's BBR
- implementation is still considered as experimental and cannot be enabled
- without "expose-experimental-directives".
+ In most scenario, it can significantly improve network throughput by avoiding
+ retransmissions. Pacing support is still experimental, as such it requires
+ "expose-experimental-directives". The BBR congestion control algorithm
+ depends on the pacing support which is in this case implicitly enabled by
+ choosing the "bbr" algorithm. Note that haproxy's BBR implementation is still
+ considered as experimental and cannot be enabled without
+ "expose-experimental-directives".
For further customization, a list of parameters can be specified after the
algorithm token. It must be written between parenthesis, separated by a
return total;
}
+/* Schedule <qcc> after emission was interrupted on pacing. */
+static void qcc_wakeup_pacing(struct qcc *qcc)
+{
+ /* Sleep to be able to reemit at least a single packet */
+ const int inter = qcc->tx.pacer.cc->algo->pacing_inter(qcc->tx.pacer.cc);
+ /* Convert nano to milliseconds rounded up, with 1ms as minimal value. */
+ const int expire = MAX((inter + 999999) / 1000000, 1);
+ qcc->pacing_task->expire = tick_add_ifset(now_ms, MS_TO_TICKS(expire));
+ ++qcc->tx.paced_sent_ctr;
+}
+
/* Proceed to sending. Loop through all available streams for the <qcc>
* instance and try to send as much as possible.
*
* Returns the total of bytes sent to the transport layer.
*/
-static int qcc_io_send(struct qcc *qcc, int after_pacing)
+static int qcc_io_send(struct qcc *qcc)
{
struct list *frms = &qcc->tx.frms;
/* Temporary list for QCS on error. */
TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
+ if (qcc_is_pacing_active(qcc->conn)) {
+ /* Always reset pacing_task timer to prevent unnecessary execution. */
+ qcc->pacing_task->expire = TICK_ETERNITY;
+ }
+
/* TODO if socket in transient error, sending should be temporarily
* disabled for all frames. However, checking for send subscription is
* not valid as this may be caused by a congestion error which only
if (!LIST_ISEMPTY(frms) && qcc_is_pacing_active(qcc->conn)) {
if (!quic_pacing_reload(&qcc->tx.pacer)) {
- if (!after_pacing)
- ++qcc->tx.paced_sent_ctr;
- tasklet_wakeup(qcc->wait_event.tasklet, TASK_F_UEVT1);
+ qcc_wakeup_pacing(qcc);
total = 0;
goto out;
}
if (ret == 1) {
/* qcc_send_frames cannot return 1 if pacing not used. */
BUG_ON(!qcc_is_pacing_active(qcc->conn));
- tasklet_wakeup(qcc->wait_event.tasklet, TASK_F_UEVT1);
- ++qcc->tx.paced_sent_ctr;
+ qcc_wakeup_pacing(qcc);
}
out:
TRACE_STATE("perform graceful shutdown", QMUX_EV_QCC_END, qcc->conn);
if (qcc->app_ops && qcc->app_ops->shutdown) {
qcc->app_ops->shutdown(qcc->ctx);
- qcc_io_send(qcc, 0);
+ qcc_io_send(qcc);
}
else {
qcc->err = quic_err_transport(QC_ERR_NO_ERROR);
if (!qc_test_fd(qcc->conn->handle.qc)) {
TRACE_DEVEL("proxy disabled with listener socket, closing connection", QMUX_EV_QCC_WAKE, qcc->conn);
qcc->conn->flags |= (CO_FL_SOCK_RD_SH|CO_FL_SOCK_WR_SH);
- qcc_io_send(qcc, 0);
+ qcc_io_send(qcc);
goto out;
}
TRACE_ENTER(QMUX_EV_QCC_END, conn);
+ task_destroy(qcc->pacing_task);
+
if (qcc->task) {
task_destroy(qcc->task);
qcc->task = NULL;
{
struct qcc *qcc = ctx;
- /* Check if woken up only for pacing but not yet expired. */
- if ((status & (TASK_F_UEVT1|TASK_WOKEN_ANY)) == TASK_F_UEVT1 &&
- !quic_pacing_expired(&qcc->tx.pacer)) {
- /* hide any trace as no progress should be performed on this invokation. */
- trace_disable();
- }
-
TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
if (!(qcc->wait_event.events & SUB_RETRY_SEND))
- qcc_io_send(qcc, status & TASK_F_UEVT1);
+ qcc_io_send(qcc);
qcc_io_recv(qcc);
qcc_refresh_timeout(qcc);
+ /* Trigger pacing task is emission should be retried after some delay. */
+ if (qcc_is_pacing_active(qcc->conn)) {
+ if (tick_isset(qcc->pacing_task->expire))
+ task_queue(qcc->pacing_task);
+ }
+
TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn);
- trace_resume();
return NULL;
qcc_release(qcc);
TRACE_LEAVE(QMUX_EV_QCC_WAKE);
- trace_resume();
return NULL;
}
+static struct task *qcc_pacing_task(struct task *t, void *ctx, unsigned int state)
+{
+ struct qcc *qcc = ctx;
+ int expired = tick_is_expired(t->expire, now_ms);
+
+ TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
+
+ if (!expired) {
+ if (!tick_isset(t->expire))
+ TRACE_DEVEL("cancelled pacing task", QMUX_EV_QCC_WAKE, qcc->conn);
+ goto requeue;
+ }
+
+ /* Reschedule I/O immediately. */
+ tasklet_wakeup_after(NULL, qcc->wait_event.tasklet);
+
+ requeue:
+ TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn);
+ return t;
+}
+
static struct task *qcc_timeout_task(struct task *t, void *ctx, unsigned int state)
{
struct qcc *qcc = ctx;
static void _qcc_init(struct qcc *qcc)
{
qcc->conn = NULL;
+ qcc->pacing_task = NULL;
qcc->task = NULL;
qcc->wait_event.tasklet = NULL;
qcc->app_ops = NULL;
if (qcc_is_pacing_active(conn)) {
quic_pacing_init(&qcc->tx.pacer, &conn->handle.qc->path->cc);
qcc->tx.paced_sent_ctr = 0;
+
+ /* Initialize pacing_task. */
+ qcc->pacing_task = task_new_here();
+ if (!qcc->pacing_task) {
+ TRACE_ERROR("pacing task alloc failure", QMUX_EV_QCC_NEW);
+ goto err;
+ }
+ qcc->pacing_task->process = qcc_pacing_task;
+ qcc->pacing_task->context = qcc;
+ qcc->pacing_task->expire = TICK_ETERNITY;
+ qcc->pacing_task->state |= TASK_F_WANTS_TIME;
}
if (conn_is_back(conn)) {