]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MAJOR: tasks: create per-thread wait queues
authorWilly Tarreau <w@1wt.eu>
Mon, 15 Oct 2018 12:52:21 +0000 (14:52 +0200)
committerWilly Tarreau <w@1wt.eu>
Mon, 15 Oct 2018 17:04:40 +0000 (19:04 +0200)
Now we still have a main contention point with the timers in the main
wait queue, but the vast majority of the tasks are pinned to a single
thread. This patch creates a per-thread wait queue and queues a task
to the local wait queue without any locking if the task is bound to a
single thread (the current one) otherwise to the shared queue using
locking. This significantly reduces contention on the wait queue. A
test with 12 threads showed 11 ms spent in the WQ lock compared to
4.7 seconds in the same test without this change. The cache miss ratio
decreased from 19.7% to 19.2% on the 12-thread test, and its performance
increased by 1.5%.

Another indirect benefit is that the average queue size is divided
by the number of threads, which roughly removes log(nbthreads) levels
in the tree and further speeds up lookups.

include/proto/task.h
src/task.c

index 5445c99067a8e2980495f6c99ee7948d7a296ace..6bc4f43cf20efa681f3ffb3c972f5ea580e60f30 100644 (file)
@@ -94,10 +94,12 @@ extern struct pool_head *pool_head_notification;
 extern THREAD_LOCAL struct task *curr_task; /* task currently running or NULL */
 extern THREAD_LOCAL struct eb32sc_node *rq_next; /* Next task to be potentially run */
 #ifdef USE_THREAD
+extern struct eb_root timers;      /* sorted timers tree, global */
 extern struct eb_root rqueue;      /* tree constituting the run queue */
 extern int global_rqueue_size; /* Number of element sin the global runqueue */
 #endif
 
+extern struct eb_root timers_local[MAX_THREADS]; /* tree constituting the per-thread run queue */
 extern struct eb_root rqueue_local[MAX_THREADS]; /* tree constituting the per-thread run queue */
 extern int rqueue_size[MAX_THREADS]; /* Number of elements in the per-thread run queue */
 extern struct list task_list[MAX_THREADS]; /* List of tasks to be run, mixing tasks and tasklets */
@@ -167,12 +169,19 @@ static inline struct task *__task_unlink_wq(struct task *t)
        return t;
 }
 
+/* remove a task from its wait queue. It may either be the local wait queue if
+ * the task is bound to a single thread (in which case there's no locking
+ * involved) or the global queue, with locking.
+ */
 static inline struct task *task_unlink_wq(struct task *t)
 {
-       HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
-       if (likely(task_in_wq(t)))
+       if (likely(task_in_wq(t))) {
+               if (atleast2(t->thread_mask))
+                       HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
                __task_unlink_wq(t);
-       HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
+               if (atleast2(t->thread_mask))
+                       HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
+       }
        return t;
 }
 
@@ -356,10 +365,14 @@ static inline void tasklet_free(struct tasklet *tl)
                pool_flush(pool_head_tasklet);
 }
 
+void __task_queue(struct task *task, struct eb_root *wq);
+
 /* Place <task> into the wait queue, where it may already be. If the expiration
  * timer is infinite, do nothing and rely on wake_expired_task to clean up.
+ * If the task is bound to a single thread, it's assumed to be bound to the
+ * current thread's queue and is queued without locking. Otherwise it's queued
+ * into the global wait queue, protected by locks.
  */
