]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: tasks: Remove the per-thread group wait queue
authorOlivier Houchard <ohouchard@haproxy.com>
Tue, 28 Apr 2026 16:46:54 +0000 (18:46 +0200)
committerOlivier Houchard <cognet@ci0.org>
Fri, 12 Jun 2026 09:49:09 +0000 (11:49 +0200)
Totally remove the per-thread group wait queue. This was potentially a
source of contention, because there were only a global lock for all
those wait queues.
Instead, for shared tasks, there is now the concept of ownership for the
task. When a task is in the wait queue, run queue, or is running on that
particular thread, the task's tid is set to -2 - thread_tid, and only
that thread will be responsible for it until it is no longer running,
and in none of its queue.
When a shared task is scheduled to be run at a later time, if its
current tid is -1, then the current thread will take ownership, and put
it in its own wait queue. If it is already owned, then TASK_WOKEN_WQ is
added to the task's state, and a task_wakeup() is done, so that the
owner thread will add it in its wait queue.
If there is any owner, then a task_wakeup() will just add the task to
the owner's runqueue, otherwise the current thread will become the
owner.

include/haproxy/task-t.h
include/haproxy/task.h
src/task.c

index 45d68a85ad9167aa321b7d280102d38ef2b4d568..787409f263576a5aa63985c10e6ba8d46237581c 100644 (file)
@@ -53,7 +53,7 @@
 /* use this to check a task state or to clean it up before queueing */
 #define TASK_WOKEN_ANY    (TASK_WOKEN_OTHER|TASK_WOKEN_INIT|TASK_WOKEN_TIMER| \
                            TASK_WOKEN_IO|TASK_WOKEN_SIGNAL|TASK_WOKEN_MSG| \
-                           TASK_WOKEN_RES)
+                           TASK_WOKEN_RES|TASK_WOKEN_WQ)
 
 #define TASK_F_TASKLET    0x00008000  /* nature of this task: 0=task 1=tasklet */
 #define TASK_F_USR1       0x00010000  /* preserved user flag 1, application-specific, def:0 */
@@ -61,7 +61,8 @@
 #define TASK_F_UEVT2      0x00040000  /* one-shot user event type 2, application specific, def:0 */
 #define TASK_F_WANTS_TIME 0x00080000  /* task/tasklet wants th_ctx->sched_call_date to be set */
 #define TASK_F_UEVT3      0x00100000  /* one-shot user event type 3, application specific, def:0 */
-/* unused: 0x200000..0x80000000 */
+#define TASK_WOKEN_WQ     0x00200000  /* The task has been waken up only to be put in the wait queue, because its expire changed */
+/* unused: 0x400000..0x80000000 */
 
 /* These flags are persistent across scheduler calls */
 #define TASK_PERSISTENT   (TASK_SELF_WAKING | TASK_KILLED | \
@@ -82,7 +83,7 @@ static forceinline char *task_show_state(char *buf, size_t len, const char *deli
        _(TASK_KILLED, _(TASK_HEAVY, _(TASK_WOKEN_INIT,
        _(TASK_WOKEN_TIMER, _(TASK_WOKEN_IO, _(TASK_WOKEN_SIGNAL,
        _(TASK_WOKEN_MSG, _(TASK_WOKEN_RES, _(TASK_WOKEN_OTHER,
-       _(TASK_F_TASKLET, _(TASK_F_USR1))))))))))))));
+       _(TASK_F_TASKLET, _(TASK_F_USR1, _(TASK_WOKEN_WQ)))))))))))))));
        /* epilogue */
        _(~0U);
        return buf;
index 6f76e43a856b5e836b3da7ca4b8227bbe7fdef9e..3ced8a9b29dc8a338122e8700bbf1ba5b59f60fb 100644 (file)
@@ -98,7 +98,8 @@ struct list *__tasklet_wakeup_after(struct list *head, struct tasklet *tl);
 void task_kill(struct task *t);
 void tasklet_kill(struct tasklet *t);
 void __task_wakeup(struct task *t);
-void __task_queue(struct task *task, struct eb_root *wq);
+void __task_queue(struct task *task);
+static inline void _task_queue(struct task *task, const struct ha_caller *caller);
 
 unsigned int run_tasks_from_lists(unsigned int budgets[]);
 
@@ -325,12 +326,16 @@ static inline void task_drop_running(struct task *t, unsigned int f)
 
                state = _HA_ATOMIC_LOAD(&t->state);
                new_state = (state | f) &~ TASK_RUNNING;
