]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
TMP ade-quic-implement-pacing-ms
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Thu, 31 Oct 2024 09:32:44 +0000 (10:32 +0100)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Thu, 31 Oct 2024 09:32:50 +0000 (10:32 +0100)
include/haproxy/quic_pacing-t.h
include/haproxy/quic_pacing.h
src/mux_quic.c
src/quic_pacing.c
src/quic_tx.c

index 25e6eecdba56c705575e414537eff9d0150654db..c12203b91207692d7be29d019e9d8187089f2adb 100644 (file)
@@ -7,7 +7,10 @@
 struct quic_pacer {
        struct list frms;
        const struct quic_cc_path *path;
-       ullong next;
+
+       unsigned int curr;
+       unsigned int next;
+       int sent;
 };
 
 #endif /* _HAPROXY_QUIC_PACING_T_H */
index ee536fb2e5dd64dd0503d2eb95fed7b5cf4006ed..10e869b954c77e32cfe032a6894945cdea0cc5c4 100644 (file)
@@ -11,7 +11,10 @@ static inline void quic_pacing_init(struct quic_pacer *pacer,
 {
        LIST_INIT(&pacer->frms);
        pacer->path = path;
-       pacer->next = 0;
+
+       pacer->curr = now_ms;
+       pacer->next = now_ms;
+       pacer->sent = 0;
 }
 
 static inline void quic_pacing_reset(struct quic_pacer *pacer)
@@ -30,9 +33,10 @@ static inline struct list *quic_pacing_frms(struct quic_pacer *pacer)
        return &pacer->frms;
 }
 
-static inline ullong quic_pacing_ns_pkt(const struct quic_pacer *pacer)
+static inline int quic_pacing_pkt_ms(const struct quic_pacer *pacer)
 {
-       return pacer->path->loss.srtt * 1000000 / (pacer->path->cwnd / pacer->path->mtu + 1);
+       return (pacer->path->cwnd / (pacer->path->mtu + 1)) /
+              (pacer->path->loss.srtt + 1) + 1;
 }
 
 int quic_pacing_expired(const struct quic_pacer *pacer);
