]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: task: move the shared runqueue to one per thread
authorWilly Tarreau <w@1wt.eu>
Thu, 16 Jun 2022 13:30:50 +0000 (15:30 +0200)
committerWilly Tarreau <w@1wt.eu>
Fri, 1 Jul 2022 17:15:14 +0000 (19:15 +0200)
Since we only use the shared runqueue to put tasks only assigned to
known threads, let's move that runqueue to each of these threads. The
goal will be to arrange an N*(N-1) mesh instead of a central contention
point.

The global_rqueue_ticks had to be dropped (for good) since we'll now
use the per-thread rqueue_ticks counter for both trees.

A few points to note:
  - the rq_lock stlil remains the global one for now so there should not
    be any gain in doing this, but should this trigger any regression, it
    is important to detect whether it's related to the lock or to the tree.

  - there's no more reason for using the scope-based version of the ebtree
    now, we could switch back to the regular eb32_tree.

  - it's worth checking if we still need TASK_GLOBAL (probably only to
    delete a task in one's own shared queue maybe).

include/haproxy/task.h
include/haproxy/tinfo-t.h
src/activity.c
src/task.c

index 58910fe7172a18d8947b776feed6cd1e0834d4ea..6c4a5f7fb8502e0259c5843f2fd3353c801de175 100644 (file)
@@ -99,7 +99,6 @@ extern struct pool_head *pool_head_notification;
 
 #ifdef USE_THREAD
 extern struct eb_root timers;      /* sorted timers tree, global */
-extern struct eb_root rqueue;      /* tree constituting the run queue */
 #endif
 
 __decl_thread(extern HA_SPINLOCK_T rq_lock);  /* spin lock related to run queue */
index df5ac7ab32e8fabe1eb75fbb3c71a036d1ab312c..94d08d4f772828a2e6ab4ebb940d5dd40a67f9b1 100644 (file)
@@ -102,6 +102,9 @@ struct thread_ctx {
 
        uint64_t prev_cpu_time;             /* previous per thread CPU time */
        uint64_t prev_mono_time;            /* previous system wide monotonic time  */
+
+       struct eb_root rqueue_shared;       /* run queue fed by other threads */
+
        ALWAYS_ALIGN(128);
 };
 
index eb49169b581d1e4102ca879590f98fcd8b3eb9c4..c86d4d7de573042f0f0f44110e2df1af65ebc359 100644 (file)
@@ -873,17 +873,20 @@ static int cli_io_handler_show_tasks(struct appctx *appctx)
        /* 1. global run queue */
 
 #ifdef USE_THREAD
-       rqnode = eb32sc_first(&rqueue, ~0UL);
-       while (rqnode) {
-               t = eb32sc_entry(rqnode, struct task, rq);
-               entry = sched_activity_entry(tmp_activity, t->process);
-               if (t->call_date) {
-                       lat = now_ns - t->call_date;
-                       if ((int64_t)lat > 0)
-                               entry->lat_time += lat;
+       for (thr = 0; thr < global.nbthread; thr++) {
+               /* task run queue */
+               rqnode = eb32sc_first(&ha_thread_ctx[thr].rqueue_shared, ~0UL);
+               while (rqnode) {
+                       t = eb32sc_entry(rqnode, struct task, rq);
+                       entry = sched_activity_entry(tmp_activity, t->process);
+                       if (t->call_date) {
+                               lat = now_ns - t->call_date;
+                               if ((int64_t)lat > 0)
+                                       entry->lat_time += lat;
+                       }
+                       entry->calls++;
+                       rqnode = eb32sc_next(rqnode, ~0UL);
                }
-               entry->calls++;
-               rqnode = eb32sc_next(rqnode, ~0UL);
        }
 #endif
        /* 2. all threads's local run queues */
index 247ed807a0a86bbff91af9604d115056b984c0d4..f972142243fed5adf7cbce5cf68eff3e42ca4760 100644 (file)
@@ -43,9 +43,7 @@ __decl_aligned_rwlock(wq_lock);   /* RW lock related to the wait queue */
 
 #ifdef USE_THREAD
 struct eb_root timers;      /* sorted timers tree, global, accessed under wq_lock */
-struct eb_root rqueue;      /* tree constituting the global run queue, accessed under rq_lock */
 unsigned int grq_total;     /* total number of entries in the global run queue, atomic */
-static unsigned int global_rqueue_ticks;  /* insertion count in the grq, use rq_lock */
 #endif
 
 
@@ -234,7 +232,7 @@ void __task_wakeup(struct task *t)
 
 #ifdef USE_THREAD
        if (thr != tid) {
-               root = &rqueue;
+               root = &ha_thread_ctx[thr].rqueue_shared;
 
                _HA_ATOMIC_INC(&grq_total);
                HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
@@ -243,7 +241,7 @@ void __task_wakeup(struct task *t)
                        global_tasks_mask = all_threads_mask;
                else
                        global_tasks_mask |= 1UL << thr;
-               t->rq.key = ++global_rqueue_ticks;
+               t->rq.key = _HA_ATOMIC_ADD_FETCH(&ha_thread_ctx[thr].rqueue_ticks, 1);
                __ha_barrier_store();
        } else
 #endif
@@ -838,9 +836,9 @@ void process_runnable_tasks()
                if ((global_tasks_mask & tid_bit) && !grq) {
 #ifdef USE_THREAD
                        HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
-                       grq = eb32sc_lookup_ge(&rqueue, global_rqueue_ticks - TIMER_LOOK_BACK, tid_bit);
+                       grq = eb32sc_lookup_ge(&th_ctx->rqueue_shared, _HA_ATOMIC_LOAD(&tt->rqueue_ticks) - TIMER_LOOK_BACK, tid_bit);
                        if (unlikely(!grq)) {
-                               grq = eb32sc_first(&rqueue, tid_bit);
+                               grq = eb32sc_first(&th_ctx->rqueue_shared, tid_bit);
                                if (!grq) {
                                        global_tasks_mask &= ~tid_bit;
                                        HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
@@ -875,7 +873,7 @@ void process_runnable_tasks()
                        eb32sc_delete(&t->rq);
 
                        if (unlikely(!grq)) {
-                               grq = eb32sc_first(&rqueue, tid_bit);
+                               grq = eb32sc_first(&th_ctx->rqueue_shared, tid_bit);
                                if (!grq) {
                                        global_tasks_mask &= ~tid_bit;
                                        HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
@@ -942,7 +940,7 @@ void mworker_cleantasks()
 
 #ifdef USE_THREAD
        /* cleanup the global run queue */
-       tmp_rq = eb32sc_first(&rqueue, ~0UL);
+       tmp_rq = eb32sc_first(&th_ctx->rqueue_shared, ~0UL);
        while (tmp_rq) {
                t = eb32sc_entry(tmp_rq, struct task, rq);
                tmp_rq = eb32sc_next(tmp_rq, ~0UL);
@@ -981,7 +979,6 @@ static void init_task()
 
 #ifdef USE_THREAD
        memset(&timers, 0, sizeof(timers));
-       memset(&rqueue, 0, sizeof(rqueue));
 #endif
        for (i = 0; i < MAX_THREADS; i++) {
                for (q = 0; q < TL_CLASSES; q++)