]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
TMP ms pacing with budget calcul quic-experiment-20241030-pacing-ms
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Thu, 31 Oct 2024 09:45:45 +0000 (09:45 +0000)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Thu, 31 Oct 2024 09:45:45 +0000 (09:45 +0000)
include/haproxy/quic_pacing-t.h
include/haproxy/quic_pacing.h
src/mux_quic.c
src/quic_pacing.c
src/quic_tx.c

index 6039acc49e18f309ed78f48ba98244c99ff9f832..0c496de60345fa6188a4755b67bab43792ed86cc 100644 (file)
@@ -7,10 +7,14 @@
 struct quic_pacer {
        struct list frms;
        const struct quic_cc_path *path;
+       //int next;
+       //unsigned int curr;
+       //int pkt_ms;
+       //int sent;
+       int burst;
+       int budget;
+       int last_sent;
        int next;
-       unsigned int curr;
-       int pkt_ms;
-       int sent;
 };
 
 #endif /* _HAPROXY_QUIC_PACING_T_H */
index ee137ae8f1cc37c52358b4fde46d8831aee4c2ba..5f7d6f2e4889c450ab16fd99b79e971327f6de90 100644 (file)
@@ -13,12 +13,18 @@ static inline void quic_pacing_init(struct quic_pacer *pacer,
        LIST_INIT(&pacer->frms);
        pacer->path = path;
        //pacer->next = TICK_ETERNITY;
-       pacer->next = now_ms;
+       //pacer->next = now_ms;
 
        //pacer->curr = now_ms;
-       pacer->curr = TICK_ETERNITY;
-       pacer->pkt_ms = 0;
-       pacer->sent = 0;
+       //pacer->curr = TICK_ETERNITY;
+       //pacer->pkt_ms = 0;
+       //pacer->sent = 0;
+       
+       pacer->last_sent = now_ms;
+       //pacer->budget = global.tune.quic_frontend_max_tx_burst;
+       pacer->budget = 0;
+       pacer->burst = global.tune.quic_frontend_max_tx_burst;
+       pacer->next = TICK_ETERNITY;
 }
 
 static inline void quic_pacing_reset(struct quic_pacer *pacer)
@@ -47,7 +53,7 @@ static inline int quic_pacing_ns_pkt(const struct quic_pacer *pacer, int sent)
        return (pacer->path->cwnd / (pacer->path->mtu + 1)) / (pacer->path->loss.srtt + 1) + 1;
 }
 
-int quic_pacing_expired(const struct quic_pacer *pacer);
+//int quic_pacing_expired(const struct quic_pacer *pacer);
 
 enum quic_tx_err quic_pacing_send(struct quic_pacer *pacer, struct quic_conn *qc);
 
@@ -56,4 +62,6 @@ int quic_pacing_prepare(struct quic_pacer *pacer);
 //void quic_pacing_sent_done(struct quic_pacer *pacer, int sent);
 int quic_pacing_sent_done(struct quic_pacer *pacer, int sent, enum quic_tx_err err);
 
+int quic_pacing_next(struct quic_pacer *pacer);
+
 #endif /* _HAPROXY_QUIC_PACING_H */
index 95ebf48805dc857437a6b14bdbf40ecd928b2a91..f877d8299ae6208dc02bd4968310e217c95875ce 100644 (file)
@@ -277,6 +277,7 @@ static void qcc_refresh_timeout(struct qcc *qcc)
 
 void qcc_wakeup(struct qcc *qcc)
 {
+       TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn);
        HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
        tasklet_wakeup(qcc->wait_event.tasklet);
 
@@ -287,12 +288,23 @@ void qcc_wakeup(struct qcc *qcc)
 
 static void qcc_wakeup_pacing(struct qcc *qcc)
 {
+       TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn);
+       BUG_ON(LIST_ISEMPTY(&qcc->tx.pacer.frms));
        HA_ATOMIC_OR(&qcc->wait_event.tasklet->state, TASK_F_USR1);
        tasklet_wakeup(qcc->wait_event.tasklet);
