]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: task: remove the tasks_run_queue counter and have one per thread
authorWilly Tarreau <w@1wt.eu>
Wed, 24 Feb 2021 14:10:07 +0000 (15:10 +0100)
committerWilly Tarreau <w@1wt.eu>
Wed, 24 Feb 2021 16:42:04 +0000 (17:42 +0100)
This counter is solely used for reporting in the stats and is the hottest
thread contention point to date. Moving it to the scheduler and having a
separate one for the global run queue dramatically improves the performance,
showing a 12% boost on the request rate on 16 threads!

In addition, the thread debugging output which used to rely on rqueue_size
was not totally accurate as it would only report task counts. Now we can
return the exact thread's run queue length.

It is also interesting to note that there are still a few other task/tasklet
counters in the scheduler that are not efficiently updated because some cover
a single area and others cover multiple areas. It looks like having a distinct
counter for each of the following entries would help and would keep the code
a bit cleaner:
  - global run queue (tree)
  - per-thread run queue (tree)
  - per-thread shared tasklets list
  - per-thread local lists

Maybe even splitting the shared tasklets lists between pure tasklets and
tasks instead of having the whole and tasks would simplify the code because
there remain a number of places where several counters have to be updated.

include/haproxy/task-t.h
include/haproxy/task.h
src/debug.c
src/stats.c
src/task.c

index 1ba91409dfde716fbec8ee791948e577ba2024e8..34727fa4e2fa35561d0f2acaadab87c4117913b5 100644 (file)
@@ -81,6 +81,7 @@ struct task_per_thread {
        int rqueue_size;        /* Number of elements in the per-thread run queue */
        int current_queue;      /* points to current tasklet list being run, -1 if none */
        struct task *current;   /* current task (not tasklet) */
+       unsigned int rq_total;  /* total size of the run queue, prio_tree + tasklets */
        uint8_t tl_class_mask;  /* bit mask of non-empty tasklets classes */
        __attribute__((aligned(64))) char end[0];
 };
index f774ba224a59d320b1cd81c73ebbddada095c751..58418189eb6ed428217eec37f40d74ddd27dd0b1 100644 (file)
@@ -89,8 +89,7 @@
 /* a few exported variables */
 extern unsigned int nb_tasks;     /* total number of tasks */
 extern volatile unsigned long global_tasks_mask; /* Mask of threads with tasks in the global runqueue */
-extern unsigned int tasks_run_queue;    /* run queue size */
-extern unsigned int tasks_run_queue_cur;
+extern unsigned int grq_total;    /* total number of entries in the global run queue */
 extern unsigned int nb_tasks_cur;
 extern unsigned int niced_tasks;  /* number of niced tasks in the run queue */
 extern struct pool_head *pool_head_task;
@@ -144,7 +143,22 @@ int next_timer_expiry();
  */
 void mworker_cleantasks();
 
+/* returns the number of running tasks+tasklets on the whole process. Note
+ * that this *is* racy since a task may move from the global to a local
+ * queue for example and be counted twice. This is only for statistics
+ * reporting.
+ */
+static inline int total_run_queues()
+{
+       int thr, ret = 0;
 
+#ifdef USE_THREAD
+       ret = _HA_ATOMIC_LOAD(&grq_total);
+#endif
+       for (thr = 0; thr < global.nbthread; thr++)
+               ret += _HA_ATOMIC_LOAD(&task_per_thread[thr].rq_total);
+       return ret;
+}
 
 /* return 0 if task is in run queue, otherwise non-zero */
 static inline int task_in_rq(struct task *t)
