]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: task/thread: move the task shared wait queues per thread group
authorWilly Tarreau <w@1wt.eu>
Thu, 7 Jul 2022 13:22:55 +0000 (15:22 +0200)
committerWilly Tarreau <w@1wt.eu>
Fri, 15 Jul 2022 17:43:10 +0000 (19:43 +0200)
Their migration was postponed for convenience only but now's time for
having the shared wait queues per thread group and not just per process,
otherwise the WQ lock uses a huge amount of CPU alone.

include/haproxy/task.h
include/haproxy/tinfo-t.h
src/task.c

index a79319eb59f9fdd8512cc888b16a4ae58cb260d2..9bb09a486431e548ddb8d24c68fa31bfc86682a7 100644 (file)
@@ -94,12 +94,6 @@ extern struct pool_head *pool_head_task;
 extern struct pool_head *pool_head_tasklet;
 extern struct pool_head *pool_head_notification;
 
-#ifdef USE_THREAD
-extern struct eb_root timers;      /* sorted timers tree, global */
-#endif
-
-__decl_thread(extern HA_RWLOCK_T wq_lock);    /* RW lock related to the wait queue */
-
 void __tasklet_wakeup_on(struct tasklet *tl, int thr);
 struct list *__tasklet_wakeup_after(struct list *head, struct tasklet *tl);
 void task_kill(struct task *t);
@@ -273,10 +267,10 @@ static inline struct task *task_unlink_wq(struct task *t)
                locked = t->tid < 0;
                BUG_ON(t->tid >= 0 && t->tid != tid);
                if (locked)
-                       HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &wq_lock);
+                       HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock);
                __task_unlink_wq(t);
                if (locked)
-                       HA_RWLOCK_WRUNLOCK(TASK_WQ_LOCK, &wq_lock);
+                       HA_RWLOCK_WRUNLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock);
        }
        return t;
 }
@@ -303,10 +297,10 @@ static inline void task_queue(struct task *task)
 
 #ifdef USE_THREAD
        if (task->tid < 0) {
-               HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &wq_lock);
+               HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock);
                if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
-                       __task_queue(task, &timers);
-               HA_RWLOCK_WRUNLOCK(TASK_WQ_LOCK, &wq_lock);
+                       __task_queue(task, &tg_ctx->timers);
+               HA_RWLOCK_WRUNLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock);
        } else
 #endif
        {
@@ -666,14 +660,14 @@ static inline void task_schedule(struct task *task, int when)
 #ifdef USE_THREAD
        if (task->tid < 0) {
                /* FIXME: is it really needed to lock the WQ during the check ? */
-               HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &wq_lock);
+               HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock);
                if (task_in_wq(task))
                        when = tick_first(when, task->expire);
 
                task->expire = when;
                if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
