]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: tasks: move the list walking code to its own function
authorWilly Tarreau <w@1wt.eu>
Thu, 30 Jan 2020 17:13:13 +0000 (18:13 +0100)
committerWilly Tarreau <w@1wt.eu>
Thu, 30 Jan 2020 17:13:13 +0000 (18:13 +0100)
New function run_tasks_from_list() will run over a tasklet list and will
run all the tasks and tasklets it finds there within a limit of <max>
that is passed in arggument. This is a preliminary work for scheduler QoS
improvements.

src/task.c

index 0c26063bc71234198c016333255031f18210eea0..a3c581a19a55494a40d78371493d841ffa543de8 100644 (file)
@@ -315,6 +315,85 @@ int next_timer_expiry()
        return ret;
 }
 
+/* Walks over tasklet list <list> and run at most <max> of them. Returns
+ * the number of entries effectively processed (tasks and tasklets merged).
+ * The count of tasks in the list for the current thread is adjusted.
+ */
+static int run_tasks_from_list(struct list *list, int max)
+{
+       struct task *(*process)(struct task *t, void *ctx, unsigned short state);
+       struct task *t;
+       unsigned short state;
+       void *ctx;
+       int done = 0;
+
+       while (done < max && !LIST_ISEMPTY(list)) {
+               t = (struct task *)LIST_ELEM(list->n, struct tasklet *, list);
+               state = (t->state & TASK_SHARED_WQ) | TASK_RUNNING;
+               state = _HA_ATOMIC_XCHG(&t->state, state);
+               __ha_barrier_atomic_store();
+               __tasklet_remove_from_tasklet_list((struct tasklet *)t);
+
+               ti->flags &= ~TI_FL_STUCK; // this thread is still running
+               activity[tid].ctxsw++;
+               ctx = t->context;
+               process = t->process;
+               t->calls++;
+
+               if (TASK_IS_TASKLET(t)) {
+                       process(NULL, ctx, state);
+                       done++;
+                       continue;
+               }
+
+               /* OK then this is a regular task */
+
+               task_per_thread[tid].task_list_size--;
+               if (unlikely(t->call_date)) {
+                       uint64_t now_ns = now_mono_time();
+
+                       t->lat_time += now_ns - t->call_date;
+                       t->call_date = now_ns;
+               }
+
+               sched->current = t;
+               __ha_barrier_store();
+               if (likely(process == process_stream))
+                       t = process_stream(t, ctx, state);
+               else if (process != NULL)
+                       t = process(t, ctx, state);
+               else {
+                       __task_free(t);
+                       sched->current = NULL;
+                       __ha_barrier_store();
+                       /* We don't want max_processed to be decremented if
+                        * we're just freeing a destroyed task, we should only
+                        * do so if we really ran a task.
+                        */
+                       continue;
+               }
+               sched->current = NULL;
+               __ha_barrier_store();
+               /* If there is a pending state  we have to wake up the task
+                * immediately, else we defer it into wait queue
+                */
+               if (t != NULL) {
+                       if (unlikely(t->call_date)) {
+                               t->cpu_time += now_mono_time() - t->call_date;
+                               t->call_date = 0;
+                       }
+
+                       state = _HA_ATOMIC_AND(&t->state, ~TASK_RUNNING);
+                       if (state & TASK_WOKEN_ANY)
+                               task_wakeup(t, 0);
+                       else
+                               task_queue(t);
+               }
+               done++;
+       }
+       return done;
+}
+
 /* The run queue is chronologically sorted in a tree. An insertion counter is
  * used to assign a position to each task. This counter may be combined with
  * other variables (eg: nice value) to set the final position in the tree. The
@@ -334,7 +413,7 @@ void process_runnable_tasks()
        struct eb32sc_node *lrq = NULL; // next local run queue entry
        struct eb32sc_node *grq = NULL; // next global run queue entry
        struct task *t;
-       int max_processed;
+       int max_processed, done;
        struct mt_list *tmp_list;
 
        ti->flags &= ~TI_FL_STUCK; // this thread is still running
@@ -421,76 +500,8 @@ void process_runnable_tasks()
                grq = NULL;
        }
 
-       while (max_processed > 0 && !LIST_ISEMPTY(&tt->task_list)) {
-               struct task *t;
-               unsigned short state;
-               void *ctx;
-               struct task *(*process)(struct task *t, void *ctx, unsigned short state);
-
-               t = (struct task *)LIST_ELEM(task_per_thread[tid].task_list.n, struct tasklet *, list);
-               state = (t->state & TASK_SHARED_WQ) | TASK_RUNNING;
-               state = _HA_ATOMIC_XCHG(&t->state, state);
-               __ha_barrier_atomic_store();
-               __tasklet_remove_from_tasklet_list((struct tasklet *)t);
-
-               ti->flags &= ~TI_FL_STUCK; // this thread is still running
-               activity[tid].ctxsw++;
-               ctx = t->context;
-               process = t->process;
-               t->calls++;
-
-               if (TASK_IS_TASKLET(t)) {
-                       process(NULL, ctx, state);
-                       max_processed--;
-                       continue;
-               }
-
-               /* OK then this is a regular task */
-
-               tt->task_list_size--;
-               if (unlikely(t->call_date)) {
-                       uint64_t now_ns = now_mono_time();
-
-                       t->lat_time += now_ns - t->call_date;
-                       t->call_date = now_ns;
-               }
-
-               sched->current = t;
-               __ha_barrier_store();
-               if (likely(process == process_stream))
-                       t = process_stream(t, ctx, state);
-               else if (process != NULL)
-                       t = process(t, ctx, state);
-               else {
-                       __task_free(t);
-                       sched->current = NULL;
-                       __ha_barrier_store();
-                       /* We don't want max_processed to be decremented if
-                        * we're just freeing a destroyed task, we should only
-                        * do so if we really ran a task.
-                        */
-                       continue;
-               }
-               sched->current = NULL;
-               __ha_barrier_store();
-               /* If there is a pending state  we have to wake up the task
-                * immediately, else we defer it into wait queue
-                */
-               if (t != NULL) {
-                       if (unlikely(t->call_date)) {
-                               t->cpu_time += now_mono_time() - t->call_date;
-                               t->call_date = 0;
-                       }
-
-                       state = _HA_ATOMIC_AND(&t->state, ~TASK_RUNNING);
-                       if (state & TASK_WOKEN_ANY)
-                               task_wakeup(t, 0);
-                       else
-                               task_queue(t);
-               }
-
-               max_processed--;
-       }
+       done = run_tasks_from_list(&tt->task_list, max_processed);
+       max_processed -= done;
 
        if (!LIST_ISEMPTY(&tt->task_list))
                activity[tid].long_rq++;