From: Amaury Denoyelle Date: Thu, 31 Oct 2024 09:32:44 +0000 (+0100) Subject: TMP X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=refs%2Fheads%2Fade-quic-implement-pacing-ms;p=thirdparty%2Fhaproxy.git TMP --- diff --git a/include/haproxy/quic_pacing-t.h b/include/haproxy/quic_pacing-t.h index 25e6eecdba..c12203b912 100644 --- a/include/haproxy/quic_pacing-t.h +++ b/include/haproxy/quic_pacing-t.h @@ -7,7 +7,10 @@ struct quic_pacer { struct list frms; const struct quic_cc_path *path; - ullong next; + + unsigned int curr; + unsigned int next; + int sent; }; #endif /* _HAPROXY_QUIC_PACING_T_H */ diff --git a/include/haproxy/quic_pacing.h b/include/haproxy/quic_pacing.h index ee536fb2e5..10e869b954 100644 --- a/include/haproxy/quic_pacing.h +++ b/include/haproxy/quic_pacing.h @@ -11,7 +11,10 @@ static inline void quic_pacing_init(struct quic_pacer *pacer, { LIST_INIT(&pacer->frms); pacer->path = path; - pacer->next = 0; + + pacer->curr = now_ms; + pacer->next = now_ms; + pacer->sent = 0; } static inline void quic_pacing_reset(struct quic_pacer *pacer) @@ -30,9 +33,10 @@ static inline struct list *quic_pacing_frms(struct quic_pacer *pacer) return &pacer->frms; } -static inline ullong quic_pacing_ns_pkt(const struct quic_pacer *pacer) +static inline int quic_pacing_pkt_ms(const struct quic_pacer *pacer) { - return pacer->path->loss.srtt * 1000000 / (pacer->path->cwnd / pacer->path->mtu + 1); + return (pacer->path->cwnd / (pacer->path->mtu + 1)) / + (pacer->path->loss.srtt + 1) + 1; } int quic_pacing_expired(const struct quic_pacer *pacer); diff --git a/src/mux_quic.c b/src/mux_quic.c index adac5125ab..6b9cb8df43 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -267,12 +267,14 @@ static inline int qcc_is_dead(const struct qcc *qcc) /* Return true if the mux timeout should be armed. */ static inline int qcc_may_expire(struct qcc *qcc) { - return !qcc->nb_sc; + //return !qcc->nb_sc; + return 1; } /* Refresh the timeout on if needed depending on its state. */ static void qcc_refresh_timeout(struct qcc *qcc) { +#if 0 const struct proxy *px = qcc->proxy; TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn); @@ -386,18 +388,25 @@ static void qcc_refresh_timeout(struct qcc *qcc) leave: TRACE_LEAVE(QMUX_EV_QCS_NEW, qcc->conn); +#endif } void qcc_wakeup(struct qcc *qcc) { HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1); tasklet_wakeup(qcc->wait_event.tasklet); + + qcc->task->expire = TICK_ETERNITY; + task_queue(qcc->task); } static void qcc_wakeup_pacing(struct qcc *qcc) { - HA_ATOMIC_OR(&qcc->wait_event.tasklet->state, TASK_F_USR1); - tasklet_wakeup(qcc->wait_event.tasklet); + //HA_ATOMIC_OR(&qcc->wait_event.tasklet->state, TASK_F_USR1); + //tasklet_wakeup(qcc->wait_event.tasklet); + qcc->task->expire = qcc->tx.pacer.next; + BUG_ON(tick_is_expired(qcc->task->expire, now_ms)); + task_queue(qcc->task); } /* Mark a stream as open if it was idle. This can be used on every @@ -2774,7 +2783,7 @@ static void qcc_release(struct qcc *qcc) TRACE_LEAVE(QMUX_EV_QCC_END); } -static void qcc_purge_sending(struct qcc *qcc) +static int qcc_purge_sending(struct qcc *qcc) { struct quic_conn *qc = qcc->conn->handle.qc; struct quic_pacer *pacer = &qcc->tx.pacer; @@ -2783,9 +2792,10 @@ static void qcc_purge_sending(struct qcc *qcc) ret = quic_pacing_send(pacer, qc); if (ret == QUIC_TX_ERR_AGAIN) { BUG_ON(LIST_ISEMPTY(quic_pacing_frms(pacer))); - qcc_wakeup_pacing(qcc); + return 1; } - else if (ret == QUIC_TX_ERR_FATAL) { + + if (ret == QUIC_TX_ERR_FATAL) { TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn); HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1); qcc_subscribe_send(qcc); @@ -2794,6 +2804,8 @@ static void qcc_purge_sending(struct qcc *qcc) if (!LIST_ISEMPTY(quic_pacing_frms(pacer))) qcc_subscribe_send(qcc); } + + return 0; } struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status) @@ -2844,12 +2856,14 @@ static struct task *qcc_timeout_task(struct task *t, void *ctx, unsigned int sta } if (!qcc_may_expire(qcc)) { + ABORT_NOW(); TRACE_DEVEL("cannot expired", QMUX_EV_QCC_WAKE, qcc->conn); t->expire = TICK_ETERNITY; goto requeue; } } +#if 0 task_destroy(t); if (!qcc) { @@ -2870,12 +2884,23 @@ static struct task *qcc_timeout_task(struct task *t, void *ctx, unsigned int sta qcc_shutdown(qcc); qcc_release(qcc); } +#endif + + if (qcc_purge_sending(qcc)) { + t->expire = qcc->tx.pacer.next; + goto requeue; + } + else { + t->expire = TICK_ETERNITY; + goto requeue; + } out: TRACE_LEAVE(QMUX_EV_QCC_WAKE); return NULL; requeue: + BUG_ON(tick_is_expired(t->expire, now_ms)); TRACE_LEAVE(QMUX_EV_QCC_WAKE); return t; } @@ -2984,7 +3009,8 @@ static int qmux_init(struct connection *conn, struct proxy *prx, } qcc->task->process = qcc_timeout_task; qcc->task->context = qcc; - qcc->task->expire = tick_add_ifset(now_ms, qcc->timeout); + //qcc->task->expire = tick_add_ifset(now_ms, qcc->timeout); + qcc->task->expire = TICK_ETERNITY; qcc_reset_idle_start(qcc); LIST_INIT(&qcc->opening_list); diff --git a/src/quic_pacing.c b/src/quic_pacing.c index 8a1e33f23c..2a762acbab 100644 --- a/src/quic_pacing.c +++ b/src/quic_pacing.c @@ -6,7 +6,7 @@ struct quic_conn; int quic_pacing_expired(const struct quic_pacer *pacer) { - return !pacer->next || pacer->next <= now_mono_time(); + return tick_is_expired(pacer->next, now_ms); } enum quic_tx_err quic_pacing_send(struct quic_pacer *pacer, struct quic_conn *qc) @@ -25,5 +25,18 @@ enum quic_tx_err quic_pacing_send(struct quic_pacer *pacer, struct quic_conn *qc void quic_pacing_sent_done(struct quic_pacer *pacer, int sent) { - pacer->next = now_mono_time() + quic_pacing_ns_pkt(pacer) * sent; + const int pkt_ms = quic_pacing_pkt_ms(pacer); + + if (pacer->curr == now_ms) { + pacer->sent += sent; + } + else { + pacer->curr = now_ms; + pacer->sent = sent; + } + + if (pacer->sent >= pkt_ms) { + pacer->next = now_ms + (pacer->sent / pkt_ms); + fprintf(stderr, "pacing in %dms (%d / %d)\n", pacer->sent / pkt_ms, pacer->sent, pkt_ms); + } } diff --git a/src/quic_tx.c b/src/quic_tx.c index fa35114755..d658f4a644 100644 --- a/src/quic_tx.c +++ b/src/quic_tx.c @@ -495,13 +495,20 @@ enum quic_tx_err qc_send_mux(struct quic_conn *qc, struct list *frms, } if (pacer) { - const ullong ns_pkts = quic_pacing_ns_pkt(pacer); - max_dgram = global.tune.quic_frontend_max_tx_burst * 1000000 / (ns_pkts + 1) + 1; + //const ullong ns_pkts = quic_pacing_ns_pkt(pacer); + //max_dgram = global.tune.quic_frontend_max_tx_burst * 1000000 / (ns_pkts + 1) + 1; + const int pkt_ms = quic_pacing_pkt_ms(pacer); + max_dgram = pkt_ms; + if (global.tune.quic_frontend_max_tx_burst) + max_dgram *= global.tune.quic_frontend_max_tx_burst; + fprintf(stderr, "max_dgram = %d (%lu/%d)\n", max_dgram, qc->path->cwnd, qc->path->loss.srtt); } TRACE_STATE("preparing data (from MUX)", QUIC_EV_CONN_TXPKT, qc); qel_register_send(&send_list, qc->ael, frms); sent = qc_send(qc, 0, &send_list, max_dgram); + BUG_ON(max_dgram && sent > max_dgram); + if (sent <= 0) { ret = QUIC_TX_ERR_FATAL; } @@ -552,6 +559,7 @@ static inline void qc_select_tls_ver(struct quic_conn *qc, static int qc_prep_pkts(struct quic_conn *qc, struct buffer *buf, struct list *qels, int max_dgrams) { + //int max_dgrams_copy = max_dgrams; int ret, cc, padding; struct quic_tx_packet *first_pkt, *prv_pkt; unsigned char *end, *pos; @@ -609,13 +617,11 @@ static int qc_prep_pkts(struct quic_conn *qc, struct buffer *buf, TRACE_PROTO("TX prep pkts", QUIC_EV_CONN_PHPKTS, qc, qel); - /* Start to decrement after the first packet built. */ - if (!dglen && pos != (unsigned char *)b_head(buf)) { - if (max_dgrams && !--max_dgrams) { - BUG_ON(LIST_ISEMPTY(frms)); - TRACE_PROTO("reached max allowed built datagrams", QUIC_EV_CONN_PHPKTS, qc, qel); - goto out; - } + TRACE_PRINTF(TRACE_LEVEL_ERROR, QUIC_EV_CONN_PHPKTS, qc, 0, 0, 0, "%d/%d", dgram_cnt, max_dgrams); + if (max_dgrams && dgram_cnt == max_dgrams) { + BUG_ON(LIST_ISEMPTY(frms)); + TRACE_PROTO("reached max allowed built datagrams", QUIC_EV_CONN_PHPKTS, qc, qel); + goto out; } if (!first_pkt) @@ -678,6 +684,7 @@ static int qc_prep_pkts(struct quic_conn *qc, struct buffer *buf, wrlen >= QUIC_INITIAL_PACKET_MINLEN)) { qc_txb_store(buf, wrlen, first_pkt); ++dgram_cnt; + BUG_ON(max_dgrams && dgram_cnt > max_dgrams); } TRACE_PROTO("could not prepare anymore packet", QUIC_EV_CONN_PHPKTS, qc, qel); break; @@ -748,6 +755,7 @@ static int qc_prep_pkts(struct quic_conn *qc, struct buffer *buf, dglen = 0; ++dgram_cnt; + BUG_ON(max_dgrams && dgram_cnt > max_dgrams); /* man 7 udp UDP_SEGMENT * The segment size must be chosen such that at @@ -763,6 +771,7 @@ static int qc_prep_pkts(struct quic_conn *qc, struct buffer *buf, padding = 0; prv_pkt = NULL; ++dgram_cnt; + BUG_ON(max_dgrams && dgram_cnt > max_dgrams); gso_dgram_cnt = 0; } @@ -778,7 +787,7 @@ static int qc_prep_pkts(struct quic_conn *qc, struct buffer *buf, out: if (first_pkt) { qc_txb_store(buf, wrlen, first_pkt); - ++dgram_cnt; + //++dgram_cnt; } if (cc && total) { @@ -787,8 +796,10 @@ static int qc_prep_pkts(struct quic_conn *qc, struct buffer *buf, qc->tx.cc_dgram_len = dglen; } + BUG_ON(max_dgrams && dgram_cnt > max_dgrams); ret = dgram_cnt; leave: + TRACE_PRINTF(TRACE_LEVEL_DEVELOPER, QUIC_EV_CONN_PHPKTS, qc, 0, 0, 0, "ret=%d", ret); TRACE_LEAVE(QUIC_EV_CONN_PHPKTS, qc); return ret; } @@ -849,7 +860,7 @@ int qc_send(struct quic_conn *qc, int old_data, struct list *send_list, BUG_ON_HOT(b_data(buf)); b_reset(buf); - prep_pkts = qc_prep_pkts(qc, buf, send_list, max_dgrams); + prep_pkts = qc_prep_pkts(qc, buf, send_list, max_dgrams ? max_dgrams - ret : 0); if (b_data(buf) && !qc_send_ppkts(buf, qc->xprt_ctx)) { ret = -1; @@ -864,6 +875,7 @@ int qc_send(struct quic_conn *qc, int old_data, struct list *send_list, } ret += prep_pkts; + BUG_ON(max_dgrams && ret > max_dgrams); if (max_dgrams && ret == max_dgrams && !LIST_ISEMPTY(send_list)) { TRACE_DEVEL("stopping for artificial pacing", QUIC_EV_CONN_TXPKT, qc); break;