From: Willy Tarreau Date: Thu, 30 Jan 2020 17:13:13 +0000 (+0100) Subject: MINOR: tasks: move the list walking code to its own function X-Git-Tag: v2.2-dev2~57 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=4ffa0b526a52a026fbabfa72f1b393ece1889dc2;p=thirdparty%2Fhaproxy.git MINOR: tasks: move the list walking code to its own function New function run_tasks_from_list() will run over a tasklet list and will run all the tasks and tasklets it finds there within a limit of that is passed in arggument. This is a preliminary work for scheduler QoS improvements. --- diff --git a/src/task.c b/src/task.c index 0c26063bc7..a3c581a19a 100644 --- a/src/task.c +++ b/src/task.c @@ -315,6 +315,85 @@ int next_timer_expiry() return ret; } +/* Walks over tasklet list and run at most of them. Returns + * the number of entries effectively processed (tasks and tasklets merged). + * The count of tasks in the list for the current thread is adjusted. + */ +static int run_tasks_from_list(struct list *list, int max) +{ + struct task *(*process)(struct task *t, void *ctx, unsigned short state); + struct task *t; + unsigned short state; + void *ctx; + int done = 0; + + while (done < max && !LIST_ISEMPTY(list)) { + t = (struct task *)LIST_ELEM(list->n, struct tasklet *, list); + state = (t->state & TASK_SHARED_WQ) | TASK_RUNNING; + state = _HA_ATOMIC_XCHG(&t->state, state); + __ha_barrier_atomic_store(); + __tasklet_remove_from_tasklet_list((struct tasklet *)t); + + ti->flags &= ~TI_FL_STUCK; // this thread is still running + activity[tid].ctxsw++; + ctx = t->context; + process = t->process; + t->calls++; + + if (TASK_IS_TASKLET(t)) { + process(NULL, ctx, state); + done++; + continue; + } + + /* OK then this is a regular task */ + + task_per_thread[tid].task_list_size--; + if (unlikely(t->call_date)) { + uint64_t now_ns = now_mono_time(); + + t->lat_time += now_ns - t->call_date; + t->call_date = now_ns; + } + + sched->current = t; + __ha_barrier_store(); + if (likely(process == process_stream)) + t = process_stream(t, ctx, state); + else if (process != NULL) + t = process(t, ctx, state); + else { + __task_free(t); + sched->current = NULL; + __ha_barrier_store(); + /* We don't want max_processed to be decremented if + * we're just freeing a destroyed task, we should only + * do so if we really ran a task. + */ + continue; + } + sched->current = NULL; + __ha_barrier_store(); + /* If there is a pending state we have to wake up the task + * immediately, else we defer it into wait queue + */ + if (t != NULL) { + if (unlikely(t->call_date)) { + t->cpu_time += now_mono_time() - t->call_date; + t->call_date = 0; + } + + state = _HA_ATOMIC_AND(&t->state, ~TASK_RUNNING); + if (state & TASK_WOKEN_ANY) + task_wakeup(t, 0); + else + task_queue(t); + } + done++; + } + return done; +} + /* The run queue is chronologically sorted in a tree. An insertion counter is * used to assign a position to each task. This counter may be combined with * other variables (eg: nice value) to set the final position in the tree. The @@ -334,7 +413,7 @@ void process_runnable_tasks() struct eb32sc_node *lrq = NULL; // next local run queue entry struct eb32sc_node *grq = NULL; // next global run queue entry struct task *t; - int max_processed; + int max_processed, done; struct mt_list *tmp_list; ti->flags &= ~TI_FL_STUCK; // this thread is still running @@ -421,76 +500,8 @@ void process_runnable_tasks() grq = NULL; } - while (max_processed > 0 && !LIST_ISEMPTY(&tt->task_list)) { - struct task *t; - unsigned short state; - void *ctx; - struct task *(*process)(struct task *t, void *ctx, unsigned short state); - - t = (struct task *)LIST_ELEM(task_per_thread[tid].task_list.n, struct tasklet *, list); - state = (t->state & TASK_SHARED_WQ) | TASK_RUNNING; - state = _HA_ATOMIC_XCHG(&t->state, state); - __ha_barrier_atomic_store(); - __tasklet_remove_from_tasklet_list((struct tasklet *)t); - - ti->flags &= ~TI_FL_STUCK; // this thread is still running - activity[tid].ctxsw++; - ctx = t->context; - process = t->process; - t->calls++; - - if (TASK_IS_TASKLET(t)) { - process(NULL, ctx, state); - max_processed--; - continue; - } - - /* OK then this is a regular task */ - - tt->task_list_size--; - if (unlikely(t->call_date)) { - uint64_t now_ns = now_mono_time(); - - t->lat_time += now_ns - t->call_date; - t->call_date = now_ns; - } - - sched->current = t; - __ha_barrier_store(); - if (likely(process == process_stream)) - t = process_stream(t, ctx, state); - else if (process != NULL) - t = process(t, ctx, state); - else { - __task_free(t); - sched->current = NULL; - __ha_barrier_store(); - /* We don't want max_processed to be decremented if - * we're just freeing a destroyed task, we should only - * do so if we really ran a task. - */ - continue; - } - sched->current = NULL; - __ha_barrier_store(); - /* If there is a pending state we have to wake up the task - * immediately, else we defer it into wait queue - */ - if (t != NULL) { - if (unlikely(t->call_date)) { - t->cpu_time += now_mono_time() - t->call_date; - t->call_date = 0; - } - - state = _HA_ATOMIC_AND(&t->state, ~TASK_RUNNING); - if (state & TASK_WOKEN_ANY) - task_wakeup(t, 0); - else - task_queue(t); - } - - max_processed--; - } + done = run_tasks_from_list(&tt->task_list, max_processed); + max_processed -= done; if (!LIST_ISEMPTY(&tt->task_list)) activity[tid].long_rq++;