]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
TMP
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Wed, 30 Oct 2024 13:11:54 +0000 (13:11 +0000)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Wed, 30 Oct 2024 13:11:54 +0000 (13:11 +0000)
include/haproxy/quic_pacing-t.h
include/haproxy/quic_pacing.h
src/mux_quic.c
src/qmux_trace.c
src/quic_pacing.c
src/quic_tx.c

index 25e6eecdba56c705575e414537eff9d0150654db..6039acc49e18f309ed78f48ba98244c99ff9f832 100644 (file)
@@ -7,7 +7,10 @@
 struct quic_pacer {
        struct list frms;
        const struct quic_cc_path *path;
-       ullong next;
+       int next;
+       unsigned int curr;
+       int pkt_ms;
+       int sent;
 };
 
 #endif /* _HAPROXY_QUIC_PACING_T_H */
index ee536fb2e5dd64dd0503d2eb95fed7b5cf4006ed..ee137ae8f1cc37c52358b4fde46d8831aee4c2ba 100644 (file)
@@ -5,13 +5,20 @@
 
 #include <haproxy/list.h>
 #include <haproxy/quic_frame.h>
+#include <haproxy/quic_tx-t.h>
 
 static inline void quic_pacing_init(struct quic_pacer *pacer,
                                     const struct quic_cc_path *path)
 {
        LIST_INIT(&pacer->frms);
        pacer->path = path;
-       pacer->next = 0;
+       //pacer->next = TICK_ETERNITY;
+       pacer->next = now_ms;
+
+       //pacer->curr = now_ms;
+       pacer->curr = TICK_ETERNITY;
+       pacer->pkt_ms = 0;
+       pacer->sent = 0;
 }
 
 static inline void quic_pacing_reset(struct quic_pacer *pacer)
@@ -30,15 +37,23 @@ static inline struct list *quic_pacing_frms(struct quic_pacer *pacer)
        return &pacer->frms;
 }
 
-static inline ullong quic_pacing_ns_pkt(const struct quic_pacer *pacer)
+static inline int quic_pacing_ns_pkt(const struct quic_pacer *pacer, int sent)
 {
-       return pacer->path->loss.srtt * 1000000 / (pacer->path->cwnd / pacer->path->mtu + 1);
+       //return pacer->path->loss.srtt * 1000000 / (pacer->path->cwnd / pacer->path->mtu + 1);
+       //ullong val = pacer->path->loss.srtt / (pacer->path->cwnd / (pacer->path->mtu * sent) + 1);
+       //fprintf(stderr, "val=%llu %d/(%lu/(%zu * %d) + 1\n",
+       //        val, pacer->path->loss.srtt, pacer->path->cwnd, pacer->path->mtu, sent);
+       //return pacer->path->loss.srtt / (pacer->path->cwnd / (pacer->path->mtu * sent) + 1);
+       return (pacer->path->cwnd / (pacer->path->mtu + 1)) / (pacer->path->loss.srtt + 1) + 1;
 }
 
 int quic_pacing_expired(const struct quic_pacer *pacer);
 
 enum quic_tx_err quic_pacing_send(struct quic_pacer *pacer, struct quic_conn *qc);
 
-void quic_pacing_sent_done(struct quic_pacer *pacer, int sent);
+int quic_pacing_prepare(struct quic_pacer *pacer);
+
+//void quic_pacing_sent_done(struct quic_pacer *pacer, int sent);
+int quic_pacing_sent_done(struct quic_pacer *pacer, int sent, enum quic_tx_err err);
 
 #endif /* _HAPROXY_QUIC_PACING_H */
