From 780b890b0e130de89df3b9fca66f6c78a1c00358 Mon Sep 17 00:00:00 2001 From: Amaury Denoyelle Date: Thu, 31 Oct 2024 09:45:45 +0000 Subject: [PATCH] TMP ms pacing with budget calcul --- include/haproxy/quic_pacing-t.h | 10 ++-- include/haproxy/quic_pacing.h | 18 +++++-- src/mux_quic.c | 79 ++++++++++++++++++++++-------- src/quic_pacing.c | 87 ++++++++++++--------------------- src/quic_tx.c | 9 +++- 5 files changed, 116 insertions(+), 87 deletions(-) diff --git a/include/haproxy/quic_pacing-t.h b/include/haproxy/quic_pacing-t.h index 6039acc49e..0c496de603 100644 --- a/include/haproxy/quic_pacing-t.h +++ b/include/haproxy/quic_pacing-t.h @@ -7,10 +7,14 @@ struct quic_pacer { struct list frms; const struct quic_cc_path *path; + //int next; + //unsigned int curr; + //int pkt_ms; + //int sent; + int burst; + int budget; + int last_sent; int next; - unsigned int curr; - int pkt_ms; - int sent; }; #endif /* _HAPROXY_QUIC_PACING_T_H */ diff --git a/include/haproxy/quic_pacing.h b/include/haproxy/quic_pacing.h index ee137ae8f1..5f7d6f2e48 100644 --- a/include/haproxy/quic_pacing.h +++ b/include/haproxy/quic_pacing.h @@ -13,12 +13,18 @@ static inline void quic_pacing_init(struct quic_pacer *pacer, LIST_INIT(&pacer->frms); pacer->path = path; //pacer->next = TICK_ETERNITY; - pacer->next = now_ms; + //pacer->next = now_ms; //pacer->curr = now_ms; - pacer->curr = TICK_ETERNITY; - pacer->pkt_ms = 0; - pacer->sent = 0; + //pacer->curr = TICK_ETERNITY; + //pacer->pkt_ms = 0; + //pacer->sent = 0; + + pacer->last_sent = now_ms; + //pacer->budget = global.tune.quic_frontend_max_tx_burst; + pacer->budget = 0; + pacer->burst = global.tune.quic_frontend_max_tx_burst; + pacer->next = TICK_ETERNITY; } static inline void quic_pacing_reset(struct quic_pacer *pacer) @@ -47,7 +53,7 @@ static inline int quic_pacing_ns_pkt(const struct quic_pacer *pacer, int sent) return (pacer->path->cwnd / (pacer->path->mtu + 1)) / (pacer->path->loss.srtt + 1) + 1; } -int quic_pacing_expired(const struct quic_pacer *pacer); +//int quic_pacing_expired(const struct quic_pacer *pacer); enum quic_tx_err quic_pacing_send(struct quic_pacer *pacer, struct quic_conn *qc); @@ -56,4 +62,6 @@ int quic_pacing_prepare(struct quic_pacer *pacer); //void quic_pacing_sent_done(struct quic_pacer *pacer, int sent); int quic_pacing_sent_done(struct quic_pacer *pacer, int sent, enum quic_tx_err err); +int quic_pacing_next(struct quic_pacer *pacer); + #endif /* _HAPROXY_QUIC_PACING_H */ diff --git a/src/mux_quic.c b/src/mux_quic.c index 95ebf48805..f877d8299a 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -277,6 +277,7 @@ static void qcc_refresh_timeout(struct qcc *qcc) void qcc_wakeup(struct qcc *qcc) { + TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn); HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1); tasklet_wakeup(qcc->wait_event.tasklet); @@ -287,12 +288,23 @@ void qcc_wakeup(struct qcc *qcc) static void qcc_wakeup_pacing(struct qcc *qcc) { + TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn); + BUG_ON(LIST_ISEMPTY(&qcc->tx.pacer.frms)); HA_ATOMIC_OR(&qcc->wait_event.tasklet->state, TASK_F_USR1); tasklet_wakeup(qcc->wait_event.tasklet); - //qcc->task->expire = qcc->tx.pacer.next; - //BUG_ON(tick_is_expired(qcc->task->expire, now_ms)); - //task_queue(qcc->task); - //TRACE_POINT(QMUX_EV_STRM_WAKE, qcc->conn); + + qcc->task->expire = TICK_ETERNITY; + task_queue(qcc->task); +} + +static void qcc_task_pacing(struct qcc *qcc) +{ + TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn); + //HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1); + qcc->task->expire = now_ms == qcc->tx.pacer.next ? tick_add(qcc->tx.pacer.next, 1) : qcc->tx.pacer.next; + BUG_ON(tick_is_expired(qcc->task->expire, now_ms)); + task_queue(qcc->task); + TRACE_POINT(QMUX_EV_STRM_WAKE, qcc->conn); } /* Mark a stream as open if it was idle. This can be used on every @@ -2176,6 +2188,7 @@ static int qcc_io_send(struct qcc *qcc) */ quic_pacing_reset(pacer); + //HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1); /* Check for transport error. */ if (qcc->flags & QC_CF_ERR_CONN || qcc->conn->flags & CO_FL_ERROR) { @@ -2277,9 +2290,17 @@ static int qcc_io_send(struct qcc *qcc) } } - if (!LIST_ISEMPTY(frms) && !quic_pacing_expired(&qcc->tx.pacer)) { - qcc_wakeup_pacing(qcc); - return 1; + //if (!LIST_ISEMPTY(frms) && !quic_pacing_expired(&qcc->tx.pacer)) { + if (!LIST_ISEMPTY(frms)) { + if (!qcc->tx.pacer.budget) { + qcc->tx.pacer.next = tick_add(now_ms, quic_pacing_next(pacer)); + //fprintf(stderr, "wait for %ldms\n", qcc->tx.pacer.burst * qcc->tx.pacer.path->loss.srtt * qcc->tx.pacer.path->mtu / qcc->tx.pacer.path->cwnd); + qcc_task_pacing(qcc); + return 1; + } + //else { + // qcc_wakeup_pacing(qcc); + //} } /* Retry sending until no frame to send, data rejected or connection @@ -2317,11 +2338,14 @@ static int qcc_io_send(struct qcc *qcc) sent_done: if (ret == 1) { + qcc->tx.pacer.next = tick_add(now_ms, quic_pacing_next(pacer)); + //fprintf(stderr, "wait for %ldms\n", pacer->burst * pacer->path->loss.srtt * pacer->path->mtu / pacer->path->cwnd); qcc_wakeup_pacing(qcc); } else if (!LIST_ISEMPTY(quic_pacing_frms(pacer))) { /* Deallocate frames that the transport layer has rejected. */ quic_pacing_reset(pacer); + //HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1); } /* Re-insert on-error QCS at the end of the send-list. */ @@ -2643,6 +2667,7 @@ static void qcc_release(struct qcc *qcc) } quic_pacing_reset(&qcc->tx.pacer); + //HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1); if (qcc->app_ops && qcc->app_ops->release) qcc->app_ops->release(qcc->ctx); @@ -2679,7 +2704,7 @@ static int qcc_purge_sending(struct qcc *qcc) if (ret == QUIC_TX_ERR_AGAIN) { BUG_ON(LIST_ISEMPTY(quic_pacing_frms(pacer))); TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn); - qcc_wakeup_pacing(qcc); + //qcc_wakeup_pacing(qcc); return 1; } else if (ret == QUIC_TX_ERR_FATAL) { @@ -2693,6 +2718,8 @@ static int qcc_purge_sending(struct qcc *qcc) TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn); if (!LIST_ISEMPTY(quic_pacing_frms(pacer))) qcc_subscribe_send(qcc); + //else + // HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1); return 0; } } @@ -2704,8 +2731,18 @@ struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status) TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn); if (status & TASK_F_USR1) { + ++activity[tid].ctr0; + //HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1); //ABORT_NOW(); - qcc_purge_sending(qcc); + if (qcc_purge_sending(qcc)) { + if (!qcc->tx.pacer.budget) { + qcc->tx.pacer.next = tick_add(now_ms, quic_pacing_next(&qcc->tx.pacer)); + //fprintf(stderr, "wait for %ldms\n", qcc->tx.pacer.burst * qcc->tx.pacer.path->loss.srtt * qcc->tx.pacer.path->mtu / qcc->tx.pacer.path->cwnd); + qcc_task_pacing(qcc); + } + else + qcc_wakeup_pacing(qcc); + } return NULL; } @@ -2745,21 +2782,21 @@ static struct task *qcc_timeout_task(struct task *t, void *ctx, unsigned int sta goto requeue; } //fprintf(stderr, "woken up after %dms\n", now_ms - qcc->tx.pacer.next); - -#if 0 - if (!qcc_may_expire(qcc)) { - TRACE_DEVEL("cannot expired", QMUX_EV_QCC_WAKE, qcc->conn); - t->expire = TICK_ETERNITY; - goto requeue; - } -#endif } + ++activity[tid].ctr1; if (qcc_purge_sending(qcc)) { - qcc->task->expire = qcc->tx.pacer.next; - BUG_ON(tick_is_expired(qcc->task->expire, now_ms)); - TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn); - goto requeue; + //qcc->task->expire = qcc->tx.pacer.next; + if (!qcc->tx.pacer.budget) { + qcc->tx.pacer.next = tick_add(now_ms, quic_pacing_next(&qcc->tx.pacer)); + qcc->task->expire = now_ms == qcc->tx.pacer.next ? tick_add(qcc->tx.pacer.next, 1) : qcc->tx.pacer.next; + BUG_ON(tick_is_expired(qcc->task->expire, now_ms)); + TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn); + goto requeue; + } + else { + qcc_wakeup_pacing(qcc); + } } t->expire = TICK_ETERNITY; goto requeue; diff --git a/src/quic_pacing.c b/src/quic_pacing.c index 8ed8638b68..139e6da0cc 100644 --- a/src/quic_pacing.c +++ b/src/quic_pacing.c @@ -5,23 +5,24 @@ struct quic_conn; -int quic_pacing_expired(const struct quic_pacer *pacer) -{ - //return !pacer->next || pacer->next <= now_mono_time(); - //return !pacer->next || pacer->next <= now_ms; - return tick_is_expired(pacer->next, now_ms); -} +//int quic_pacing_expired(const struct quic_pacer *pacer) +//{ +// //return !pacer->next || pacer->next <= now_mono_time(); +// //return !pacer->next || pacer->next <= now_ms; +// return tick_is_expired(pacer->next, now_ms); +//} enum quic_tx_err quic_pacing_send(struct quic_pacer *pacer, struct quic_conn *qc) { enum quic_tx_err ret; - if (!quic_pacing_expired(pacer)) - return QUIC_TX_ERR_AGAIN; + //if (!quic_pacing_expired(pacer)) + //if (!pacer->budget) + // return QUIC_TX_ERR_AGAIN; BUG_ON(LIST_ISEMPTY(&pacer->frms)); ret = qc_send_mux(qc, &pacer->frms, pacer); - BUG_ON(ret == QUIC_TX_ERR_AGAIN && tick_is_expired(pacer->next, now_ms)); + //BUG_ON(ret == QUIC_TX_ERR_AGAIN && tick_is_expired(pacer->next, now_ms)); /* TODO handle QUIC_TX_ERR_FATAL */ return ret; @@ -29,60 +30,34 @@ enum quic_tx_err quic_pacing_send(struct quic_pacer *pacer, struct quic_conn *qc int quic_pacing_prepare(struct quic_pacer *pacer) { - if (pacer->curr == now_ms) { - BUG_ON(pacer->sent > pacer->pkt_ms); - return pacer->pkt_ms - pacer->sent; - } - else { - int not_consumed = pacer->pkt_ms - pacer->sent; - BUG_ON(not_consumed < 0); - //if (not_consumed) - // fprintf(stderr, "not consumed %d (%d - %d)\n", not_consumed, pacer->pkt_ms, pacer->sent); + int idle = tick_remain(pacer->last_sent, now_ms); + int pkts = idle * pacer->path->cwnd / (pacer->path->loss.srtt * pacer->path->mtu + 1); - pacer->curr = now_ms; - pacer->sent = 0; - pacer->pkt_ms = quic_pacing_ns_pkt(pacer, 0); - //pacer->pkt_ms = quic_pacing_ns_pkt(pacer, 0) + not_consumed; + TRACE_POINT(QMUX_EV_QCC_WAKE, NULL); - BUG_ON(!pacer->pkt_ms); - return pacer->pkt_ms; + pacer->budget += pkts; + if (pacer->budget > pacer->burst * 2) { + TRACE_POINT(QMUX_EV_QCC_WAKE, NULL); + pacer->budget = pacer->burst * 2; } + //fprintf(stderr, "prepare = %d %d/%d\n", pkts, pacer->budget, pacer->burst); + return MIN(pacer->budget, pacer->burst); +} +int quic_pacing_next(struct quic_pacer *pacer) +{ + //return (pacer->burst / 4) * pacer->path->loss.srtt * pacer->path->mtu / pacer->path->cwnd; + return 1; } int quic_pacing_sent_done(struct quic_pacer *pacer, int sent, enum quic_tx_err err) { - //const int pkt_ms = quic_pacing_ns_pkt(pacer, 1); - -#if 0 - if (pacer->curr == now_ms) { - pacer->sent += sent; - } - else { - int not_consumed = pkt_ms - pacer->sent; - if (not_consumed < 0) - not_consumed = 0; - if (not_consumed) - fprintf(stderr, "not consumed %d (%d - %d)\n", not_consumed, pkt_ms, pacer->sent); - - //pacer->sent = 0; - //pacer->sent -= not_consumed; - - pacer->curr = now_ms; - pacer->sent = sent; - } -#endif - BUG_ON(pacer->curr != now_ms); - pacer->sent += sent; - - if (pacer->sent >= pacer->pkt_ms) { - //pacer->next = tick_add(now_ms, 1); - pacer->next = tick_add(now_ms, MAX((pacer->sent / pacer->pkt_ms), 1)); - BUG_ON(tick_is_expired(pacer->next, now_ms)); - //fprintf(stderr, "pacing in %dms (%d / %d)\n", pacer->sent / pkt_ms, pacer->sent, pkt_ms); - return 1; - } - else { - return 0; + BUG_ON(sent > pacer->budget); + TRACE_POINT(QMUX_EV_QCC_WAKE, NULL); + pacer->budget -= sent; + if (sent) { + TRACE_POINT(QMUX_EV_QCC_WAKE, NULL); + pacer->last_sent = now_ms; } + return 0; } diff --git a/src/quic_tx.c b/src/quic_tx.c index e1a4c38bc7..ba268263a4 100644 --- a/src/quic_tx.c +++ b/src/quic_tx.c @@ -513,9 +513,12 @@ enum quic_tx_err qc_send_mux(struct quic_conn *qc, struct list *frms, } #endif max_dgram = quic_pacing_prepare(pacer); - BUG_ON(!max_dgram); - if (!max_dgram) + //BUG_ON(!max_dgram); + if (!max_dgram) { + pacer->next = tick_add(now_ms, quic_pacing_next(pacer)); + //fprintf(stderr, "wait for %ldms\n", pacer->burst * pacer->path->loss.srtt * pacer->path->mtu / pacer->path->cwnd); return QUIC_TX_ERR_AGAIN; + } } TRACE_STATE("preparing data (from MUX)", QUIC_EV_CONN_TXPKT, qc); @@ -638,6 +641,7 @@ static int qc_prep_pkts(struct quic_conn *qc, struct buffer *buf, // goto out; // } //} + BUG_ON(max_dgrams && dgram_cnt > max_dgrams); if (max_dgrams && dgram_cnt == max_dgrams) { BUG_ON(LIST_ISEMPTY(frms)); TRACE_PROTO("reached max allowed built datagrams", QUIC_EV_CONN_PHPKTS, qc, qel); @@ -890,6 +894,7 @@ int qc_send(struct quic_conn *qc, int old_data, struct list *send_list, } ret += prep_pkts; + BUG_ON(max_dgrams && ret > max_dgrams); if (max_dgrams && ret == max_dgrams && !LIST_ISEMPTY(send_list)) { TRACE_DEVEL("stopping for artificial pacing", QUIC_EV_CONN_TXPKT, qc); break; -- 2.47.2