From: Emeric Brun Date: Wed, 27 Sep 2017 12:59:38 +0000 (+0200) Subject: MAJOR: threads/task: handle multithread on task scheduler X-Git-Tag: v1.8-rc1~150 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=c60def8368f73e578685c4f02df7d6dbe63c85d3;p=thirdparty%2Fhaproxy.git MAJOR: threads/task: handle multithread on task scheduler 2 global locks have been added to protect, respectively, the run queue and the wait queue. And a process mask has been added on each task. Like for FDs, this mask is used to know which threads are allowed to process a task. For many tasks, all threads are granted. And this must be your first intension when you create a new task, else you have a good reason to make a task sticky on some threads. This is then the responsibility to the process callback to lock what have to be locked in the task context. Nevertheless, all tasks linked to a session must be sticky on the thread creating the session. It is important that I/O handlers processing session FDs and these tasks run on the same thread to avoid conflicts. --- diff --git a/include/common/hathreads.h b/include/common/hathreads.h index a2c047e2c9..19cdf83dbf 100644 --- a/include/common/hathreads.h +++ b/include/common/hathreads.h @@ -142,6 +142,8 @@ enum lock_label { FDCACHE_LOCK, FD_LOCK, POLL_LOCK, + TASK_RQ_LOCK, + TASK_WQ_LOCK, POOL_LOCK, LOCK_LABELS }; @@ -226,7 +228,7 @@ struct ha_rwlock { static inline void show_lock_stats() { const char *labels[LOCK_LABELS] = {"THREAD_SYNC", "FDTAB", "FDCACHE", "FD", "POLL", - "POOL" }; + "TASK_RQ", "TASK_WQ", "POOL" }; int lbl; for (lbl = 0; lbl < LOCK_LABELS; lbl++) { diff --git a/include/proto/task.h b/include/proto/task.h index c6177d08e1..bc3a17316f 100644 --- a/include/proto/task.h +++ b/include/proto/task.h @@ -30,6 +30,8 @@ #include #include #include +#include + #include #include @@ -86,6 +88,10 @@ extern unsigned int nb_tasks_cur; extern unsigned int niced_tasks; /* number of niced tasks in the run queue */ extern struct pool_head *pool2_task; extern struct pool_head *pool2_notification; +#ifdef USE_THREAD +extern HA_SPINLOCK_T rq_lock; /* spin lock related to run queue */ +extern HA_SPINLOCK_T wq_lock; /* spin lock related to wait queue */ +#endif /* return 0 if task is in run queue, otherwise non-zero */ static inline int task_in_rq(struct task *t) @@ -103,19 +109,29 @@ static inline int task_in_wq(struct task *t) struct task *__task_wakeup(struct task *t); static inline struct task *task_wakeup(struct task *t, unsigned int f) { + SPIN_LOCK(TASK_RQ_LOCK, &rq_lock); + /* If task is running, we postpone the call * and backup the state. */ if (unlikely(t->state & TASK_RUNNING)) { t->pending_state |= f; + SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); return t; } if (likely(!task_in_rq(t))) __task_wakeup(t); t->state |= f; + SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); + return t; } +static inline void task_set_affinity(struct task *t, unsigned long thread_mask) +{ + + t->process_mask = thread_mask; +} /* * Unlink the task from the wait queue, and possibly update the last_timer * pointer. A pointer to the task itself is returned. The task *must* already @@ -130,8 +146,10 @@ static inline struct task *__task_unlink_wq(struct task *t) static inline struct task *task_unlink_wq(struct task *t) { + SPIN_LOCK(TASK_WQ_LOCK, &wq_lock); if (likely(task_in_wq(t))) __task_unlink_wq(t); + SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock); return t; } @@ -156,9 +174,10 @@ static inline struct task *__task_unlink_rq(struct task *t) */ static inline struct task *task_unlink_rq(struct task *t) { - if (likely(task_in_rq(t))) { + SPIN_LOCK(TASK_RQ_LOCK, &rq_lock); + if (likely(task_in_rq(t))) __task_unlink_rq(t); - } + SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); return t; } @@ -178,11 +197,12 @@ static inline struct task *task_delete(struct task *t) * state). The task is returned. This function should not be used outside of * task_new(). */ -static inline struct task *task_init(struct task *t) +static inline struct task *task_init(struct task *t, unsigned long thread_mask) { t->wq.node.leaf_p = NULL; t->rq.node.leaf_p = NULL; t->pending_state = t->state = TASK_SLEEPING; + t->process_mask = thread_mask; t->nice = 0; t->calls = 0; t->expire = TICK_ETERNITY; @@ -194,12 +214,12 @@ static inline struct task *task_init(struct task *t) * case of lack of memory. The task count is incremented. Tasks should only * be allocated this way, and must be freed using task_free(). */ -static inline struct task *task_new(void) +static inline struct task *task_new(unsigned long thread_mask) { struct task *t = pool_alloc2(pool2_task); if (t) { - nb_tasks++; - task_init(t); + HA_ATOMIC_ADD(&nb_tasks, 1); + task_init(t, thread_mask); } return t; } @@ -213,7 +233,7 @@ static inline void task_free(struct task *t) pool_free2(pool2_task, t); if (unlikely(stopping)) pool_flush2(pool2_task); - nb_tasks--; + HA_ATOMIC_SUB(&nb_tasks, 1); } /* Place into the wait queue, where it may already be. If the expiration @@ -234,8 +254,10 @@ static inline void task_queue(struct task *task) if (!tick_isset(task->expire)) return; + SPIN_LOCK(TASK_WQ_LOCK, &wq_lock); if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) __task_queue(task); + SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock); } /* Ensure will be woken up at most at . If the task is already in @@ -244,15 +266,18 @@ static inline void task_queue(struct task *task) */ static inline void task_schedule(struct task *task, int when) { + /* TODO: mthread, check if there is no tisk with this test */ if (task_in_rq(task)) return; + SPIN_LOCK(TASK_WQ_LOCK, &wq_lock); if (task_in_wq(task)) when = tick_first(when, task->expire); task->expire = when; if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) __task_queue(task); + SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock); } /* This function register a new signal. "lua" is the current lua diff --git a/include/types/task.h b/include/types/task.h index e0ae3823e7..da7c929bcd 100644 --- a/include/types/task.h +++ b/include/types/task.h @@ -69,6 +69,7 @@ struct task { void *context; /* the task's context */ struct eb32_node wq; /* ebtree node used to hold the task in the wait queue */ int expire; /* next expiration date for this task, in ticks */ + unsigned long process_mask; /* mask of thread IDs authorized to process the task */ }; /* diff --git a/src/cfgparse.c b/src/cfgparse.c index 1cb98f3546..e765fdb1ac 100644 --- a/src/cfgparse.c +++ b/src/cfgparse.c @@ -42,6 +42,7 @@ #include #include #include +#include #include #include @@ -8862,7 +8863,7 @@ out_uri_auth_compat: } /* create the task associated with the proxy */ - curproxy->task = task_new(); + curproxy->task = task_new(MAX_THREADS_MASK); if (curproxy->task) { curproxy->task->context = curproxy; curproxy->task->process = manage_proxy; diff --git a/src/checks.c b/src/checks.c index 704bed2966..3d60237724 100644 --- a/src/checks.c +++ b/src/checks.c @@ -2226,7 +2226,7 @@ static int start_check_task(struct check *check, int mininter, { struct task *t; /* task for the check */ - if ((t = task_new()) == NULL) { + if ((t = task_new(MAX_THREADS_MASK)) == NULL) { Alert("Starting [%s:%s] check: out of memory.\n", check->server->proxy->id, check->server->id); return 0; @@ -2272,7 +2272,7 @@ static int start_checks() for (px = proxy; px; px = px->next) { for (s = px->srv; s; s = s->next) { if (s->slowstart) { - if ((t = task_new()) == NULL) { + if ((t = task_new(MAX_THREADS_MASK)) == NULL) { Alert("Starting [%s:%s] check: out of memory.\n", px->id, s->id); return ERR_ALERT | ERR_FATAL; } @@ -3130,7 +3130,7 @@ int init_email_alert(struct mailers *mls, struct proxy *p, char **err) check->port = 587; //check->server = s; - if ((t = task_new()) == NULL) { + if ((t = task_new(MAX_THREADS_MASK)) == NULL) { memprintf(err, "out of memory while allocating mailer alerts task"); goto error; } diff --git a/src/dns.c b/src/dns.c index c2b87c015b..3383faa701 100644 --- a/src/dns.c +++ b/src/dns.c @@ -1851,7 +1851,7 @@ static int dns_finalize_config(void) } /* Create the task associated to the resolvers section */ - if ((t = task_new()) == NULL) { + if ((t = task_new(MAX_THREADS_MASK)) == NULL) { Alert("config : resolvers '%s' : out of memory.\n", resolvers->id); err_code |= (ERR_ALERT|ERR_ABORT); goto err; diff --git a/src/flt_spoe.c b/src/flt_spoe.c index 51730a2145..9543f8fe24 100644 --- a/src/flt_spoe.c +++ b/src/flt_spoe.c @@ -1939,7 +1939,7 @@ spoe_create_appctx(struct spoe_config *conf) memset(appctx->ctx.spoe.ptr, 0, pool2_spoe_appctx->size); appctx->st0 = SPOE_APPCTX_ST_CONNECT; - if ((SPOE_APPCTX(appctx)->task = task_new()) == NULL) + if ((SPOE_APPCTX(appctx)->task = task_new(MAX_THREADS_MASK)) == NULL) goto out_free_spoe_appctx; SPOE_APPCTX(appctx)->owner = appctx; @@ -1975,10 +1975,10 @@ spoe_create_appctx(struct spoe_config *conf) strm->do_log = NULL; strm->res.flags |= CF_READ_DONTWAIT; - task_wakeup(SPOE_APPCTX(appctx)->task, TASK_WOKEN_INIT); LIST_ADDQ(&conf->agent->applets, &SPOE_APPCTX(appctx)->list); conf->agent->applets_act++; + task_wakeup(SPOE_APPCTX(appctx)->task, TASK_WOKEN_INIT); task_wakeup(strm->task, TASK_WOKEN_INIT); return appctx; diff --git a/src/haproxy.c b/src/haproxy.c index 170b002e39..ff638445eb 100644 --- a/src/haproxy.c +++ b/src/haproxy.c @@ -1511,7 +1511,7 @@ static void init(int argc, char **argv) exit(2); } - global_listener_queue_task = task_new(); + global_listener_queue_task = task_new(MAX_THREADS_MASK); if (!global_listener_queue_task) { Alert("Out of memory when initializing global task\n"); exit(1); diff --git a/src/hlua.c b/src/hlua.c index f5fed044ad..137117c586 100644 --- a/src/hlua.c +++ b/src/hlua.c @@ -5450,7 +5450,7 @@ static int hlua_register_task(lua_State *L) if (!hlua) WILL_LJMP(luaL_error(L, "lua out of memory error.")); - task = task_new(); + task = task_new(MAX_THREADS_MASK); task->context = hlua; task->process = hlua_process_task; @@ -6031,7 +6031,7 @@ static int hlua_applet_tcp_init(struct appctx *ctx, struct proxy *px, struct str ctx->ctx.hlua_apptcp.flags = 0; /* Create task used by signal to wakeup applets. */ - task = task_new(); + task = task_new(MAX_THREADS_MASK); if (!task) { SEND_ERR(px, "Lua applet tcp '%s': out of memory.\n", ctx->rule->arg.hlua_rule->fcn.name); @@ -6232,7 +6232,7 @@ static int hlua_applet_http_init(struct appctx *ctx, struct proxy *px, struct st ctx->ctx.hlua_apphttp.flags |= APPLET_HTTP11; /* Create task used by signal to wakeup applets. */ - task = task_new(); + task = task_new(MAX_THREADS_MASK); if (!task) { SEND_ERR(px, "Lua applet http '%s': out of memory.\n", ctx->rule->arg.hlua_rule->fcn.name); @@ -6777,7 +6777,7 @@ static int hlua_cli_parse_fct(char **args, struct appctx *appctx, void *private) * We use the same wakeup fonction than the Lua applet_tcp and * applet_http. It is absolutely compatible. */ - appctx->ctx.hlua_cli.task = task_new(); + appctx->ctx.hlua_cli.task = task_new(MAX_THREADS_MASK); if (!appctx->ctx.hlua_cli.task) { SEND_ERR(NULL, "Lua cli '%s': out of memory.\n", fcn->name); goto error; diff --git a/src/peers.c b/src/peers.c index 4c85af5aac..b98ec61a86 100644 --- a/src/peers.c +++ b/src/peers.c @@ -2055,7 +2055,7 @@ void peers_init_sync(struct peers *peers) list_for_each_entry(listener, &peers->peers_fe->conf.listeners, by_fe) listener->maxconn = peers->peers_fe->maxconn; - peers->sync_task = task_new(); + peers->sync_task = task_new(MAX_THREADS_MASK); peers->sync_task->process = process_peer_sync; peers->sync_task->context = (void *)peers; peers->sighandler = signal_register_task(0, peers->sync_task, 0); diff --git a/src/proxy.c b/src/proxy.c index 53f886ec5b..71091d01df 100644 --- a/src/proxy.c +++ b/src/proxy.c @@ -979,7 +979,7 @@ void soft_stop(void) stopping = 1; if (tick_isset(global.hard_stop_after)) { - task = task_new(); + task = task_new(MAX_THREADS_MASK); if (task) { task->process = hard_stop; task_schedule(task, tick_add(now_ms, global.hard_stop_after)); diff --git a/src/session.c b/src/session.c index ecfa2f14d9..54a879bcf7 100644 --- a/src/session.c +++ b/src/session.c @@ -241,7 +241,7 @@ int session_accept_fd(struct listener *l, int cfd, struct sockaddr_storage *addr * conn -- owner ---> task <-----+ */ if (cli_conn->flags & (CO_FL_HANDSHAKE | CO_FL_EARLY_SSL_HS)) { - if (unlikely((sess->task = task_new()) == NULL)) + if (unlikely((sess->task = task_new(tid_bit)) == NULL)) goto out_free_sess; conn_set_xprt_done_cb(cli_conn, conn_complete_session); diff --git a/src/stick_table.c b/src/stick_table.c index d95c77f745..5e8211623c 100644 --- a/src/stick_table.c +++ b/src/stick_table.c @@ -436,7 +436,7 @@ int stktable_init(struct stktable *t) t->exp_next = TICK_ETERNITY; if ( t->expire ) { - t->exp_task = task_new(); + t->exp_task = task_new(MAX_THREADS_MASK); t->exp_task->process = process_table_expire; t->exp_task->context = (void *)t; } diff --git a/src/stream.c b/src/stream.c index a78ee969a2..522441fd02 100644 --- a/src/stream.c +++ b/src/stream.c @@ -163,7 +163,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin) s->flags |= SF_INITIALIZED; s->unique_id = NULL; - if ((t = task_new()) == NULL) + if ((t = task_new(tid_bit)) == NULL) goto out_fail_alloc; s->task = t; diff --git a/src/task.c b/src/task.c index 8d4ab39e57..0022bff302 100644 --- a/src/task.c +++ b/src/task.c @@ -36,6 +36,10 @@ unsigned int tasks_run_queue_cur = 0; /* copy of the run queue size */ unsigned int nb_tasks_cur = 0; /* copy of the tasks count */ unsigned int niced_tasks = 0; /* number of niced tasks in the run queue */ +#ifdef USE_THREAD +HA_SPINLOCK_T rq_lock; /* spin lock related to run queue */ +HA_SPINLOCK_T wq_lock; /* spin lock related to wait queue */ +#endif static struct eb_root timers; /* sorted timers tree */ static struct eb_root rqueue; /* tree constituting the run queue */ @@ -113,22 +117,29 @@ int wake_expired_tasks() { struct task *task; struct eb32_node *eb; + int ret; while (1) { + SPIN_LOCK(TASK_WQ_LOCK, &wq_lock); + lookup_next: eb = eb32_lookup_ge(&timers, now_ms - TIMER_LOOK_BACK); - if (unlikely(!eb)) { + if (!eb) { /* we might have reached the end of the tree, typically because * is in the first half and we're first scanning the last * half. Let's loop back to the beginning of the tree now. */ eb = eb32_first(&timers); - if (likely(!eb)) + if (likely(!eb)) { + SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock); break; + } } if (likely(tick_is_lt(now_ms, eb->key))) { + ret = eb->key; + SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock); /* timer not expired yet, revisit it later */ - return eb->key; + return ret; } /* timer looks expired, detach it from the queue */ @@ -150,10 +161,11 @@ int wake_expired_tasks() */ if (!tick_is_expired(task->expire, now_ms)) { if (!tick_isset(task->expire)) - continue; + goto lookup_next; __task_queue(task); - continue; + goto lookup_next; } + SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock); task_wakeup(task, TASK_WOKEN_TIMER); } @@ -192,6 +204,7 @@ void process_runnable_tasks() if (likely(niced_tasks)) max_processed = (max_processed + 3) / 4; + SPIN_LOCK(TASK_RQ_LOCK, &rq_lock); while (max_processed > 0) { /* Note: this loop is one of the fastest code path in * the whole program. It should not be re-arranged @@ -216,12 +229,14 @@ void process_runnable_tasks() while (local_tasks_count < 16) { t = eb32_entry(rq_next, struct task, rq); rq_next = eb32_next(rq_next); - /* detach the task from the queue */ - __task_unlink_rq(t); - t->state |= TASK_RUNNING; - t->pending_state = 0; - t->calls++; - local_tasks[local_tasks_count++] = t; + if (t->process_mask & (1UL << tid)) { + /* detach the task from the queue */ + __task_unlink_rq(t); + t->state |= TASK_RUNNING; + t->pending_state = 0; + t->calls++; + local_tasks[local_tasks_count++] = t; + } if (!rq_next) { if (rewind || !(rq_next = eb32_first(&rqueue))) { break; @@ -233,6 +248,7 @@ void process_runnable_tasks() if (!local_tasks_count) break; + SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); for (i = 0; i < local_tasks_count ; i++) { t = local_tasks[i]; @@ -247,6 +263,7 @@ void process_runnable_tasks() } max_processed -= local_tasks_count; + SPIN_LOCK(TASK_RQ_LOCK, &rq_lock); for (i = 0; i < local_tasks_count ; i++) { t = local_tasks[i]; if (likely(t != NULL)) { @@ -263,6 +280,7 @@ void process_runnable_tasks() } } } + SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); } /* perform minimal intializations, report 0 in case of error, 1 if OK. */ @@ -270,6 +288,8 @@ int init_task() { memset(&timers, 0, sizeof(timers)); memset(&rqueue, 0, sizeof(rqueue)); + SPIN_INIT(&wq_lock); + SPIN_INIT(&rq_lock); pool2_task = create_pool("task", sizeof(struct task), MEM_F_SHARED); if (!pool2_task) return 0;