@@ -294,21 +308,26 @@ static inline void task_set_affinity(struct task *t, unsigned long thread_mask)
 }
 
 /*
- * Unlink the task from the run queue. The tasks_run_queue size and number of
- * niced tasks are updated too. A pointer to the task itself is returned. The
- * task *must* already be in the run queue before calling this function. If
- * unsure, use the safer task_unlink_rq() function. Note that the pointer to the
- * next run queue entry is neither checked nor updated.
+ * Unlink the task from the run queue. The run queue size and number of niced
+ * tasks are updated too. A pointer to the task itself is returned. The task
+ * *must* already be in the run queue before calling this function. If the task
+ * is in the global run queue, the global run queue's lock must already be held.
+ * If unsure, use the safer task_unlink_rq() function. Note that the pointer to
+ * the next run queue entry is neither checked nor updated.
  */
 static inline struct task *__task_unlink_rq(struct task *t)
 {
-       _HA_ATOMIC_SUB(&tasks_run_queue, 1);
 #ifdef USE_THREAD
-       if (t->state & TASK_GLOBAL)
+       if (t->state & TASK_GLOBAL) {
+               grq_total--;
                _HA_ATOMIC_AND(&t->state, ~TASK_GLOBAL);
+       }
        else
 #endif
+       {
                sched->rqueue_size--;
+               _HA_ATOMIC_SUB(&sched->rq_total, 1);
+       }
        eb32sc_delete(&t->rq);
        if (likely(t->nice))
                _HA_ATOMIC_SUB(&niced_tasks, 1);
@@ -377,15 +396,16 @@ static inline void _tasklet_wakeup_on(struct tasklet *tl, int thr, const char *f
                        LIST_ADDQ(&sched->tasklets[sched->current_queue], &tl->list);
                        sched->tl_class_mask |= 1 << sched->current_queue;
                }
+               _HA_ATOMIC_ADD(&sched->rq_total, 1);
        } else {
                /* this tasklet runs on a specific thread. */
                MT_LIST_ADDQ(&task_per_thread[thr].shared_tasklet_list, (struct mt_list *)&tl->list);
+               _HA_ATOMIC_ADD(&task_per_thread[thr].rq_total, 1);
                if (sleeping_thread_mask & (1UL << thr)) {
                        _HA_ATOMIC_AND(&sleeping_thread_mask, ~(1UL << thr));
                        wake_thread(thr);
                }
        }
-       _HA_ATOMIC_ADD(&tasks_run_queue, 1);
 }
 
 /* schedules tasklet <tl> to run onto the thread designated by tl->tid, which
@@ -419,7 +439,7 @@ static inline void tasklet_remove_from_tasklet_list(struct tasklet *t)
 {
        if (MT_LIST_DEL((struct mt_list *)&t->list)) {
                _HA_ATOMIC_AND(&t->state, ~TASK_IN_LIST);
-               _HA_ATOMIC_SUB(&tasks_run_queue, 1);
+               _HA_ATOMIC_SUB(&task_per_thread[t->tid >= 0 ? t->tid : tid].rq_total, 1);
        }
 }
 
@@ -547,7 +567,7 @@ static inline void task_destroy(struct task *t)
 static inline void tasklet_free(struct tasklet *tl)
 {
        if (MT_LIST_DEL((struct mt_list *)&tl->list))
-               _HA_ATOMIC_SUB(&tasks_run_queue, 1);
+               _HA_ATOMIC_SUB(&task_per_thread[tl->tid >= 0 ? tl->tid : tid].rq_total, 1);
 
 #ifdef DEBUG_TASK
        if ((unsigned int)tl->debug.caller_idx > 1)
index bf64f19f05daa52023beb918de04acdf1628e1d1..3162d3282fade8d9219b57b3b8712b92a507d685 100644 (file)
@@ -175,7 +175,7 @@ void ha_thread_dump(struct buffer *buf, int thr, int calling_tid)
                        LIST_ISEMPTY(&task_per_thread[thr].tasklets[TL_BULK]) &&
                        MT_LIST_ISEMPTY(&task_per_thread[thr].shared_tasklet_list)),
                      task_per_thread[thr].task_list_size,
-                     task_per_thread[thr].rqueue_size,
+                     task_per_thread[thr].rq_total,
                      stuck,
                      !!(task_profiling_mask & thr_bit));
 
index a63178d5a7364fe8920a5b4d240c82fbad3a9780..e124f28ba1657d1cb8b77ee51d68259b27366575 100644 (file)
@@ -3339,7 +3339,7 @@ static void stats_dump_html_info(struct stream_interface *si, struct uri_auth *u
                      actconn, pipes_used, pipes_used+pipes_free, read_freq_ctr(&global.conn_per_sec),
                      bps >= 1000000000UL ? (bps / 1000000000.0) : bps >= 1000000UL ? (bps / 1000000.0) : (bps / 1000.0),
                      bps >= 1000000000UL ? 'G' : bps >= 1000000UL ? 'M' : 'k',
-                     tasks_run_queue_cur, nb_tasks_cur, ti->idle_pct
+                     total_run_queues(), nb_tasks_cur, ti->idle_pct
                      );
 
        /* scope_txt = search query, appctx->ctx.stats.scope_len is always <= STAT_SCOPE_TXT_MAXLEN */
@@ -4366,7 +4366,7 @@ int stats_fill_info(struct field *info, int len)
        info[INF_MAX_ZLIB_MEM_USAGE]             = mkf_u32(FO_CONFIG|FN_LIMIT, global.maxzlibmem);
 #endif
        info[INF_TASKS]                          = mkf_u32(0, nb_tasks_cur);
