]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MAJOR: threads/task: handle multithread on task scheduler
authorEmeric Brun <ebrun@haproxy.com>
Wed, 27 Sep 2017 12:59:38 +0000 (14:59 +0200)
committerWilly Tarreau <w@1wt.eu>
Tue, 31 Oct 2017 12:58:30 +0000 (13:58 +0100)
2 global locks have been added to protect, respectively, the run queue and the
wait queue. And a process mask has been added on each task. Like for FDs, this
mask is used to know which threads are allowed to process a task.

For many tasks, all threads are granted. And this must be your first intension
when you create a new task, else you have a good reason to make a task sticky on
some threads. This is then the responsibility to the process callback to lock
what have to be locked in the task context.

Nevertheless, all tasks linked to a session must be sticky on the thread
creating the session. It is important that I/O handlers processing session FDs
and these tasks run on the same thread to avoid conflicts.

15 files changed:
include/common/hathreads.h
include/proto/task.h
include/types/task.h
src/cfgparse.c
src/checks.c
src/dns.c
src/flt_spoe.c
src/haproxy.c
src/hlua.c
src/peers.c
src/proxy.c
src/session.c
src/stick_table.c
src/stream.c
src/task.c

index a2c047e2c9f40a5a000863a16ebe2d798617a6a9..19cdf83dbf117a6da924ebb82b6e447901097768 100644 (file)
@@ -142,6 +142,8 @@ enum lock_label {
        FDCACHE_LOCK,
        FD_LOCK,
        POLL_LOCK,
+       TASK_RQ_LOCK,
+       TASK_WQ_LOCK,
        POOL_LOCK,
        LOCK_LABELS
 };