index adac5125abf91bbeb057efdbc0a8971eff8f42ce..95ebf48805dc857437a6b14bdbf40ecd928b2a91 100644 (file)
@@ -273,131 +273,26 @@ static inline int qcc_may_expire(struct qcc *qcc)
 /* Refresh the timeout on <qcc> if needed depending on its state. */
 static void qcc_refresh_timeout(struct qcc *qcc)
 {
-       const struct proxy *px = qcc->proxy;
-
-       TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
-
-       if (!qcc->task) {
-               TRACE_DEVEL("already expired", QMUX_EV_QCC_WAKE, qcc->conn);
-               goto leave;
-       }
-
-       /* Check if upper layer is responsible of timeout management. */
-       if (!qcc_may_expire(qcc)) {
-               TRACE_DEVEL("not eligible for timeout", QMUX_EV_QCC_WAKE, qcc->conn);
-               qcc->task->expire = TICK_ETERNITY;
-               task_queue(qcc->task);
-               goto leave;
-       }
-
-       /* Frontend timeout management
-        * - shutdown done -> timeout client-fin
-        * - detached streams with data left to send -> default timeout
-        * - stream waiting on incomplete request or no stream yet activated -> timeout http-request
-        * - idle after stream processing -> timeout http-keep-alive
-        *
-        * If proxy stop-stop in progress, immediate or spread close will be
-        * processed if shutdown already one or connection is idle.
-        */
-       if (!conn_is_back(qcc->conn)) {
-               if (qcc->nb_hreq && !(qcc->flags & QC_CF_APP_SHUT)) {
-                       TRACE_DEVEL("one or more requests still in progress", QMUX_EV_QCC_WAKE, qcc->conn);
-                       qcc->task->expire = tick_add_ifset(now_ms, qcc->timeout);
-                       task_queue(qcc->task);
-                       goto leave;
-               }
-
-               if ((!LIST_ISEMPTY(&qcc->opening_list) || unlikely(!qcc->largest_bidi_r)) &&
-                   !(qcc->flags & QC_CF_APP_SHUT)) {
-                       int timeout = px->timeout.httpreq;
-                       struct qcs *qcs = NULL;
-                       int base_time;
-
-                       /* Use start time of first stream waiting on HTTP or
-                        * qcc idle if no stream not yet used.
-                        */
-                       if (likely(!LIST_ISEMPTY(&qcc->opening_list)))
-                               qcs = LIST_ELEM(qcc->opening_list.n, struct qcs *, el_opening);
-                       base_time = qcs ? qcs->start : qcc->idle_start;
-
-                       TRACE_DEVEL("waiting on http request", QMUX_EV_QCC_WAKE, qcc->conn, qcs);
-                       qcc->task->expire = tick_add_ifset(base_time, timeout);
-               }
-               else {
-                       if (qcc->flags & QC_CF_APP_SHUT) {
-                               TRACE_DEVEL("connection in closing", QMUX_EV_QCC_WAKE, qcc->conn);
-                               qcc->task->expire = tick_add_ifset(now_ms,
-                                                                  qcc->shut_timeout);
-                       }
-                       else {
-                               /* Use http-request timeout if keep-alive timeout not set */
-                               int timeout = tick_isset(px->timeout.httpka) ?
-                                             px->timeout.httpka : px->timeout.httpreq;
-                               TRACE_DEVEL("at least one request achieved but none currently in progress", QMUX_EV_QCC_WAKE, qcc->conn);
-                               qcc->task->expire = tick_add_ifset(qcc->idle_start, timeout);
-                       }
-
-                       /* If proxy soft-stop in progress and connection is
-                        * inactive, close the connection immediately. If a
-                        * close-spread-time is configured, randomly spread the
-                        * timer over a closing window.
-                        */
-                       if ((qcc->proxy->flags & (PR_FL_DISABLED|PR_FL_STOPPED)) &&
-                           !(global.tune.options & GTUNE_DISABLE_ACTIVE_CLOSE)) {
-
-                               /* Wake timeout task immediately if window already expired. */
-                               int remaining_window = tick_isset(global.close_spread_end) ?
-                                 tick_remain(now_ms, global.close_spread_end) : 0;
-
-                               TRACE_DEVEL("proxy disabled, prepare connection soft-stop", QMUX_EV_QCC_WAKE, qcc->conn);
-                               if (remaining_window) {
-                                       /* We don't need to reset the expire if it would
-                                        * already happen before the close window end.
-                                        */
-                                       if (!tick_isset(qcc->task->expire) ||
-                                           tick_is_le(global.close_spread_end, qcc->task->expire)) {
-                                               /* Set an expire value shorter than the current value
-                                                * because the close spread window end comes earlier.
-                                                */
-                                               qcc->task->expire = tick_add(now_ms,
-                                                                            statistical_prng_range(remaining_window));
-                                       }
-                               }
-                               else {
-                                       /* We are past the soft close window end, wake the timeout
-                                        * task up immediately.
-                                        */
-                                       qcc->task->expire = now_ms;
-                                       task_wakeup(qcc->task, TASK_WOKEN_TIMER);
-                               }
-                       }
-               }
-       }
-
-       /* fallback to default timeout if frontend specific undefined or for
-        * backend connections.
-        */
-       if (!tick_isset(qcc->task->expire)) {
-               TRACE_DEVEL("fallback to default timeout", QMUX_EV_QCC_WAKE, qcc->conn);
-               qcc->task->expire = tick_add_ifset(now_ms, qcc->timeout);
-       }
-
-       task_queue(qcc->task);
-
- leave:
-       TRACE_LEAVE(QMUX_EV_QCS_NEW, qcc->conn);
 }
 
 void qcc_wakeup(struct qcc *qcc)
 {
        HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
        tasklet_wakeup(qcc->wait_event.tasklet);
+
+       //qcc->task->expire = TICK_ETERNITY;
+       //task_queue(qcc->task);
+       //TRACE_POINT(QMUX_EV_STRM_WAKE, qcc->conn);
 }
 
 static void qcc_wakeup_pacing(struct qcc *qcc)
 {
        HA_ATOMIC_OR(&qcc->wait_event.tasklet->state, TASK_F_USR1);
        tasklet_wakeup(qcc->wait_event.tasklet);
+       //qcc->task->expire = qcc->tx.pacer.next;
+       //BUG_ON(tick_is_expired(qcc->task->expire, now_ms));
+       //task_queue(qcc->task);
+       //TRACE_POINT(QMUX_EV_STRM_WAKE, qcc->conn);
 }
 
 /* Mark a stream as open if it was idle. This can be used on every
@@ -674,7 +569,7 @@ void qcc_notify_buf(struct qcc *qcc, uint64_t free_size)
 {
        struct qcs *qcs;
 
-       TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
+       //TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
 
        /* Cannot have a negative buf_in_flight counter */
        BUG_ON(qcc->tx.buf_in_flight < free_size);
@@ -700,7 +595,7 @@ void qcc_notify_buf(struct qcc *qcc, uint64_t free_size)
                qcs_notify_send(qcs);
        }
 
-       TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn);
+       //TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn);
 }
 
 /* A fatal error is detected locally for <qcc> connection. It should be closed
@@ -2774,7 +2669,7 @@ static void qcc_release(struct qcc *qcc)
        TRACE_LEAVE(QMUX_EV_QCC_END);
 }
 
-static void qcc_purge_sending(struct qcc *qcc)
+static int qcc_purge_sending(struct qcc *qcc)
 {
        struct quic_conn *qc = qcc->conn->handle.qc;
        struct quic_pacer *pacer = &qcc->tx.pacer;
@@ -2783,16 +2678,22 @@ static void qcc_purge_sending(struct qcc *qcc)
        ret = quic_pacing_send(pacer, qc);
        if (ret == QUIC_TX_ERR_AGAIN) {
                BUG_ON(LIST_ISEMPTY(quic_pacing_frms(pacer)));
+               TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn);
                qcc_wakeup_pacing(qcc);
+               return 1;
        }
        else if (ret == QUIC_TX_ERR_FATAL) {
                TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn);
+               TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn);
                HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
                qcc_subscribe_send(qcc);
+               return 0;
        }
        else {
+               TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn);
                if (!LIST_ISEMPTY(quic_pacing_frms(pacer)))
                        qcc_subscribe_send(qcc);
+               return 0;
        }
 }
 
@@ -2803,6 +2704,7 @@ struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status)
        TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
 
        if (status & TASK_F_USR1) {
+               //ABORT_NOW();
                qcc_purge_sending(qcc);
                return NULL;
        }
@@ -2842,14 +2744,27 @@ static struct task *qcc_timeout_task(struct task *t, void *ctx, unsigned int sta
                        TRACE_DEVEL("not expired", QMUX_EV_QCC_WAKE, qcc->conn);
                        goto requeue;
                }
+               //fprintf(stderr, "woken up after %dms\n", now_ms - qcc->tx.pacer.next);
 
+#if 0
                if (!qcc_may_expire(qcc)) {
                        TRACE_DEVEL("cannot expired", QMUX_EV_QCC_WAKE, qcc->conn);
                        t->expire = TICK_ETERNITY;
                        goto requeue;
                }
+#endif
+       }
+
+       if (qcc_purge_sending(qcc)) {
+               qcc->task->expire = qcc->tx.pacer.next;
+               BUG_ON(tick_is_expired(qcc->task->expire, now_ms));
+               TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn);
+               goto requeue;
        }
+       t->expire = TICK_ETERNITY;
+       goto requeue;
 
+#if 0
        task_destroy(t);
 
        if (!qcc) {
@@ -2870,6 +2785,7 @@ static struct task *qcc_timeout_task(struct task *t, void *ctx, unsigned int sta
                qcc_shutdown(qcc);
                qcc_release(qcc);
        }
+#endif
 
  out:
        TRACE_LEAVE(QMUX_EV_QCC_WAKE);
@@ -2984,7 +2900,8 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
        }
        qcc->task->process = qcc_timeout_task;
        qcc->task->context = qcc;
-       qcc->task->expire = tick_add_ifset(now_ms, qcc->timeout);
+       //qcc->task->expire = tick_add_ifset(now_ms, qcc->timeout);
+       qcc->task->expire = TICK_ETERNITY;
 
        qcc_reset_idle_start(qcc);
        LIST_INIT(&qcc->opening_list);
index f75e702ccba1baa8337c31a976a659b6f8a42266..68c89b4d7245eedcaea673e4a56d2db115e3c7ed 100644 (file)
@@ -139,9 +139,9 @@ void qmux_dump_qcc_info(struct buffer *msg, const struct qcc *qcc)
                chunk_appendf(msg, " qc=%p", qcc->conn->handle.qc);
        chunk_appendf(msg, " .sc=%llu .hreq=%llu .flg=0x%04x", (ullong)qcc->nb_sc, (ullong)qcc->nb_hreq, qcc->flags);
 
-       chunk_appendf(msg, " .tx=%llu %llu/%llu bwnd=%llu/%llu",
+       chunk_appendf(msg, " .tx=%llu %llu/%llu bwnd=%llu/%llu exp=%llu",
                      (ullong)qcc->tx.fc.off_soft, (ullong)qcc->tx.fc.off_real, (ullong)qcc->tx.fc.limit,
-                     (ullong)qcc->tx.buf_in_flight, (ullong)qc->path->cwnd);
+                     (ullong)qcc->tx.buf_in_flight, (ullong)qc->path->cwnd, qcc->task && tick_isset(qcc->task->expire) ? (ullong)tick_remain(now_ms, qcc->task->expire) : 0);
 }
 
 void qmux_dump_qcs_info(struct buffer *msg, const struct qcs *qcs)
index 8a1e33f23cbfbe1d2919db07a7d08334a0285d98..8ed8638b68428dbf2d998b7338709f18bd479143 100644 (file)
@@ -1,12 +1,15 @@
 #include <haproxy/quic_pacing.h>
 
+#include <haproxy/qmux_trace.h>
 #include <haproxy/quic_tx.h>
 
 struct quic_conn;
 
 int quic_pacing_expired(const struct quic_pacer *pacer)
 {
-       return !pacer->next || pacer->next <= now_mono_time();
+       //return !pacer->next || pacer->next <= now_mono_time();
+       //return !pacer->next || pacer->next <= now_ms;
+       return tick_is_expired(pacer->next, now_ms);
 }
 
 enum quic_tx_err quic_pacing_send(struct quic_pacer *pacer, struct quic_conn *qc)
@@ -18,12 +21,68 @@ enum quic_tx_err quic_pacing_send(struct quic_pacer *pacer, struct quic_conn *qc
 
        BUG_ON(LIST_ISEMPTY(&pacer->frms));
        ret = qc_send_mux(qc, &pacer->frms, pacer);
+       BUG_ON(ret == QUIC_TX_ERR_AGAIN && tick_is_expired(pacer->next, now_ms));
 
        /* TODO handle QUIC_TX_ERR_FATAL */
        return ret;
 }
 
