]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MAJOR: muxes: No longer use app_ops .wake() callback function from muxes
authorChristopher Faulet <cfaulet@haproxy.com>
Thu, 5 Mar 2026 16:17:49 +0000 (17:17 +0100)
committerChristopher Faulet <cfaulet@haproxy.com>
Tue, 10 Mar 2026 14:10:34 +0000 (15:10 +0100)
Thanks to previous commits, it is now possible to wake the data layer up,
via a tasklet_wakeup, instead of using the app_ops .wake() callback
function.

When a data layer must be notified of a mux event (an error for instance),
we now always perform a tasklet_wakeup(). TASK_WOKEN_MSG state is used by
default. TASK_WOKEN_IO is eventually added if the data layer was subscribed
to receives or sends.

Changes are not trivial at all. We replaced a synchronous call to the
sc_conn_process() function by a tasklet_wakeup().

src/mux_fcgi.c
src/mux_h1.c
src/mux_h2.c
src/mux_pt.c
src/mux_quic.c
src/mux_spop.c

index c34514ae152b68e6b1059e91b83decc30dd87c2d..2ca2c5ee93cde4b1112def1347b0d9f1708daf89 100644 (file)
@@ -886,26 +886,28 @@ static void fcgi_strm_notify_send(struct fcgi_strm *fstrm)
        }
 }
 
-/* Alerts the data layer, trying to wake it up by all means, following
- * this sequence :
- *   - if the fcgi stream' data layer is subscribed to recv, then it's woken up
- *     for recv
- *   - if its subscribed to send, then it's woken up for send
- *   - if it was subscribed to neither, its ->wake() callback is called
- * It is safe to call this function with a closed stream which doesn't have a
- * stream connector anymore.
+
+/* Alerts the data layer by waking it up. TASK_WOKEN_MSG state is used by
+ * default and if the data layer is also subscribed to recv or send,
+ * TASK_WOKEN_IO is added. But first of all, we check if the shut tasklet must
+ * be woken up or not instead.
  */
 static void fcgi_strm_alert(struct fcgi_strm *fstrm)
 {
        TRACE_POINT(FCGI_EV_STRM_WAKE, fstrm->fconn->conn, fstrm);
-       if (fstrm->subs ||
-           (fstrm->flags & (FCGI_SF_WANT_SHUTR|FCGI_SF_WANT_SHUTW))) {
-               fcgi_strm_notify_recv(fstrm);
-               fcgi_strm_notify_send(fstrm);
-       }
-       else if (fcgi_strm_sc(fstrm) && fcgi_strm_sc(fstrm)->app_ops->wake != NULL) {
-               TRACE_POINT(FCGI_EV_STRM_WAKE, fstrm->fconn->conn, fstrm);
-               fcgi_strm_sc(fstrm)->app_ops->wake(fcgi_strm_sc(fstrm));
+       if (!fstrm->subs && (fstrm->flags & (FCGI_SF_WANT_SHUTR|FCGI_SF_WANT_SHUTW)))
+               tasklet_wakeup(fstrm->shut_tl);
+       else if (fcgi_strm_sc(fstrm)) {
+               unsigned int state = TASK_WOKEN_MSG;
+
+               if (fstrm->subs) {
+                       if (fstrm->subs->events & SUB_RETRY_SEND)
+                               fstrm->flags |= FCGI_SF_NOTIFIED;
+                       fstrm->subs->events = 0;
+                       fstrm->subs = NULL;
+                       state |= TASK_WOKEN_IO;
+               }
+               tasklet_wakeup(fcgi_strm_sc(fstrm)->wait_event.tasklet, state);
        }
 }
 
index fa9c20119276f2122cb666cc6e8eaa407888035d..4be01c8f797c53ffa20e426d634718a71e594d6f 100644 (file)
@@ -3733,21 +3733,24 @@ static void h1_wake_stream_for_send(struct h1s *h1s)
        }
 }
 