index adac5125abf91bbeb057efdbc0a8971eff8f42ce..6b9cb8df436b3335128a930852befa585d458ade 100644 (file)
@@ -267,12 +267,14 @@ static inline int qcc_is_dead(const struct qcc *qcc)
 /* Return true if the mux timeout should be armed. */
 static inline int qcc_may_expire(struct qcc *qcc)
 {
-       return !qcc->nb_sc;
+       //return !qcc->nb_sc;
+       return 1;
 }
 
 /* Refresh the timeout on <qcc> if needed depending on its state. */
 static void qcc_refresh_timeout(struct qcc *qcc)
 {
+#if 0
        const struct proxy *px = qcc->proxy;
 
        TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
@@ -386,18 +388,25 @@ static void qcc_refresh_timeout(struct qcc *qcc)
 
  leave:
        TRACE_LEAVE(QMUX_EV_QCS_NEW, qcc->conn);
+#endif
 }
 
 void qcc_wakeup(struct qcc *qcc)
 {
        HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
        tasklet_wakeup(qcc->wait_event.tasklet);
+
+       qcc->task->expire = TICK_ETERNITY;
+       task_queue(qcc->task);
 }
 
 static void qcc_wakeup_pacing(struct qcc *qcc)
 {
-       HA_ATOMIC_OR(&qcc->wait_event.tasklet->state, TASK_F_USR1);
-       tasklet_wakeup(qcc->wait_event.tasklet);
+       //HA_ATOMIC_OR(&qcc->wait_event.tasklet->state, TASK_F_USR1);
+       //tasklet_wakeup(qcc->wait_event.tasklet);
+       qcc->task->expire = qcc->tx.pacer.next;
+       BUG_ON(tick_is_expired(qcc->task->expire, now_ms));
+       task_queue(qcc->task);
 }
 
 /* Mark a stream as open if it was idle. This can be used on every
@@ -2774,7 +2783,7 @@ static void qcc_release(struct qcc *qcc)
        TRACE_LEAVE(QMUX_EV_QCC_END);
 }
 
-static void qcc_purge_sending(struct qcc *qcc)
+static int qcc_purge_sending(struct qcc *qcc)
 {
        struct quic_conn *qc = qcc->conn->handle.qc;
        struct quic_pacer *pacer = &qcc->tx.pacer;
@@ -2783,9 +2792,10 @@ static void qcc_purge_sending(struct qcc *qcc)
        ret = quic_pacing_send(pacer, qc);
        if (ret == QUIC_TX_ERR_AGAIN) {
                BUG_ON(LIST_ISEMPTY(quic_pacing_frms(pacer)));
-               qcc_wakeup_pacing(qcc);
+               return 1;
        }
-       else if (ret == QUIC_TX_ERR_FATAL) {
+
+       if (ret == QUIC_TX_ERR_FATAL) {
                TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn);
                HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
                qcc_subscribe_send(qcc);
@@ -2794,6 +2804,8 @@ static void qcc_purge_sending(struct qcc *qcc)
                if (!LIST_ISEMPTY(quic_pacing_frms(pacer)))
                        qcc_subscribe_send(qcc);
        }
+
+       return 0;
 }
 
 struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status)
@@ -2844,12 +2856,14 @@ static struct task *qcc_timeout_task(struct task *t, void *ctx, unsigned int sta
                }
 
                if (!qcc_may_expire(qcc)) {
+                       ABORT_NOW();
                        TRACE_DEVEL("cannot expired", QMUX_EV_QCC_WAKE, qcc->conn);
                        t->expire = TICK_ETERNITY;
                        goto requeue;
                }
        }
 
+#if 0
        task_destroy(t);
 
        if (!qcc) {
@@ -2870,12 +2884,23 @@ static struct task *qcc_timeout_task(struct task *t, void *ctx, unsigned int sta
                qcc_shutdown(qcc);
                qcc_release(qcc);
        }
+#endif
+
+       if (qcc_purge_sending(qcc)) {
+               t->expire = qcc->tx.pacer.next;
+               goto requeue;
+       }
+       else {
+               t->expire = TICK_ETERNITY;
+               goto requeue;
+       }
 
  out:
        TRACE_LEAVE(QMUX_EV_QCC_WAKE);
        return NULL;
 
  requeue:
+       BUG_ON(tick_is_expired(t->expire, now_ms));
        TRACE_LEAVE(QMUX_EV_QCC_WAKE);
        return t;
 }
@@ -2984,7 +3009,8 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
        }
        qcc->task->process = qcc_timeout_task;
        qcc->task->context = qcc;
-       qcc->task->expire = tick_add_ifset(now_ms, qcc->timeout);
+       //qcc->task->expire = tick_add_ifset(now_ms, qcc->timeout);
+       qcc->task->expire = TICK_ETERNITY;
 
        qcc_reset_idle_start(qcc);
        LIST_INIT(&qcc->opening_list);
index 8a1e33f23cbfbe1d2919db07a7d08334a0285d98..2a762acbab0f0332c1f05b98c3b5147cdb6b055b 100644 (file)
@@ -6,7 +6,7 @@ struct quic_conn;
 
 int quic_pacing_expired(const struct quic_pacer *pacer)
 {
-       return !pacer->next || pacer->next <= now_mono_time();
+       return tick_is_expired(pacer->next, now_ms);
 }
 
 enum quic_tx_err quic_pacing_send(struct quic_pacer *pacer, struct quic_conn *qc)
@@ -25,5 +25,18 @@ enum quic_tx_err quic_pacing_send(struct quic_pacer *pacer, struct quic_conn *qc
 
 void quic_pacing_sent_done(struct quic_pacer *pacer, int sent)
 {
-       pacer->next = now_mono_time() + quic_pacing_ns_pkt(pacer) * sent;
+       const int pkt_ms = quic_pacing_pkt_ms(pacer);
+
+       if (pacer->curr == now_ms) {
+               pacer->sent += sent;
+       }
+       else {
+               pacer->curr = now_ms;
+               pacer->sent = sent;
+       }
+
+       if (pacer->sent >= pkt_ms) {
+               pacer->next = now_ms + (pacer->sent / pkt_ms);
+               fprintf(stderr, "pacing in %dms (%d / %d)\n", pacer->sent / pkt_ms, pacer->sent, pkt_ms);
+       }
 }
index fa3511475506a4a00e8b96eb72aff8f46f65281a..d658f4a6446e61cc974c699994a7dc2379a85ac1 100644 (file)
@@ -495,13 +495,20 @@ enum quic_tx_err qc_send_mux(struct quic_conn *qc, struct list *frms,
        }
 
        if (pacer) {
-               const ullong ns_pkts = quic_pacing_ns_pkt(pacer);
-               max_dgram = global.tune.quic_frontend_max_tx_burst * 1000000 / (ns_pkts + 1) + 1;
+               //const ullong ns_pkts = quic_pacing_ns_pkt(pacer);
+               //max_dgram = global.tune.quic_frontend_max_tx_burst * 1000000 / (ns_pkts + 1) + 1;
+               const int pkt_ms = quic_pacing_pkt_ms(pacer);
+               max_dgram = pkt_ms;
+               if (global.tune.quic_frontend_max_tx_burst)
+                       max_dgram *= global.tune.quic_frontend_max_tx_burst;
+               fprintf(stderr, "max_dgram = %d (%lu/%d)\n", max_dgram, qc->path->cwnd, qc->path->loss.srtt);
        }
 
        TRACE_STATE("preparing data (from MUX)", QUIC_EV_CONN_TXPKT, qc);
        qel_register_send(&send_list, qc->ael, frms);
        sent = qc_send(qc, 0, &send_list, max_dgram);
+       BUG_ON(max_dgram && sent > max_dgram);
+
        if (sent <= 0) {
                ret = QUIC_TX_ERR_FATAL;
        }
@@ -552,6 +559,7 @@ static inline void qc_select_tls_ver(struct quic_conn *qc,
 static int qc_prep_pkts(struct quic_conn *qc, struct buffer *buf,
                          struct list *qels, int max_dgrams)
 {
+       //int max_dgrams_copy = max_dgrams;
        int ret, cc, padding;
        struct quic_tx_packet *first_pkt, *prv_pkt;
        unsigned char *end, *pos;
@@ -609,13 +617,11 @@ static int qc_prep_pkts(struct quic_conn *qc, struct buffer *buf,
 
                        TRACE_PROTO("TX prep pkts", QUIC_EV_CONN_PHPKTS, qc, qel);
 
-                       /* Start to decrement <max_dgrams> after the first packet built. */
-                       if (!dglen && pos != (unsigned char *)b_head(buf)) {
-                               if (max_dgrams && !--max_dgrams) {
-                                       BUG_ON(LIST_ISEMPTY(frms));
-                                       TRACE_PROTO("reached max allowed built datagrams", QUIC_EV_CONN_PHPKTS, qc, qel);
-                                       goto out;
-                               }
+                       TRACE_PRINTF(TRACE_LEVEL_ERROR, QUIC_EV_CONN_PHPKTS, qc, 0, 0, 0, "%d/%d", dgram_cnt, max_dgrams);
+                       if (max_dgrams && dgram_cnt == max_dgrams) {
+                               BUG_ON(LIST_ISEMPTY(frms));
+                               TRACE_PROTO("reached max allowed built datagrams", QUIC_EV_CONN_PHPKTS, qc, qel);
+                               goto out;
                        }
 
                        if (!first_pkt)
@@ -678,6 +684,7 @@ static int qc_prep_pkts(struct quic_conn *qc, struct buffer *buf,
                                                          wrlen >= QUIC_INITIAL_PACKET_MINLEN)) {
                                                qc_txb_store(buf, wrlen, first_pkt);
                                                ++dgram_cnt;
+                                               BUG_ON(max_dgrams && dgram_cnt > max_dgrams);
                                        }
                                        TRACE_PROTO("could not prepare anymore packet", QUIC_EV_CONN_PHPKTS, qc, qel);
                                        break;