-       info[INF_RUN_QUEUE]                      = mkf_u32(0, tasks_run_queue_cur);
+       info[INF_RUN_QUEUE]                      = mkf_u32(0, total_run_queues());
        info[INF_IDLE_PCT]                       = mkf_u32(FN_AVG, ti->idle_pct);
        info[INF_NODE]                           = mkf_str(FO_CONFIG|FN_OUTPUT|FS_SERVICE, global.node);
        if (global.desc)
index 9c8312cbea6f6bbc6bb25f57fcb52af280aacc09..153f7d63879036bd673f13568e0f6c7ec9a42498 100644 (file)
@@ -37,8 +37,6 @@ DECLARE_POOL(pool_head_notification, "notification", sizeof(struct notification)
 
 unsigned int nb_tasks = 0;
 volatile unsigned long global_tasks_mask = 0; /* Mask of threads with tasks in the global runqueue */
-unsigned int tasks_run_queue = 0;
-unsigned int tasks_run_queue_cur = 0;    /* copy of the run queue size */
 unsigned int nb_tasks_cur = 0;     /* copy of the tasks count */
 unsigned int niced_tasks = 0;      /* number of niced tasks in the run queue */
 
@@ -50,6 +48,7 @@ __decl_aligned_rwlock(wq_lock);   /* RW lock related to the wait queue */
 #ifdef USE_THREAD
 struct eb_root timers;      /* sorted timers tree, global, accessed under wq_lock */
 struct eb_root rqueue;      /* tree constituting the global run queue, accessed under rq_lock */
+unsigned int grq_total;     /* total number of entries in the global run queue, use grq_lock */
 static unsigned int global_rqueue_ticks;  /* insertion count in the grq, use rq_lock */
 #endif
 
@@ -97,7 +96,7 @@ void task_kill(struct task *t)
                        /* Beware: tasks that have never run don't have their ->list empty yet! */
                        MT_LIST_ADDQ(&task_per_thread[thr].shared_tasklet_list,
                                     (struct mt_list *)&((struct tasklet *)t)->list);
-                       _HA_ATOMIC_ADD(&tasks_run_queue, 1);
+                       _HA_ATOMIC_ADD(&task_per_thread[thr].rq_total, 1);
                        _HA_ATOMIC_ADD(&task_per_thread[thr].task_list_size, 1);
                        if (sleeping_thread_mask & (1UL << thr)) {
                                _HA_ATOMIC_AND(&sleeping_thread_mask, ~(1UL << thr));
@@ -122,19 +121,18 @@ void __task_wakeup(struct task *t, struct eb_root *root)
        if (root == &rqueue) {
                HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
        }
-#endif
-       /* Make sure if the task isn't in the runqueue, nobody inserts it
-        * in the meanwhile.
-        */
-       _HA_ATOMIC_ADD(&tasks_run_queue, 1);
-#ifdef USE_THREAD
+
        if (root == &rqueue) {
                global_tasks_mask |= t->thread_mask;
+               grq_total++;
                t->rq.key = ++global_rqueue_ticks;
                __ha_barrier_store();
        } else
 #endif
+       {
+               _HA_ATOMIC_ADD(&sched->rq_total, 1);
                t->rq.key = ++sched->rqueue_ticks;
+       }
 
        if (likely(t->nice)) {
                int offset;
@@ -460,7 +458,7 @@ unsigned int run_tasks_from_lists(unsigned int budgets[])
                t->calls++;
                sched->current = t;
 
-               _HA_ATOMIC_SUB(&tasks_run_queue, 1);
+               _HA_ATOMIC_SUB(&sched->rq_total, 1);
 
                if (TASK_IS_TASKLET(t)) {
                        LIST_DEL_INIT(&((struct tasklet *)t)->list);
@@ -595,7 +593,6 @@ void process_runnable_tasks()
                return;
        }
 
-       tasks_run_queue_cur = tasks_run_queue; /* keep a copy for reporting */
        nb_tasks_cur = nb_tasks;
        max_processed = global.tune.runqueue_depth;
 
@@ -702,7 +699,7 @@ void process_runnable_tasks()
        if (picked) {
                tt->tl_class_mask |= 1 << TL_NORMAL;
                _HA_ATOMIC_ADD(&tt->task_list_size, picked);
-               _HA_ATOMIC_ADD(&tasks_run_queue, picked);
+               _HA_ATOMIC_ADD(&tt->rq_total, picked);
                activity[tid].tasksw += picked;
        }