]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MAJOR: applets: Use tasks, instead of rolling our own scheduler.
authorOlivier Houchard <ohouchard@haproxy.com>
Fri, 25 May 2018 14:58:52 +0000 (16:58 +0200)
committerWilly Tarreau <w@1wt.eu>
Sat, 26 May 2018 18:03:30 +0000 (20:03 +0200)
There's no real reason to have a specific scheduler for applets anymore, so
nuke it and just use tasks. This comes with some benefits, the first one
being that applets cannot induce high latencies anymore since they share
nice values with other tasks. Later it will be possible to configure the
applets' nice value. The second benefit is that the applet scheduler was
not very thread-friendly, having a big lock around it in prevision of this
change. Thus applet-intensive workloads should now scale much better with
threads.

Some more improvement is possible now : some applets also use a task to
handle timers and timeouts. These ones could now be simplified to use only
one task.

include/proto/applet.h
include/proto/channel.h
include/types/applet.h
include/types/global.h
src/applet.c
src/cfgparse.c
src/cli.c
src/flt_spoe.c
src/haproxy.c
src/mux_h2.c
src/stream.c

index 7cc2c0ad148ef7858a407bf10fd57907acaf28c6..00ad63b6a1259fc89efb6b8cfb55eee17a994e2d 100644 (file)
 #include <common/mini-clist.h>
 #include <types/applet.h>
 #include <proto/connection.h>
+#include <proto/task.h>
 
 extern unsigned int nb_applets;
-extern unsigned long active_applets_mask;
-extern unsigned int applets_active_queue;
-__decl_hathreads(extern HA_SPINLOCK_T applet_active_lock);
-extern struct list applet_active_queue;
 
-void applet_run_active();
+struct task *task_run_applet(struct task *t, void *context, unsigned short state);
 
 
 static int inline appctx_res_wakeup(struct appctx *appctx);
@@ -52,7 +49,7 @@ static inline void appctx_init(struct appctx *appctx, unsigned long thread_mask)
        appctx->chunk = NULL;
        appctx->io_release = NULL;
        appctx->thread_mask = thread_mask;
