From: Amaury Denoyelle Date: Thu, 12 Dec 2024 13:53:49 +0000 (+0100) Subject: MEDIUM: mux-quic: remove pacing specific code on qcc_io_cb X-Git-Tag: v3.2-dev2~39 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=41f0472d967b2deb095d5adc8a167da973fbee3d;p=thirdparty%2Fhaproxy.git MEDIUM: mux-quic: remove pacing specific code on qcc_io_cb Pacing was recently implemented by QUIC MUX. Its tasklet is rescheduled until next emission timer is reached. To improve performance, an alternate execution of qcc_io_cb was performed when rescheduled due to pacing. This was implemented using TASK_F_USR1 flag. However, this model is fragile, in particular when several events happened alongside pacing scheduling. This has caused some issue recently, most notably when MUX is subscribed on transport layer on receive for handshake completion while pacing emission is performed in parallel. MUX qcc_io_cb() would not execute the default code path, which means the reception event is silently ignored. Recent patches have reworked several parts of qcc_io_cb. The objective was to improve performance with better algorithm on send and receive part. Most notable, qcc frames list is only cleared when new data is available for emission. With this, pacing alternative code is now mostly unneeded. As such, this patch removes it. The following changes are performed : * TASK_F_USR1 is now not used by QUIC MUX. As such, tasklet_wakeup() default invokation can now replace obsolete wrappers qcc_wakeup/qcc_wakeup_pacing * qcc_purge_sending is removed. On pacing rescheduling, all qcc_io_cb() is executed. This is less error-prone, in particular when pacing is mixed with other events like receive handling. This renders the code less fragile, as it completely solves the described issue above. This should be backported up to 3.1. --- diff --git a/src/mux_quic.c b/src/mux_quic.c index 56674a78a8..7956f16b86 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -397,18 +397,6 @@ static void qcc_refresh_timeout(struct qcc *qcc) TRACE_LEAVE(QMUX_EV_QCS_NEW, qcc->conn); } -void qcc_wakeup(struct qcc *qcc) -{ - HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1); - tasklet_wakeup(qcc->wait_event.tasklet); -} - -static void qcc_wakeup_pacing(struct qcc *qcc) -{ - HA_ATOMIC_OR(&qcc->wait_event.tasklet->state, TASK_F_USR1); - tasklet_wakeup(qcc->wait_event.tasklet); -} - /* Mark a stream as open if it was idle. This can be used on every * successful emission/reception operation to update the stream state. */ @@ -755,7 +743,7 @@ void qcc_set_error(struct qcc *qcc, int err, int app) * is too tedious too not forget a wakeup outside of this function for * the moment. */ - qcc_wakeup(qcc); + HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1); } /* Increment glitch counter for connection by steps. If configured @@ -1109,7 +1097,7 @@ static void qcs_consume(struct qcs *qcs, uint64_t bytes) frm->max_stream_data.max_stream_data = qcs->rx.msd; LIST_APPEND(&qcc->lfctl.frms, &frm->list); - qcc_wakeup(qcc); + tasklet_wakeup(qcc->wait_event.tasklet); } conn_fctl: @@ -1127,7 +1115,7 @@ static void qcs_consume(struct qcs *qcs, uint64_t bytes) frm->max_data.max_data = qcc->lfctl.md; LIST_APPEND(&qcs->qcc->lfctl.frms, &frm->list); - qcc_wakeup(qcc); + tasklet_wakeup(qcc->wait_event.tasklet); } TRACE_LEAVE(QMUX_EV_QCS_RECV, qcc->conn, qcs); @@ -1397,7 +1385,7 @@ void qcc_reset_stream(struct qcs *qcs, int err) } qcc_send_stream(qcs, 1, 0); - qcc_wakeup(qcc); + tasklet_wakeup(qcc->wait_event.tasklet); } /* Register stream for emission of STREAM, STOP_SENDING or RESET_STREAM. @@ -1450,7 +1438,7 @@ void qcc_abort_stream_read(struct qcs *qcs) qcs->flags |= (QC_SF_TO_STOP_SENDING|QC_SF_READ_ABORTED); qcc_send_stream(qcs, 1, 0); - qcc_wakeup(qcc); + tasklet_wakeup(qcc->wait_event.tasklet); end: TRACE_LEAVE(QMUX_EV_QCC_NEW, qcc->conn, qcs); @@ -1483,7 +1471,7 @@ int qcc_install_app_ops(struct qcc *qcc, const struct qcc_app_ops *app_ops) TRACE_ERROR("app ops finalize error", QMUX_EV_QCC_NEW, qcc->conn); goto err; } - qcc_wakeup(qcc); + tasklet_wakeup(qcc->wait_event.tasklet); } TRACE_LEAVE(QMUX_EV_QCC_NEW, qcc->conn); @@ -1663,7 +1651,7 @@ int qcc_recv_max_data(struct qcc *qcc, uint64_t max) TRACE_DATA("increase remote max-data", QMUX_EV_QCC_RECV, qcc->conn); if (unblock_real) - qcc_wakeup(qcc); + tasklet_wakeup(qcc->wait_event.tasklet); if (unblock_soft) qcc_notify_fctl(qcc); @@ -1712,7 +1700,7 @@ int qcc_recv_max_stream_data(struct qcc *qcc, uint64_t id, uint64_t max) TRACE_DATA("increase remote max-stream-data", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs); if (unblock_real) { /* TODO optim: only wakeup IO-CB if stream has data to sent. */ - qcc_wakeup(qcc); + tasklet_wakeup(qcc->wait_event.tasklet); } if (unblock_soft) { @@ -1965,7 +1953,7 @@ static int qcc_release_remote_stream(struct qcc *qcc, uint64_t id) frm->max_streams_bidi.max_streams = qcc->lfctl.ms_bidi + qcc->lfctl.cl_bidi_r; LIST_APPEND(&qcc->lfctl.frms, &frm->list); - qcc_wakeup(qcc); + tasklet_wakeup(qcc->wait_event.tasklet); qcc->lfctl.ms_bidi += qcc->lfctl.cl_bidi_r; qcc->lfctl.cl_bidi_r = 0; @@ -2523,8 +2511,9 @@ static int qcc_io_send(struct qcc *qcc) if (qcc_is_pacing_active(qcc->conn)) { if (!LIST_ISEMPTY(frms) && !quic_pacing_expired(&qcc->tx.pacer)) { - qcc_wakeup_pacing(qcc); - return 1; + tasklet_wakeup(qcc->wait_event.tasklet); + total = 0; + goto out; } } @@ -2564,13 +2553,9 @@ static int qcc_io_send(struct qcc *qcc) if (ret == 1) { /* qcc_send_frames cannot return 1 if pacing not used. */ BUG_ON(!qcc_is_pacing_active(qcc->conn)); - qcc_wakeup_pacing(qcc); + tasklet_wakeup(qcc->wait_event.tasklet); ++qcc->tx.paced_sent_ctr; } - else if (LIST_ISEMPTY(frms)) { - /* Everything sent */ - HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1); - } out: /* Re-insert on-error QCS at the end of the send-list. */ @@ -2581,7 +2566,7 @@ static int qcc_io_send(struct qcc *qcc) } if (!qfctl_rblocked(&qcc->tx.fc)) - qcc_wakeup(qcc); + tasklet_wakeup(qcc->wait_event.tasklet); } if (qcc->conn->flags & CO_FL_ERROR && !(qcc->flags & QC_CF_ERR_CONN)) { @@ -2878,59 +2863,12 @@ static void qcc_release(struct qcc *qcc) TRACE_LEAVE(QMUX_EV_QCC_END); } -static void qcc_purge_sending(struct qcc *qcc) -{ - struct quic_pacer *pacer = &qcc->tx.pacer; - struct list *frms = &qcc->tx.frms; - enum quic_tx_err ret = QUIC_TX_ERR_PACING; - int sent = 0; - - TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn); - - /* This function is reserved for pacing usage. */ - BUG_ON(!qcc_is_pacing_active(qcc->conn)); - - /* Only restart emission if pacing delay is reached. */ - if (quic_pacing_expired(pacer)) { - ret = qc_send_mux(qcc->conn->handle.qc, frms, pacer); - sent = 1; - } - - if (ret == QUIC_TX_ERR_PACING) { - BUG_ON(LIST_ISEMPTY(frms)); - qcc_wakeup_pacing(qcc); - if (sent) - ++qcc->tx.paced_sent_ctr; - } - else if (ret == QUIC_TX_ERR_FATAL) { - TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn); - HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1); - qcc_subscribe_send(qcc); - } - else { - if (!LIST_ISEMPTY(frms)) { - qcc_subscribe_send(qcc); - } - else { - /* Everything sent */ - HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1); - } - } - - TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn); -} - struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status) { struct qcc *qcc = ctx; TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn); - if (status & TASK_F_USR1) { - qcc_purge_sending(qcc); - goto end; - } - if (!(qcc->wait_event.events & SUB_RETRY_SEND)) qcc_io_send(qcc); @@ -2943,7 +2881,6 @@ struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status) qcc_refresh_timeout(qcc); - end: TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn); return NULL; @@ -3139,7 +3076,7 @@ static int qmux_init(struct connection *conn, struct proxy *prx, LIST_APPEND(&mux_stopping_data[tid].list, &conn->stopping_list); /* init read cycle */ - qcc_wakeup(qcc); + tasklet_wakeup(qcc->wait_event.tasklet); /* MUX is initialized before QUIC handshake completion if early data * received. Flag connection to delay stream processing if @@ -3301,7 +3238,7 @@ static size_t qmux_strm_rcv_buf(struct stconn *sc, struct buffer *buf, qcs->flags &= ~QC_SF_DEM_FULL; if (!(qcc->flags & QC_CF_ERRL)) { LIST_APPEND(&qcc->recv_list, &qcs->el_recv); - qcc_wakeup(qcc); + tasklet_wakeup(qcc->wait_event.tasklet); } } @@ -3366,14 +3303,9 @@ static size_t qmux_strm_snd_buf(struct stconn *sc, struct buffer *buf, if (data || fin) qcc_send_stream(qcs, 0, data); - /* Wake up MUX to emit newly transferred data. If blocked on - * send, ensure next emission will refresh data by removing - * pacing status info. - */ + /* Wake up MUX to emit newly transferred data. */ if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND)) - qcc_wakeup(qcs->qcc); - else - HA_ATOMIC_AND(&qcs->qcc->wait_event.tasklet->state, ~TASK_F_USR1); + tasklet_wakeup(qcs->qcc->wait_event.tasklet); } end: @@ -3496,11 +3428,8 @@ static size_t qmux_strm_done_ff(struct stconn *sc) qcs->sd->iobuf.offset = 0; qcs->sd->iobuf.data = 0; - /* Similar to snd_buf callback. */ if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND)) - qcc_wakeup(qcc); - else - HA_ATOMIC_AND(&qcs->qcc->wait_event.tasklet->state, ~TASK_F_USR1); + tasklet_wakeup(qcc->wait_event.tasklet); end: TRACE_LEAVE(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); @@ -3598,7 +3527,7 @@ static void qmux_strm_shut(struct stconn *sc, unsigned int mode, struct se_abort qcc_reset_stream(qcs, 0); } - qcc_wakeup(qcc); + tasklet_wakeup(qcc->wait_event.tasklet); } out: diff --git a/src/quic_conn.c b/src/quic_conn.c index c28725af7a..8f1b484917 100644 --- a/src/quic_conn.c +++ b/src/quic_conn.c @@ -1816,7 +1816,7 @@ void qc_notify_err(struct quic_conn *qc) * is made between MUX and quic-conn layer, wake up could be * conducted only with qc.subs. */ - qcc_wakeup(qc->qcc); + tasklet_wakeup(qc->qcc->wait_event.tasklet); } TRACE_LEAVE(QUIC_EV_CONN_CLOSE, qc);