-void quic_pacing_sent_done(struct quic_pacer *pacer, int sent)
+int quic_pacing_prepare(struct quic_pacer *pacer)
 {
-       pacer->next = now_mono_time() + quic_pacing_ns_pkt(pacer) * sent;
+       if (pacer->curr == now_ms) {
+               BUG_ON(pacer->sent > pacer->pkt_ms);
+               return pacer->pkt_ms - pacer->sent;
+       }
+       else {
+               int not_consumed = pacer->pkt_ms - pacer->sent;
+               BUG_ON(not_consumed < 0);
+               //if (not_consumed)
+               //      fprintf(stderr, "not consumed %d (%d - %d)\n", not_consumed, pacer->pkt_ms, pacer->sent);
+
+               pacer->curr = now_ms;
+               pacer->sent = 0;
+               pacer->pkt_ms = quic_pacing_ns_pkt(pacer, 0);
+               //pacer->pkt_ms = quic_pacing_ns_pkt(pacer, 0) + not_consumed;
+
+               BUG_ON(!pacer->pkt_ms);
+               return pacer->pkt_ms;
+       }
+
+}
+
+int quic_pacing_sent_done(struct quic_pacer *pacer, int sent, enum quic_tx_err err)
+{
+       //const int pkt_ms = quic_pacing_ns_pkt(pacer, 1);
+
+#if 0
+       if (pacer->curr == now_ms) {
+               pacer->sent += sent;
+       }
+       else {
+               int not_consumed = pkt_ms - pacer->sent;
+               if (not_consumed < 0)
+                       not_consumed = 0;       
+               if (not_consumed)
+                       fprintf(stderr, "not consumed %d (%d - %d)\n", not_consumed, pkt_ms, pacer->sent);
+
+               //pacer->sent = 0;
+               //pacer->sent -= not_consumed;
+
+               pacer->curr = now_ms;
+               pacer->sent = sent;
+       }
+#endif
+       BUG_ON(pacer->curr != now_ms);
+       pacer->sent += sent;
+
+       if (pacer->sent >= pacer->pkt_ms) {
+               //pacer->next = tick_add(now_ms, 1);
+               pacer->next = tick_add(now_ms, MAX((pacer->sent / pacer->pkt_ms), 1));
+               BUG_ON(tick_is_expired(pacer->next, now_ms));
+               //fprintf(stderr, "pacing in %dms (%d / %d)\n", pacer->sent / pkt_ms, pacer->sent, pkt_ms);
+               return 1;
+       }
+       else {
+               return 0;
+       }
 }