-       //qcc->task->expire = qcc->tx.pacer.next;
-       //BUG_ON(tick_is_expired(qcc->task->expire, now_ms));
-       //task_queue(qcc->task);
-       //TRACE_POINT(QMUX_EV_STRM_WAKE, qcc->conn);
+
+       qcc->task->expire = TICK_ETERNITY;
+       task_queue(qcc->task);
+}
+
+static void qcc_task_pacing(struct qcc *qcc)
+{
+       TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn);
+       //HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
+       qcc->task->expire = now_ms == qcc->tx.pacer.next ? tick_add(qcc->tx.pacer.next, 1) : qcc->tx.pacer.next;
+       BUG_ON(tick_is_expired(qcc->task->expire, now_ms));
+       task_queue(qcc->task);
+       TRACE_POINT(QMUX_EV_STRM_WAKE, qcc->conn);
 }
 
 /* Mark a stream as open if it was idle. This can be used on every
@@ -2176,6 +2188,7 @@ static int qcc_io_send(struct qcc *qcc)
         */
 
        quic_pacing_reset(pacer);
+       //HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
 
        /* Check for transport error. */
        if (qcc->flags & QC_CF_ERR_CONN || qcc->conn->flags & CO_FL_ERROR) {
@@ -2277,9 +2290,17 @@ static int qcc_io_send(struct qcc *qcc)
                }
        }
 
-       if (!LIST_ISEMPTY(frms) && !quic_pacing_expired(&qcc->tx.pacer)) {
-               qcc_wakeup_pacing(qcc);
-               return 1;
+       //if (!LIST_ISEMPTY(frms) && !quic_pacing_expired(&qcc->tx.pacer)) {
+       if (!LIST_ISEMPTY(frms)) {
+              if (!qcc->tx.pacer.budget) {
+                      qcc->tx.pacer.next = tick_add(now_ms, quic_pacing_next(pacer));
+                      //fprintf(stderr, "wait for %ldms\n", qcc->tx.pacer.burst * qcc->tx.pacer.path->loss.srtt * qcc->tx.pacer.path->mtu / qcc->tx.pacer.path->cwnd);
+                      qcc_task_pacing(qcc);
+                      return 1;
+              }
+              //else {
+              //        qcc_wakeup_pacing(qcc);
+              //}
        }
 
        /* Retry sending until no frame to send, data rejected or connection
@@ -2317,11 +2338,14 @@ static int qcc_io_send(struct qcc *qcc)
 
  sent_done:
        if (ret == 1) {
+               qcc->tx.pacer.next = tick_add(now_ms, quic_pacing_next(pacer));
+               //fprintf(stderr, "wait for %ldms\n", pacer->burst * pacer->path->loss.srtt * pacer->path->mtu / pacer->path->cwnd);
                qcc_wakeup_pacing(qcc);
        }
        else if (!LIST_ISEMPTY(quic_pacing_frms(pacer))) {
                /* Deallocate frames that the transport layer has rejected. */
                quic_pacing_reset(pacer);
+               //HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
        }
 
        /* Re-insert on-error QCS at the end of the send-list. */
@@ -2643,6 +2667,7 @@ static void qcc_release(struct qcc *qcc)
        }
 
        quic_pacing_reset(&qcc->tx.pacer);
+       //HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
 
        if (qcc->app_ops && qcc->app_ops->release)
                qcc->app_ops->release(qcc->ctx);
@@ -2679,7 +2704,7 @@ static int qcc_purge_sending(struct qcc *qcc)
        if (ret == QUIC_TX_ERR_AGAIN) {
                BUG_ON(LIST_ISEMPTY(quic_pacing_frms(pacer)));
                TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn);
-               qcc_wakeup_pacing(qcc);
+               //qcc_wakeup_pacing(qcc);
                return 1;
        }
        else if (ret == QUIC_TX_ERR_FATAL) {
@@ -2693,6 +2718,8 @@ static int qcc_purge_sending(struct qcc *qcc)
                TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn);
                if (!LIST_ISEMPTY(quic_pacing_frms(pacer)))
                        qcc_subscribe_send(qcc);
+               //else
+               //      HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
                return 0;
        }
 }
@@ -2704,8 +2731,18 @@ struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status)
        TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
 
        if (status & TASK_F_USR1) {
+               ++activity[tid].ctr0;
+               //HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
                //ABORT_NOW();
-               qcc_purge_sending(qcc);
+               if (qcc_purge_sending(qcc)) {
+                       if (!qcc->tx.pacer.budget) {
+                              qcc->tx.pacer.next = tick_add(now_ms, quic_pacing_next(&qcc->tx.pacer));
+                              //fprintf(stderr, "wait for %ldms\n", qcc->tx.pacer.burst * qcc->tx.pacer.path->loss.srtt * qcc->tx.pacer.path->mtu / qcc->tx.pacer.path->cwnd);
+                              qcc_task_pacing(qcc);
+                       }
+                       else
+                               qcc_wakeup_pacing(qcc);
+               }
                return NULL;
        }
 