@@ -226,7 +228,7 @@ struct ha_rwlock {
 static inline void show_lock_stats()
 {
        const char *labels[LOCK_LABELS] = {"THREAD_SYNC", "FDTAB", "FDCACHE", "FD", "POLL",
-                                          "POOL" };
+                                          "TASK_RQ", "TASK_WQ", "POOL" };
        int lbl;
 
        for (lbl = 0; lbl < LOCK_LABELS; lbl++) {
index c6177d08e1a0a4cc441739e4cba900f1193ae160..bc3a17316fbc8c2f7a79a8db2acf9a1e3e1ab2e1 100644 (file)
@@ -30,6 +30,8 @@
 #include <common/mini-clist.h>
 #include <common/standard.h>
 #include <common/ticks.h>
+#include <common/hathreads.h>
+
 #include <eb32tree.h>
 
 #include <types/global.h>
@@ -86,6 +88,10 @@ extern unsigned int nb_tasks_cur;
 extern unsigned int niced_tasks;  /* number of niced tasks in the run queue */
 extern struct pool_head *pool2_task;
 extern struct pool_head *pool2_notification;
+#ifdef USE_THREAD
+extern HA_SPINLOCK_T rq_lock;        /* spin lock related to run queue */
+extern HA_SPINLOCK_T wq_lock;        /* spin lock related to wait queue */
+#endif
 
 /* return 0 if task is in run queue, otherwise non-zero */
 static inline int task_in_rq(struct task *t)
@@ -103,19 +109,29 @@ static inline int task_in_wq(struct task *t)
 struct task *__task_wakeup(struct task *t);
 static inline struct task *task_wakeup(struct task *t, unsigned int f)
 {
+       SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
+
        /* If task is running, we postpone the call
         * and backup the state.
         */
        if (unlikely(t->state & TASK_RUNNING)) {
                t->pending_state |= f;
+               SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
                return t;
        }
        if (likely(!task_in_rq(t)))
                __task_wakeup(t);
        t->state |= f;
+       SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
+
        return t;
 }
 
+static inline void task_set_affinity(struct task *t, unsigned long thread_mask)
+{
+
+       t->process_mask = thread_mask;
+}
 /*
  * Unlink the task from the wait queue, and possibly update the last_timer
  * pointer. A pointer to the task itself is returned. The task *must* already
@@ -130,8 +146,10 @@ static inline struct task *__task_unlink_wq(struct task *t)
 
 static inline struct task *task_unlink_wq(struct task *t)
 {
+       SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
        if (likely(task_in_wq(t)))
                __task_unlink_wq(t);
+       SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
        return t;
 }
 
@@ -156,9 +174,10 @@ static inline struct task *__task_unlink_rq(struct task *t)
  */
 static inline struct task *task_unlink_rq(struct task *t)
 {
-       if (likely(task_in_rq(t))) {
+       SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
+       if (likely(task_in_rq(t)))
                __task_unlink_rq(t);
-       }
+       SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
        return t;
 }
 
@@ -178,11 +197,12 @@ static inline struct task *task_delete(struct task *t)
  * state).  The task is returned. This function should not be used outside of
  * task_new().
  */
-static inline struct task *task_init(struct task *t)
+static inline struct task *task_init(struct task *t, unsigned long thread_mask)
 {
        t->wq.node.leaf_p = NULL;
        t->rq.node.leaf_p = NULL;
        t->pending_state = t->state = TASK_SLEEPING;
+       t->process_mask = thread_mask;
        t->nice = 0;
        t->calls = 0;
        t->expire = TICK_ETERNITY;
@@ -194,12 +214,12 @@ static inline struct task *task_init(struct task *t)
  * case of lack of memory. The task count is incremented. Tasks should only
  * be allocated this way, and must be freed using task_free().
  */
-static inline struct task *task_new(void)
+static inline struct task *task_new(unsigned long thread_mask)
 {
        struct task *t = pool_alloc2(pool2_task);
        if (t) {
-               nb_tasks++;
-               task_init(t);
+               HA_ATOMIC_ADD(&nb_tasks, 1);
+               task_init(t, thread_mask);
        }
        return t;
 }
@@ -213,7 +233,7 @@ static inline void task_free(struct task *t)
        pool_free2(pool2_task, t);
        if (unlikely(stopping))
                pool_flush2(pool2_task);
-       nb_tasks--;
+       HA_ATOMIC_SUB(&nb_tasks, 1);
 }
 
 /* Place <task> into the wait queue, where it may already be. If the expiration
@@ -234,8 +254,10 @@ static inline void task_queue(struct task *task)
        if (!tick_isset(task->expire))
                return;
 
+       SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
        if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
                __task_queue(task);
+       SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
 }
 
 /* Ensure <task> will be woken up at most at <when>. If the task is already in
@@ -244,15 +266,18 @@ static inline void task_queue(struct task *task)
  */
 static inline void task_schedule(struct task *task, int when)
 {
+       /* TODO: mthread, check if there is no tisk with this test */
        if (task_in_rq(task))
                return;
 
+       SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
        if (task_in_wq(task))
                when = tick_first(when, task->expire);
 
        task->expire = when;
        if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
                __task_queue(task);
+       SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
 }
 
 /* This function register a new signal. "lua" is the current lua
index e0ae3823e74dca2287ddb6888fc9980ce8edde63..da7c929bcd430b202e1011a9f247bf1ee7f95c21 100644 (file)
@@ -69,6 +69,7 @@ struct task {
        void *context;                  /* the task's context */
        struct eb32_node wq;            /* ebtree node used to hold the task in the wait queue */
        int expire;                     /* next expiration date for this task, in ticks */
+       unsigned long process_mask;     /* mask of thread IDs authorized to process the task */
 };
 
 /*
index 1cb98f354625ae740ef234c9dd37b08e7bb2d737..e765fdb1acabe999d5272ac629aa74db1bdaa347 100644 (file)
@@ -42,6 +42,7 @@
 #include <common/time.h>
 #include <common/uri_auth.h>
 #include <common/namespace.h>
+#include <common/hathreads.h>
 
 #include <types/capture.h>
 #include <types/compression.h>
@@ -8862,7 +8863,7 @@ out_uri_auth_compat:
                }
 
                /* create the task associated with the proxy */
