]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: mux-quic: remove pacing specific code on qcc_io_cb
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Thu, 12 Dec 2024 13:53:49 +0000 (14:53 +0100)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Wed, 18 Dec 2024 08:49:20 +0000 (09:49 +0100)
Pacing was recently implemented by QUIC MUX. Its tasklet is rescheduled
until next emission timer is reached. To improve performance, an
alternate execution of qcc_io_cb was performed when rescheduled due to
pacing. This was implemented using TASK_F_USR1 flag.

However, this model is fragile, in particular when several events
happened alongside pacing scheduling. This has caused some issue
recently, most notably when MUX is subscribed on transport layer on
receive for handshake completion while pacing emission is performed in
parallel. MUX qcc_io_cb() would not execute the default code path, which
means the reception event is silently ignored.

Recent patches have reworked several parts of qcc_io_cb. The objective
was to improve performance with better algorithm on send and receive
part. Most notable, qcc frames list is only cleared when new data is
available for emission. With this, pacing alternative code is now mostly
unneeded. As such, this patch removes it. The following changes are
performed :

* TASK_F_USR1 is now not used by QUIC MUX. As such, tasklet_wakeup()
  default invokation can now replace obsolete wrappers
  qcc_wakeup/qcc_wakeup_pacing

* qcc_purge_sending is removed. On pacing rescheduling, all qcc_io_cb()
  is executed. This is less error-prone, in particular when pacing is
  mixed with other events like receive handling. This renders the code
  less fragile, as it completely solves the described issue above.

This should be backported up to 3.1.

src/mux_quic.c
src/quic_conn.c

index 56674a78a837bf4972a79378bfa2eb3f1eccfc86..7956f16b860465a17dbbf4b7ede28c09e9c4df17 100644 (file)
@@ -397,18 +397,6 @@ static void qcc_refresh_timeout(struct qcc *qcc)
        TRACE_LEAVE(QMUX_EV_QCS_NEW, qcc->conn);
 }
 
-void qcc_wakeup(struct qcc *qcc)
-{
-       HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
-       tasklet_wakeup(qcc->wait_event.tasklet);
-}
-
-static void qcc_wakeup_pacing(struct qcc *qcc)
-{
-       HA_ATOMIC_OR(&qcc->wait_event.tasklet->state, TASK_F_USR1);
-       tasklet_wakeup(qcc->wait_event.tasklet);
-}
-
 /* Mark a stream as open if it was idle. This can be used on every
  * successful emission/reception operation to update the stream state.
  */
@@ -755,7 +743,7 @@ void qcc_set_error(struct qcc *qcc, int err, int app)
         * is too tedious too not forget a wakeup outside of this function for
         * the moment.
         */
-       qcc_wakeup(qcc);
+       HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
 }
 
 /* Increment glitch counter for <qcc> connection by <inc> steps. If configured
@@ -1109,7 +1097,7 @@ static void qcs_consume(struct qcs *qcs, uint64_t bytes)
                frm->max_stream_data.max_stream_data = qcs->rx.msd;
 
                LIST_APPEND(&qcc->lfctl.frms, &frm->list);
-               qcc_wakeup(qcc);
+               tasklet_wakeup(qcc->wait_event.tasklet);
        }
 
  conn_fctl:
@@ -1127,7 +1115,7 @@ static void qcs_consume(struct qcs *qcs, uint64_t bytes)
                frm->max_data.max_data = qcc->lfctl.md;
 
                LIST_APPEND(&qcs->qcc->lfctl.frms, &frm->list);
-               qcc_wakeup(qcc);
+               tasklet_wakeup(qcc->wait_event.tasklet);
        }
 
        TRACE_LEAVE(QMUX_EV_QCS_RECV, qcc->conn, qcs);
@@ -1397,7 +1385,7 @@ void qcc_reset_stream(struct qcs *qcs, int err)
        }
 
        qcc_send_stream(qcs, 1, 0);
-       qcc_wakeup(qcc);
+       tasklet_wakeup(qcc->wait_event.tasklet);
 }
 
 /* Register <qcs> stream for emission of STREAM, STOP_SENDING or RESET_STREAM.
@@ -1450,7 +1438,7 @@ void qcc_abort_stream_read(struct qcs *qcs)
        qcs->flags |= (QC_SF_TO_STOP_SENDING|QC_SF_READ_ABORTED);
 
        qcc_send_stream(qcs, 1, 0);
-       qcc_wakeup(qcc);
+       tasklet_wakeup(qcc->wait_event.tasklet);
 
  end:
        TRACE_LEAVE(QMUX_EV_QCC_NEW, qcc->conn, qcs);
@@ -1483,7 +1471,7 @@ int qcc_install_app_ops(struct qcc *qcc, const struct qcc_app_ops *app_ops)
                        TRACE_ERROR("app ops finalize error", QMUX_EV_QCC_NEW, qcc->conn);
                        goto err;
                }
-               qcc_wakeup(qcc);
+               tasklet_wakeup(qcc->wait_event.tasklet);
        }
 
        TRACE_LEAVE(QMUX_EV_QCC_NEW, qcc->conn);
@@ -1663,7 +1651,7 @@ int qcc_recv_max_data(struct qcc *qcc, uint64_t max)
                TRACE_DATA("increase remote max-data", QMUX_EV_QCC_RECV, qcc->conn);
 
                if (unblock_real)
-                       qcc_wakeup(qcc);
+                       tasklet_wakeup(qcc->wait_event.tasklet);
 
                if (unblock_soft)
                        qcc_notify_fctl(qcc);
@@ -1712,7 +1700,7 @@ int qcc_recv_max_stream_data(struct qcc *qcc, uint64_t id, uint64_t max)
                        TRACE_DATA("increase remote max-stream-data", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
                        if (unblock_real) {
                                /* TODO optim: only wakeup IO-CB if stream has data to sent. */
