]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MAJOR: applet: applet scheduler rework.
authorEmeric Brun <ebrun@haproxy.com>
Mon, 26 Jun 2017 14:36:53 +0000 (16:36 +0200)
committerWilly Tarreau <w@1wt.eu>
Tue, 27 Jun 2017 12:38:02 +0000 (14:38 +0200)
In order to authorize call of appctx_wakeup on running task:
- from within the task handler itself.
- in futur, from another thread.

The appctx is considered paused as default after running the handler.

The handler should explicitly call appctx_wakeup to be re-called.

When the appctx_free is called on a running handler. The real
free is postponed at the end of the handler process.

include/proto/applet.h
include/types/applet.h
src/applet.c
src/stream.c
src/stream_interface.c

index 653be31e486899aff069c2025a4aa2c20d1336e3..3cf8578c1e40e9614912468c59375c7526ace9ce 100644 (file)
@@ -48,6 +48,7 @@ static inline void appctx_init(struct appctx *appctx)
 {
        appctx->st0 = appctx->st1 = appctx->st2 = 0;
        appctx->io_release = NULL;
+       appctx->state = APPLET_SLEEPING;
 }
 
 /* Tries to allocate a new appctx and initialize its main fields. The appctx
@@ -76,7 +77,7 @@ static inline struct appctx *appctx_new(struct applet *applet)
 /* Releases an appctx previously allocated by appctx_new(). Note that
  * we share the connection pool.
  */
