From: Amaury Denoyelle Date: Wed, 30 Oct 2024 13:11:54 +0000 (+0000) Subject: TMP X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=bbaf5bccdf0caeee29e32d93ae5fd7960e7becc2;p=thirdparty%2Fhaproxy.git TMP --- diff --git a/include/haproxy/quic_pacing-t.h b/include/haproxy/quic_pacing-t.h index 25e6eecdba..6039acc49e 100644 --- a/include/haproxy/quic_pacing-t.h +++ b/include/haproxy/quic_pacing-t.h @@ -7,7 +7,10 @@ struct quic_pacer { struct list frms; const struct quic_cc_path *path; - ullong next; + int next; + unsigned int curr; + int pkt_ms; + int sent; }; #endif /* _HAPROXY_QUIC_PACING_T_H */ diff --git a/include/haproxy/quic_pacing.h b/include/haproxy/quic_pacing.h index ee536fb2e5..ee137ae8f1 100644 --- a/include/haproxy/quic_pacing.h +++ b/include/haproxy/quic_pacing.h @@ -5,13 +5,20 @@ #include #include +#include static inline void quic_pacing_init(struct quic_pacer *pacer, const struct quic_cc_path *path) { LIST_INIT(&pacer->frms); pacer->path = path; - pacer->next = 0; + //pacer->next = TICK_ETERNITY; + pacer->next = now_ms; + + //pacer->curr = now_ms; + pacer->curr = TICK_ETERNITY; + pacer->pkt_ms = 0; + pacer->sent = 0; } static inline void quic_pacing_reset(struct quic_pacer *pacer) @@ -30,15 +37,23 @@ static inline struct list *quic_pacing_frms(struct quic_pacer *pacer) return &pacer->frms; } -static inline ullong quic_pacing_ns_pkt(const struct quic_pacer *pacer) +static inline int quic_pacing_ns_pkt(const struct quic_pacer *pacer, int sent) { - return pacer->path->loss.srtt * 1000000 / (pacer->path->cwnd / pacer->path->mtu + 1); + //return pacer->path->loss.srtt * 1000000 / (pacer->path->cwnd / pacer->path->mtu + 1); + //ullong val = pacer->path->loss.srtt / (pacer->path->cwnd / (pacer->path->mtu * sent) + 1); + //fprintf(stderr, "val=%llu %d/(%lu/(%zu * %d) + 1\n", + // val, pacer->path->loss.srtt, pacer->path->cwnd, pacer->path->mtu, sent); + //return pacer->path->loss.srtt / (pacer->path->cwnd / (pacer->path->mtu * sent) + 1); + return (pacer->path->cwnd / (pacer->path->mtu + 1)) / (pacer->path->loss.srtt + 1) + 1; } int quic_pacing_expired(const struct quic_pacer *pacer); enum quic_tx_err quic_pacing_send(struct quic_pacer *pacer, struct quic_conn *qc); -void quic_pacing_sent_done(struct quic_pacer *pacer, int sent); +int quic_pacing_prepare(struct quic_pacer *pacer); + +//void quic_pacing_sent_done(struct quic_pacer *pacer, int sent); +int quic_pacing_sent_done(struct quic_pacer *pacer, int sent, enum quic_tx_err err); #endif /* _HAPROXY_QUIC_PACING_H */ diff --git a/src/mux_quic.c b/src/mux_quic.c index adac5125ab..95ebf48805 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -273,131 +273,26 @@ static inline int qcc_may_expire(struct qcc *qcc) /* Refresh the timeout on if needed depending on its state. */ static void qcc_refresh_timeout(struct qcc *qcc) { - const struct proxy *px = qcc->proxy; - - TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn); - - if (!qcc->task) { - TRACE_DEVEL("already expired", QMUX_EV_QCC_WAKE, qcc->conn); - goto leave; - } - - /* Check if upper layer is responsible of timeout management. */ - if (!qcc_may_expire(qcc)) { - TRACE_DEVEL("not eligible for timeout", QMUX_EV_QCC_WAKE, qcc->conn); - qcc->task->expire = TICK_ETERNITY; - task_queue(qcc->task); - goto leave; - } - - /* Frontend timeout management - * - shutdown done -> timeout client-fin - * - detached streams with data left to send -> default timeout - * - stream waiting on incomplete request or no stream yet activated -> timeout http-request - * - idle after stream processing -> timeout http-keep-alive - * - * If proxy stop-stop in progress, immediate or spread close will be - * processed if shutdown already one or connection is idle. - */ - if (!conn_is_back(qcc->conn)) { - if (qcc->nb_hreq && !(qcc->flags & QC_CF_APP_SHUT)) { - TRACE_DEVEL("one or more requests still in progress", QMUX_EV_QCC_WAKE, qcc->conn); - qcc->task->expire = tick_add_ifset(now_ms, qcc->timeout); - task_queue(qcc->task); - goto leave; - } - - if ((!LIST_ISEMPTY(&qcc->opening_list) || unlikely(!qcc->largest_bidi_r)) && - !(qcc->flags & QC_CF_APP_SHUT)) { - int timeout = px->timeout.httpreq; - struct qcs *qcs = NULL; - int base_time; - - /* Use start time of first stream waiting on HTTP or - * qcc idle if no stream not yet used. - */ - if (likely(!LIST_ISEMPTY(&qcc->opening_list))) - qcs = LIST_ELEM(qcc->opening_list.n, struct qcs *, el_opening); - base_time = qcs ? qcs->start : qcc->idle_start; - - TRACE_DEVEL("waiting on http request", QMUX_EV_QCC_WAKE, qcc->conn, qcs); - qcc->task->expire = tick_add_ifset(base_time, timeout); - } - else { - if (qcc->flags & QC_CF_APP_SHUT) { - TRACE_DEVEL("connection in closing", QMUX_EV_QCC_WAKE, qcc->conn); - qcc->task->expire = tick_add_ifset(now_ms, - qcc->shut_timeout); - } - else { - /* Use http-request timeout if keep-alive timeout not set */ - int timeout = tick_isset(px->timeout.httpka) ? - px->timeout.httpka : px->timeout.httpreq; - TRACE_DEVEL("at least one request achieved but none currently in progress", QMUX_EV_QCC_WAKE, qcc->conn); - qcc->task->expire = tick_add_ifset(qcc->idle_start, timeout); - } - - /* If proxy soft-stop in progress and connection is - * inactive, close the connection immediately. If a - * close-spread-time is configured, randomly spread the - * timer over a closing window. - */ - if ((qcc->proxy->flags & (PR_FL_DISABLED|PR_FL_STOPPED)) && - !(global.tune.options & GTUNE_DISABLE_ACTIVE_CLOSE)) { - - /* Wake timeout task immediately if window already expired. */ - int remaining_window = tick_isset(global.close_spread_end) ? - tick_remain(now_ms, global.close_spread_end) : 0; - - TRACE_DEVEL("proxy disabled, prepare connection soft-stop", QMUX_EV_QCC_WAKE, qcc->conn); - if (remaining_window) { - /* We don't need to reset the expire if it would - * already happen before the close window end. - */ - if (!tick_isset(qcc->task->expire) || - tick_is_le(global.close_spread_end, qcc->task->expire)) { - /* Set an expire value shorter than the current value - * because the close spread window end comes earlier. - */ - qcc->task->expire = tick_add(now_ms, - statistical_prng_range(remaining_window)); - } - } - else { - /* We are past the soft close window end, wake the timeout - * task up immediately. - */ - qcc->task->expire = now_ms; - task_wakeup(qcc->task, TASK_WOKEN_TIMER); - } - } - } - } - - /* fallback to default timeout if frontend specific undefined or for - * backend connections. - */ - if (!tick_isset(qcc->task->expire)) { - TRACE_DEVEL("fallback to default timeout", QMUX_EV_QCC_WAKE, qcc->conn); - qcc->task->expire = tick_add_ifset(now_ms, qcc->timeout); - } - - task_queue(qcc->task); - - leave: - TRACE_LEAVE(QMUX_EV_QCS_NEW, qcc->conn); } void qcc_wakeup(struct qcc *qcc) { HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1); tasklet_wakeup(qcc->wait_event.tasklet); + + //qcc->task->expire = TICK_ETERNITY; + //task_queue(qcc->task); + //TRACE_POINT(QMUX_EV_STRM_WAKE, qcc->conn); } static void qcc_wakeup_pacing(struct qcc *qcc) { HA_ATOMIC_OR(&qcc->wait_event.tasklet->state, TASK_F_USR1); tasklet_wakeup(qcc->wait_event.tasklet); + //qcc->task->expire = qcc->tx.pacer.next; + //BUG_ON(tick_is_expired(qcc->task->expire, now_ms)); + //task_queue(qcc->task); + //TRACE_POINT(QMUX_EV_STRM_WAKE, qcc->conn); } /* Mark a stream as open if it was idle. This can be used on every @@ -674,7 +569,7 @@ void qcc_notify_buf(struct qcc *qcc, uint64_t free_size) { struct qcs *qcs; - TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn); + //TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn); /* Cannot have a negative buf_in_flight counter */ BUG_ON(qcc->tx.buf_in_flight < free_size); @@ -700,7 +595,7 @@ void qcc_notify_buf(struct qcc *qcc, uint64_t free_size) qcs_notify_send(qcs); } - TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn); + //TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn); } /* A fatal error is detected locally for connection. It should be closed @@ -2774,7 +2669,7 @@ static void qcc_release(struct qcc *qcc) TRACE_LEAVE(QMUX_EV_QCC_END); } -static void qcc_purge_sending(struct qcc *qcc) +static int qcc_purge_sending(struct qcc *qcc) { struct quic_conn *qc = qcc->conn->handle.qc; struct quic_pacer *pacer = &qcc->tx.pacer; @@ -2783,16 +2678,22 @@ static void qcc_purge_sending(struct qcc *qcc) ret = quic_pacing_send(pacer, qc); if (ret == QUIC_TX_ERR_AGAIN) { BUG_ON(LIST_ISEMPTY(quic_pacing_frms(pacer))); + TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn); qcc_wakeup_pacing(qcc); + return 1; } else if (ret == QUIC_TX_ERR_FATAL) { TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn); + TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn); HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1); qcc_subscribe_send(qcc); + return 0; } else { + TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn); if (!LIST_ISEMPTY(quic_pacing_frms(pacer))) qcc_subscribe_send(qcc); + return 0; } } @@ -2803,6 +2704,7 @@ struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status) TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn); if (status & TASK_F_USR1) { + //ABORT_NOW(); qcc_purge_sending(qcc); return NULL; } @@ -2842,14 +2744,27 @@ static struct task *qcc_timeout_task(struct task *t, void *ctx, unsigned int sta TRACE_DEVEL("not expired", QMUX_EV_QCC_WAKE, qcc->conn); goto requeue; } + //fprintf(stderr, "woken up after %dms\n", now_ms - qcc->tx.pacer.next); +#if 0 if (!qcc_may_expire(qcc)) { TRACE_DEVEL("cannot expired", QMUX_EV_QCC_WAKE, qcc->conn); t->expire = TICK_ETERNITY; goto requeue; } +#endif + } + + if (qcc_purge_sending(qcc)) { + qcc->task->expire = qcc->tx.pacer.next; + BUG_ON(tick_is_expired(qcc->task->expire, now_ms)); + TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn); + goto requeue; } + t->expire = TICK_ETERNITY; + goto requeue; +#if 0 task_destroy(t); if (!qcc) { @@ -2870,6 +2785,7 @@ static struct task *qcc_timeout_task(struct task *t, void *ctx, unsigned int sta qcc_shutdown(qcc); qcc_release(qcc); } +#endif out: TRACE_LEAVE(QMUX_EV_QCC_WAKE); @@ -2984,7 +2900,8 @@ static int qmux_init(struct connection *conn, struct proxy *prx, } qcc->task->process = qcc_timeout_task; qcc->task->context = qcc; - qcc->task->expire = tick_add_ifset(now_ms, qcc->timeout); + //qcc->task->expire = tick_add_ifset(now_ms, qcc->timeout); + qcc->task->expire = TICK_ETERNITY; qcc_reset_idle_start(qcc); LIST_INIT(&qcc->opening_list); diff --git a/src/qmux_trace.c b/src/qmux_trace.c index f75e702ccb..68c89b4d72 100644 --- a/src/qmux_trace.c +++ b/src/qmux_trace.c @@ -139,9 +139,9 @@ void qmux_dump_qcc_info(struct buffer *msg, const struct qcc *qcc) chunk_appendf(msg, " qc=%p", qcc->conn->handle.qc); chunk_appendf(msg, " .sc=%llu .hreq=%llu .flg=0x%04x", (ullong)qcc->nb_sc, (ullong)qcc->nb_hreq, qcc->flags); - chunk_appendf(msg, " .tx=%llu %llu/%llu bwnd=%llu/%llu", + chunk_appendf(msg, " .tx=%llu %llu/%llu bwnd=%llu/%llu exp=%llu", (ullong)qcc->tx.fc.off_soft, (ullong)qcc->tx.fc.off_real, (ullong)qcc->tx.fc.limit, - (ullong)qcc->tx.buf_in_flight, (ullong)qc->path->cwnd); + (ullong)qcc->tx.buf_in_flight, (ullong)qc->path->cwnd, qcc->task && tick_isset(qcc->task->expire) ? (ullong)tick_remain(now_ms, qcc->task->expire) : 0); } void qmux_dump_qcs_info(struct buffer *msg, const struct qcs *qcs) diff --git a/src/quic_pacing.c b/src/quic_pacing.c index 8a1e33f23c..8ed8638b68 100644 --- a/src/quic_pacing.c +++ b/src/quic_pacing.c @@ -1,12 +1,15 @@ #include +#include #include struct quic_conn; int quic_pacing_expired(const struct quic_pacer *pacer) { - return !pacer->next || pacer->next <= now_mono_time(); + //return !pacer->next || pacer->next <= now_mono_time(); + //return !pacer->next || pacer->next <= now_ms; + return tick_is_expired(pacer->next, now_ms); } enum quic_tx_err quic_pacing_send(struct quic_pacer *pacer, struct quic_conn *qc) @@ -18,12 +21,68 @@ enum quic_tx_err quic_pacing_send(struct quic_pacer *pacer, struct quic_conn *qc BUG_ON(LIST_ISEMPTY(&pacer->frms)); ret = qc_send_mux(qc, &pacer->frms, pacer); + BUG_ON(ret == QUIC_TX_ERR_AGAIN && tick_is_expired(pacer->next, now_ms)); /* TODO handle QUIC_TX_ERR_FATAL */ return ret; } -void quic_pacing_sent_done(struct quic_pacer *pacer, int sent) +int quic_pacing_prepare(struct quic_pacer *pacer) { - pacer->next = now_mono_time() + quic_pacing_ns_pkt(pacer) * sent; + if (pacer->curr == now_ms) { + BUG_ON(pacer->sent > pacer->pkt_ms); + return pacer->pkt_ms - pacer->sent; + } + else { + int not_consumed = pacer->pkt_ms - pacer->sent; + BUG_ON(not_consumed < 0); + //if (not_consumed) + // fprintf(stderr, "not consumed %d (%d - %d)\n", not_consumed, pacer->pkt_ms, pacer->sent); + + pacer->curr = now_ms; + pacer->sent = 0; + pacer->pkt_ms = quic_pacing_ns_pkt(pacer, 0); + //pacer->pkt_ms = quic_pacing_ns_pkt(pacer, 0) + not_consumed; + + BUG_ON(!pacer->pkt_ms); + return pacer->pkt_ms; + } + +} + +int quic_pacing_sent_done(struct quic_pacer *pacer, int sent, enum quic_tx_err err) +{ + //const int pkt_ms = quic_pacing_ns_pkt(pacer, 1); + +#if 0 + if (pacer->curr == now_ms) { + pacer->sent += sent; + } + else { + int not_consumed = pkt_ms - pacer->sent; + if (not_consumed < 0) + not_consumed = 0; + if (not_consumed) + fprintf(stderr, "not consumed %d (%d - %d)\n", not_consumed, pkt_ms, pacer->sent); + + //pacer->sent = 0; + //pacer->sent -= not_consumed; + + pacer->curr = now_ms; + pacer->sent = sent; + } +#endif + BUG_ON(pacer->curr != now_ms); + pacer->sent += sent; + + if (pacer->sent >= pacer->pkt_ms) { + //pacer->next = tick_add(now_ms, 1); + pacer->next = tick_add(now_ms, MAX((pacer->sent / pacer->pkt_ms), 1)); + BUG_ON(tick_is_expired(pacer->next, now_ms)); + //fprintf(stderr, "pacing in %dms (%d / %d)\n", pacer->sent / pkt_ms, pacer->sent, pkt_ms); + return 1; + } + else { + return 0; + } } diff --git a/src/quic_tx.c b/src/quic_tx.c index fa35114755..e1a4c38bc7 100644 --- a/src/quic_tx.c +++ b/src/quic_tx.c @@ -495,8 +495,27 @@ enum quic_tx_err qc_send_mux(struct quic_conn *qc, struct list *frms, } if (pacer) { - const ullong ns_pkts = quic_pacing_ns_pkt(pacer); - max_dgram = global.tune.quic_frontend_max_tx_burst * 1000000 / (ns_pkts + 1) + 1; +#if 0 + const int ns_pkts = quic_pacing_ns_pkt(pacer, 1); + //max_dgram = global.tune.quic_frontend_max_tx_burst * 1000000 / (ns_pkts + 1) + 1; + //max_dgram = global.tune.quic_frontend_max_tx_burst / (ns_pkts + 1) + 1; + max_dgram = ns_pkts; + if (global.tune.quic_frontend_max_tx_burst) + max_dgram *= global.tune.quic_frontend_max_tx_burst; + //fprintf(stderr, "max_dgram=%d\n", max_dgram); + //fprintf(stderr, "max_dgram = %d (%lu/%d), sent = %d\n", max_dgram, qc->path->cwnd, qc->path->loss.srtt, pacer->curr == now_ms ? pacer->sent : 0); + if (now_ms == pacer->curr) { + if (max_dgram <= pacer->sent) { + BUG_ON(tick_is_expired(pacer->next, now_ms)); + return QUIC_TX_ERR_AGAIN; + } + max_dgram -= pacer->sent; + } +#endif + max_dgram = quic_pacing_prepare(pacer); + BUG_ON(!max_dgram); + if (!max_dgram) + return QUIC_TX_ERR_AGAIN; } TRACE_STATE("preparing data (from MUX)", QUIC_EV_CONN_TXPKT, qc); @@ -508,7 +527,9 @@ enum quic_tx_err qc_send_mux(struct quic_conn *qc, struct list *frms, else if (pacer) { if (max_dgram && max_dgram == sent && !LIST_ISEMPTY(frms)) ret = QUIC_TX_ERR_AGAIN; - quic_pacing_sent_done(pacer, sent); + quic_pacing_sent_done(pacer, sent, ret); + //if (quic_pacing_sent_done(pacer, sent, ret)) + // ret = QUIC_TX_ERR_AGAIN; } TRACE_LEAVE(QUIC_EV_CONN_TXPKT, qc); @@ -609,13 +630,18 @@ static int qc_prep_pkts(struct quic_conn *qc, struct buffer *buf, TRACE_PROTO("TX prep pkts", QUIC_EV_CONN_PHPKTS, qc, qel); - /* Start to decrement after the first packet built. */ - if (!dglen && pos != (unsigned char *)b_head(buf)) { - if (max_dgrams && !--max_dgrams) { - BUG_ON(LIST_ISEMPTY(frms)); - TRACE_PROTO("reached max allowed built datagrams", QUIC_EV_CONN_PHPKTS, qc, qel); - goto out; - } + ///* Start to decrement after the first packet built. */ + //if (!dglen && pos != (unsigned char *)b_head(buf)) { + // if (max_dgrams && !--max_dgrams) { + // BUG_ON(LIST_ISEMPTY(frms)); + // TRACE_PROTO("reached max allowed built datagrams", QUIC_EV_CONN_PHPKTS, qc, qel); + // goto out; + // } + //} + if (max_dgrams && dgram_cnt == max_dgrams) { + BUG_ON(LIST_ISEMPTY(frms)); + TRACE_PROTO("reached max allowed built datagrams", QUIC_EV_CONN_PHPKTS, qc, qel); + goto out; } if (!first_pkt) @@ -778,7 +804,7 @@ static int qc_prep_pkts(struct quic_conn *qc, struct buffer *buf, out: if (first_pkt) { qc_txb_store(buf, wrlen, first_pkt); - ++dgram_cnt; + //++dgram_cnt; } if (cc && total) { @@ -849,7 +875,7 @@ int qc_send(struct quic_conn *qc, int old_data, struct list *send_list, BUG_ON_HOT(b_data(buf)); b_reset(buf); - prep_pkts = qc_prep_pkts(qc, buf, send_list, max_dgrams); + prep_pkts = qc_prep_pkts(qc, buf, send_list, max_dgrams ? max_dgrams - ret : 0); if (b_data(buf) && !qc_send_ppkts(buf, qc->xprt_ctx)) { ret = -1;