@@ -2745,21 +2782,21 @@ static struct task *qcc_timeout_task(struct task *t, void *ctx, unsigned int sta
                        goto requeue;
                }
                //fprintf(stderr, "woken up after %dms\n", now_ms - qcc->tx.pacer.next);
-
-#if 0
-               if (!qcc_may_expire(qcc)) {
-                       TRACE_DEVEL("cannot expired", QMUX_EV_QCC_WAKE, qcc->conn);
-                       t->expire = TICK_ETERNITY;
-                       goto requeue;
-               }
-#endif
        }
 
+       ++activity[tid].ctr1;
        if (qcc_purge_sending(qcc)) {
-               qcc->task->expire = qcc->tx.pacer.next;
-               BUG_ON(tick_is_expired(qcc->task->expire, now_ms));
-               TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn);
-               goto requeue;
+               //qcc->task->expire = qcc->tx.pacer.next;
+               if (!qcc->tx.pacer.budget) {
+                       qcc->tx.pacer.next = tick_add(now_ms, quic_pacing_next(&qcc->tx.pacer));
+                       qcc->task->expire = now_ms == qcc->tx.pacer.next ? tick_add(qcc->tx.pacer.next, 1) : qcc->tx.pacer.next;
+                       BUG_ON(tick_is_expired(qcc->task->expire, now_ms));
+                       TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn);
+                       goto requeue;
+               }
+               else {
+                       qcc_wakeup_pacing(qcc);
+               }
        }
        t->expire = TICK_ETERNITY;
        goto requeue;
index 8ed8638b68428dbf2d998b7338709f18bd479143..139e6da0ccedbc4bc52dd94c598dabd0b5544be7 100644 (file)
@@ -5,23 +5,24 @@
 
 struct quic_conn;
 