-static inline void appctx_free(struct appctx *appctx)
+static inline void __appctx_free(struct appctx *appctx)
 {
        if (!LIST_ISEMPTY(&appctx->runq)) {
                LIST_DEL(&appctx->runq);
@@ -89,9 +90,17 @@ static inline void appctx_free(struct appctx *appctx)
        pool_free2(pool2_connection, appctx);
        nb_applets--;
 }
+static inline void appctx_free(struct appctx *appctx)
+{
+       if (appctx->state & APPLET_RUNNING) {
+               appctx->state |= APPLET_WANT_DIE;
+               return;
+       }
+       __appctx_free(appctx);
+}
 
 /* wakes up an applet when conditions have changed */
-static inline void appctx_wakeup(struct appctx *appctx)
+static inline void __appctx_wakeup(struct appctx *appctx)
 {
        if (LIST_ISEMPTY(&appctx->runq)) {
                LIST_ADDQ(&applet_active_queue, &appctx->runq);
@@ -99,25 +108,34 @@ static inline void appctx_wakeup(struct appctx *appctx)
        }
 }
 
-/* removes an applet from the list of active applets */
-static inline void appctx_pause(struct appctx *appctx)
+static inline void appctx_wakeup(struct appctx *appctx)
 {
-       if (!LIST_ISEMPTY(&appctx->runq)) {
-               LIST_DEL(&appctx->runq);
-               LIST_INIT(&appctx->runq);
-               applets_active_queue--;
+       if (appctx->state & APPLET_RUNNING) {
+               appctx->state |= APPLET_WOKEN_UP;
+               return;
        }
+       __appctx_wakeup(appctx);
 }
 
 /* Callback used to wake up an applet when a buffer is available. The applet
  * <appctx> is woken up is if it is not already in the list of "active"
  * applets. This functions returns 1 is the stream is woken up, otherwise it
- * returns 0. */
+ * returns 0. If task is running we request we check if woken was already
+ * requested */
 static inline int appctx_res_wakeup(struct appctx *appctx)
 {
-       if (!LIST_ISEMPTY(&appctx->runq))
+       if (appctx->state & APPLET_RUNNING) {
+               if (appctx->state & APPLET_WOKEN_UP) {
+                       return 0;
+               }
+               appctx->state |= APPLET_WOKEN_UP;
+               return 1;
+       }
+
+       if (!LIST_ISEMPTY(&appctx->runq)) {
                return 0;
-       appctx_wakeup(appctx);
+       }
+       __appctx_wakeup(appctx);
        return 1;
 }
 
index 90484f6070ed6c70ee294f0b83d9b2e071e18a93..71976d88cedee09178a418111d357f9a87aba98c 100644 (file)
@@ -44,11 +44,17 @@ struct applet {
        unsigned int timeout;              /* execution timeout. */
 };
 
+#define APPLET_SLEEPING     0x00  /* applet is currently sleeping or pending in active queue */
+#define APPLET_RUNNING      0x01  /* applet is currently running */
+#define APPLET_WOKEN_UP     0x02  /* applet was running and requested to woken up again */
+#define APPLET_WANT_DIE     0x04  /* applet was running and requested to die */
+
 /* Context of a running applet. */
 struct appctx {
        struct list runq;          /* chaining in the applet run queue */
        enum obj_type obj_type;    /* OBJ_TYPE_APPCTX */
        /* 3 unused bytes here */
+       unsigned short state;      /* Internal appctx state */
        unsigned int st0;          /* CLI state for stats, session state for peers */
        unsigned int st1;          /* prompt for stats, session error for peers */
        unsigned int st2;          /* output state for stats, unused by peers  */
@@ -59,6 +65,7 @@ struct appctx {
        void (*io_release)(struct appctx *appctx);  /* used within the cli_io_handler when st0 = CLI_ST_CALLBACK,
                                                       if the command is terminated or the session released */
        struct buffer_wait buffer_wait; /* position in the list of objects waiting for a buffer */
+       unsigned long process_mask;     /* mask of thread IDs authorized to process the applet */
 
        union {
                struct {
index f5bc79d8003bf7c2d48166bfd8113c588eae02f4..324dfd3b92e5e46e019da3876771c3abda782cb2 100644 (file)
@@ -24,22 +24,22 @@ unsigned int nb_applets = 0;
 unsigned int applets_active_queue = 0;
 
 struct list applet_active_queue = LIST_HEAD_INIT(applet_active_queue);
-struct list applet_cur_queue    = LIST_HEAD_INIT(applet_cur_queue);
 
 void applet_run_active()
 {
-       struct appctx *curr;
+       struct appctx *curr, *next;
        struct stream_interface *si;
+       struct list applet_cur_queue = LIST_HEAD_INIT(applet_cur_queue);
 
-       if (LIST_ISEMPTY(&applet_active_queue))
-               return;
-
-       /* move active queue to run queue */
-       applet_active_queue.n->p = &applet_cur_queue;
-       applet_active_queue.p->n = &applet_cur_queue;
-
-       applet_cur_queue = applet_active_queue;
-       LIST_INIT(&applet_active_queue);
+       curr = LIST_NEXT(&applet_active_queue, typeof(curr), runq);
+       while (&curr->runq != &applet_active_queue) {
+               next = LIST_NEXT(&curr->runq, typeof(next), runq);
+               LIST_DEL(&curr->runq);
+               curr->state = APPLET_RUNNING;
+               LIST_ADDQ(&applet_cur_queue, &curr->runq);
+               applets_active_queue--;
+               curr = next;
+       }
 
        /* The list is only scanned from the head. This guarantees that if any
         * applet removes another one, there is no side effect while walking
@@ -70,7 +70,20 @@ void applet_run_active()
                if (applet_cur_queue.n == &curr->runq) {
                        /* curr was left in the list, move it back to the active list */
                        LIST_DEL(&curr->runq);
-                       LIST_ADDQ(&applet_active_queue, &curr->runq);
+                       LIST_INIT(&curr->runq);
+                       if (curr->state & APPLET_WANT_DIE) {
+                               curr->state = APPLET_SLEEPING;
+                               __appctx_free(curr);
+                       }
+                       else {
+                               if (curr->state & APPLET_WOKEN_UP) {
+                                       curr->state = APPLET_SLEEPING;
+                                       __appctx_wakeup(curr);
+                               }
+                               else {
+                                       curr->state = APPLET_SLEEPING;
+                               }
+                       }
                }
        }
 }
index 3781ac75fc054017b7f702ae56cd92f9c4cf325e..4e34f3848c4c26a894ed577dbba0adba438e96ad 100644 (file)
@@ -1061,7 +1061,6 @@ enum act_return process_use_service(struct act_rule *rule, struct proxy *px,
        /* Stops the applet sheduling, in case of the init function miss
         * some data.
         */
-       appctx_pause(appctx);
        si_applet_stop_get(&s->si[1]);
 
        /* Call initialisation. */
index 47ba8c1f600ec89c2f9c3d2cf1fbd79fa7c4ada5..52e2df4674419a6cf6a988d8a3b639e5350a410d 100644 (file)
@@ -1369,16 +1369,6 @@ void si_applet_wake_cb(struct stream_interface *si)
 
        /* update the stream-int, channels, and possibly wake the stream up */
        stream_int_notify(si);
-
-       /* Get away from the active list if we can't work anymore.
-        * We also do that if the main task has already scheduled, because it
-        * saves a useless wakeup/pause/wakeup cycle causing one useless call
-        * per session on average.
-        */
-       if (task_in_rq(si_task(si)) ||
-           (((si->flags & (SI_FL_WANT_PUT|SI_FL_WAIT_ROOM)) != SI_FL_WANT_PUT) &&
-            ((si->flags & (SI_FL_WANT_GET|SI_FL_WAIT_DATA)) != SI_FL_WANT_GET)))
-               appctx_pause(si_appctx(si));
 }
 
 
@@ -1393,8 +1383,6 @@ void stream_int_update_applet(struct stream_interface *si)
        if (((si->flags & (SI_FL_WANT_PUT|SI_FL_WAIT_ROOM)) == SI_FL_WANT_PUT) ||
            ((si->flags & (SI_FL_WANT_GET|SI_FL_WAIT_DATA)) == SI_FL_WANT_GET))
                appctx_wakeup(si_appctx(si));
-       else
-               appctx_pause(si_appctx(si));
 }
 
 /*