]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: applet: Handle channel's STREAMER flags on applets size
authorChristopher Faulet <cfaulet@haproxy.com>
Tue, 21 Nov 2023 07:03:37 +0000 (08:03 +0100)
committerChristopher Faulet <cfaulet@haproxy.com>
Wed, 6 Dec 2023 09:24:41 +0000 (10:24 +0100)
Till now, it was not possible to notify an producing applet is streaming
data. It means, it was not possible to set CF_STREAMER and CF_STREAMER_FLAGS
on the input channel of an applet streaming data.

While it is not a big deal for most of applets, it is interesting for the
cache. Because there are now dedicated functions to deal with these flags,
we can use them in task_run_applet() to set/unset these flags on the input
channel.

This patch relies on "MINOR: channel: Use dedicated functions to deal with
STREAMER flags".

src/applet.c

index a5b0946b9a275bbc3d7e27a54896500b85ac58a5..c6e08b0bf33399ded366135fcf3bf17659a91f7e 100644 (file)
@@ -404,8 +404,9 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state)
 {
        struct appctx *app = context;
        struct stconn *sc, *sco;
+       struct channel *ic, *oc;
        unsigned int rate;
-       size_t count;
+       size_t input, output;
        int did_send = 0;
 
        TRACE_ENTER(APPLET_EV_PROCESS, app);
@@ -434,6 +435,9 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state)
        sc = appctx_sc(app);
        sco = sc_opposite(sc);
 
+       ic = sc_ic(sc);
+       oc = sc_oc(sc);
+
        /* We always pretend the applet can't get and doesn't want to
         * put, it's up to it to change this if needed. This ensures
         * that one applet which ignores any event will not spin.
@@ -450,7 +454,10 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state)
        if (!sc_alloc_ibuf(sc, &app->buffer_wait))
                applet_have_more_data(app);
 
-       count = co_data(sc_oc(sc));
+       channel_check_idletimer(ic);
+
+       input  = channel_data(ic);
+       output = co_data(oc);
        app->applet->fct(app);
 
        TRACE_POINT(APPLET_EV_PROCESS, app);
@@ -458,9 +465,9 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state)
        /* now check if the applet has released some room and forgot to
         * notify the other side about it.
         */
-       if (count != co_data(sc_oc(sc))) {
-               sc_oc(sc)->flags |= CF_WRITE_EVENT | CF_WROTE_DATA;
-               if (sco->room_needed < 0 || channel_recv_max(sc_oc(sc)) >= sco->room_needed)
+       if (output != co_data(oc)) {
+               oc->flags |= CF_WRITE_EVENT | CF_WROTE_DATA;
+               if (sco->room_needed < 0 || channel_recv_max(oc) >= sco->room_needed)
                        sc_have_room(sco);
                did_send = 1;
        }
@@ -469,14 +476,17 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state)
                        sc_have_room(sco);
        }
 
-       if (sc_ic(sc)->flags & CF_READ_EVENT)
+       input = channel_data(ic) - input;
+       if (input) {
+               channel_check_xfer(ic, input);
                sc_ep_report_read_activity(sc);
+       }
 
        if (sc_waiting_room(sc) && (sc->flags & SC_FL_ABRT_DONE)) {
                sc_ep_set(sc, SE_FL_EOS|SE_FL_ERROR);
        }
 
-       if (!co_data(sc_oc(sc))) {
+       if (!co_data(oc)) {
                if (did_send)
                        sc_ep_report_send_activity(sc);
        }
@@ -495,7 +505,7 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state)
        }
 
        sc->app_ops->wake(sc);
-       channel_release_buffer(sc_ic(sc), &app->buffer_wait);
+       channel_release_buffer(ic, &app->buffer_wait);
        TRACE_LEAVE(APPLET_EV_PROCESS, app);
        return t;
 }