index fa3511475506a4a00e8b96eb72aff8f46f65281a..e1a4c38bc7dbc42efa85960135fa9b2c52460a81 100644 (file)
@@ -495,8 +495,27 @@ enum quic_tx_err qc_send_mux(struct quic_conn *qc, struct list *frms,
        }
 
        if (pacer) {
-               const ullong ns_pkts = quic_pacing_ns_pkt(pacer);
-               max_dgram = global.tune.quic_frontend_max_tx_burst * 1000000 / (ns_pkts + 1) + 1;
+#if 0
+               const int ns_pkts = quic_pacing_ns_pkt(pacer, 1);
+               //max_dgram = global.tune.quic_frontend_max_tx_burst * 1000000 / (ns_pkts + 1) + 1;
+               //max_dgram = global.tune.quic_frontend_max_tx_burst / (ns_pkts + 1) + 1;
+               max_dgram = ns_pkts;
+               if (global.tune.quic_frontend_max_tx_burst)
+                       max_dgram *= global.tune.quic_frontend_max_tx_burst;
+               //fprintf(stderr, "max_dgram=%d\n", max_dgram);
+               //fprintf(stderr, "max_dgram = %d (%lu/%d), sent = %d\n", max_dgram, qc->path->cwnd, qc->path->loss.srtt, pacer->curr == now_ms ? pacer->sent : 0);
+               if (now_ms == pacer->curr) {
+                       if (max_dgram <= pacer->sent) {
+                               BUG_ON(tick_is_expired(pacer->next, now_ms));
+                               return QUIC_TX_ERR_AGAIN;
+                       }
+                       max_dgram -= pacer->sent;
+               }
+#endif
+               max_dgram = quic_pacing_prepare(pacer);
+               BUG_ON(!max_dgram);
+               if (!max_dgram)
+                       return QUIC_TX_ERR_AGAIN;
        }
 
        TRACE_STATE("preparing data (from MUX)", QUIC_EV_CONN_TXPKT, qc);