-                       __task_queue(task, &timers);
-               HA_RWLOCK_WRUNLOCK(TASK_WQ_LOCK, &wq_lock);
+                       __task_queue(task, &tg_ctx->timers);
+               HA_RWLOCK_WRUNLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock);
        } else
 #endif
        {
index 16baffd6019835df5c34eefd434c9f080d8e077b..6df31f117a84b1817ade0082f1a2a632e070b96f 100644 (file)
@@ -69,6 +69,10 @@ struct tgroup_ctx {
        ulong threads_harmless;           /* mask of threads that are not modifying anything */
        ulong threads_idle;               /* mask of threads idling in the poller */
        ulong stopping_threads;           /* mask of threads currently stopping */
+
+       HA_RWLOCK_T wq_lock;              /* RW lock related to the wait queue below */
+       struct eb_root timers;            /* wait queue (sorted timers tree, global, accessed under wq_lock) */
+
        /* pad to cache line (64B) */
        char __pad[0];                    /* unused except to check remaining room */
        char __end[0] __attribute__((aligned(64)));
index 00b2887c0998a471cafb31658ce90b2c9fe1bf6c..11a1b2ccd2c6b70b104b9d0348de4e28dc537d3c 100644 (file)
@@ -36,13 +36,6 @@ DECLARE_POOL(pool_head_notification, "notification", sizeof(struct notification)
 
 unsigned int niced_tasks = 0;      /* number of niced tasks in the run queue */
 
-__decl_aligned_rwlock(wq_lock);   /* RW lock related to the wait queue */
-
-#ifdef USE_THREAD
-struct eb_root timers;      /* sorted timers tree, global, accessed under wq_lock */
-#endif
-
-
 
 /* Flags the task <t> for immediate destruction and puts it into its first
  * thread's shared tasklet list if not yet queued/running. This will bypass
@@ -277,9 +270,9 @@ void __task_wakeup(struct task *t)
 void __task_queue(struct task *task, struct eb_root *wq)
 {
 #ifdef USE_THREAD
-       BUG_ON((wq == &timers && task->tid >= 0) ||
+       BUG_ON((wq == &tg_ctx->timers && task->tid >= 0) ||
               (wq == &th_ctx->timers && task->tid < 0) ||
-              (wq != &timers && wq != &th_ctx->timers));
+              (wq != &tg_ctx->timers && wq != &th_ctx->timers));
 #endif
        /* if this happens the process is doomed anyway, so better catch it now
         * so that we have the caller in the stack.
@@ -367,32 +360,32 @@ void wake_expired_tasks()
        }
 
 #ifdef USE_THREAD
-       if (eb_is_empty(&timers))
+       if (eb_is_empty(&tg_ctx->timers))
                goto leave;
 
-       HA_RWLOCK_RDLOCK(TASK_WQ_LOCK, &wq_lock);
-       eb = eb32_lookup_ge(&timers, now_ms - TIMER_LOOK_BACK);
+       HA_RWLOCK_RDLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock);
+       eb = eb32_lookup_ge(&tg_ctx->timers, now_ms - TIMER_LOOK_BACK);
        if (!eb) {
-               eb = eb32_first(&timers);
+               eb = eb32_first(&tg_ctx->timers);
                if (likely(!eb)) {
-                       HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &wq_lock);
+                       HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock);
                        goto leave;
                }
        }
        key = eb->key;
 
        if (tick_is_lt(now_ms, key)) {
-               HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &wq_lock);
+               HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock);
                goto leave;
        }
 
        /* There's really something of interest here, let's visit the queue */
 
-       if (HA_RWLOCK_TRYRDTOSK(TASK_WQ_LOCK, &wq_lock)) {
+       if (HA_RWLOCK_TRYRDTOSK(TASK_WQ_LOCK, &tg_ctx->wq_lock)) {
                /* if we failed to grab the lock it means another thread is
                 * already doing the same here, so let it do the job.
                 */
-               HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &wq_lock);
+               HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock);
                goto leave;
        }
 
@@ -400,13 +393,13 @@ void wake_expired_tasks()
   lookup_next:
                if (max_processed-- <= 0)
                        break;
-               eb = eb32_lookup_ge(&timers, now_ms - TIMER_LOOK_BACK);
+               eb = eb32_lookup_ge(&tg_ctx->timers, now_ms - TIMER_LOOK_BACK);
                if (!eb) {
                        /* we might have reached the end of the tree, typically because
                        * <now_ms> is in the first half and we're first scanning the last
                        * half. Let's loop back to the beginning of the tree now.
                        */
-                       eb = eb32_first(&timers);
+                       eb = eb32_first(&tg_ctx->timers);
                        if (likely(!eb))
                                break;
                }
@@ -431,20 +424,20 @@ void wake_expired_tasks()
 
                if (tick_is_expired(task->expire, now_ms)) {
                        /* expired task, wake it up */
-                       HA_RWLOCK_SKTOWR(TASK_WQ_LOCK, &wq_lock);
+                       HA_RWLOCK_SKTOWR(TASK_WQ_LOCK, &tg_ctx->wq_lock);
                        __task_unlink_wq(task);
-                       HA_RWLOCK_WRTOSK(TASK_WQ_LOCK, &wq_lock);
+                       HA_RWLOCK_WRTOSK(TASK_WQ_LOCK, &tg_ctx->wq_lock);
                        task_drop_running(task, TASK_WOKEN_TIMER);
                }
                else if (task->expire != eb->key) {
                        /* task is not expired but its key doesn't match so let's
                         * update it and skip to next apparently expired task.
                         */
-                       HA_RWLOCK_SKTOWR(TASK_WQ_LOCK, &wq_lock);
+                       HA_RWLOCK_SKTOWR(TASK_WQ_LOCK, &tg_ctx->wq_lock);
                        __task_unlink_wq(task);
                        if (tick_isset(task->expire))
-                               __task_queue(task, &timers);
-                       HA_RWLOCK_WRTOSK(TASK_WQ_LOCK, &wq_lock);
+                               __task_queue(task, &tg_ctx->timers);
+                       HA_RWLOCK_WRTOSK(TASK_WQ_LOCK, &tg_ctx->wq_lock);
                        task_drop_running(task, 0);
                        goto lookup_next;
                }
@@ -456,7 +449,7 @@ void wake_expired_tasks()
                }
        }
 
-       HA_RWLOCK_SKUNLOCK(TASK_WQ_LOCK, &wq_lock);
+       HA_RWLOCK_SKUNLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock);
 #endif
 leave:
        return;
@@ -487,14 +480,14 @@ int next_timer_expiry()
                ret = eb->key;
 
 #ifdef USE_THREAD
-       if (!eb_is_empty(&timers)) {
-               HA_RWLOCK_RDLOCK(TASK_WQ_LOCK, &wq_lock);
-               eb = eb32_lookup_ge(&timers, now_ms - TIMER_LOOK_BACK);
+       if (!eb_is_empty(&tg_ctx->timers)) {
+               HA_RWLOCK_RDLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock);
+               eb = eb32_lookup_ge(&tg_ctx->timers, now_ms - TIMER_LOOK_BACK);
                if (!eb)
-                       eb = eb32_first(&timers);
+                       eb = eb32_first(&tg_ctx->timers);
                if (eb)
                        key = eb->key;
-               HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &wq_lock);
+               HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock);
                if (eb)
                        ret = tick_first(ret, key);
        }
@@ -914,7 +907,7 @@ void mworker_cleantasks()
                task_destroy(t);
        }
        /* cleanup the timers queue */
-       tmp_wq = eb32_first(&timers);
+       tmp_wq = eb32_first(&tg_ctx->timers);
        while (tmp_wq) {
                t = eb32_entry(tmp_wq, struct task, wq);
                tmp_wq = eb32_next(tmp_wq);
@@ -944,9 +937,9 @@ static void init_task()
 {
        int i, q;
 
-#ifdef USE_THREAD
-       memset(&timers, 0, sizeof(timers));
-#endif
+       for (i = 0; i < MAX_TGROUPS; i++)
+               memset(&ha_tgroup_ctx[i].timers, 0, sizeof(ha_tgroup_ctx[i].timers));
+
        for (i = 0; i < MAX_THREADS; i++) {
                for (q = 0; q < TL_CLASSES; q++)
                        LIST_INIT(&ha_thread_ctx[i].tasklets[q]);