struct quic_pacer {
struct list frms;
const struct quic_cc_path *path;
+ //int next;
+ //unsigned int curr;
+ //int pkt_ms;
+ //int sent;
+ int burst;
+ int budget;
+ int last_sent;
int next;
- unsigned int curr;
- int pkt_ms;
- int sent;
};
#endif /* _HAPROXY_QUIC_PACING_T_H */
LIST_INIT(&pacer->frms);
pacer->path = path;
//pacer->next = TICK_ETERNITY;
- pacer->next = now_ms;
+ //pacer->next = now_ms;
//pacer->curr = now_ms;
- pacer->curr = TICK_ETERNITY;
- pacer->pkt_ms = 0;
- pacer->sent = 0;
+ //pacer->curr = TICK_ETERNITY;
+ //pacer->pkt_ms = 0;
+ //pacer->sent = 0;
+
+ pacer->last_sent = now_ms;
+ //pacer->budget = global.tune.quic_frontend_max_tx_burst;
+ pacer->budget = 0;
+ pacer->burst = global.tune.quic_frontend_max_tx_burst;
+ pacer->next = TICK_ETERNITY;
}
static inline void quic_pacing_reset(struct quic_pacer *pacer)
return (pacer->path->cwnd / (pacer->path->mtu + 1)) / (pacer->path->loss.srtt + 1) + 1;
}
-int quic_pacing_expired(const struct quic_pacer *pacer);
+//int quic_pacing_expired(const struct quic_pacer *pacer);
enum quic_tx_err quic_pacing_send(struct quic_pacer *pacer, struct quic_conn *qc);
//void quic_pacing_sent_done(struct quic_pacer *pacer, int sent);
int quic_pacing_sent_done(struct quic_pacer *pacer, int sent, enum quic_tx_err err);
+int quic_pacing_next(struct quic_pacer *pacer);
+
#endif /* _HAPROXY_QUIC_PACING_H */
void qcc_wakeup(struct qcc *qcc)
{
+ TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn);
HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
tasklet_wakeup(qcc->wait_event.tasklet);
static void qcc_wakeup_pacing(struct qcc *qcc)
{
+ TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn);
+ BUG_ON(LIST_ISEMPTY(&qcc->tx.pacer.frms));
HA_ATOMIC_OR(&qcc->wait_event.tasklet->state, TASK_F_USR1);
tasklet_wakeup(qcc->wait_event.tasklet);
- //qcc->task->expire = qcc->tx.pacer.next;
- //BUG_ON(tick_is_expired(qcc->task->expire, now_ms));
- //task_queue(qcc->task);
- //TRACE_POINT(QMUX_EV_STRM_WAKE, qcc->conn);
+
+ qcc->task->expire = TICK_ETERNITY;
+ task_queue(qcc->task);
+}
+
+static void qcc_task_pacing(struct qcc *qcc)
+{
+ TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn);
+ //HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
+ qcc->task->expire = now_ms == qcc->tx.pacer.next ? tick_add(qcc->tx.pacer.next, 1) : qcc->tx.pacer.next;
+ BUG_ON(tick_is_expired(qcc->task->expire, now_ms));
+ task_queue(qcc->task);
+ TRACE_POINT(QMUX_EV_STRM_WAKE, qcc->conn);
}
/* Mark a stream as open if it was idle. This can be used on every
*/
quic_pacing_reset(pacer);
+ //HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
/* Check for transport error. */
if (qcc->flags & QC_CF_ERR_CONN || qcc->conn->flags & CO_FL_ERROR) {
}
}
- if (!LIST_ISEMPTY(frms) && !quic_pacing_expired(&qcc->tx.pacer)) {
- qcc_wakeup_pacing(qcc);
- return 1;
+ //if (!LIST_ISEMPTY(frms) && !quic_pacing_expired(&qcc->tx.pacer)) {
+ if (!LIST_ISEMPTY(frms)) {
+ if (!qcc->tx.pacer.budget) {
+ qcc->tx.pacer.next = tick_add(now_ms, quic_pacing_next(pacer));
+ //fprintf(stderr, "wait for %ldms\n", qcc->tx.pacer.burst * qcc->tx.pacer.path->loss.srtt * qcc->tx.pacer.path->mtu / qcc->tx.pacer.path->cwnd);
+ qcc_task_pacing(qcc);
+ return 1;
+ }
+ //else {
+ // qcc_wakeup_pacing(qcc);
+ //}
}
/* Retry sending until no frame to send, data rejected or connection
sent_done:
if (ret == 1) {
+ qcc->tx.pacer.next = tick_add(now_ms, quic_pacing_next(pacer));
+ //fprintf(stderr, "wait for %ldms\n", pacer->burst * pacer->path->loss.srtt * pacer->path->mtu / pacer->path->cwnd);
qcc_wakeup_pacing(qcc);
}
else if (!LIST_ISEMPTY(quic_pacing_frms(pacer))) {
/* Deallocate frames that the transport layer has rejected. */
quic_pacing_reset(pacer);
+ //HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
}
/* Re-insert on-error QCS at the end of the send-list. */
}
quic_pacing_reset(&qcc->tx.pacer);
+ //HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
if (qcc->app_ops && qcc->app_ops->release)
qcc->app_ops->release(qcc->ctx);
if (ret == QUIC_TX_ERR_AGAIN) {
BUG_ON(LIST_ISEMPTY(quic_pacing_frms(pacer)));
TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn);
- qcc_wakeup_pacing(qcc);
+ //qcc_wakeup_pacing(qcc);
return 1;
}
else if (ret == QUIC_TX_ERR_FATAL) {
TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn);
if (!LIST_ISEMPTY(quic_pacing_frms(pacer)))
qcc_subscribe_send(qcc);
+ //else
+ // HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
return 0;
}
}
TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
if (status & TASK_F_USR1) {
+ ++activity[tid].ctr0;
+ //HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
//ABORT_NOW();
- qcc_purge_sending(qcc);
+ if (qcc_purge_sending(qcc)) {
+ if (!qcc->tx.pacer.budget) {
+ qcc->tx.pacer.next = tick_add(now_ms, quic_pacing_next(&qcc->tx.pacer));
+ //fprintf(stderr, "wait for %ldms\n", qcc->tx.pacer.burst * qcc->tx.pacer.path->loss.srtt * qcc->tx.pacer.path->mtu / qcc->tx.pacer.path->cwnd);
+ qcc_task_pacing(qcc);
+ }
+ else
+ qcc_wakeup_pacing(qcc);
+ }
return NULL;
}
goto requeue;
}
//fprintf(stderr, "woken up after %dms\n", now_ms - qcc->tx.pacer.next);
-
-#if 0
- if (!qcc_may_expire(qcc)) {
- TRACE_DEVEL("cannot expired", QMUX_EV_QCC_WAKE, qcc->conn);
- t->expire = TICK_ETERNITY;
- goto requeue;
- }
-#endif
}
+ ++activity[tid].ctr1;
if (qcc_purge_sending(qcc)) {
- qcc->task->expire = qcc->tx.pacer.next;
- BUG_ON(tick_is_expired(qcc->task->expire, now_ms));
- TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn);
- goto requeue;
+ //qcc->task->expire = qcc->tx.pacer.next;
+ if (!qcc->tx.pacer.budget) {
+ qcc->tx.pacer.next = tick_add(now_ms, quic_pacing_next(&qcc->tx.pacer));
+ qcc->task->expire = now_ms == qcc->tx.pacer.next ? tick_add(qcc->tx.pacer.next, 1) : qcc->tx.pacer.next;
+ BUG_ON(tick_is_expired(qcc->task->expire, now_ms));
+ TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn);
+ goto requeue;
+ }
+ else {
+ qcc_wakeup_pacing(qcc);
+ }
}
t->expire = TICK_ETERNITY;
goto requeue;
struct quic_conn;
-int quic_pacing_expired(const struct quic_pacer *pacer)
-{
- //return !pacer->next || pacer->next <= now_mono_time();
- //return !pacer->next || pacer->next <= now_ms;
- return tick_is_expired(pacer->next, now_ms);
-}
+//int quic_pacing_expired(const struct quic_pacer *pacer)
+//{
+// //return !pacer->next || pacer->next <= now_mono_time();
+// //return !pacer->next || pacer->next <= now_ms;
+// return tick_is_expired(pacer->next, now_ms);
+//}
enum quic_tx_err quic_pacing_send(struct quic_pacer *pacer, struct quic_conn *qc)
{
enum quic_tx_err ret;
- if (!quic_pacing_expired(pacer))
- return QUIC_TX_ERR_AGAIN;
+ //if (!quic_pacing_expired(pacer))
+ //if (!pacer->budget)
+ // return QUIC_TX_ERR_AGAIN;
BUG_ON(LIST_ISEMPTY(&pacer->frms));
ret = qc_send_mux(qc, &pacer->frms, pacer);
- BUG_ON(ret == QUIC_TX_ERR_AGAIN && tick_is_expired(pacer->next, now_ms));
+ //BUG_ON(ret == QUIC_TX_ERR_AGAIN && tick_is_expired(pacer->next, now_ms));
/* TODO handle QUIC_TX_ERR_FATAL */
return ret;
int quic_pacing_prepare(struct quic_pacer *pacer)
{
- if (pacer->curr == now_ms) {
- BUG_ON(pacer->sent > pacer->pkt_ms);
- return pacer->pkt_ms - pacer->sent;
- }
- else {
- int not_consumed = pacer->pkt_ms - pacer->sent;
- BUG_ON(not_consumed < 0);
- //if (not_consumed)
- // fprintf(stderr, "not consumed %d (%d - %d)\n", not_consumed, pacer->pkt_ms, pacer->sent);
+ int idle = tick_remain(pacer->last_sent, now_ms);
+ int pkts = idle * pacer->path->cwnd / (pacer->path->loss.srtt * pacer->path->mtu + 1);
- pacer->curr = now_ms;
- pacer->sent = 0;
- pacer->pkt_ms = quic_pacing_ns_pkt(pacer, 0);
- //pacer->pkt_ms = quic_pacing_ns_pkt(pacer, 0) + not_consumed;
+ TRACE_POINT(QMUX_EV_QCC_WAKE, NULL);
- BUG_ON(!pacer->pkt_ms);
- return pacer->pkt_ms;
+ pacer->budget += pkts;
+ if (pacer->budget > pacer->burst * 2) {
+ TRACE_POINT(QMUX_EV_QCC_WAKE, NULL);
+ pacer->budget = pacer->burst * 2;
}
+ //fprintf(stderr, "prepare = %d %d/%d\n", pkts, pacer->budget, pacer->burst);
+ return MIN(pacer->budget, pacer->burst);
+}
+int quic_pacing_next(struct quic_pacer *pacer)
+{
+ //return (pacer->burst / 4) * pacer->path->loss.srtt * pacer->path->mtu / pacer->path->cwnd;
+ return 1;
}
int quic_pacing_sent_done(struct quic_pacer *pacer, int sent, enum quic_tx_err err)
{
- //const int pkt_ms = quic_pacing_ns_pkt(pacer, 1);
-
-#if 0
- if (pacer->curr == now_ms) {
- pacer->sent += sent;
- }
- else {
- int not_consumed = pkt_ms - pacer->sent;
- if (not_consumed < 0)
- not_consumed = 0;
- if (not_consumed)
- fprintf(stderr, "not consumed %d (%d - %d)\n", not_consumed, pkt_ms, pacer->sent);
-
- //pacer->sent = 0;
- //pacer->sent -= not_consumed;
-
- pacer->curr = now_ms;
- pacer->sent = sent;
- }
-#endif
- BUG_ON(pacer->curr != now_ms);
- pacer->sent += sent;
-
- if (pacer->sent >= pacer->pkt_ms) {
- //pacer->next = tick_add(now_ms, 1);
- pacer->next = tick_add(now_ms, MAX((pacer->sent / pacer->pkt_ms), 1));
- BUG_ON(tick_is_expired(pacer->next, now_ms));
- //fprintf(stderr, "pacing in %dms (%d / %d)\n", pacer->sent / pkt_ms, pacer->sent, pkt_ms);
- return 1;
- }
- else {
- return 0;
+ BUG_ON(sent > pacer->budget);
+ TRACE_POINT(QMUX_EV_QCC_WAKE, NULL);
+ pacer->budget -= sent;
+ if (sent) {
+ TRACE_POINT(QMUX_EV_QCC_WAKE, NULL);
+ pacer->last_sent = now_ms;
}
+ return 0;
}
}
#endif
max_dgram = quic_pacing_prepare(pacer);
- BUG_ON(!max_dgram);
- if (!max_dgram)
+ //BUG_ON(!max_dgram);
+ if (!max_dgram) {
+ pacer->next = tick_add(now_ms, quic_pacing_next(pacer));
+ //fprintf(stderr, "wait for %ldms\n", pacer->burst * pacer->path->loss.srtt * pacer->path->mtu / pacer->path->cwnd);
return QUIC_TX_ERR_AGAIN;
+ }
}
TRACE_STATE("preparing data (from MUX)", QUIC_EV_CONN_TXPKT, qc);
// goto out;
// }
//}
+ BUG_ON(max_dgrams && dgram_cnt > max_dgrams);
if (max_dgrams && dgram_cnt == max_dgrams) {
BUG_ON(LIST_ISEMPTY(frms));
TRACE_PROTO("reached max allowed built datagrams", QUIC_EV_CONN_PHPKTS, qc, qel);
}
ret += prep_pkts;
+ BUG_ON(max_dgrams && ret > max_dgrams);
if (max_dgrams && ret == max_dgrams && !LIST_ISEMPTY(send_list)) {
TRACE_DEVEL("stopping for artificial pacing", QUIC_EV_CONN_TXPKT, qc);
break;