-                               qcc_wakeup(qcc);
+                               tasklet_wakeup(qcc->wait_event.tasklet);
                        }
 
                        if (unblock_soft) {
@@ -1965,7 +1953,7 @@ static int qcc_release_remote_stream(struct qcc *qcc, uint64_t id)
                        frm->max_streams_bidi.max_streams = qcc->lfctl.ms_bidi +
                                                            qcc->lfctl.cl_bidi_r;
                        LIST_APPEND(&qcc->lfctl.frms, &frm->list);
-                       qcc_wakeup(qcc);
+                       tasklet_wakeup(qcc->wait_event.tasklet);
 
                        qcc->lfctl.ms_bidi += qcc->lfctl.cl_bidi_r;
                        qcc->lfctl.cl_bidi_r = 0;
@@ -2523,8 +2511,9 @@ static int qcc_io_send(struct qcc *qcc)
 
        if (qcc_is_pacing_active(qcc->conn)) {
                if (!LIST_ISEMPTY(frms) && !quic_pacing_expired(&qcc->tx.pacer)) {
-                       qcc_wakeup_pacing(qcc);
-                       return 1;
+                       tasklet_wakeup(qcc->wait_event.tasklet);
+                       total = 0;
+                       goto out;
                }
        }
 
@@ -2564,13 +2553,9 @@ static int qcc_io_send(struct qcc *qcc)
        if (ret == 1) {
                /* qcc_send_frames cannot return 1 if pacing not used. */
                BUG_ON(!qcc_is_pacing_active(qcc->conn));
-               qcc_wakeup_pacing(qcc);
+               tasklet_wakeup(qcc->wait_event.tasklet);
                ++qcc->tx.paced_sent_ctr;
        }
-       else if (LIST_ISEMPTY(frms)) {
-               /* Everything sent */
-               HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
-       }
 
  out:
        /* Re-insert on-error QCS at the end of the send-list. */
@@ -2581,7 +2566,7 @@ static int qcc_io_send(struct qcc *qcc)
                }
 
                if (!qfctl_rblocked(&qcc->tx.fc))
-                       qcc_wakeup(qcc);
+                       tasklet_wakeup(qcc->wait_event.tasklet);
        }
 
        if (qcc->conn->flags & CO_FL_ERROR && !(qcc->flags & QC_CF_ERR_CONN)) {
@@ -2878,59 +2863,12 @@ static void qcc_release(struct qcc *qcc)
        TRACE_LEAVE(QMUX_EV_QCC_END);
 }
 