+               cur_tid = t->tid;
+               if ((new_state & TASK_WOKEN_WQ) && __task_get_current_owner(cur_tid) == tid) {
+                       _task_queue(t, NULL);
+                       new_state &= ~TASK_WOKEN_WQ;
+               }
                if (new_state & TASK_WOKEN_ANY)
                        new_state |= TASK_QUEUED;
 
-               cur_tid = t->tid;
 
-               if ((new_state & TASK_QUEUED) || cur_tid >= 0)
+               if ((new_state & TASK_QUEUED) || cur_tid >= 0 || task_in_wq(t))
                        new_tid = cur_tid;
                else
                        new_tid = -1;
@@ -399,34 +404,17 @@ static inline void _task_queue(struct task *task, const struct ha_caller *caller
        if (!tick_isset(task->expire))
                return;
 
-#ifdef USE_THREAD
-       if (task->tid < 0) {
-               HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &wq_lock);
-               if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) {
-                       if (likely(caller)) {
-                               caller = HA_ATOMIC_XCHG(&task->caller, caller);
-                               BUG_ON((ulong)caller & 1);
-#ifdef DEBUG_TASK
-                               HA_ATOMIC_STORE(&task->debug.prev_caller, caller);
-#endif
-                       }
-                       __task_queue(task, &tg_ctx->timers);
-               }
-               HA_RWLOCK_WRUNLOCK(TASK_WQ_LOCK, &wq_lock);
-       } else
-#endif
-       {
-               BUG_ON(task->tid != tid);
-               if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) {
-                       if (likely(caller)) {
-                               caller = HA_ATOMIC_XCHG(&task->caller, caller);
-                               BUG_ON((ulong)caller & 1);
+       BUG_ON(task->tid >= 0 && task->tid != tid);
+
+       if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) {
+               if (likely(caller)) {
+                       caller = HA_ATOMIC_XCHG(&task->caller, caller);
+                       BUG_ON((ulong)caller & 1);
 #ifdef DEBUG_TASK
-                               HA_ATOMIC_STORE(&task->debug.prev_caller, caller);
+                       HA_ATOMIC_STORE(&task->debug.prev_caller, caller);
 #endif
-                       }
-                       __task_queue(task, &th_ctx->timers);
                }
+               __task_queue(task);
        }
 }
 
@@ -446,6 +434,11 @@ static inline void task_set_thread(struct task *t, int thr)
        /* no shared queue without threads */
        thr = 0;
 #endif
+       /*
+        * Nothing to do, the task is only temporarily owned
+        */
+       if (thr == -1 && t->tid == -2 - tid)
+               return;
        if (unlikely(task_in_wq(t))) {
                task_unlink_wq(t);
                t->tid = thr;
@@ -790,11 +783,11 @@ static inline void tasklet_set_tid(struct tasklet *tl, int tid)
 
 static inline void _task_schedule(struct task *task, int when, const struct ha_caller *caller)
 {
+       int did_lock = 0;
        /* TODO: mthread, check if there is no task with this test */
        if (task_in_rq(task))
                return;
 
-#ifdef USE_THREAD
        if (task->tid < 0) {
                /*
                 * If the task is already running, then just wake it up, just
@@ -812,44 +805,26 @@ static inline void _task_schedule(struct task *task, int when, const struct ha_c
                        task_wakeup(task, TASK_WOKEN_OTHER);
                        return;
                }
-
-               /* FIXME: is it really needed to lock the WQ during the check ? */
-               HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &wq_lock);
-               if (task_in_wq(task))
-                       when = tick_first(when, task->expire);
-
-               task->expire = when;
-               if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) {
-                       if (likely(caller)) {
-                               caller = HA_ATOMIC_XCHG(&task->caller, caller);
-                               BUG_ON((ulong)caller & 1);
-#ifdef DEBUG_TASK
-                               HA_ATOMIC_STORE(&task->debug.prev_caller, caller);
-#endif
-                       }
-                       __task_queue(task, &tg_ctx->timers);
-               }
-               task_drop_running(task, 0);
-               HA_RWLOCK_WRUNLOCK(TASK_WQ_LOCK, &wq_lock);
+               did_lock = 1;
        } else