-int quic_pacing_expired(const struct quic_pacer *pacer)
-{
-       //return !pacer->next || pacer->next <= now_mono_time();
-       //return !pacer->next || pacer->next <= now_ms;
-       return tick_is_expired(pacer->next, now_ms);
-}
+//int quic_pacing_expired(const struct quic_pacer *pacer)
+//{
+//     //return !pacer->next || pacer->next <= now_mono_time();
+//     //return !pacer->next || pacer->next <= now_ms;
+//     return tick_is_expired(pacer->next, now_ms);
+//}
 
 enum quic_tx_err quic_pacing_send(struct quic_pacer *pacer, struct quic_conn *qc)
 {
        enum quic_tx_err ret;
 
-       if (!quic_pacing_expired(pacer))
-               return QUIC_TX_ERR_AGAIN;
+       //if (!quic_pacing_expired(pacer))
+       //if (!pacer->budget)
+       //      return QUIC_TX_ERR_AGAIN;
 
        BUG_ON(LIST_ISEMPTY(&pacer->frms));
        ret = qc_send_mux(qc, &pacer->frms, pacer);
-       BUG_ON(ret == QUIC_TX_ERR_AGAIN && tick_is_expired(pacer->next, now_ms));
+       //BUG_ON(ret == QUIC_TX_ERR_AGAIN && tick_is_expired(pacer->next, now_ms));
 
        /* TODO handle QUIC_TX_ERR_FATAL */
        return ret;
@@ -29,60 +30,34 @@ enum quic_tx_err quic_pacing_send(struct quic_pacer *pacer, struct quic_conn *qc
 
 int quic_pacing_prepare(struct quic_pacer *pacer)
 {
-       if (pacer->curr == now_ms) {
-               BUG_ON(pacer->sent > pacer->pkt_ms);
-               return pacer->pkt_ms - pacer->sent;
-       }
-       else {
-               int not_consumed = pacer->pkt_ms - pacer->sent;
-               BUG_ON(not_consumed < 0);
-               //if (not_consumed)
-               //      fprintf(stderr, "not consumed %d (%d - %d)\n", not_consumed, pacer->pkt_ms, pacer->sent);
+       int idle = tick_remain(pacer->last_sent, now_ms);
+       int pkts = idle * pacer->path->cwnd / (pacer->path->loss.srtt * pacer->path->mtu + 1); 
 
-               pacer->curr = now_ms;
-               pacer->sent = 0;
-               pacer->pkt_ms = quic_pacing_ns_pkt(pacer, 0);
-               //pacer->pkt_ms = quic_pacing_ns_pkt(pacer, 0) + not_consumed;
+       TRACE_POINT(QMUX_EV_QCC_WAKE, NULL);
 
-               BUG_ON(!pacer->pkt_ms);
-               return pacer->pkt_ms;
+       pacer->budget += pkts;
+       if (pacer->budget > pacer->burst * 2) {
+               TRACE_POINT(QMUX_EV_QCC_WAKE, NULL);
+               pacer->budget = pacer->burst * 2;
        }
+       //fprintf(stderr, "prepare = %d %d/%d\n", pkts, pacer->budget, pacer->burst);
+       return MIN(pacer->budget, pacer->burst);
+}
 
+int quic_pacing_next(struct quic_pacer *pacer)
+{
+       //return (pacer->burst / 4) * pacer->path->loss.srtt * pacer->path->mtu / pacer->path->cwnd;
+       return 1;
 }
 
 int quic_pacing_sent_done(struct quic_pacer *pacer, int sent, enum quic_tx_err err)
 {
-       //const int pkt_ms = quic_pacing_ns_pkt(pacer, 1);
-
-#if 0
-       if (pacer->curr == now_ms) {
-               pacer->sent += sent;
-       }
-       else {
-               int not_consumed = pkt_ms - pacer->sent;
-               if (not_consumed < 0)
-                       not_consumed = 0;       
-               if (not_consumed)
-                       fprintf(stderr, "not consumed %d (%d - %d)\n", not_consumed, pkt_ms, pacer->sent);
-
-               //pacer->sent = 0;
-               //pacer->sent -= not_consumed;
-
-               pacer->curr = now_ms;
-               pacer->sent = sent;
-       }
-#endif
-       BUG_ON(pacer->curr != now_ms);
-       pacer->sent += sent;
-
-       if (pacer->sent >= pacer->pkt_ms) {
-               //pacer->next = tick_add(now_ms, 1);
-               pacer->next = tick_add(now_ms, MAX((pacer->sent / pacer->pkt_ms), 1));
-               BUG_ON(tick_is_expired(pacer->next, now_ms));
-               //fprintf(stderr, "pacing in %dms (%d / %d)\n", pacer->sent / pkt_ms, pacer->sent, pkt_ms);
-               return 1;
-       }
-       else {
-               return 0;
+       BUG_ON(sent > pacer->budget);
+       TRACE_POINT(QMUX_EV_QCC_WAKE, NULL);
+       pacer->budget -= sent;
+       if (sent) {
+               TRACE_POINT(QMUX_EV_QCC_WAKE, NULL);
+               pacer->last_sent = now_ms;
        }
+       return 0;
 }
index e1a4c38bc7dbc42efa85960135fa9b2c52460a81..ba268263a47fae41fc6c63b198730f1a3b6322af 100644 (file)
@@ -513,9 +513,12 @@ enum quic_tx_err qc_send_mux(struct quic_conn *qc, struct list *frms,
                }
 #endif
                max_dgram = quic_pacing_prepare(pacer);
-               BUG_ON(!max_dgram);
-               if (!max_dgram)
+               //BUG_ON(!max_dgram);
+               if (!max_dgram) {
+                       pacer->next = tick_add(now_ms, quic_pacing_next(pacer));
+                       //fprintf(stderr, "wait for %ldms\n", pacer->burst * pacer->path->loss.srtt * pacer->path->mtu / pacer->path->cwnd);
                        return QUIC_TX_ERR_AGAIN;
+               }
        }
 
        TRACE_STATE("preparing data (from MUX)", QUIC_EV_CONN_TXPKT, qc);
@@ -638,6 +641,7 @@ static int qc_prep_pkts(struct quic_conn *qc, struct buffer *buf,
                        //              goto out;
                        //      }
                        //}
+                       BUG_ON(max_dgrams && dgram_cnt > max_dgrams);
                        if (max_dgrams && dgram_cnt == max_dgrams) {
                                BUG_ON(LIST_ISEMPTY(frms));
                                TRACE_PROTO("reached max allowed built datagrams", QUIC_EV_CONN_PHPKTS, qc, qel);
@@ -890,6 +894,7 @@ int qc_send(struct quic_conn *qc, int old_data, struct list *send_list,
                }
 
                ret += prep_pkts;
+               BUG_ON(max_dgrams && ret > max_dgrams);
                if (max_dgrams && ret == max_dgrams && !LIST_ISEMPTY(send_list)) {
                        TRACE_DEVEL("stopping for artificial pacing", QUIC_EV_CONN_TXPKT, qc);
                        break;