-               curproxy->task = task_new();
+               curproxy->task = task_new(MAX_THREADS_MASK);
                if (curproxy->task) {
                        curproxy->task->context = curproxy;
                        curproxy->task->process = manage_proxy;
index 704bed2966c7a35714e6a8ac0b8618c3b295ff1d..3d60237724923a1f82e5d7a7a8ac8d2001c8b296 100644 (file)
@@ -2226,7 +2226,7 @@ static int start_check_task(struct check *check, int mininter,
 {
        struct task *t;
        /* task for the check */
-       if ((t = task_new()) == NULL) {
+       if ((t = task_new(MAX_THREADS_MASK)) == NULL) {
                Alert("Starting [%s:%s] check: out of memory.\n",
                      check->server->proxy->id, check->server->id);
                return 0;
@@ -2272,7 +2272,7 @@ static int start_checks()
        for (px = proxy; px; px = px->next) {
                for (s = px->srv; s; s = s->next) {
                        if (s->slowstart) {
-                               if ((t = task_new()) == NULL) {
+                               if ((t = task_new(MAX_THREADS_MASK)) == NULL) {
                                        Alert("Starting [%s:%s] check: out of memory.\n", px->id, s->id);
                                        return ERR_ALERT | ERR_FATAL;
                                }
@@ -3130,7 +3130,7 @@ int init_email_alert(struct mailers *mls, struct proxy *p, char **err)
                        check->port = 587;
                //check->server = s;
 
-               if ((t = task_new()) == NULL) {
+               if ((t = task_new(MAX_THREADS_MASK)) == NULL) {
                        memprintf(err, "out of memory while allocating mailer alerts task");
                        goto error;
                }
index c2b87c015b82ce4d46342b034b53e9c566c9c086..3383faa701b65b19035935f508cedeaa20cb51c4 100644 (file)
--- a/src/dns.c
+++ b/src/dns.c
@@ -1851,7 +1851,7 @@ static int dns_finalize_config(void)
                }
 
                /* Create the task associated to the resolvers section */
-               if ((t = task_new()) == NULL) {
+               if ((t = task_new(MAX_THREADS_MASK)) == NULL) {
                        Alert("config : resolvers '%s' : out of memory.\n", resolvers->id);
                        err_code |= (ERR_ALERT|ERR_ABORT);
                        goto err;
index 51730a214505f2262df6230c50ceda4957773997..9543f8fe244becc1160aaa349a128eeea2a40787 100644 (file)
@@ -1939,7 +1939,7 @@ spoe_create_appctx(struct spoe_config *conf)
        memset(appctx->ctx.spoe.ptr, 0, pool2_spoe_appctx->size);
 
        appctx->st0 = SPOE_APPCTX_ST_CONNECT;
-       if ((SPOE_APPCTX(appctx)->task = task_new()) == NULL)
+       if ((SPOE_APPCTX(appctx)->task = task_new(MAX_THREADS_MASK)) == NULL)
                goto out_free_spoe_appctx;
 
        SPOE_APPCTX(appctx)->owner           = appctx;
@@ -1975,10 +1975,10 @@ spoe_create_appctx(struct spoe_config *conf)
        strm->do_log = NULL;
        strm->res.flags |= CF_READ_DONTWAIT;
 
-       task_wakeup(SPOE_APPCTX(appctx)->task, TASK_WOKEN_INIT);
        LIST_ADDQ(&conf->agent->applets, &SPOE_APPCTX(appctx)->list);
        conf->agent->applets_act++;
 
+       task_wakeup(SPOE_APPCTX(appctx)->task, TASK_WOKEN_INIT);
        task_wakeup(strm->task, TASK_WOKEN_INIT);
        return appctx;
 
index 170b002e393a191faf195db1f740eae440f1bea9..ff638445eb6f1465626393ca88dfe6feea968b2a 100644 (file)
@@ -1511,7 +1511,7 @@ static void init(int argc, char **argv)
                exit(2);
        }
 
-       global_listener_queue_task = task_new();
+       global_listener_queue_task = task_new(MAX_THREADS_MASK);
        if (!global_listener_queue_task) {
                Alert("Out of memory when initializing global task\n");
                exit(1);
index f5fed044ad976ed316a876c71ee1330f73089ddc..137117c586532de654bd16e93c111a139da0b473 100644 (file)
@@ -5450,7 +5450,7 @@ static int hlua_register_task(lua_State *L)
        if (!hlua)
                WILL_LJMP(luaL_error(L, "lua out of memory error."));
 
-       task = task_new();
+       task = task_new(MAX_THREADS_MASK);
        task->context = hlua;
        task->process = hlua_process_task;
 
@@ -6031,7 +6031,7 @@ static int hlua_applet_tcp_init(struct appctx *ctx, struct proxy *px, struct str
        ctx->ctx.hlua_apptcp.flags = 0;
 
        /* Create task used by signal to wakeup applets. */
-       task = task_new();
+       task = task_new(MAX_THREADS_MASK);
        if (!task) {
                SEND_ERR(px, "Lua applet tcp '%s': out of memory.\n",
                         ctx->rule->arg.hlua_rule->fcn.name);
@@ -6232,7 +6232,7 @@ static int hlua_applet_http_init(struct appctx *ctx, struct proxy *px, struct st
                ctx->ctx.hlua_apphttp.flags |= APPLET_HTTP11;
 
        /* Create task used by signal to wakeup applets. */
-       task = task_new();
+       task = task_new(MAX_THREADS_MASK);
        if (!task) {
                SEND_ERR(px, "Lua applet http '%s': out of memory.\n",
                         ctx->rule->arg.hlua_rule->fcn.name);
@@ -6777,7 +6777,7 @@ static int hlua_cli_parse_fct(char **args, struct appctx *appctx, void *private)
         * We use the same wakeup fonction than the Lua applet_tcp and
         * applet_http. It is absolutely compatible.
         */
-       appctx->ctx.hlua_cli.task = task_new();
+       appctx->ctx.hlua_cli.task = task_new(MAX_THREADS_MASK);
        if (!appctx->ctx.hlua_cli.task) {
                SEND_ERR(NULL, "Lua cli '%s': out of memory.\n", fcn->name);
                goto error;
index 4c85af5aac46a87ced0076f62c4a45adb93e06d4..b98ec61a8607dab60e8153b47d735b94cb80621f 100644 (file)
@@ -2055,7 +2055,7 @@ void peers_init_sync(struct peers *peers)
 
        list_for_each_entry(listener, &peers->peers_fe->conf.listeners, by_fe)
                listener->maxconn = peers->peers_fe->maxconn;
-       peers->sync_task = task_new();
+       peers->sync_task = task_new(MAX_THREADS_MASK);
        peers->sync_task->process = process_peer_sync;
        peers->sync_task->context = (void *)peers;
        peers->sighandler = signal_register_task(0, peers->sync_task, 0);
index 53f886ec5b0087f92715562768d0291fd71554e6..71091d01df65c22492863235c6993ce4ac0b21d4 100644 (file)
@@ -979,7 +979,7 @@ void soft_stop(void)
 
        stopping = 1;
        if (tick_isset(global.hard_stop_after)) {
-               task = task_new();
+               task = task_new(MAX_THREADS_MASK);
                if (task) {
                        task->process = hard_stop;
                        task_schedule(task, tick_add(now_ms, global.hard_stop_after));
index ecfa2f14d9a0c44844d7676800c4c36d0b2b0c19..54a879bcf77daf8a6edda8a3d8c38997c89c437a 100644 (file)
@@ -241,7 +241,7 @@ int session_accept_fd(struct listener *l, int cfd, struct sockaddr_storage *addr
         *          conn -- owner ---> task <-----+
         */
        if (cli_conn->flags & (CO_FL_HANDSHAKE | CO_FL_EARLY_SSL_HS)) {
-               if (unlikely((sess->task = task_new()) == NULL))
+               if (unlikely((sess->task = task_new(tid_bit)) == NULL))
                        goto out_free_sess;
 
                conn_set_xprt_done_cb(cli_conn, conn_complete_session);
index d95c77f745da469e81f669a3c038f76b1da9ce52..5e8211623cda3f9a0a5857b3e9141c2bef76a829 100644 (file)
@@ -436,7 +436,7 @@ int stktable_init(struct stktable *t)
 
                t->exp_next = TICK_ETERNITY;
                if ( t->expire ) {
-                       t->exp_task = task_new();
+                       t->exp_task = task_new(MAX_THREADS_MASK);
                        t->exp_task->process = process_table_expire;
                        t->exp_task->context = (void *)t;
                }
index a78ee969a2f54bf897fef2d1fcc1239e44ad5a27..522441fd029483c8bd9b5c222696d8148445a839 100644 (file)
@@ -163,7 +163,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
        s->flags |= SF_INITIALIZED;
        s->unique_id = NULL;
 
-       if ((t = task_new()) == NULL)
+       if ((t = task_new(tid_bit)) == NULL)
                goto out_fail_alloc;
 
        s->task = t;
index 8d4ab39e571b97dea3b032082e1620cd7c977fe9..0022bff302912c76e5a74f42cfc910bd5f329e51 100644 (file)
@@ -36,6 +36,10 @@ unsigned int tasks_run_queue_cur = 0;    /* copy of the run queue size */
 unsigned int nb_tasks_cur = 0;     /* copy of the tasks count */
 unsigned int niced_tasks = 0;      /* number of niced tasks in the run queue */
 
+#ifdef USE_THREAD
+HA_SPINLOCK_T rq_lock;        /* spin lock related to run queue */
+HA_SPINLOCK_T wq_lock;        /* spin lock related to wait queue */
+#endif
 
 static struct eb_root timers;      /* sorted timers tree */
 static struct eb_root rqueue;      /* tree constituting the run queue */
@@ -113,22 +117,29 @@ int wake_expired_tasks()
 {
        struct task *task;
        struct eb32_node *eb;
+       int ret;
 
        while (1) {
+               SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
+  lookup_next:
                eb = eb32_lookup_ge(&timers, now_ms - TIMER_LOOK_BACK);
-               if (unlikely(!eb)) {
+               if (!eb) {
                        /* we might have reached the end of the tree, typically because
                        * <now_ms> is in the first half and we're first scanning the last
                        * half. Let's loop back to the beginning of the tree now.
                        */
                        eb = eb32_first(&timers);
-                       if (likely(!eb))
+                       if (likely(!eb)) {
+                               SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
                                break;
+                       }
                }
 
                if (likely(tick_is_lt(now_ms, eb->key))) {
+                       ret = eb->key;
+                       SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
                        /* timer not expired yet, revisit it later */
-                       return eb->key;
+                       return ret;
                }
 
                /* timer looks expired, detach it from the queue */
@@ -150,10 +161,11 @@ int wake_expired_tasks()
                 */
                if (!tick_is_expired(task->expire, now_ms)) {
                        if (!tick_isset(task->expire))
-                               continue;
+                               goto lookup_next;
                        __task_queue(task);
-                       continue;
+                       goto lookup_next;
                }
+               SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
                task_wakeup(task, TASK_WOKEN_TIMER);
        }
 
@@ -192,6 +204,7 @@ void process_runnable_tasks()
        if (likely(niced_tasks))
                max_processed = (max_processed + 3) / 4;
 
+       SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
        while (max_processed > 0) {
                /* Note: this loop is one of the fastest code path in
                 * the whole program. It should not be re-arranged
@@ -216,12 +229,14 @@ void process_runnable_tasks()
                while (local_tasks_count < 16) {
                        t = eb32_entry(rq_next, struct task, rq);
                        rq_next = eb32_next(rq_next);
-                       /* detach the task from the queue */
-                       __task_unlink_rq(t);
-                       t->state |= TASK_RUNNING;
-                       t->pending_state = 0;
-                       t->calls++;
-                       local_tasks[local_tasks_count++] = t;
+                       if (t->process_mask & (1UL << tid)) {
+                               /* detach the task from the queue */
+                               __task_unlink_rq(t);
+                               t->state |= TASK_RUNNING;
+                               t->pending_state = 0;
+                               t->calls++;
+                               local_tasks[local_tasks_count++] = t;
+                       }
                        if (!rq_next) {
                                if (rewind || !(rq_next = eb32_first(&rqueue))) {
                                        break;
@@ -233,6 +248,7 @@ void process_runnable_tasks()
                if (!local_tasks_count)
                        break;
 
+               SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
 
                for (i = 0; i < local_tasks_count ; i++) {
                        t = local_tasks[i];
@@ -247,6 +263,7 @@ void process_runnable_tasks()
                }
 
                max_processed -= local_tasks_count;
+               SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
                for (i = 0; i < local_tasks_count ; i++) {
                        t = local_tasks[i];
                        if (likely(t != NULL)) {
@@ -263,6 +280,7 @@ void process_runnable_tasks()
                        }
                }
        }
+       SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
 }
 
 /* perform minimal intializations, report 0 in case of error, 1 if OK. */
@@ -270,6 +288,8 @@ int init_task()
 {
        memset(&timers, 0, sizeof(timers));
        memset(&rqueue, 0, sizeof(rqueue));
+       SPIN_INIT(&wq_lock);
+       SPIN_INIT(&rq_lock);
        pool2_task = create_pool("task", sizeof(struct task), MEM_F_SHARED);
        if (!pool2_task)
                return 0;