-#endif
-       {
                BUG_ON(task->tid != tid);
-               if (task_in_wq(task))
-                       when = tick_first(when, task->expire);
 
-               task->expire = when;
-               if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) {
-                       if (likely(caller)) {
-                               caller = HA_ATOMIC_XCHG(&task->caller, caller);
-                               BUG_ON((ulong)caller & 1);
+       if (task_in_wq(task))
+               when = tick_first(when, task->expire);
+
+       task->expire = when;
+       if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) {
+               if (likely(caller)) {
+                       caller = HA_ATOMIC_XCHG(&task->caller, caller);
+                       BUG_ON((ulong)caller & 1);
 #ifdef DEBUG_TASK
-                               HA_ATOMIC_STORE(&task->debug.prev_caller, caller);
+                       HA_ATOMIC_STORE(&task->debug.prev_caller, caller);
 #endif
-                       }
-                       __task_queue(task, &th_ctx->timers);
                }
+               __task_queue(task);
        }
+       if (did_lock)
+               task_drop_running(task, 0);
 }
 
 /* returns the string corresponding to a task type as found in the task caller
index 80f3368033752da9a8883107142e16b06a08204f..de584993847e381355fddca0f7a8fcafc995a41e 100644 (file)
@@ -222,7 +222,13 @@ struct list *__tasklet_wakeup_after(struct list *head, struct tasklet *tl)
 void __task_wakeup(struct task *t)
 {
        struct eb_root *root = &th_ctx->rqueue;
-       int thr __maybe_unused = t->tid >= 0 ? t->tid : tid;
+       /*
+        * At this point the task tid should always be set to the relevant
+        * thread, so we can just use __task_get_current_owner();
+        */
+       int thr __maybe_unused = __task_get_current_owner(t->tid);
+
+       BUG_ON(t->tid == -1);
 
 #ifdef USE_THREAD
        if (thr != tid) {
@@ -281,18 +287,38 @@ void __task_wakeup(struct task *t)
  * at all about locking so the caller must be careful when deciding whether to
  * lock or not around this call.
  */
-void __task_queue(struct task *task, struct eb_root *wq)
+void __task_queue(struct task *task)
 {
-#ifdef USE_THREAD
-       BUG_ON((wq == &tg_ctx->timers && task->tid >= 0) ||
-              (wq == &th_ctx->timers && task->tid < 0) ||
-              (wq != &tg_ctx->timers && wq != &th_ctx->timers));
-#endif
+       int old_state, new_state;
+       int old_tid;
+       int cur_owner;
+
        /* if this happens the process is doomed anyway, so better catch it now
         * so that we have the caller in the stack.
         */
        BUG_ON(task->expire == TICK_ETERNITY);
 
+       do {
+               new_state = old_state = _HA_ATOMIC_LOAD(&task->state);
+               if (old_state & TASK_KILLED)
+                       return;
+               old_tid = _HA_ATOMIC_LOAD(&task->tid);
+               cur_owner = __task_get_current_owner(old_tid);
+               if (old_tid != -1 && cur_owner != tid)
+                       new_state |= TASK_WOKEN_WQ;
+       } while (!(__task_set_state_and_tid(task, old_tid, __task_get_new_tid_field(old_tid), old_state, new_state)));
+
+       if (cur_owner != tid && cur_owner != -1) {
+               /*
+                * If the task has already been woken up to be added in the
+                * wait queue, nothing left to do, the target thread will
+                * eventually do the right thing.
+                */
+               if (!(old_state & TASK_WOKEN_WQ))
+                       _task_wakeup(task, 0, NULL);
+               return;
+       }
+
        if (likely(task_in_wq(task)))
                __task_unlink_wq(task);
 
@@ -304,7 +330,7 @@ void __task_queue(struct task *task, struct eb_root *wq)
                return;
 #endif
 
-       eb32_insert(wq, &task->wq);
+       eb32_insert(&th_ctx->timers, &task->wq);
 }
 
 /*
@@ -317,7 +343,6 @@ void wake_expired_tasks()
        int max_processed = global.tune.runqueue_depth;
        struct task *task;
        struct eb32_node *eb;
-       __decl_thread(int key);
 
        while (1) {
                if (max_processed-- <= 0)
@@ -364,107 +389,14 @@ void wake_expired_tasks()
                         */
                        __task_unlink_wq(task);
                        if (tick_isset(task->expire))
-                               __task_queue(task, &tt->timers);
-               }
-               else {
-                       /* task not expired and correctly placed. It may not be eternal. */
-                       BUG_ON(task->expire == TICK_ETERNITY);
-                       break;
-               }
-       }
-
-#ifdef USE_THREAD
-       if (eb_is_empty(&tg_ctx->timers))
-               goto leave;
-
-       HA_RWLOCK_RDLOCK(TASK_WQ_LOCK, &wq_lock);
-       eb = eb32_lookup_ge(&tg_ctx->timers, now_ms - TIMER_LOOK_BACK);
-       if (!eb) {
-               eb = eb32_first(&tg_ctx->timers);
-               if (likely(!eb)) {
-                       HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &wq_lock);
-                       goto leave;
-               }
-       }
-       key = eb->key;
-
-       if (tick_is_lt(now_ms, key)) {
-               HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &wq_lock);
-               goto leave;
-       }
-
-       /* There's really something of interest here, let's visit the queue */
-
-       if (HA_RWLOCK_TRYRDTOSK(TASK_WQ_LOCK, &wq_lock)) {
-               /* if we failed to grab the lock it means another thread is
-                * already doing the same here, so let it do the job.
-                */
-               HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &wq_lock);
-               goto leave;
-       }
-
-       while (1) {
-  lookup_next:
-               if (max_processed-- <= 0)
-                       break;
-               eb = eb32_lookup_ge(&tg_ctx->timers, now_ms - TIMER_LOOK_BACK);
-               if (!eb) {
-                       /* we might have reached the end of the tree, typically because
-                       * <now_ms> is in the first half and we're first scanning the last
-                       * half. Let's loop back to the beginning of the tree now.
-                       */
-                       eb = eb32_first(&tg_ctx->timers);
-                       if (likely(!eb))
-                               break;
-               }
-
-               task = eb32_entry(eb, struct task, wq);
-
-               /* Check for any competing run of the task (quite rare but may
-                * involve a dangerous concurrent access on task->expire). In
-                * order to protect against this, we'll take an exclusive access
-                * on TASK_RUNNING before checking/touching task->expire. If the
-                * task is already RUNNING on another thread, it will deal by
-                * itself with the requeuing so we must not do anything and
-                * simply quit the loop for now, because we cannot wait with the
-                * WQ lock held as this would prevent the running thread from
-                * requeuing the task. One annoying effect of holding RUNNING
-                * here is that a concurrent task_wakeup() will refrain from
-                * waking it up. This forces us to check for a wakeup after
-                * releasing the flag.
-                */
-               if (HA_ATOMIC_FETCH_OR(&task->state, TASK_RUNNING) & TASK_RUNNING)
-                       break;
-
-               if (tick_is_expired(task->expire, now_ms)) {
-                       /* expired task, wake it up */
-                       HA_RWLOCK_SKTOWR(TASK_WQ_LOCK, &wq_lock);
-                       __task_unlink_wq(task);
-                       HA_RWLOCK_WRTOSK(TASK_WQ_LOCK, &wq_lock);
-                       task_drop_running(task, TASK_WOKEN_TIMER);
-               }
-               else if (task->expire != eb->key) {
-                       /* task is not expired but its key doesn't match so let's
-                        * update it and skip to next apparently expired task.
-                        */
-                       HA_RWLOCK_SKTOWR(TASK_WQ_LOCK, &wq_lock);
-                       __task_unlink_wq(task);
-                       if (tick_isset(task->expire))
-                               __task_queue(task, &tg_ctx->timers);
-                       HA_RWLOCK_WRTOSK(TASK_WQ_LOCK, &wq_lock);
-                       task_drop_running(task, 0);
-                       goto lookup_next;
+                               __task_queue(task);
                }
                else {
                        /* task not expired and correctly placed. It may not be eternal. */
                        BUG_ON(task->expire == TICK_ETERNITY);
-                       task_drop_running(task, 0);
                        break;
                }
        }
-
-       HA_RWLOCK_SKUNLOCK(TASK_WQ_LOCK, &wq_lock);
-#endif
 leave:
        return;
 }
@@ -658,6 +590,19 @@ unsigned int run_tasks_from_lists(unsigned int budgets[])
                        goto next;
                }
 
+               if (state & TASK_WOKEN_WQ) {
+                       /* We should add this task to our wait queue */
+                       task_queue(t);
+                       /*
+                        * If this is the only reason the task got scheduled,
+                        * then we don't actually have ot run it.
+                        */
+                       if ((state & TASK_WOKEN_ANY) == TASK_WOKEN_WQ) {
+                               task_drop_running(t, 0);
+                               goto next;
+                       }
+                       state &= ~TASK_WOKEN_WQ;
+               }
                /* OK now the task or tasklet is well alive and is going to be run */
                if (state & TASK_F_TASKLET) {
                        /* this is a tasklet */
@@ -686,7 +631,8 @@ unsigned int run_tasks_from_lists(unsigned int budgets[])
                                        __task_free(t);
                                }
                                else {
-                                       task_queue(t);
+                                       if (__task_get_current_owner(t->tid) == tid)
+                                               task_queue(t);
                                        task_drop_running(t, 0);
                                }
                        }