struct quic_pacer {
struct list frms;
const struct quic_cc_path *path;
- ullong next;
+
+ unsigned int curr;
+ unsigned int next;
+ int sent;
};
#endif /* _HAPROXY_QUIC_PACING_T_H */
{
LIST_INIT(&pacer->frms);
pacer->path = path;
- pacer->next = 0;
+
+ pacer->curr = now_ms;
+ pacer->next = now_ms;
+ pacer->sent = 0;
}
static inline void quic_pacing_reset(struct quic_pacer *pacer)
return &pacer->frms;
}
-static inline ullong quic_pacing_ns_pkt(const struct quic_pacer *pacer)
+static inline int quic_pacing_pkt_ms(const struct quic_pacer *pacer)
{
- return pacer->path->loss.srtt * 1000000 / (pacer->path->cwnd / pacer->path->mtu + 1);
+ return (pacer->path->cwnd / (pacer->path->mtu + 1)) /
+ (pacer->path->loss.srtt + 1) + 1;
}
int quic_pacing_expired(const struct quic_pacer *pacer);
/* Return true if the mux timeout should be armed. */
static inline int qcc_may_expire(struct qcc *qcc)
{
- return !qcc->nb_sc;
+ //return !qcc->nb_sc;
+ return 1;
}
/* Refresh the timeout on <qcc> if needed depending on its state. */
static void qcc_refresh_timeout(struct qcc *qcc)
{
+#if 0
const struct proxy *px = qcc->proxy;
TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
leave:
TRACE_LEAVE(QMUX_EV_QCS_NEW, qcc->conn);
+#endif
}
void qcc_wakeup(struct qcc *qcc)
{
HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
tasklet_wakeup(qcc->wait_event.tasklet);
+
+ qcc->task->expire = TICK_ETERNITY;
+ task_queue(qcc->task);
}
static void qcc_wakeup_pacing(struct qcc *qcc)
{
- HA_ATOMIC_OR(&qcc->wait_event.tasklet->state, TASK_F_USR1);
- tasklet_wakeup(qcc->wait_event.tasklet);
+ //HA_ATOMIC_OR(&qcc->wait_event.tasklet->state, TASK_F_USR1);
+ //tasklet_wakeup(qcc->wait_event.tasklet);
+ qcc->task->expire = qcc->tx.pacer.next;
+ BUG_ON(tick_is_expired(qcc->task->expire, now_ms));
+ task_queue(qcc->task);
}
/* Mark a stream as open if it was idle. This can be used on every
TRACE_LEAVE(QMUX_EV_QCC_END);
}
-static void qcc_purge_sending(struct qcc *qcc)
+static int qcc_purge_sending(struct qcc *qcc)
{
struct quic_conn *qc = qcc->conn->handle.qc;
struct quic_pacer *pacer = &qcc->tx.pacer;
ret = quic_pacing_send(pacer, qc);
if (ret == QUIC_TX_ERR_AGAIN) {
BUG_ON(LIST_ISEMPTY(quic_pacing_frms(pacer)));
- qcc_wakeup_pacing(qcc);
+ return 1;
}
- else if (ret == QUIC_TX_ERR_FATAL) {
+
+ if (ret == QUIC_TX_ERR_FATAL) {
TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn);
HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
qcc_subscribe_send(qcc);
if (!LIST_ISEMPTY(quic_pacing_frms(pacer)))
qcc_subscribe_send(qcc);
}
+
+ return 0;
}
struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status)
}
if (!qcc_may_expire(qcc)) {
+ ABORT_NOW();
TRACE_DEVEL("cannot expired", QMUX_EV_QCC_WAKE, qcc->conn);
t->expire = TICK_ETERNITY;
goto requeue;
}
}
+#if 0
task_destroy(t);
if (!qcc) {
qcc_shutdown(qcc);
qcc_release(qcc);
}
+#endif
+
+ if (qcc_purge_sending(qcc)) {
+ t->expire = qcc->tx.pacer.next;
+ goto requeue;
+ }
+ else {
+ t->expire = TICK_ETERNITY;
+ goto requeue;
+ }
out:
TRACE_LEAVE(QMUX_EV_QCC_WAKE);
return NULL;
requeue:
+ BUG_ON(tick_is_expired(t->expire, now_ms));
TRACE_LEAVE(QMUX_EV_QCC_WAKE);
return t;
}
}
qcc->task->process = qcc_timeout_task;
qcc->task->context = qcc;
- qcc->task->expire = tick_add_ifset(now_ms, qcc->timeout);
+ //qcc->task->expire = tick_add_ifset(now_ms, qcc->timeout);
+ qcc->task->expire = TICK_ETERNITY;
qcc_reset_idle_start(qcc);
LIST_INIT(&qcc->opening_list);
int quic_pacing_expired(const struct quic_pacer *pacer)
{
- return !pacer->next || pacer->next <= now_mono_time();
+ return tick_is_expired(pacer->next, now_ms);
}
enum quic_tx_err quic_pacing_send(struct quic_pacer *pacer, struct quic_conn *qc)
void quic_pacing_sent_done(struct quic_pacer *pacer, int sent)
{
- pacer->next = now_mono_time() + quic_pacing_ns_pkt(pacer) * sent;
+ const int pkt_ms = quic_pacing_pkt_ms(pacer);
+
+ if (pacer->curr == now_ms) {
+ pacer->sent += sent;
+ }
+ else {
+ pacer->curr = now_ms;
+ pacer->sent = sent;
+ }
+
+ if (pacer->sent >= pkt_ms) {
+ pacer->next = now_ms + (pacer->sent / pkt_ms);
+ fprintf(stderr, "pacing in %dms (%d / %d)\n", pacer->sent / pkt_ms, pacer->sent, pkt_ms);
+ }
}
}
if (pacer) {
- const ullong ns_pkts = quic_pacing_ns_pkt(pacer);
- max_dgram = global.tune.quic_frontend_max_tx_burst * 1000000 / (ns_pkts + 1) + 1;
+ //const ullong ns_pkts = quic_pacing_ns_pkt(pacer);
+ //max_dgram = global.tune.quic_frontend_max_tx_burst * 1000000 / (ns_pkts + 1) + 1;
+ const int pkt_ms = quic_pacing_pkt_ms(pacer);
+ max_dgram = pkt_ms;
+ if (global.tune.quic_frontend_max_tx_burst)
+ max_dgram *= global.tune.quic_frontend_max_tx_burst;
+ fprintf(stderr, "max_dgram = %d (%lu/%d)\n", max_dgram, qc->path->cwnd, qc->path->loss.srtt);
}
TRACE_STATE("preparing data (from MUX)", QUIC_EV_CONN_TXPKT, qc);
qel_register_send(&send_list, qc->ael, frms);
sent = qc_send(qc, 0, &send_list, max_dgram);
+ BUG_ON(max_dgram && sent > max_dgram);
+
if (sent <= 0) {
ret = QUIC_TX_ERR_FATAL;
}
static int qc_prep_pkts(struct quic_conn *qc, struct buffer *buf,
struct list *qels, int max_dgrams)
{
+ //int max_dgrams_copy = max_dgrams;
int ret, cc, padding;
struct quic_tx_packet *first_pkt, *prv_pkt;
unsigned char *end, *pos;
TRACE_PROTO("TX prep pkts", QUIC_EV_CONN_PHPKTS, qc, qel);
- /* Start to decrement <max_dgrams> after the first packet built. */
- if (!dglen && pos != (unsigned char *)b_head(buf)) {
- if (max_dgrams && !--max_dgrams) {
- BUG_ON(LIST_ISEMPTY(frms));
- TRACE_PROTO("reached max allowed built datagrams", QUIC_EV_CONN_PHPKTS, qc, qel);
- goto out;
- }
+ TRACE_PRINTF(TRACE_LEVEL_ERROR, QUIC_EV_CONN_PHPKTS, qc, 0, 0, 0, "%d/%d", dgram_cnt, max_dgrams);
+ if (max_dgrams && dgram_cnt == max_dgrams) {
+ BUG_ON(LIST_ISEMPTY(frms));
+ TRACE_PROTO("reached max allowed built datagrams", QUIC_EV_CONN_PHPKTS, qc, qel);
+ goto out;
}
if (!first_pkt)
wrlen >= QUIC_INITIAL_PACKET_MINLEN)) {
qc_txb_store(buf, wrlen, first_pkt);
++dgram_cnt;
+ BUG_ON(max_dgrams && dgram_cnt > max_dgrams);
}
TRACE_PROTO("could not prepare anymore packet", QUIC_EV_CONN_PHPKTS, qc, qel);
break;
dglen = 0;
++dgram_cnt;
+ BUG_ON(max_dgrams && dgram_cnt > max_dgrams);
/* man 7 udp UDP_SEGMENT
* The segment size must be chosen such that at
padding = 0;
prv_pkt = NULL;
++dgram_cnt;
+ BUG_ON(max_dgrams && dgram_cnt > max_dgrams);
gso_dgram_cnt = 0;
}
out:
if (first_pkt) {
qc_txb_store(buf, wrlen, first_pkt);
- ++dgram_cnt;
+ //++dgram_cnt;
}
if (cc && total) {
qc->tx.cc_dgram_len = dglen;
}
+ BUG_ON(max_dgrams && dgram_cnt > max_dgrams);
ret = dgram_cnt;
leave:
+ TRACE_PRINTF(TRACE_LEVEL_DEVELOPER, QUIC_EV_CONN_PHPKTS, qc, 0, 0, 0, "ret=%d", ret);
TRACE_LEAVE(QUIC_EV_CONN_PHPKTS, qc);
return ret;
}
BUG_ON_HOT(b_data(buf));
b_reset(buf);
- prep_pkts = qc_prep_pkts(qc, buf, send_list, max_dgrams);
+ prep_pkts = qc_prep_pkts(qc, buf, send_list, max_dgrams ? max_dgrams - ret : 0);
if (b_data(buf) && !qc_send_ppkts(buf, qc->xprt_ctx)) {
ret = -1;
}
ret += prep_pkts;
+ BUG_ON(max_dgrams && ret > max_dgrams);
if (max_dgrams && ret == max_dgrams && !LIST_ISEMPTY(send_list)) {
TRACE_DEVEL("stopping for artificial pacing", QUIC_EV_CONN_TXPKT, qc);
break;