@@ -748,6 +755,7 @@ static int qc_prep_pkts(struct quic_conn *qc, struct buffer *buf,
                                dglen = 0;
 
                                ++dgram_cnt;
+                               BUG_ON(max_dgrams && dgram_cnt > max_dgrams);
 
                                /* man 7 udp UDP_SEGMENT
                                 * The segment size must be chosen such that at
@@ -763,6 +771,7 @@ static int qc_prep_pkts(struct quic_conn *qc, struct buffer *buf,
                                padding = 0;
                                prv_pkt = NULL;
                                ++dgram_cnt;
+                               BUG_ON(max_dgrams && dgram_cnt > max_dgrams);
                                gso_dgram_cnt = 0;
                        }
 
@@ -778,7 +787,7 @@ static int qc_prep_pkts(struct quic_conn *qc, struct buffer *buf,
  out:
        if (first_pkt) {
                qc_txb_store(buf, wrlen, first_pkt);
-               ++dgram_cnt;
+               //++dgram_cnt;
        }
 
        if (cc && total) {
@@ -787,8 +796,10 @@ static int qc_prep_pkts(struct quic_conn *qc, struct buffer *buf,
                qc->tx.cc_dgram_len = dglen;
        }
 
+       BUG_ON(max_dgrams && dgram_cnt > max_dgrams);
        ret = dgram_cnt;
  leave:
+       TRACE_PRINTF(TRACE_LEVEL_DEVELOPER, QUIC_EV_CONN_PHPKTS, qc, 0, 0, 0, "ret=%d", ret);
        TRACE_LEAVE(QUIC_EV_CONN_PHPKTS, qc);
        return ret;
 }
@@ -849,7 +860,7 @@ int qc_send(struct quic_conn *qc, int old_data, struct list *send_list,
                BUG_ON_HOT(b_data(buf));
                b_reset(buf);
 
-               prep_pkts = qc_prep_pkts(qc, buf, send_list, max_dgrams);
+               prep_pkts = qc_prep_pkts(qc, buf, send_list, max_dgrams ? max_dgrams - ret : 0);
 
                if (b_data(buf) && !qc_send_ppkts(buf, qc->xprt_ctx)) {
                        ret = -1;
@@ -864,6 +875,7 @@ int qc_send(struct quic_conn *qc, int old_data, struct list *send_list,
                }
 
                ret += prep_pkts;
+               BUG_ON(max_dgrams && ret > max_dgrams);
                if (max_dgrams && ret == max_dgrams && !LIST_ISEMPTY(send_list)) {
                        TRACE_DEVEL("stopping for artificial pacing", QUIC_EV_CONN_TXPKT, qc);
                        break;