-static void qcc_purge_sending(struct qcc *qcc)
-{
-       struct quic_pacer *pacer = &qcc->tx.pacer;
-       struct list *frms = &qcc->tx.frms;
-       enum quic_tx_err ret = QUIC_TX_ERR_PACING;
-       int sent = 0;
-
-       TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
-
-       /* This function is reserved for pacing usage. */
-       BUG_ON(!qcc_is_pacing_active(qcc->conn));
-
-       /* Only restart emission if pacing delay is reached. */
-       if (quic_pacing_expired(pacer)) {
-               ret = qc_send_mux(qcc->conn->handle.qc, frms, pacer);
-               sent = 1;
-       }
-
-       if (ret == QUIC_TX_ERR_PACING) {
-               BUG_ON(LIST_ISEMPTY(frms));
-               qcc_wakeup_pacing(qcc);
-               if (sent)
-                       ++qcc->tx.paced_sent_ctr;
-       }
-       else if (ret == QUIC_TX_ERR_FATAL) {
-               TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn);
-               HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
-               qcc_subscribe_send(qcc);
-       }
-       else {
-               if (!LIST_ISEMPTY(frms)) {
-                       qcc_subscribe_send(qcc);
-               }
-               else {
-                       /* Everything sent */
-                       HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
-               }
-       }
-
-       TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn);
-}
-
 struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status)
 {
        struct qcc *qcc = ctx;
 
        TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
 
-       if (status & TASK_F_USR1) {
-               qcc_purge_sending(qcc);
-               goto end;
-       }
-
        if (!(qcc->wait_event.events & SUB_RETRY_SEND))
                qcc_io_send(qcc);
 
@@ -2943,7 +2881,6 @@ struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status)
 
        qcc_refresh_timeout(qcc);
 
- end:
        TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn);
        return NULL;
 
@@ -3139,7 +3076,7 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
                LIST_APPEND(&mux_stopping_data[tid].list, &conn->stopping_list);
 
        /* init read cycle */
-       qcc_wakeup(qcc);
+       tasklet_wakeup(qcc->wait_event.tasklet);
 
        /* MUX is initialized before QUIC handshake completion if early data
         * received. Flag connection to delay stream processing if
@@ -3301,7 +3238,7 @@ static size_t qmux_strm_rcv_buf(struct stconn *sc, struct buffer *buf,
                qcs->flags &= ~QC_SF_DEM_FULL;
                if (!(qcc->flags & QC_CF_ERRL)) {
                        LIST_APPEND(&qcc->recv_list, &qcs->el_recv);
-                       qcc_wakeup(qcc);
+                       tasklet_wakeup(qcc->wait_event.tasklet);
                }
        }
 
@@ -3366,14 +3303,9 @@ static size_t qmux_strm_snd_buf(struct stconn *sc, struct buffer *buf,
                if (data || fin)
                        qcc_send_stream(qcs, 0, data);
 
-               /* Wake up MUX to emit newly transferred data. If blocked on
-                * send, ensure next emission will refresh data by removing
-                * pacing status info.
-                */
+               /* Wake up MUX to emit newly transferred data. */
                if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND))
-                       qcc_wakeup(qcs->qcc);
-               else
-                       HA_ATOMIC_AND(&qcs->qcc->wait_event.tasklet->state, ~TASK_F_USR1);
+                       tasklet_wakeup(qcs->qcc->wait_event.tasklet);
        }
 
  end:
@@ -3496,11 +3428,8 @@ static size_t qmux_strm_done_ff(struct stconn *sc)
        qcs->sd->iobuf.offset = 0;
        qcs->sd->iobuf.data = 0;
 
-       /* Similar to snd_buf callback. */
        if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND))
-               qcc_wakeup(qcc);
-       else
-               HA_ATOMIC_AND(&qcs->qcc->wait_event.tasklet->state, ~TASK_F_USR1);
+               tasklet_wakeup(qcc->wait_event.tasklet);
 
   end:
        TRACE_LEAVE(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
@@ -3598,7 +3527,7 @@ static void qmux_strm_shut(struct stconn *sc, unsigned int mode, struct se_abort
                        qcc_reset_stream(qcs, 0);
                }
 
-               qcc_wakeup(qcc);
+               tasklet_wakeup(qcc->wait_event.tasklet);
        }
 
  out:
index c28725af7a3769f1eb4a0801fcb73ed199d7a4e1..8f1b4849176fd9246f5f786b6239dda09a0c02f7 100644 (file)
@@ -1816,7 +1816,7 @@ void qc_notify_err(struct quic_conn *qc)
                 * is made between MUX and quic-conn layer, wake up could be
                 * conducted only with qc.subs.
                 */
-               qcc_wakeup(qc->qcc);
+               tasklet_wakeup(qc->qcc->wait_event.tasklet);
        }
 
        TRACE_LEAVE(QUIC_EV_CONN_CLOSE, qc);