-void __task_queue(struct task *task);
 static inline void task_queue(struct task *task)
 {
        /* If we already have a place in the wait queue no later than the
@@ -374,10 +387,18 @@ static inline void task_queue(struct task *task)
        if (!tick_isset(task->expire))
                return;
 
-       HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
-       if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
-               __task_queue(task);
-       HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
+#ifdef USE_THREAD
+       if (atleast2(task->thread_mask)) {
+               HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
+               if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
+                       __task_queue(task, &timers);
+               HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
+       } else
+#endif
+       {
+               if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
+                       __task_queue(task, &timers_local[tid]);
+       }
 }
 
 /* Ensure <task> will be woken up at most at <when>. If the task is already in
@@ -390,14 +411,26 @@ static inline void task_schedule(struct task *task, int when)
        if (task_in_rq(task))
                return;
 
-       HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
-       if (task_in_wq(task))
-               when = tick_first(when, task->expire);
+#ifdef USE_THREAD
+       if (atleast2(task->thread_mask)) {
+               HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
+               if (task_in_wq(task))
+                       when = tick_first(when, task->expire);
+
+               task->expire = when;
+               if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
+                       __task_queue(task, &timers);
+               HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
+       } else
+#endif
+       {
+               if (task_in_wq(task))
+                       when = tick_first(when, task->expire);
 
-       task->expire = when;
-       if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
-               __task_queue(task);
-       HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
+               task->expire = when;
+               if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
+                       __task_queue(task, &timers_local[tid]);
+       }
 }
 
 /* This function register a new signal. "lua" is the current lua
index 3f193f2d092cdd5ce802a3041d15179da5deb0e7..27408b103ee5819cf956976e1753a96b0e78320d 100644 (file)
@@ -50,14 +50,16 @@ int task_list_size[MAX_THREADS]; /* Number of tasks in the task_list */
 __decl_hathreads(HA_SPINLOCK_T __attribute__((aligned(64))) rq_lock); /* spin lock related to run queue */
 __decl_hathreads(HA_SPINLOCK_T __attribute__((aligned(64))) wq_lock); /* spin lock related to wait queue */
 
-static struct eb_root timers;      /* sorted timers tree */
 #ifdef USE_THREAD
+struct eb_root timers;      /* sorted timers tree, global */
 struct eb_root rqueue;      /* tree constituting the run queue */
 int global_rqueue_size; /* Number of element sin the global runqueue */
 #endif
+
 struct eb_root rqueue_local[MAX_THREADS]; /* tree constituting the per-thread run queue */
 int rqueue_size[MAX_THREADS]; /* Number of elements in the per-thread run queue */
 static unsigned int rqueue_ticks;  /* insertion count */
+struct eb_root timers_local[MAX_THREADS];  /* sorted timers tree, per thread */
 
 /* Puts the task <t> in run queue at a position depending on t->nice. <t> is
  * returned. The nice value assigns boosts in 32th of the run queue size. A
@@ -170,7 +172,7 @@ redo:
 /*
  * __task_queue()
  *
- * Inserts a task into the wait queue at the position given by its expiration
+ * Inserts a task into wait queue <wq> at the position given by its expiration
  * date. It does not matter if the task was already in the wait queue or not,
  * as it will be unlinked. The task must not have an infinite expiration timer.
  * Last, tasks must not be queued further than the end of the tree, which is
@@ -178,9 +180,11 @@ redo:
  *
  * This function should not be used directly, it is meant to be called by the
  * inline version of task_queue() which performs a few cheap preliminary tests
- * before deciding to call __task_queue().
+ * before deciding to call __task_queue(). Moreover this function doesn't care
+ * at all about locking so the caller must be careful when deciding whether to
+ * lock or not around this call.
  */
-void __task_queue(struct task *task)
+void __task_queue(struct task *task, struct eb_root *wq)
 {
        if (likely(task_in_wq(task)))
                __task_unlink_wq(task);
@@ -193,9 +197,7 @@ void __task_queue(struct task *task)
                return;
 #endif
 
-       eb32_insert(&timers, &task->wq);
-
-       return;
+       eb32_insert(wq, &task->wq);
 }
 
 /*
@@ -208,6 +210,51 @@ int wake_expired_tasks()
        struct eb32_node *eb;
        int ret = TICK_ETERNITY;
 
+       while (1) {
+  lookup_next_local:
+               eb = eb32_lookup_ge(&timers_local[tid], now_ms - TIMER_LOOK_BACK);
+               if (!eb) {
+                       /* we might have reached the end of the tree, typically because
+                       * <now_ms> is in the first half and we're first scanning the last
+                       * half. Let's loop back to the beginning of the tree now.
+                       */
+                       eb = eb32_first(&timers_local[tid]);
+                       if (likely(!eb))
+                               break;
+               }
+
+               if (tick_is_lt(now_ms, eb->key)) {
+                       /* timer not expired yet, revisit it later */
+                       ret = eb->key;
+                       break;
+               }
+
+               /* timer looks expired, detach it from the queue */
+               task = eb32_entry(eb, struct task, wq);
+               __task_unlink_wq(task);
+
+               /* It is possible that this task was left at an earlier place in the
+                * tree because a recent call to task_queue() has not moved it. This
+                * happens when the new expiration date is later than the old one.
+                * Since it is very unlikely that we reach a timeout anyway, it's a
+                * lot cheaper to proceed like this because we almost never update
+                * the tree. We may also find disabled expiration dates there. Since
+                * we have detached the task from the tree, we simply call task_queue
+                * to take care of this. Note that we might occasionally requeue it at
+                * the same place, before <eb>, so we have to check if this happens,
+                * and adjust <eb>, otherwise we may skip it which is not what we want.
+                * We may also not requeue the task (and not point eb at it) if its
+                * expiration time is not set.
+                */
+               if (!tick_is_expired(task->expire, now_ms)) {
+                       if (tick_isset(task->expire))
+                               __task_queue(task, &timers_local[tid]);
+                       goto lookup_next_local;
+               }
+               task_wakeup(task, TASK_WOKEN_TIMER);
+       }
+
+#ifdef USE_THREAD
        while (1) {
                HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
   lookup_next:
@@ -224,7 +271,7 @@ int wake_expired_tasks()
 
                if (tick_is_lt(now_ms, eb->key)) {
                        /* timer not expired yet, revisit it later */
-                       ret = eb->key;
+                       ret = tick_first(ret, eb->key);
                        break;
                }
 
@@ -247,7 +294,7 @@ int wake_expired_tasks()
                 */
                if (!tick_is_expired(task->expire, now_ms)) {
                        if (tick_isset(task->expire))
-                               __task_queue(task);
+                               __task_queue(task, &timers);
                        goto lookup_next;
                }
                task_wakeup(task, TASK_WOKEN_TIMER);
@@ -255,6 +302,7 @@ int wake_expired_tasks()
        }
 
        HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
+#endif
        return ret;
 }
 
@@ -415,13 +463,14 @@ int init_task()
 {
        int i;
 
-       memset(&timers, 0, sizeof(timers));
 #ifdef USE_THREAD
+       memset(&timers, 0, sizeof(timers));
        memset(&rqueue, 0, sizeof(rqueue));
 #endif
        HA_SPIN_INIT(&wq_lock);
        HA_SPIN_INIT(&rq_lock);
        for (i = 0; i < MAX_THREADS; i++) {
+               memset(&timers_local[i], 0, sizeof(timers_local[i]));
                memset(&rqueue_local[i], 0, sizeof(rqueue_local[i]));
                LIST_INIT(&task_list[i]);
                task_list_size[i] = 0;