@@ -508,7 +527,9 @@ enum quic_tx_err qc_send_mux(struct quic_conn *qc, struct list *frms,
        else if (pacer) {
                if (max_dgram && max_dgram == sent && !LIST_ISEMPTY(frms))
                        ret = QUIC_TX_ERR_AGAIN;
-               quic_pacing_sent_done(pacer, sent);
+               quic_pacing_sent_done(pacer, sent, ret);
+               //if (quic_pacing_sent_done(pacer, sent, ret))
+               //      ret = QUIC_TX_ERR_AGAIN;
        }
 
        TRACE_LEAVE(QUIC_EV_CONN_TXPKT, qc);
@@ -609,13 +630,18 @@ static int qc_prep_pkts(struct quic_conn *qc, struct buffer *buf,
 
                        TRACE_PROTO("TX prep pkts", QUIC_EV_CONN_PHPKTS, qc, qel);
 
-                       /* Start to decrement <max_dgrams> after the first packet built. */
-                       if (!dglen && pos != (unsigned char *)b_head(buf)) {
-                               if (max_dgrams && !--max_dgrams) {
-                                       BUG_ON(LIST_ISEMPTY(frms));
-                                       TRACE_PROTO("reached max allowed built datagrams", QUIC_EV_CONN_PHPKTS, qc, qel);
-                                       goto out;
-                               }
+                       ///* Start to decrement <max_dgrams> after the first packet built. */
+                       //if (!dglen && pos != (unsigned char *)b_head(buf)) {
+                       //      if (max_dgrams && !--max_dgrams) {
+                       //              BUG_ON(LIST_ISEMPTY(frms));
+                       //              TRACE_PROTO("reached max allowed built datagrams", QUIC_EV_CONN_PHPKTS, qc, qel);
+                       //              goto out;
+                       //      }
+                       //}
+                       if (max_dgrams && dgram_cnt == max_dgrams) {
+                               BUG_ON(LIST_ISEMPTY(frms));
+                               TRACE_PROTO("reached max allowed built datagrams", QUIC_EV_CONN_PHPKTS, qc, qel);
+                               goto out;
                        }
 
                        if (!first_pkt)
@@ -778,7 +804,7 @@ static int qc_prep_pkts(struct quic_conn *qc, struct buffer *buf,
  out:
        if (first_pkt) {
                qc_txb_store(buf, wrlen, first_pkt);
-               ++dgram_cnt;
+               //++dgram_cnt;
        }
 
        if (cc && total) {
@@ -849,7 +875,7 @@ int qc_send(struct quic_conn *qc, int old_data, struct list *send_list,
                BUG_ON_HOT(b_data(buf));
                b_reset(buf);
 
-               prep_pkts = qc_prep_pkts(qc, buf, send_list, max_dgrams);
+               prep_pkts = qc_prep_pkts(qc, buf, send_list, max_dgrams ? max_dgrams - ret : 0);
 
                if (b_data(buf) && !qc_send_ppkts(buf, qc->xprt_ctx)) {
                        ret = -1;