From b20aa9eef354b5cca3f9ea43aab38d568e7fdeb7 Mon Sep 17 00:00:00 2001 From: Willy Tarreau Date: Mon, 15 Oct 2018 14:52:21 +0200 Subject: [PATCH] MAJOR: tasks: create per-thread wait queues Now we still have a main contention point with the timers in the main wait queue, but the vast majority of the tasks are pinned to a single thread. This patch creates a per-thread wait queue and queues a task to the local wait queue without any locking if the task is bound to a single thread (the current one) otherwise to the shared queue using locking. This significantly reduces contention on the wait queue. A test with 12 threads showed 11 ms spent in the WQ lock compared to 4.7 seconds in the same test without this change. The cache miss ratio decreased from 19.7% to 19.2% on the 12-thread test, and its performance increased by 1.5%. Another indirect benefit is that the average queue size is divided by the number of threads, which roughly removes log(nbthreads) levels in the tree and further speeds up lookups. --- include/proto/task.h | 63 ++++++++++++++++++++++++++++++---------- src/task.c | 69 +++++++++++++++++++++++++++++++++++++------- 2 files changed, 107 insertions(+), 25 deletions(-) diff --git a/include/proto/task.h b/include/proto/task.h index 5445c99067..6bc4f43cf2 100644 --- a/include/proto/task.h +++ b/include/proto/task.h @@ -94,10 +94,12 @@ extern struct pool_head *pool_head_notification; extern THREAD_LOCAL struct task *curr_task; /* task currently running or NULL */ extern THREAD_LOCAL struct eb32sc_node *rq_next; /* Next task to be potentially run */ #ifdef USE_THREAD +extern struct eb_root timers; /* sorted timers tree, global */ extern struct eb_root rqueue; /* tree constituting the run queue */ extern int global_rqueue_size; /* Number of element sin the global runqueue */ #endif +extern struct eb_root timers_local[MAX_THREADS]; /* tree constituting the per-thread run queue */ extern struct eb_root rqueue_local[MAX_THREADS]; /* tree constituting the per-thread run queue */ extern int rqueue_size[MAX_THREADS]; /* Number of elements in the per-thread run queue */ extern struct list task_list[MAX_THREADS]; /* List of tasks to be run, mixing tasks and tasklets */ @@ -167,12 +169,19 @@ static inline struct task *__task_unlink_wq(struct task *t) return t; } +/* remove a task from its wait queue. It may either be the local wait queue if + * the task is bound to a single thread (in which case there's no locking + * involved) or the global queue, with locking. + */ static inline struct task *task_unlink_wq(struct task *t) { - HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock); - if (likely(task_in_wq(t))) + if (likely(task_in_wq(t))) { + if (atleast2(t->thread_mask)) + HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock); __task_unlink_wq(t); - HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock); + if (atleast2(t->thread_mask)) + HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock); + } return t; } @@ -356,10 +365,14 @@ static inline void tasklet_free(struct tasklet *tl) pool_flush(pool_head_tasklet); } +void __task_queue(struct task *task, struct eb_root *wq); + /* Place into the wait queue, where it may already be. If the expiration * timer is infinite, do nothing and rely on wake_expired_task to clean up. + * If the task is bound to a single thread, it's assumed to be bound to the + * current thread's queue and is queued without locking. Otherwise it's queued + * into the global wait queue, protected by locks. */ -void __task_queue(struct task *task); static inline void task_queue(struct task *task) { /* If we already have a place in the wait queue no later than the @@ -374,10 +387,18 @@ static inline void task_queue(struct task *task) if (!tick_isset(task->expire)) return; - HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock); - if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) - __task_queue(task); - HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock); +#ifdef USE_THREAD + if (atleast2(task->thread_mask)) { + HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock); + if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) + __task_queue(task, &timers); + HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock); + } else +#endif + { + if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) + __task_queue(task, &timers_local[tid]); + } } /* Ensure will be woken up at most at . If the task is already in @@ -390,14 +411,26 @@ static inline void task_schedule(struct task *task, int when) if (task_in_rq(task)) return; - HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock); - if (task_in_wq(task)) - when = tick_first(when, task->expire); +#ifdef USE_THREAD + if (atleast2(task->thread_mask)) { + HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock); + if (task_in_wq(task)) + when = tick_first(when, task->expire); + + task->expire = when; + if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) + __task_queue(task, &timers); + HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock); + } else +#endif + { + if (task_in_wq(task)) + when = tick_first(when, task->expire); - task->expire = when; - if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) - __task_queue(task); - HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock); + task->expire = when; + if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) + __task_queue(task, &timers_local[tid]); + } } /* This function register a new signal. "lua" is the current lua diff --git a/src/task.c b/src/task.c index 3f193f2d09..27408b103e 100644 --- a/src/task.c +++ b/src/task.c @@ -50,14 +50,16 @@ int task_list_size[MAX_THREADS]; /* Number of tasks in the task_list */ __decl_hathreads(HA_SPINLOCK_T __attribute__((aligned(64))) rq_lock); /* spin lock related to run queue */ __decl_hathreads(HA_SPINLOCK_T __attribute__((aligned(64))) wq_lock); /* spin lock related to wait queue */ -static struct eb_root timers; /* sorted timers tree */ #ifdef USE_THREAD +struct eb_root timers; /* sorted timers tree, global */ struct eb_root rqueue; /* tree constituting the run queue */ int global_rqueue_size; /* Number of element sin the global runqueue */ #endif + struct eb_root rqueue_local[MAX_THREADS]; /* tree constituting the per-thread run queue */ int rqueue_size[MAX_THREADS]; /* Number of elements in the per-thread run queue */ static unsigned int rqueue_ticks; /* insertion count */ +struct eb_root timers_local[MAX_THREADS]; /* sorted timers tree, per thread */ /* Puts the task in run queue at a position depending on t->nice. is * returned. The nice value assigns boosts in 32th of the run queue size. A @@ -170,7 +172,7 @@ redo: /* * __task_queue() * - * Inserts a task into the wait queue at the position given by its expiration + * Inserts a task into wait queue at the position given by its expiration * date. It does not matter if the task was already in the wait queue or not, * as it will be unlinked. The task must not have an infinite expiration timer. * Last, tasks must not be queued further than the end of the tree, which is @@ -178,9 +180,11 @@ redo: * * This function should not be used directly, it is meant to be called by the * inline version of task_queue() which performs a few cheap preliminary tests - * before deciding to call __task_queue(). + * before deciding to call __task_queue(). Moreover this function doesn't care + * at all about locking so the caller must be careful when deciding whether to + * lock or not around this call. */ -void __task_queue(struct task *task) +void __task_queue(struct task *task, struct eb_root *wq) { if (likely(task_in_wq(task))) __task_unlink_wq(task); @@ -193,9 +197,7 @@ void __task_queue(struct task *task) return; #endif - eb32_insert(&timers, &task->wq); - - return; + eb32_insert(wq, &task->wq); } /* @@ -208,6 +210,51 @@ int wake_expired_tasks() struct eb32_node *eb; int ret = TICK_ETERNITY; + while (1) { + lookup_next_local: + eb = eb32_lookup_ge(&timers_local[tid], now_ms - TIMER_LOOK_BACK); + if (!eb) { + /* we might have reached the end of the tree, typically because + * is in the first half and we're first scanning the last + * half. Let's loop back to the beginning of the tree now. + */ + eb = eb32_first(&timers_local[tid]); + if (likely(!eb)) + break; + } + + if (tick_is_lt(now_ms, eb->key)) { + /* timer not expired yet, revisit it later */ + ret = eb->key; + break; + } + + /* timer looks expired, detach it from the queue */ + task = eb32_entry(eb, struct task, wq); + __task_unlink_wq(task); + + /* It is possible that this task was left at an earlier place in the + * tree because a recent call to task_queue() has not moved it. This + * happens when the new expiration date is later than the old one. + * Since it is very unlikely that we reach a timeout anyway, it's a + * lot cheaper to proceed like this because we almost never update + * the tree. We may also find disabled expiration dates there. Since + * we have detached the task from the tree, we simply call task_queue + * to take care of this. Note that we might occasionally requeue it at + * the same place, before , so we have to check if this happens, + * and adjust , otherwise we may skip it which is not what we want. + * We may also not requeue the task (and not point eb at it) if its + * expiration time is not set. + */ + if (!tick_is_expired(task->expire, now_ms)) { + if (tick_isset(task->expire)) + __task_queue(task, &timers_local[tid]); + goto lookup_next_local; + } + task_wakeup(task, TASK_WOKEN_TIMER); + } + +#ifdef USE_THREAD while (1) { HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock); lookup_next: @@ -224,7 +271,7 @@ int wake_expired_tasks() if (tick_is_lt(now_ms, eb->key)) { /* timer not expired yet, revisit it later */ - ret = eb->key; + ret = tick_first(ret, eb->key); break; } @@ -247,7 +294,7 @@ int wake_expired_tasks() */ if (!tick_is_expired(task->expire, now_ms)) { if (tick_isset(task->expire)) - __task_queue(task); + __task_queue(task, &timers); goto lookup_next; } task_wakeup(task, TASK_WOKEN_TIMER); @@ -255,6 +302,7 @@ int wake_expired_tasks() } HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock); +#endif return ret; } @@ -415,13 +463,14 @@ int init_task() { int i; - memset(&timers, 0, sizeof(timers)); #ifdef USE_THREAD + memset(&timers, 0, sizeof(timers)); memset(&rqueue, 0, sizeof(rqueue)); #endif HA_SPIN_INIT(&wq_lock); HA_SPIN_INIT(&rq_lock); for (i = 0; i < MAX_THREADS; i++) { + memset(&timers_local[i], 0, sizeof(timers_local[i])); memset(&rqueue_local[i], 0, sizeof(rqueue_local[i])); LIST_INIT(&task_list[i]); task_list_size[i] = 0; -- 2.39.5