From: Christopher Faulet Date: Fri, 1 Apr 2022 14:34:53 +0000 (+0200) Subject: MEDIUM: conn-stream/applet: Add a data callback for applets X-Git-Tag: v2.6-dev6~55 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=6059ba4acc27e5ea03a3537aab92245f12146a9f;p=thirdparty%2Fhaproxy.git MEDIUM: conn-stream/applet: Add a data callback for applets data callbacks were only used for streams attached to a connection and for health-checks. However there is a callback used by task_run_applet. So, si_applet_wake_cb() is first renamed to cs_applet_process() and it is defined as the data callback for streams attached to an applet. This way, this part now manipulates a conn-stream instead of a stream-interface. In addition, applets are no longer handled as an exception for this part. --- diff --git a/include/haproxy/stream_interface.h b/include/haproxy/stream_interface.h index 45a8bacd3f..a9adb8719e 100644 --- a/include/haproxy/stream_interface.h +++ b/include/haproxy/stream_interface.h @@ -30,13 +30,14 @@ #include extern struct data_cb si_conn_cb; +extern struct data_cb cs_data_applet_cb; extern struct data_cb check_conn_cb; struct stream_interface *si_new(struct conn_stream *cs); void si_free(struct stream_interface *si); /* main event functions used to move data between sockets and buffers */ -void si_applet_wake_cb(struct stream_interface *si); +int cs_applet_process(struct conn_stream *cs); struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned int state); int si_sync_recv(struct stream_interface *si); void si_sync_send(struct stream_interface *si); diff --git a/src/applet.c b/src/applet.c index 65e4337050..b403c239e8 100644 --- a/src/applet.c +++ b/src/applet.c @@ -175,7 +175,7 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state) stream_dump_and_crash(&app->obj_type, read_freq_ctr(&app->call_rate)); } - si_applet_wake_cb(cs->si); + cs->data_cb->wake(cs); channel_release_buffer(cs_ic(cs), &app->buffer_wait); return t; } diff --git a/src/conn_stream.c b/src/conn_stream.c index 6686ee48e8..352624dc91 100644 --- a/src/conn_stream.c +++ b/src/conn_stream.c @@ -252,7 +252,7 @@ void cs_attach_applet(struct conn_stream *cs, void *target, void *ctx) appctx->owner = cs; if (cs_strm(cs)) { cs->ops = &cs_app_applet_ops; - cs->data_cb = NULL; + cs->data_cb = &cs_data_applet_cb; } } @@ -282,7 +282,7 @@ int cs_attach_strm(struct conn_stream *cs, struct stream *strm) } else if (cs->endp->flags & CS_EP_T_APPLET) { cs->ops = &cs_app_applet_ops; - cs->data_cb = NULL; + cs->data_cb = &cs_data_applet_cb; } else { cs->ops = &cs_app_embedded_ops; diff --git a/src/stream_interface.c b/src/stream_interface.c index 0f060ec05b..da8955be6e 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -53,6 +53,11 @@ struct data_cb si_conn_cb = { }; +struct data_cb cs_data_applet_cb = { + .wake = cs_applet_process, + .name = "STRM", +}; + struct stream_interface *si_new(struct conn_stream *cs) { struct stream_interface *si; @@ -908,36 +913,37 @@ static void stream_int_read0(struct stream_interface *si) * may re-enable the applet's based on the channels and stream interface's final * states. */ -void si_applet_wake_cb(struct stream_interface *si) +int cs_applet_process(struct conn_stream *cs) { - struct channel *ic = si_ic(si); + struct channel *ic = cs_ic(cs); - BUG_ON(!cs_appctx(si->cs)); + BUG_ON(!cs_appctx(cs)); /* If the applet wants to write and the channel is closed, it's a * broken pipe and it must be reported. */ - if (!(si->flags & SI_FL_RX_WAIT_EP) && (ic->flags & CF_SHUTR)) - si->cs->endp->flags |= CS_EP_ERROR; + if (!(cs->si->flags & SI_FL_RX_WAIT_EP) && (ic->flags & CF_SHUTR)) + cs->endp->flags |= CS_EP_ERROR; /* automatically mark the applet having data available if it reported * begin blocked by the channel. */ - if (si_rx_blocked(si)) - si_rx_endp_more(si); + if (si_rx_blocked(cs->si)) + si_rx_endp_more(cs->si); /* update the stream-int, channels, and possibly wake the stream up */ - stream_int_notify(si); - stream_release_buffers(si_strm(si)); + stream_int_notify(cs->si); + stream_release_buffers(__cs_strm(cs)); /* stream_int_notify may have passed through chk_snd and released some * RXBLK flags. Process_stream will consider those flags to wake up the * appctx but in the case the task is not in runqueue we may have to * wakeup the appctx immediately. */ - if ((si_rx_endp_ready(si) && !si_rx_blocked(si)) || - (si_tx_endp_ready(si) && !si_tx_blocked(si))) - appctx_wakeup(__cs_appctx(si->cs)); + if ((si_rx_endp_ready(cs->si) && !si_rx_blocked(cs->si)) || + (si_tx_endp_ready(cs->si) && !si_tx_blocked(cs->si))) + appctx_wakeup(__cs_appctx(cs)); + return 0; } /*