From: Olivier Houchard Date: Fri, 11 Oct 2019 14:35:01 +0000 (+0200) Subject: MEDIUM: task: Split the tasklet list into two lists. X-Git-Tag: v2.1-dev3~107 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=06910464ddece4b161cb38137c2841cc21577c63;p=thirdparty%2Fhaproxy.git MEDIUM: task: Split the tasklet list into two lists. As using an mt_list for the tasklet list is costly, instead use a regular list, but add an mt_list for tasklet woken up by other threads, to be run on the current thread. At the beginning of process_runnable_tasks(), we just take the new list, and merge it into the task_list. This should give us performances comparable to before we started using a mt_list, but allow us to use tasklet_wakeup() from other threads. --- diff --git a/include/proto/task.h b/include/proto/task.h index 261ee9c5df..2258448103 100644 --- a/include/proto/task.h +++ b/include/proto/task.h @@ -228,11 +228,18 @@ static inline struct task *task_unlink_rq(struct task *t) static inline void tasklet_wakeup(struct tasklet *tl) { - if (MT_LIST_ADDQ(&task_per_thread[tl->tid].task_list, &tl->list) == 1) { - _HA_ATOMIC_ADD(&tasks_run_queue, 1); - if (sleeping_thread_mask & (1 << tl->tid)) { - _HA_ATOMIC_AND(&sleeping_thread_mask, ~(1 << tl->tid)); - wake_thread(tl->tid); + if (tl->tid == tid) { + if (LIST_ISEMPTY(&tl->list)) { + LIST_ADDQ(&task_per_thread[tl->tid].task_list, &tl->list); + _HA_ATOMIC_ADD(&tasks_run_queue, 1); + } + } else { + if (MT_LIST_ADDQ(&task_per_thread[tl->tid].shared_tasklet_list, (struct mt_list *)&tl->list) == 1) { + _HA_ATOMIC_ADD(&tasks_run_queue, 1); + if (sleeping_thread_mask & (1 << tl->tid)) { + _HA_ATOMIC_AND(&sleeping_thread_mask, ~(1 << tl->tid)); + wake_thread(tl->tid); + } } } @@ -243,23 +250,25 @@ static inline void tasklet_wakeup(struct tasklet *tl) */ static inline void tasklet_insert_into_tasklet_list(struct tasklet *tl) { - if (MT_LIST_ADDQ(&sched->task_list, &tl->list) == 1) - _HA_ATOMIC_ADD(&tasks_run_queue, 1); + _HA_ATOMIC_ADD(&tasks_run_queue, 1); + LIST_ADDQ(&sched->task_list, &tl->list); } /* Remove the tasklet from the tasklet list. The tasklet MUST already be there. * If unsure, use tasklet_remove_from_tasklet_list() instead. If used with a * plain task, the caller must update the task_list_size. + * This should only be used by the thread that owns the tasklet, any other + * thread should use tasklet_cancel(). */ static inline void __tasklet_remove_from_tasklet_list(struct tasklet *t) { - if (MT_LIST_DEL(&t->list) == 1) - _HA_ATOMIC_SUB(&tasks_run_queue, 1); + LIST_DEL_INIT(&t->list); + _HA_ATOMIC_SUB(&tasks_run_queue, 1); } static inline void tasklet_remove_from_tasklet_list(struct tasklet *t) { - if (likely(!MT_LIST_ISEMPTY(&t->list))) + if (likely(!LIST_ISEMPTY(&t->list))) __tasklet_remove_from_tasklet_list(t); } @@ -290,7 +299,7 @@ static inline void tasklet_init(struct tasklet *t) t->state = 0; t->process = NULL; t->tid = tid; - MT_LIST_INIT(&t->list); + LIST_INIT(&t->list); } static inline struct tasklet *tasklet_new(void) @@ -359,11 +368,12 @@ static inline void task_destroy(struct task *t) t->process = NULL; } +/* Should only be called by the thread responsible for the tasklet */ static inline void tasklet_free(struct tasklet *tl) { - if (!MT_LIST_ISEMPTY(&tl->list)) { - if(MT_LIST_DEL(&tl->list) == 1) - _HA_ATOMIC_SUB(&tasks_run_queue, 1); + if (!LIST_ISEMPTY(&tl->list)) { + LIST_DEL(&tl->list); + _HA_ATOMIC_SUB(&tasks_run_queue, 1); } pool_free(pool_head_tasklet, tl); @@ -545,7 +555,7 @@ static inline int thread_has_tasks(void) { return (!!(global_tasks_mask & tid_bit) | (sched->rqueue_size > 0) | - !MT_LIST_ISEMPTY(&sched->task_list)); + !LIST_ISEMPTY(&sched->task_list) | !MT_LIST_ISEMPTY(&sched->shared_tasklet_list)); } /* adds list item to work list and wake up the associated task */ diff --git a/include/types/task.h b/include/types/task.h index 40304eefd7..fbefeef426 100644 --- a/include/types/task.h +++ b/include/types/task.h @@ -61,7 +61,8 @@ struct notification { struct task_per_thread { struct eb_root timers; /* tree constituting the per-thread wait queue */ struct eb_root rqueue; /* tree constituting the per-thread run queue */ - struct mt_list task_list; /* List of tasks to be run, mixing tasks and tasklets */ + struct list task_list; /* List of tasks to be run, mixing tasks and tasklets */ + struct mt_list shared_tasklet_list; /* Tasklet to be run, woken up by other threads */ int task_list_size; /* Number of tasks in the task_list */ int rqueue_size; /* Number of elements in the per-thread run queue */ struct task *current; /* current task (not tasklet) */ @@ -95,7 +96,7 @@ struct task { /* lightweight tasks, without priority, mainly used for I/Os */ struct tasklet { TASK_COMMON; /* must be at the beginning! */ - struct mt_list list; + struct list list; int tid; /* TID of the tasklet owner */ }; diff --git a/src/debug.c b/src/debug.c index 1a1301c4fa..5c4aa880ae 100644 --- a/src/debug.c +++ b/src/debug.c @@ -57,7 +57,8 @@ void ha_thread_dump(struct buffer *buf, int thr, int calling_tid) !!(global_tasks_mask & thr_bit), !eb_is_empty(&task_per_thread[thr].timers), !eb_is_empty(&task_per_thread[thr].rqueue), - !MT_LIST_ISEMPTY(&task_per_thread[thr].task_list), + !(LIST_ISEMPTY(&task_per_thread[thr].task_list) | + MT_LIST_ISEMPTY(&task_per_thread[thr].shared_tasklet_list)), task_per_thread[thr].task_list_size, task_per_thread[thr].rqueue_size, stuck, diff --git a/src/task.c b/src/task.c index 9ac02117b0..abf1583b3d 100644 --- a/src/task.c +++ b/src/task.c @@ -305,6 +305,7 @@ void process_runnable_tasks() struct eb32sc_node *grq = NULL; // next global run queue entry struct task *t; int max_processed; + struct mt_list *tmp_list; ti->flags &= ~TI_FL_STUCK; // this thread is still running @@ -312,6 +313,12 @@ void process_runnable_tasks() activity[tid].empty_rq++; return; } + /* Merge the list of tasklets waken up by other threads to the + * main list. + */ + tmp_list = MT_LIST_BEHEAD(&sched->shared_tasklet_list); + if (tmp_list) + LIST_SPLICE_END_DETACHED(&sched->task_list, (struct list *)tmp_list); tasks_run_queue_cur = tasks_run_queue; /* keep a copy for reporting */ nb_tasks_cur = nb_tasks; @@ -371,10 +378,10 @@ void process_runnable_tasks() #endif /* Make sure the entry doesn't appear to be in a list */ - MT_LIST_INIT(&((struct tasklet *)t)->list); + LIST_INIT(&((struct tasklet *)t)->list); /* And add it to the local task list */ tasklet_insert_into_tasklet_list((struct tasklet *)t); - HA_ATOMIC_ADD(&tt->task_list_size, 1); + tt->task_list_size++; activity[tid].tasksw++; } @@ -384,18 +391,16 @@ void process_runnable_tasks() grq = NULL; } - while (max_processed > 0 && !MT_LIST_ISEMPTY(&tt->task_list)) { + while (max_processed > 0 && !LIST_ISEMPTY(&tt->task_list)) { struct task *t; unsigned short state; void *ctx; struct task *(*process)(struct task *t, void *ctx, unsigned short state); - t = (struct task *)MT_LIST_POP(&tt->task_list, struct tasklet *, list); - if (!t) - break; - _HA_ATOMIC_SUB(&tasks_run_queue, 1); + t = (struct task *)LIST_ELEM(task_per_thread[tid].task_list.n, struct tasklet *, list); state = _HA_ATOMIC_XCHG(&t->state, TASK_RUNNING); __ha_barrier_atomic_store(); + __tasklet_remove_from_tasklet_list((struct tasklet *)t); ti->flags &= ~TI_FL_STUCK; // this thread is still running activity[tid].ctxsw++; @@ -411,7 +416,7 @@ void process_runnable_tasks() /* OK then this is a regular task */ - _HA_ATOMIC_SUB(&tt->task_list_size, 1); + tt->task_list_size--; if (unlikely(t->call_date)) { uint64_t now_ns = now_mono_time(); @@ -456,7 +461,7 @@ void process_runnable_tasks() max_processed--; } - if (!MT_LIST_ISEMPTY(&tt->task_list)) + if (!LIST_ISEMPTY(&tt->task_list)) activity[tid].long_rq++; } @@ -560,7 +565,8 @@ static void init_task() #endif memset(&task_per_thread, 0, sizeof(task_per_thread)); for (i = 0; i < MAX_THREADS; i++) { - MT_LIST_INIT(&task_per_thread[i].task_list); + LIST_INIT(&task_per_thread[i].task_list); + MT_LIST_INIT(&task_per_thread[i].shared_tasklet_list); } }