-/* alerts the data layer following this sequence :
- *   - if the h1s' data layer is subscribed to recv, then it's woken up for recv
- *   - if its subscribed to send, then it's woken up for send
- *   - if it was subscribed to neither, its ->wake() callback is called
+/* Alerts the data layer by waking it up. TASK_WOKEN_MSG state is used by
+ * default and if the data layer is also subscribed to recv or send,
+ * TASK_WOKEN_IO is added.
  */
 static void h1_alert(struct h1s *h1s)
 {
+       unsigned int state = TASK_WOKEN_MSG;
+
+       TRACE_POINT(H1_EV_STRM_WAKE, h1s->h1c->conn, h1s);
+       if (!h1s_sc(h1s))
+               return;
+
        if (h1s->subs) {
-               h1_wake_stream_for_recv(h1s);
-               h1_wake_stream_for_send(h1s);
-       }
-       else if (h1s_sc(h1s) && h1s_sc(h1s)->app_ops->wake != NULL) {
-               TRACE_POINT(H1_EV_STRM_WAKE, h1s->h1c->conn, h1s);
-               h1s_sc(h1s)->app_ops->wake(h1s_sc(h1s));
+               h1s->subs->events = 0;
+               h1s->subs = NULL;
+               state |= TASK_WOKEN_IO;
        }
+       tasklet_wakeup(h1s_sc(h1s)->wait_event.tasklet, state);
 }
 
 /* Try to send an HTTP error with h1c->errcode status code. It returns 1 on success
index d09eb7110d6c904e6874960abcc6019f7d003d14..89439a129c6a83549175a6baa2ccedf86e4eba58 100644 (file)
@@ -1695,26 +1695,27 @@ static void __maybe_unused h2s_notify_send(struct h2s *h2s)
        }
 }
 
-/* alerts the data layer, trying to wake it up by all means, following
- * this sequence :
- *   - if the h2s' data layer is subscribed to recv, then it's woken up for recv
- *   - if its subscribed to send, then it's woken up for send
- *   - if it was subscribed to neither, its ->wake() callback is called
- * It is safe to call this function with a closed stream which doesn't have a
- * stream connector anymore.
+/* alerts the data layer by waking it up. TASK_WOKEN_MSG state is used by
+ * default and if the data layer is also subscribed to recv or send,
+ * TASK_WOKEN_IO is added. But first of all, we check if the shut tasklet must
+ * be woken up or not instead.
  */
 static void __maybe_unused h2s_alert(struct h2s *h2s)
 {
        TRACE_ENTER(H2_EV_H2S_WAKE, h2s->h2c->conn, h2s);
+       if (!h2s->subs && (h2s->flags & (H2_SF_WANT_SHUTR | H2_SF_WANT_SHUTW)))
+               tasklet_wakeup(h2s->shut_tl);
+       else if (h2s_sc(h2s)) {
+               unsigned int state = TASK_WOKEN_MSG;
 
-       if (h2s->subs ||
-           (h2s->flags & (H2_SF_WANT_SHUTR | H2_SF_WANT_SHUTW))) {
-               h2s_notify_recv(h2s);
-               h2s_notify_send(h2s);
-       }
-       else if (h2s_sc(h2s) && h2s_sc(h2s)->app_ops->wake != NULL) {
-               TRACE_POINT(H2_EV_STRM_WAKE, h2s->h2c->conn, h2s);
-               h2s_sc(h2s)->app_ops->wake(h2s_sc(h2s));
+               if (h2s->subs) {
+                       if (h2s->subs->events & SUB_RETRY_SEND)
+                               h2s->flags |= H2_SF_NOTIFIED;
+                       h2s->subs->events = 0;
+                       h2s->subs = NULL;
+                       state |= TASK_WOKEN_IO;
+               }
+               tasklet_wakeup(h2s_sc(h2s)->wait_event.tasklet, state);
        }
 
        TRACE_LEAVE(H2_EV_H2S_WAKE, h2s->h2c->conn, h2s);
index 0b4c9ce78cdd092a9e83a0fab1ebc658ef086554..0c6a1f972a9e0da0eeb8fcc579a1ddc67dc690bb 100644 (file)
@@ -252,20 +252,21 @@ struct task *mux_pt_io_cb(struct task *t, void *tctx, unsigned int status)
 
        TRACE_ENTER(PT_EV_CONN_WAKE, ctx->conn);
        if (!se_fl_test(ctx->sd, SE_FL_ORPHAN)) {
+               unsigned int state = TASK_WOKEN_MSG;
+
                /* There's a small race condition.
                 * mux_pt_io_cb() is only supposed to be called if we have no
                 * stream attached. However, maybe the tasklet got woken up,
                 * and this connection was then attached to a new stream.
-                * If this happened, just wake the tasklet up if anybody
-                * subscribed to receive events, and otherwise call the wake
-                * method, to make sure the event is noticed.
+                * If this happened, just wake the tasklet up.
                 */
                if (ctx->conn->subs) {
                        ctx->conn->subs->events = 0;
-                       tasklet_wakeup(ctx->conn->subs->tasklet);
                        ctx->conn->subs = NULL;
-               } else if (pt_sc(ctx)->app_ops->wake)
-                       pt_sc(ctx)->app_ops->wake(pt_sc(ctx));
+                       state |= TASK_WOKEN_IO;
+               }
+               tasklet_wakeup(pt_sc(ctx)->wait_event.tasklet, state);
+
                TRACE_DEVEL("leaving waking up SC", PT_EV_CONN_WAKE, ctx->conn);
                return t;
        }
@@ -360,14 +361,9 @@ static int mux_pt_wake(struct connection *conn)
        int ret = 0;
 
        TRACE_ENTER(PT_EV_CONN_WAKE, ctx->conn);
-       if (!se_fl_test(ctx->sd, SE_FL_ORPHAN)) {
-               ret = pt_sc(ctx)->app_ops->wake ? pt_sc(ctx)->app_ops->wake(pt_sc(ctx)) : 0;
-
-               if (ret < 0) {
-                       TRACE_DEVEL("leaving waking up SC", PT_EV_CONN_WAKE, ctx->conn);
-                       return ret;
-               }
-       } else {
+       if (!se_fl_test(ctx->sd, SE_FL_ORPHAN))
+               tasklet_wakeup(pt_sc(ctx)->wait_event.tasklet, TASK_WOKEN_MSG);
+       else {
                conn_ctrl_drain(conn);
                if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH)) {
                        TRACE_DEVEL("leaving destroying PT context", PT_EV_CONN_WAKE, ctx->conn);
index 248c787bbe46cb9fbeb34edce3e6b7524f7dff76..3fd283d9fe091ecfbe253dbcf991ba663cdd7608 100644 (file)
@@ -506,19 +506,23 @@ static struct ncbuf *qcs_get_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf)
        return ncbuf;
 }
 
