]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIM: applet: Add the applet handler based on IN/OUT buffers
authorChristopher Faulet <cfaulet@haproxy.com>
Thu, 11 Jan 2024 08:57:01 +0000 (09:57 +0100)
committerChristopher Faulet <cfaulet@haproxy.com>
Wed, 7 Feb 2024 14:03:26 +0000 (15:03 +0100)
A dedicated function to run applets was introduced, in addition to the old
one, to deal with applets that use their own buffers. The main differnce
here is that this handler does not use channels at all. It performs a
synchronous send before calling the applet and performs a synchronous
receive just after.

No applets are plugged on this handler for now.

include/haproxy/applet.h
src/applet.c

index ebd26f0c68717053680046d18562574da7480aac..9b4f5744b25a897d93d2c3203565ba503fb60ddb 100644 (file)
@@ -38,6 +38,7 @@ extern unsigned int nb_applets;
 extern struct pool_head *pool_head_appctx;
 
 struct task *task_run_applet(struct task *t, void *context, unsigned int state);
+struct task *task_process_applet(struct task *t, void *context, unsigned int state);
 int appctx_buf_available(void *arg);
 void *applet_reserve_svcctx(struct appctx *appctx, size_t size);
 void applet_reset_svcctx(struct appctx *appctx);
index 89bea0238a876e2e07d08ba34ba56f996d361833..81bb99964df2bee6df8cc1dc2868ed87844c6461 100644 (file)
@@ -238,7 +238,10 @@ struct appctx *appctx_new_on(struct applet *applet, struct sedesc *sedesc, int t
        }
 
        appctx->sedesc = sedesc;
-       appctx->t->process = task_run_applet;
+       if (applet->rcv_buf != NULL && applet->snd_buf != NULL)
+               appctx->t->process = task_process_applet;
+       else
+               appctx->t->process = task_run_applet;
        appctx->t->context = appctx;
 
        appctx->inbuf = BUF_NULL;
@@ -663,3 +666,72 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state)
        TRACE_LEAVE(APPLET_EV_PROCESS, app);
        return t;
 }
+
+
+/* Default applet handler based on IN/OUT buffers. It is a true task here, no a tasklet  */
+struct task *task_process_applet(struct task *t, void *context, unsigned int state)
+{
+       struct appctx *app = context;
+       struct stconn *sc;
+       unsigned int rate;
+
+       TRACE_ENTER(APPLET_EV_PROCESS, app);
+
+       if (app->state & APPLET_WANT_DIE) {
+               TRACE_DEVEL("APPCTX want die, release it", APPLET_EV_FREE, app);
+               __appctx_free(app);
+               return NULL;
+       }
+
+       if (se_fl_test(app->sedesc, SE_FL_ORPHAN)) {
+               /* Finalize init of orphan appctx. .init callback function must
+                * be defined and it must finalize appctx startup.
+                */
+               BUG_ON(!app->applet->init);
+
+               if (appctx_init(app) == -1) {
+                       TRACE_DEVEL("APPCTX init failed", APPLET_EV_FREE|APPLET_EV_ERR, app);
+                       appctx_free_on_early_error(app);
+                       return NULL;
+               }
+               BUG_ON(!app->sess || !appctx_sc(app) || !appctx_strm(app));
+               TRACE_DEVEL("APPCTX initialized", APPLET_EV_PROCESS, app);
+       }
+
+       sc = appctx_sc(app);
+
+       sc_applet_sync_send(sc);
+
+       /* We always pretend the applet can't get and doesn't want to
+        * put, it's up to it to change this if needed. This ensures
+        * that one applet which ignores any event will not spin.
+        */
+       applet_need_more_data(app);
+       applet_have_no_more_data(app);
+
+       app->applet->fct(app);
+
+       TRACE_POINT(APPLET_EV_PROCESS, app);
+
+       sc_applet_sync_recv(sc);
+
+       /* TODO: May be move in appctx_rcv_buf or sc_applet_process ? */
+       if (sc_waiting_room(sc) && (sc->flags & SC_FL_ABRT_DONE)) {
+               sc_ep_set(sc, SE_FL_EOS|SE_FL_ERROR);
+       }
+
+       /* measure the call rate and check for anomalies when too high */
+       if (((b_size(sc_ib(sc)) && sc->flags & SC_FL_NEED_BUFF) || // asks for a buffer which is present
+            (b_size(sc_ib(sc)) && !b_data(sc_ib(sc)) && sc->flags & SC_FL_NEED_ROOM) || // asks for room in an empty buffer
+            (b_data(sc_ob(sc)) && sc_is_send_allowed(sc)) || // asks for data already present
+            (!b_data(sc_ib(sc)) && b_data(sc_ob(sc)) && // didn't return anything ...
+             (!(sc_oc(sc)->flags & CF_WRITE_EVENT) && (sc->flags & SC_FL_SHUT_WANTED))))) { // ... and left data pending after a shut
+               rate = update_freq_ctr(&app->call_rate, 1);
+               if (rate >= 100000 && app->call_rate.prev_ctr) // looped like this more than 100k times over last second
+                       stream_dump_and_crash(&app->obj_type, read_freq_ctr(&app->call_rate));
+       }
+
+       sc->app_ops->wake(sc);
+       TRACE_LEAVE(APPLET_EV_PROCESS, app);
+       return t;
+}