-       appctx->state = APPLET_SLEEPING;
+       appctx->state = 0;
 }
 
 /* Tries to allocate a new appctx and initialize its main fields. The appctx
@@ -69,7 +66,13 @@ static inline struct appctx *appctx_new(struct applet *applet, unsigned long thr
                appctx->obj_type = OBJ_TYPE_APPCTX;
                appctx->applet = applet;
                appctx_init(appctx, thread_mask);
-               LIST_INIT(&appctx->runq);
+               appctx->t = task_new(thread_mask);
+               if (unlikely(appctx->t == NULL)) {
+                       pool_free(pool_head_connection, appctx);
+                       return NULL;
+               }
+               appctx->t->process = task_run_applet;
+               appctx->t->context = appctx;
                LIST_INIT(&appctx->buffer_wait.list);
                appctx->buffer_wait.target = appctx;
                appctx->buffer_wait.wakeup_cb = (int (*)(void *))appctx_res_wakeup;
@@ -83,11 +86,8 @@ static inline struct appctx *appctx_new(struct applet *applet, unsigned long thr
  */
 static inline void __appctx_free(struct appctx *appctx)
 {
-       if (!LIST_ISEMPTY(&appctx->runq)) {
-               LIST_DEL(&appctx->runq);
-               applets_active_queue--;
-       }
-
+       task_delete(appctx->t);
+       task_free(appctx->t);
        if (!LIST_ISEMPTY(&appctx->buffer_wait.list)) {
                HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
                LIST_DEL(&appctx->buffer_wait.list);
@@ -98,38 +98,27 @@ static inline void __appctx_free(struct appctx *appctx)
        pool_free(pool_head_connection, appctx);
        HA_ATOMIC_SUB(&nb_applets, 1);
 }
+
 static inline void appctx_free(struct appctx *appctx)
 {
-       HA_SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
-       if (appctx->state & APPLET_RUNNING) {
+       /* The task is supposed to be run on this thread, so we can just
+        * check if it's running already (or about to run) or not
+        */
+       if (!(appctx->t->state & TASK_RUNNING))
+               __appctx_free(appctx);
+       else {
+               /* if it's running, or about to run, defer the freeing
+                * until the callback is called.
+                */
                appctx->state |= APPLET_WANT_DIE;
-               HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
-               return;
+               task_wakeup(appctx->t, TASK_WOKEN_OTHER);
        }
-       __appctx_free(appctx);
-       HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
 }
 
 /* wakes up an applet when conditions have changed */
-static inline void __appctx_wakeup(struct appctx *appctx)
-{
-       if (LIST_ISEMPTY(&appctx->runq)) {
-               LIST_ADDQ(&applet_active_queue, &appctx->runq);
-               applets_active_queue++;
-               active_applets_mask |= appctx->thread_mask;
-       }
-}
-
 static inline void appctx_wakeup(struct appctx *appctx)
 {
-       HA_SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
-       if (appctx->state & APPLET_RUNNING) {
-               appctx->state |= APPLET_WOKEN_UP;
-               HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
-               return;
-       }
-       __appctx_wakeup(appctx);
-       HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
+       task_wakeup(appctx->t, TASK_WOKEN_OTHER);
 }
 
 /* Callback used to wake up an applet when a buffer is available. The applet
@@ -139,19 +128,17 @@ static inline void appctx_wakeup(struct appctx *appctx)
  * requested */
 static inline int appctx_res_wakeup(struct appctx *appctx)
 {
-       HA_SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
-       if (appctx->state & APPLET_RUNNING) {
-               if (appctx->state & APPLET_WOKEN_UP) {
-                       HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
-                       return 0;
-               }
-               appctx->state |= APPLET_WOKEN_UP;
-               HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
-               return 1;
-       }
-       __appctx_wakeup(appctx);
-       HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
-       return 1;
+       int ret;
+
+       /* To detect if we have already been waken or not, we now that
+        * if the state contains TASK_RUNNING, but not just TASK_RUNNING.
+        * This is racy, but that's OK. At worst we will wake a little more
+        * tasks than necessary when a buffer is available.
+        */
+       ret = ((appctx->state & TASK_RUNNING) != 0) &&
+             ((appctx->state != TASK_RUNNING));
+       task_wakeup(appctx->t, TASK_WOKEN_OTHER);
+       return ret;
 }
 
 
index 274495f28c6c51c5f7ea5257264cc7bcf66addc6..d66fc911b5a8a87599b46fb727081e3eb9f47edc 100644 (file)
@@ -36,7 +36,6 @@
 #include <types/stream.h>
 #include <types/stream_interface.h>
 
-#include <proto/applet.h>
 #include <proto/task.h>
 
 /* perform minimal intializations, report 0 in case of error, 1 if OK. */
@@ -456,7 +455,7 @@ static inline void channel_release_buffer(struct channel *chn, struct buffer_wai
 {
        if (chn->buf->size && buffer_empty(chn->buf)) {
                b_free(&chn->buf);
-               offer_buffers(wait->target, tasks_run_queue + applets_active_queue);
+               offer_buffers(wait->target, tasks_run_queue);
        }
 }
 
index b0715866e2eda8b817fadcc5d69c4a8cad3ebcc8..8b7c28fc01786a63d603829bcb4ac15b63b5d0a6 100644 (file)
@@ -45,17 +45,13 @@ struct applet {
        unsigned int timeout;              /* execution timeout. */
 };
 
-#define APPLET_SLEEPING     0x00  /* applet is currently sleeping or pending in active queue */
-#define APPLET_RUNNING      0x01  /* applet is currently running */
-#define APPLET_WOKEN_UP     0x02  /* applet was running and requested to woken up again */
-#define APPLET_WANT_DIE     0x04  /* applet was running and requested to die */
+#define APPLET_WANT_DIE     0x01  /* applet was running and requested to die */
 
 #define APPCTX_CLI_ST1_PROMPT  (1 << 0)
 #define APPCTX_CLI_ST1_PAYLOAD (1 << 1)
 
 /* Context of a running applet. */
 struct appctx {
-       struct list runq;          /* chaining in the applet run queue */
        enum obj_type obj_type;    /* OBJ_TYPE_APPCTX */
        /* 3 unused bytes here */
        unsigned short state;      /* Internal appctx state */
@@ -72,6 +68,7 @@ struct appctx {
        int cli_severity_output;        /* used within the cli_io_handler to format severity output of informational feedback */
        struct buffer_wait buffer_wait; /* position in the list of objects waiting for a buffer */
        unsigned long thread_mask;      /* mask of thread IDs authorized to process the applet */
+       struct task *t;                  /* task associated to the applet */
 
        union {
                struct {
index d603c4261180b35c332040874c78a2d63082e857..2ab124b6294f946750ec938b188d03768b4501f5 100644 (file)
@@ -183,7 +183,6 @@ struct activity {
        unsigned int loops;        // complete loops in run_poll_loop()
        unsigned int wake_cache;   // active fd_cache prevented poll() from sleeping
        unsigned int wake_tasks;   // active tasks prevented poll() from sleeping
-       unsigned int wake_applets; // active applets prevented poll() from sleeping
        unsigned int wake_signal;  // pending signal prevented poll() from sleeping
        unsigned int poll_exp;     // number of times poll() sees an expired timeout (includes wake_*)
        unsigned int poll_drop;    // poller dropped a dead FD from the update list
index 77f984d91f75c3e703c4c785e14c899c4faa9ad7..d7fbb53eb67843131acaef720a9847a440111c1b 100644 (file)
 #include <proto/channel.h>
 #include <proto/stream.h>
 #include <proto/stream_interface.h>
+#include <proto/task.h>
 
 unsigned int nb_applets = 0;
-unsigned long active_applets_mask = 0;
-unsigned int applets_active_queue = 0;
-__decl_hathreads(HA_SPINLOCK_T applet_active_lock);  /* spin lock related to applet active queue */
 
-struct list applet_active_queue = LIST_HEAD_INIT(applet_active_queue);
-
-void applet_run_active()
+struct task *task_run_applet(struct task *t, void *context, unsigned short state)
 {
-       struct appctx *curr, *next;
-       struct stream_interface *si;
-       struct list applet_cur_queue = LIST_HEAD_INIT(applet_cur_queue);
-       int max_processed;
-
-       max_processed = applets_active_queue;
-       if (max_processed > 200)
-               max_processed = 200;
+       struct appctx *app = context;
+       struct stream_interface *si = app->owner;
 
-       HA_SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
-       if (!(active_applets_mask & tid_bit)) {
-               HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
-               return;
-       }
-       active_applets_mask &= ~tid_bit;
-       curr = LIST_NEXT(&applet_active_queue, typeof(curr), runq);
-       while (&curr->runq != &applet_active_queue) {
-               next = LIST_NEXT(&curr->runq, typeof(next), runq);
-               if (curr->thread_mask & tid_bit) {
-                       LIST_DEL(&curr->runq);
-                       curr->state = APPLET_RUNNING;
-                       LIST_ADDQ(&applet_cur_queue, &curr->runq);
-                       applets_active_queue--;
-                       max_processed--;
-               }
-               curr = next;
-               if (max_processed <= 0) {
-                       active_applets_mask |= tid_bit;
-                       break;
-               }
+       if (app->state & APPLET_WANT_DIE) {
+               __appctx_free(app);
+               return NULL;
        }
-       HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
-
-       /* The list is only scanned from the head. This guarantees that if any
-        * applet removes another one, there is no side effect while walking
-        * through the list.
+       /* Now we'll try to allocate the input buffer. We wake up the
+        * applet in all cases. So this is the applet responsibility to
+        * check if this buffer was allocated or not. This let a chance
+        * for applets to do some other processing if needed. */
+       if (!channel_alloc_buffer(si_ic(si), &app->buffer_wait))
+               si_applet_cant_put(si);
+
+       /* We always pretend the applet can't get and doesn't want to
+        * put, it's up to it to change this if needed. This ensures
+        * that one applet which ignores any event will not spin.
         */
-       while (!LIST_ISEMPTY(&applet_cur_queue)) {
-               curr = LIST_ELEM(applet_cur_queue.n, typeof(curr), runq);
-               si = curr->owner;
-
-               /* Now we'll try to allocate the input buffer. We wake up the
-                * applet in all cases. So this is the applet responsibility to
-                * check if this buffer was allocated or not. This let a chance
-                * for applets to do some other processing if needed. */
-               if (!channel_alloc_buffer(si_ic(si), &curr->buffer_wait))
-                       si_applet_cant_put(si);
+       si_applet_cant_get(si);
+       si_applet_stop_put(si);
 
-               /* We always pretend the applet can't get and doesn't want to
-                * put, it's up to it to change this if needed. This ensures
-                * that one applet which ignores any event will not spin.
-                */
-               si_applet_cant_get(si);
-               si_applet_stop_put(si);
-
-               curr->applet->fct(curr);
-               si_applet_wake_cb(si);
-               channel_release_buffer(si_ic(si), &curr->buffer_wait);
-
-               if (applet_cur_queue.n == &curr->runq) {
-                       /* curr was left in the list, move it back to the active list */
-                       LIST_DEL(&curr->runq);
-                       LIST_INIT(&curr->runq);
-                       HA_SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
-                       if (curr->state & APPLET_WANT_DIE) {
-                               curr->state = APPLET_SLEEPING;
-                               __appctx_free(curr);
-                       }
-                       else {
-                               if (curr->state & APPLET_WOKEN_UP) {
-                                       curr->state = APPLET_SLEEPING;
-                                       __appctx_wakeup(curr);
-                               }
-                               else {
-                                       curr->state = APPLET_SLEEPING;
-                               }
-                       }
-                       HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
-               }
-       }
+       app->applet->fct(app);
+       si_applet_wake_cb(si);
+       channel_release_buffer(si_ic(si), &app->buffer_wait);
+       return t;
 }
 
-__attribute__((constructor))
-static void __applet_init(void)
-{
-       HA_SPIN_INIT(&applet_active_lock);
-}
index 023973f40e0ac37d1452fa5587093b78739c49fa..3d224b9bee0d3fe93c81b6d04fd982d79e395bb4 100644 (file)
@@ -84,6 +84,7 @@
 #include <proto/stick_table.h>
 #include <proto/task.h>
 #include <proto/tcp_rules.h>
+#include <proto/connection.h>
 
 
 /* This is the SSLv3 CLIENT HELLO packet used in conjunction with the
index 4adf68401ee98ef2dd5684754679af7ecddfd172..c34078f98159de9ff15cfd4e96523601ae59ea07 100644 (file)
--- a/src/cli.c
+++ b/src/cli.c
@@ -953,7 +953,6 @@ static int cli_io_handler_show_activity(struct appctx *appctx)
        chunk_appendf(&trash, "\nloops:");        for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].loops);
        chunk_appendf(&trash, "\nwake_cache:");   for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].wake_cache);
        chunk_appendf(&trash, "\nwake_tasks:");   for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].wake_tasks);
-       chunk_appendf(&trash, "\nwake_applets:"); for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].wake_applets);
        chunk_appendf(&trash, "\nwake_signal:");  for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].wake_signal);
        chunk_appendf(&trash, "\npoll_exp:");     for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].poll_exp);
        chunk_appendf(&trash, "\npoll_drop:");    for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].poll_drop);
index 856050d07d28515af91d25adcf646cfb43a4a38e..c2563030102cbb817dcd94f3a73a2d100b791f2f 100644 (file)
@@ -2840,8 +2840,7 @@ spoe_release_buffer(struct buffer **buf, struct buffer_wait *buffer_wait)
        /* Release the buffer if needed */
        if ((*buf)->size) {
                b_free(buf);
-               offer_buffers(buffer_wait->target,
-                             tasks_run_queue + applets_active_queue);
+               offer_buffers(buffer_wait->target, tasks_run_queue);
        }
 }
 
index 4e61e4056fd6a7149eef7233e55c2d6f623930a8..766758f628f34377b68cbb6f478995a94fbfd888 100644 (file)
@@ -90,7 +90,6 @@
 #include <types/peers.h>
 
 #include <proto/acl.h>
-#include <proto/applet.h>
 #include <proto/arg.h>
 #include <proto/auth.h>
 #include <proto/backend.h>
@@ -2419,8 +2418,6 @@ static void run_poll_loop()
                        activity[tid].wake_cache++;
                else if (active_tasks_mask & tid_bit)
                        activity[tid].wake_tasks++;
-               else if (active_applets_mask & tid_bit)
-                       activity[tid].wake_applets++;
                else if (signal_queue_len)
                        activity[tid].wake_signal++;
                else
@@ -2429,7 +2426,6 @@ static void run_poll_loop()
                /* The poller will ensure it returns around <next> */
                cur_poller.poll(&cur_poller, exp);
                fd_process_cached_events();
-               applet_run_active();
 
 
                /* Synchronize all polling loops */
index 9c7b8284c2f5166dd58f2f4db71ae8d86b712c32..57b172250879c5d3440c26317901854a75f7ca4e 100644 (file)
@@ -17,7 +17,6 @@
 #include <common/hpack-enc.h>
 #include <common/hpack-tbl.h>
 #include <common/net_helper.h>
-#include <proto/applet.h>
 #include <proto/connection.h>
 #include <proto/h1.h>
 #include <proto/stream.h>
@@ -303,8 +302,7 @@ static inline void h2_release_buf(struct h2c *h2c, struct buffer **bptr)
 {
        if ((*bptr)->size) {
                b_free(bptr);
-               offer_buffers(h2c->buf_wait.target,
-                             tasks_run_queue + applets_active_queue);
+               offer_buffers(h2c->buf_wait.target, tasks_run_queue);
        }
 }
 
index 3ea89538c9a1f4b9740aed802596f3f89210adbd..7ad84e993cd4686498e2bb4c702b49c36b51c2a2 100644 (file)
@@ -339,7 +339,7 @@ static void stream_free(struct stream *s)
        if (s->req.buf->size || s->res.buf->size) {
                b_drop(&s->req.buf);
                b_drop(&s->res.buf);
-               offer_buffers(NULL, tasks_run_queue + applets_active_queue);
+               offer_buffers(NULL, tasks_run_queue);
        }
 
        hlua_ctx_destroy(s->hlua);
@@ -469,7 +469,7 @@ void stream_release_buffers(struct stream *s)
         * someone waiting, we can wake up a waiter and offer them.
         */
        if (offer)
-               offer_buffers(s, tasks_run_queue + applets_active_queue);
+               offer_buffers(s, tasks_run_queue);
 }
 
 /* perform minimal intializations, report 0 in case of error, 1 if OK. */