-/* Notify an eventual subscriber on <qcs> or else wakeup up the stconn layer if
- * initialized.
+/* Notify the stconn layer if initialized with TASK_WOKEN_MSG state and
+ * eventually TASK_WOKEN_IO.
  */
 static void qcs_alert(struct qcs *qcs)
 {
+       unsigned int state = TASK_WOKEN_MSG;
+
+       TRACE_POINT(QMUX_EV_STRM_WAKE, qcs->qcc->conn, qcs);
+       if (!qcs_sc(qcs))
+               return;
+
        if (qcs->subs) {
-               qcs_notify_recv(qcs);
-               qcs_notify_send(qcs);
-       }
-       else if (qcs_sc(qcs) && qcs->sd->sc->app_ops->wake) {
-               TRACE_POINT(QMUX_EV_STRM_WAKE, qcs->qcc->conn, qcs);
-               qcs->sd->sc->app_ops->wake(qcs->sd->sc);
+               qcs->subs->events = 0;
+               qcs->subs = NULL;
+               state |= TASK_WOKEN_IO;
        }
+       tasklet_wakeup(qcs_sc(qcs)->wait_event.tasklet, state);
 }
 
 int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es)
index 1fa07e7fb7773a719895140c195232d54e42a529..162842ab25f45f68727c8032a02ec28e519888da 100644 (file)
@@ -972,26 +972,26 @@ static void spop_strm_notify_send(struct spop_strm *spop_strm)
        }
 }
 
-/* Alerts the data layer, trying to wake it up by all means, following
- * this sequence :
- *   - if the spop stream' data layer is subscribed to recv, then it's woken up
- *     for recv
- *   - if its subscribed to send, then it's woken up for send
- *   - if it was subscribed to neither, its ->wake() callback is called
- * It is safe to call this function with a closed stream which doesn't have a
- * stream connector anymore.
+/* Alerts the data layer by waking it up. TASK_WOKEN_MSG state is used by
+ * default and if the data layer is also subscribed to recv or send,
+ * TASK_WOKEN_IO is added.
  */
 static void spop_strm_alert(struct spop_strm *spop_strm)
 {
+       unsigned int state = TASK_WOKEN_MSG;
+
        TRACE_POINT(SPOP_EV_STRM_WAKE, spop_strm->spop_conn->conn, spop_strm);
+       if (!spop_strm_sc(spop_strm))
+               return;
+
        if (spop_strm->subs) {
-               spop_strm_notify_recv(spop_strm);
-               spop_strm_notify_send(spop_strm);
-       }
-       else if (spop_strm_sc(spop_strm) && spop_strm_sc(spop_strm)->app_ops->wake != NULL) {
-               TRACE_POINT(SPOP_EV_STRM_WAKE, spop_strm->spop_conn->conn, spop_strm);
-               spop_strm_sc(spop_strm)->app_ops->wake(spop_strm_sc(spop_strm));
+               if (spop_strm->subs->events & SUB_RETRY_SEND)
+                       spop_strm->flags |= SPOP_SF_NOTIFIED;
+               spop_strm->subs->events  = 0;
+               spop_strm->subs = NULL;
+                       state |= TASK_WOKEN_IO;
        }
+       tasklet_wakeup(spop_strm_sc(spop_strm)->wait_event.tasklet, state);
 }
 
 /* Writes the 32-bit frame size <len> at address <frame> */