TRACE_LEAVE(QMUX_EV_QCS_NEW, qcc->conn);
}
-void qcc_wakeup(struct qcc *qcc)
-{
- HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
- tasklet_wakeup(qcc->wait_event.tasklet);
-}
-
-static void qcc_wakeup_pacing(struct qcc *qcc)
-{
- HA_ATOMIC_OR(&qcc->wait_event.tasklet->state, TASK_F_USR1);
- tasklet_wakeup(qcc->wait_event.tasklet);
-}
-
/* Mark a stream as open if it was idle. This can be used on every
* successful emission/reception operation to update the stream state.
*/
* is too tedious too not forget a wakeup outside of this function for
* the moment.
*/
- qcc_wakeup(qcc);
+ HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
}
/* Increment glitch counter for <qcc> connection by <inc> steps. If configured
frm->max_stream_data.max_stream_data = qcs->rx.msd;
LIST_APPEND(&qcc->lfctl.frms, &frm->list);
- qcc_wakeup(qcc);
+ tasklet_wakeup(qcc->wait_event.tasklet);
}
conn_fctl:
frm->max_data.max_data = qcc->lfctl.md;
LIST_APPEND(&qcs->qcc->lfctl.frms, &frm->list);
- qcc_wakeup(qcc);
+ tasklet_wakeup(qcc->wait_event.tasklet);
}
TRACE_LEAVE(QMUX_EV_QCS_RECV, qcc->conn, qcs);
}
qcc_send_stream(qcs, 1, 0);
- qcc_wakeup(qcc);
+ tasklet_wakeup(qcc->wait_event.tasklet);
}
/* Register <qcs> stream for emission of STREAM, STOP_SENDING or RESET_STREAM.
qcs->flags |= (QC_SF_TO_STOP_SENDING|QC_SF_READ_ABORTED);
qcc_send_stream(qcs, 1, 0);
- qcc_wakeup(qcc);
+ tasklet_wakeup(qcc->wait_event.tasklet);
end:
TRACE_LEAVE(QMUX_EV_QCC_NEW, qcc->conn, qcs);
TRACE_ERROR("app ops finalize error", QMUX_EV_QCC_NEW, qcc->conn);
goto err;
}
- qcc_wakeup(qcc);
+ tasklet_wakeup(qcc->wait_event.tasklet);
}
TRACE_LEAVE(QMUX_EV_QCC_NEW, qcc->conn);
TRACE_DATA("increase remote max-data", QMUX_EV_QCC_RECV, qcc->conn);
if (unblock_real)
- qcc_wakeup(qcc);
+ tasklet_wakeup(qcc->wait_event.tasklet);
if (unblock_soft)
qcc_notify_fctl(qcc);
TRACE_DATA("increase remote max-stream-data", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
if (unblock_real) {
/* TODO optim: only wakeup IO-CB if stream has data to sent. */
- qcc_wakeup(qcc);
+ tasklet_wakeup(qcc->wait_event.tasklet);
}
if (unblock_soft) {
frm->max_streams_bidi.max_streams = qcc->lfctl.ms_bidi +
qcc->lfctl.cl_bidi_r;
LIST_APPEND(&qcc->lfctl.frms, &frm->list);
- qcc_wakeup(qcc);
+ tasklet_wakeup(qcc->wait_event.tasklet);
qcc->lfctl.ms_bidi += qcc->lfctl.cl_bidi_r;
qcc->lfctl.cl_bidi_r = 0;
if (qcc_is_pacing_active(qcc->conn)) {
if (!LIST_ISEMPTY(frms) && !quic_pacing_expired(&qcc->tx.pacer)) {
- qcc_wakeup_pacing(qcc);
- return 1;
+ tasklet_wakeup(qcc->wait_event.tasklet);
+ total = 0;
+ goto out;
}
}
if (ret == 1) {
/* qcc_send_frames cannot return 1 if pacing not used. */
BUG_ON(!qcc_is_pacing_active(qcc->conn));
- qcc_wakeup_pacing(qcc);
+ tasklet_wakeup(qcc->wait_event.tasklet);
++qcc->tx.paced_sent_ctr;
}
- else if (LIST_ISEMPTY(frms)) {
- /* Everything sent */
- HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
- }
out:
/* Re-insert on-error QCS at the end of the send-list. */
}
if (!qfctl_rblocked(&qcc->tx.fc))
- qcc_wakeup(qcc);
+ tasklet_wakeup(qcc->wait_event.tasklet);
}
if (qcc->conn->flags & CO_FL_ERROR && !(qcc->flags & QC_CF_ERR_CONN)) {
TRACE_LEAVE(QMUX_EV_QCC_END);
}
-static void qcc_purge_sending(struct qcc *qcc)
-{
- struct quic_pacer *pacer = &qcc->tx.pacer;
- struct list *frms = &qcc->tx.frms;
- enum quic_tx_err ret = QUIC_TX_ERR_PACING;
- int sent = 0;
-
- TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
-
- /* This function is reserved for pacing usage. */
- BUG_ON(!qcc_is_pacing_active(qcc->conn));
-
- /* Only restart emission if pacing delay is reached. */
- if (quic_pacing_expired(pacer)) {
- ret = qc_send_mux(qcc->conn->handle.qc, frms, pacer);
- sent = 1;
- }
-
- if (ret == QUIC_TX_ERR_PACING) {
- BUG_ON(LIST_ISEMPTY(frms));
- qcc_wakeup_pacing(qcc);
- if (sent)
- ++qcc->tx.paced_sent_ctr;
- }
- else if (ret == QUIC_TX_ERR_FATAL) {
- TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn);
- HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
- qcc_subscribe_send(qcc);
- }
- else {
- if (!LIST_ISEMPTY(frms)) {
- qcc_subscribe_send(qcc);
- }
- else {
- /* Everything sent */
- HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
- }
- }
-
- TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn);
-}
-
struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status)
{
struct qcc *qcc = ctx;
TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
- if (status & TASK_F_USR1) {
- qcc_purge_sending(qcc);
- goto end;
- }
-
if (!(qcc->wait_event.events & SUB_RETRY_SEND))
qcc_io_send(qcc);
qcc_refresh_timeout(qcc);
- end:
TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn);
return NULL;
LIST_APPEND(&mux_stopping_data[tid].list, &conn->stopping_list);
/* init read cycle */
- qcc_wakeup(qcc);
+ tasklet_wakeup(qcc->wait_event.tasklet);
/* MUX is initialized before QUIC handshake completion if early data
* received. Flag connection to delay stream processing if
qcs->flags &= ~QC_SF_DEM_FULL;
if (!(qcc->flags & QC_CF_ERRL)) {
LIST_APPEND(&qcc->recv_list, &qcs->el_recv);
- qcc_wakeup(qcc);
+ tasklet_wakeup(qcc->wait_event.tasklet);
}
}
if (data || fin)
qcc_send_stream(qcs, 0, data);
- /* Wake up MUX to emit newly transferred data. If blocked on
- * send, ensure next emission will refresh data by removing
- * pacing status info.
- */
+ /* Wake up MUX to emit newly transferred data. */
if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND))
- qcc_wakeup(qcs->qcc);
- else
- HA_ATOMIC_AND(&qcs->qcc->wait_event.tasklet->state, ~TASK_F_USR1);
+ tasklet_wakeup(qcs->qcc->wait_event.tasklet);
}
end:
qcs->sd->iobuf.offset = 0;
qcs->sd->iobuf.data = 0;
- /* Similar to snd_buf callback. */
if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND))
- qcc_wakeup(qcc);
- else
- HA_ATOMIC_AND(&qcs->qcc->wait_event.tasklet->state, ~TASK_F_USR1);
+ tasklet_wakeup(qcc->wait_event.tasklet);
end:
TRACE_LEAVE(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
qcc_reset_stream(qcs, 0);
}
- qcc_wakeup(qcc);
+ tasklet_wakeup(qcc->wait_event.tasklet);
}
out: