]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: conn-stream/applet: Add a data callback for applets
authorChristopher Faulet <cfaulet@haproxy.com>
Fri, 1 Apr 2022 14:34:53 +0000 (16:34 +0200)
committerChristopher Faulet <cfaulet@haproxy.com>
Wed, 13 Apr 2022 13:10:15 +0000 (15:10 +0200)
data callbacks were only used for streams attached to a connection and
for health-checks. However there is a callback used by task_run_applet. So,
si_applet_wake_cb() is first renamed to cs_applet_process() and it is
defined as the data callback for streams attached to an applet. This way,
this part now manipulates a conn-stream instead of a stream-interface. In
addition, applets are no longer handled as an exception for this part.

include/haproxy/stream_interface.h
src/applet.c
src/conn_stream.c
src/stream_interface.c

index 45a8bacd3f8963ca38def10ef58602ddd848bfc3..a9adb8719eb08644eba58cd7bc58841a1beac825 100644 (file)
 #include <haproxy/obj_type.h>
 
 extern struct data_cb si_conn_cb;
+extern struct data_cb cs_data_applet_cb;
 extern struct data_cb check_conn_cb;
 
 struct stream_interface *si_new(struct conn_stream *cs);
 void si_free(struct stream_interface *si);
 
 /* main event functions used to move data between sockets and buffers */
-void si_applet_wake_cb(struct stream_interface *si);
+int cs_applet_process(struct conn_stream *cs);
 struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned int state);
 int si_sync_recv(struct stream_interface *si);
 void si_sync_send(struct stream_interface *si);
index 65e4337050282a7f66677f15c66b2168e08c6d61..b403c239e84fe66b4236149ea3f02d0be66a9275 100644 (file)
@@ -175,7 +175,7 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state)
                stream_dump_and_crash(&app->obj_type, read_freq_ctr(&app->call_rate));
        }
 
-       si_applet_wake_cb(cs->si);
+       cs->data_cb->wake(cs);
        channel_release_buffer(cs_ic(cs), &app->buffer_wait);
        return t;
 }
index 6686ee48e8a6484cedcc5f6824acac87b5b80624..352624dc91e61ce752376a85f2cf58d12e3bb7a3 100644 (file)
@@ -252,7 +252,7 @@ void cs_attach_applet(struct conn_stream *cs, void *target, void *ctx)
        appctx->owner = cs;
        if (cs_strm(cs)) {
                cs->ops = &cs_app_applet_ops;
-               cs->data_cb = NULL;
+               cs->data_cb = &cs_data_applet_cb;
        }
 }
 
@@ -282,7 +282,7 @@ int cs_attach_strm(struct conn_stream *cs, struct stream *strm)
        }
        else if (cs->endp->flags & CS_EP_T_APPLET) {
                cs->ops = &cs_app_applet_ops;
-               cs->data_cb = NULL;
+               cs->data_cb = &cs_data_applet_cb;
        }
        else {
                cs->ops = &cs_app_embedded_ops;
index 0f060ec05be8f022eeb4caada99f8f7604a038a0..da8955be6eed6389c61e98def394d33124005eb3 100644 (file)
@@ -53,6 +53,11 @@ struct data_cb si_conn_cb = {
 };
 
 
+struct data_cb cs_data_applet_cb = {
+       .wake    = cs_applet_process,
+       .name    = "STRM",
+};
+
 struct stream_interface *si_new(struct conn_stream *cs)
 {
        struct stream_interface *si;
@@ -908,36 +913,37 @@ static void stream_int_read0(struct stream_interface *si)
  * may re-enable the applet's based on the channels and stream interface's final
  * states.
  */
-void si_applet_wake_cb(struct stream_interface *si)
+int cs_applet_process(struct conn_stream *cs)
 {
-       struct channel *ic = si_ic(si);
+       struct channel *ic = cs_ic(cs);
 
-       BUG_ON(!cs_appctx(si->cs));
+       BUG_ON(!cs_appctx(cs));
 
        /* If the applet wants to write and the channel is closed, it's a
         * broken pipe and it must be reported.
         */
-       if (!(si->flags & SI_FL_RX_WAIT_EP) && (ic->flags & CF_SHUTR))
-               si->cs->endp->flags |= CS_EP_ERROR;
+       if (!(cs->si->flags & SI_FL_RX_WAIT_EP) && (ic->flags & CF_SHUTR))
+               cs->endp->flags |= CS_EP_ERROR;
 
        /* automatically mark the applet having data available if it reported
         * begin blocked by the channel.
         */
-       if (si_rx_blocked(si))
-               si_rx_endp_more(si);
+       if (si_rx_blocked(cs->si))
+               si_rx_endp_more(cs->si);
 
        /* update the stream-int, channels, and possibly wake the stream up */
-       stream_int_notify(si);
-       stream_release_buffers(si_strm(si));
+       stream_int_notify(cs->si);
+       stream_release_buffers(__cs_strm(cs));
 
        /* stream_int_notify may have passed through chk_snd and released some
         * RXBLK flags. Process_stream will consider those flags to wake up the
         * appctx but in the case the task is not in runqueue we may have to
         * wakeup the appctx immediately.
         */
-       if ((si_rx_endp_ready(si) && !si_rx_blocked(si)) ||
-           (si_tx_endp_ready(si) && !si_tx_blocked(si)))
-               appctx_wakeup(__cs_appctx(si->cs));
+       if ((si_rx_endp_ready(cs->si) && !si_rx_blocked(cs->si)) ||
+           (si_tx_endp_ready(cs->si) && !si_tx_blocked(cs->si)))
+               appctx_wakeup(__cs_appctx(cs));
+       return 0;
 }
 
 /*