]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: task: Split the tasklet list into two lists.
authorOlivier Houchard <ohouchard@haproxy.com>
Fri, 11 Oct 2019 14:35:01 +0000 (16:35 +0200)
committerOlivier Houchard <cognet@ci0.org>
Fri, 11 Oct 2019 14:37:41 +0000 (16:37 +0200)
As using an mt_list for the tasklet list is costly, instead use a regular list,
but add an mt_list for tasklet woken up by other threads, to be run on the
current thread. At the beginning of process_runnable_tasks(), we just take
the new list, and merge it into the task_list.
This should give us performances comparable to before we started using a
mt_list, but allow us to use tasklet_wakeup() from other threads.

include/proto/task.h
include/types/task.h
src/debug.c
src/task.c

index 261ee9c5dff63413b453ac0d5fd121759e6a9d7a..2258448103d960c219961da907f1e03570ba07a4 100644 (file)
@@ -228,11 +228,18 @@ static inline struct task *task_unlink_rq(struct task *t)
 
 static inline void tasklet_wakeup(struct tasklet *tl)
 {
-       if (MT_LIST_ADDQ(&task_per_thread[tl->tid].task_list, &tl->list) == 1) {
-               _HA_ATOMIC_ADD(&tasks_run_queue, 1);
-               if (sleeping_thread_mask & (1 << tl->tid)) {
-                       _HA_ATOMIC_AND(&sleeping_thread_mask, ~(1 << tl->tid));
-                       wake_thread(tl->tid);
+       if (tl->tid == tid) {
+               if (LIST_ISEMPTY(&tl->list)) {
+                       LIST_ADDQ(&task_per_thread[tl->tid].task_list, &tl->list);
+                       _HA_ATOMIC_ADD(&tasks_run_queue, 1);
+               }
+       } else {
+               if (MT_LIST_ADDQ(&task_per_thread[tl->tid].shared_tasklet_list, (struct mt_list *)&tl->list) == 1) {
+                       _HA_ATOMIC_ADD(&tasks_run_queue, 1);
+                       if (sleeping_thread_mask & (1 << tl->tid)) {
+                               _HA_ATOMIC_AND(&sleeping_thread_mask, ~(1 << tl->tid));
+                               wake_thread(tl->tid);
+                       }
                }
        }
 
@@ -243,23 +250,25 @@ static inline void tasklet_wakeup(struct tasklet *tl)
  */
 static inline void tasklet_insert_into_tasklet_list(struct tasklet *tl)
 {
-       if (MT_LIST_ADDQ(&sched->task_list, &tl->list) == 1)
-               _HA_ATOMIC_ADD(&tasks_run_queue, 1);
+       _HA_ATOMIC_ADD(&tasks_run_queue, 1);
+       LIST_ADDQ(&sched->task_list, &tl->list);
 }
 
 /* Remove the tasklet from the tasklet list. The tasklet MUST already be there.
  * If unsure, use tasklet_remove_from_tasklet_list() instead. If used with a
  * plain task, the caller must update the task_list_size.
+ * This should only be used by the thread that owns the tasklet, any other
+ * thread should use tasklet_cancel().
  */
 static inline void __tasklet_remove_from_tasklet_list(struct tasklet *t)
 {
-       if (MT_LIST_DEL(&t->list) == 1)
-               _HA_ATOMIC_SUB(&tasks_run_queue, 1);
+       LIST_DEL_INIT(&t->list);
+       _HA_ATOMIC_SUB(&tasks_run_queue, 1);
 }
 
 static inline void tasklet_remove_from_tasklet_list(struct tasklet *t)
 {
-       if (likely(!MT_LIST_ISEMPTY(&t->list)))
+       if (likely(!LIST_ISEMPTY(&t->list)))
                __tasklet_remove_from_tasklet_list(t);
 }
 
@@ -290,7 +299,7 @@ static inline void tasklet_init(struct tasklet *t)
        t->state = 0;
        t->process = NULL;
        t->tid = tid;
-       MT_LIST_INIT(&t->list);
+       LIST_INIT(&t->list);
 }
 
 static inline struct tasklet *tasklet_new(void)
@@ -359,11 +368,12 @@ static inline void task_destroy(struct task *t)
                t->process = NULL;
 }
 
+/* Should only be called by the thread responsible for the tasklet */
 static inline void tasklet_free(struct tasklet *tl)
 {
-       if (!MT_LIST_ISEMPTY(&tl->list)) {
-               if(MT_LIST_DEL(&tl->list) == 1)
-                       _HA_ATOMIC_SUB(&tasks_run_queue, 1);
+       if (!LIST_ISEMPTY(&tl->list)) {
+               LIST_DEL(&tl->list);
+               _HA_ATOMIC_SUB(&tasks_run_queue, 1);
        }
 
        pool_free(pool_head_tasklet, tl);
@@ -545,7 +555,7 @@ static inline int thread_has_tasks(void)
 {
        return (!!(global_tasks_mask & tid_bit) |
                (sched->rqueue_size > 0) |
-               !MT_LIST_ISEMPTY(&sched->task_list));
+               !LIST_ISEMPTY(&sched->task_list) | !MT_LIST_ISEMPTY(&sched->shared_tasklet_list));
 }
 
 /* adds list item <item> to work list <work> and wake up the associated task */
index 40304eefd782def86a8ff524d4ae9f10999a9503..fbefeef426c42b4729333d990b1ffeceae8c95fe 100644 (file)
@@ -61,7 +61,8 @@ struct notification {
 struct task_per_thread {
        struct eb_root timers;  /* tree constituting the per-thread wait queue */
        struct eb_root rqueue;  /* tree constituting the per-thread run queue */
-       struct mt_list task_list; /* List of tasks to be run, mixing tasks and tasklets */
+       struct list task_list;  /* List of tasks to be run, mixing tasks and tasklets */
+       struct mt_list shared_tasklet_list; /* Tasklet to be run, woken up by other threads */
        int task_list_size;     /* Number of tasks in the task_list */
        int rqueue_size;        /* Number of elements in the per-thread run queue */
        struct task *current;   /* current task (not tasklet) */
@@ -95,7 +96,7 @@ struct task {
 /* lightweight tasks, without priority, mainly used for I/Os */
 struct tasklet {
        TASK_COMMON;                    /* must be at the beginning! */
-       struct mt_list list;
+       struct list list;
        int tid;                        /* TID of the tasklet owner */
 };
 
index 1a1301c4faaf26309a46eaf4c34be7049f354539..5c4aa880ae57fcecd29519e9ae5e48da606b166f 100644 (file)
@@ -57,7 +57,8 @@ void ha_thread_dump(struct buffer *buf, int thr, int calling_tid)
                      !!(global_tasks_mask & thr_bit),
                      !eb_is_empty(&task_per_thread[thr].timers),
                      !eb_is_empty(&task_per_thread[thr].rqueue),
-                     !MT_LIST_ISEMPTY(&task_per_thread[thr].task_list),
+                     !(LIST_ISEMPTY(&task_per_thread[thr].task_list) |
+                       MT_LIST_ISEMPTY(&task_per_thread[thr].shared_tasklet_list)),
                      task_per_thread[thr].task_list_size,
                      task_per_thread[thr].rqueue_size,
                      stuck,
index 9ac02117b0ea7c616d5dea80acd64503b4e415d1..abf1583b3de2f330b952a84d82250bb4317e5cbe 100644 (file)
@@ -305,6 +305,7 @@ void process_runnable_tasks()
        struct eb32sc_node *grq = NULL; // next global run queue entry
        struct task *t;
        int max_processed;
+       struct mt_list *tmp_list;
 
        ti->flags &= ~TI_FL_STUCK; // this thread is still running
 
@@ -312,6 +313,12 @@ void process_runnable_tasks()
                activity[tid].empty_rq++;
                return;
        }
+       /* Merge the list of tasklets waken up by other threads to the
+        * main list.
+        */
+       tmp_list = MT_LIST_BEHEAD(&sched->shared_tasklet_list);
+       if (tmp_list)
+               LIST_SPLICE_END_DETACHED(&sched->task_list, (struct list *)tmp_list);
 
        tasks_run_queue_cur = tasks_run_queue; /* keep a copy for reporting */
        nb_tasks_cur = nb_tasks;
@@ -371,10 +378,10 @@ void process_runnable_tasks()
 #endif
 
                /* Make sure the entry doesn't appear to be in a list */
-               MT_LIST_INIT(&((struct tasklet *)t)->list);
+               LIST_INIT(&((struct tasklet *)t)->list);
                /* And add it to the local task list */
                tasklet_insert_into_tasklet_list((struct tasklet *)t);
-               HA_ATOMIC_ADD(&tt->task_list_size, 1);
+               tt->task_list_size++;
                activity[tid].tasksw++;
        }
 
@@ -384,18 +391,16 @@ void process_runnable_tasks()
                grq = NULL;
        }
 
-       while (max_processed > 0 && !MT_LIST_ISEMPTY(&tt->task_list)) {
+       while (max_processed > 0 && !LIST_ISEMPTY(&tt->task_list)) {
                struct task *t;
                unsigned short state;
                void *ctx;
                struct task *(*process)(struct task *t, void *ctx, unsigned short state);
 
-               t = (struct task *)MT_LIST_POP(&tt->task_list, struct tasklet *, list);
-               if (!t)
-                       break;
-               _HA_ATOMIC_SUB(&tasks_run_queue, 1);
+               t = (struct task *)LIST_ELEM(task_per_thread[tid].task_list.n, struct tasklet *, list);
                state = _HA_ATOMIC_XCHG(&t->state, TASK_RUNNING);
                __ha_barrier_atomic_store();
+               __tasklet_remove_from_tasklet_list((struct tasklet *)t);
 
                ti->flags &= ~TI_FL_STUCK; // this thread is still running
                activity[tid].ctxsw++;
@@ -411,7 +416,7 @@ void process_runnable_tasks()
 
                /* OK then this is a regular task */
 
-               _HA_ATOMIC_SUB(&tt->task_list_size, 1);
+               tt->task_list_size--;
                if (unlikely(t->call_date)) {
                        uint64_t now_ns = now_mono_time();
 
@@ -456,7 +461,7 @@ void process_runnable_tasks()
                max_processed--;
        }
 
-       if (!MT_LIST_ISEMPTY(&tt->task_list))
+       if (!LIST_ISEMPTY(&tt->task_list))
                activity[tid].long_rq++;
 }
 
@@ -560,7 +565,8 @@ static void init_task()
 #endif
        memset(&task_per_thread, 0, sizeof(task_per_thread));
        for (i = 0; i < MAX_THREADS; i++) {
-               MT_LIST_INIT(&task_per_thread[i].task_list);
+               LIST_INIT(&task_per_thread[i].task_list);
+               MT_LIST_INIT(&task_per_thread[i].shared_tasklet_list);
        }
 }