]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MAJOR: task: task scheduler rework.
authorEmeric Brun <ebrun@haproxy.com>
Thu, 30 Mar 2017 13:37:25 +0000 (15:37 +0200)
committerWilly Tarreau <w@1wt.eu>
Tue, 27 Jun 2017 12:38:02 +0000 (14:38 +0200)
In order to authorize call of task_wakeup on running task:
- from within the task handler itself.
- in futur, from another thread.

The lookups on runqueue and waitqueue are re-worked
to prepare multithread stuff.

If task_wakeup is called on a running task, the woken
message flags are savec in the 'pending_state' attribute of
the state. The real wakeup is postponed at the end of the handler
process and the woken messages are copied from pending_state
to the state attribute of the task.

It's important to note that this change will cause a very minor
(though measurable) performance loss but it is necessary to make
forward progress on a multi-threaded scheduler. Most users won't
ever notice.

include/proto/task.h
include/types/task.h
src/task.c

index 70fd0b31c2fdb470a99f40746f38349c400137bf..e510cd9ec0122e175250bd621e804b67e776454d 100644 (file)
@@ -85,8 +85,6 @@ extern unsigned int tasks_run_queue_cur;
 extern unsigned int nb_tasks_cur;
 extern unsigned int niced_tasks;  /* number of niced tasks in the run queue */
 extern struct pool_head *pool2_task;
-extern struct eb32_node *last_timer;   /* optimization: last queued timer */
-extern struct eb32_node *rq_next;    /* optimization: next task except if delete/insert */
 
 /* return 0 if task is in run queue, otherwise non-zero */
 static inline int task_in_rq(struct task *t)
@@ -104,6 +102,13 @@ static inline int task_in_wq(struct task *t)
 struct task *__task_wakeup(struct task *t);
 static inline struct task *task_wakeup(struct task *t, unsigned int f)
 {
+       /* If task is running, we postpone the call
+        * and backup the state.
+        */
+       if (unlikely(t->state & TASK_RUNNING)) {
+               t->pending_state |= f;
+               return t;
+       }
        if (likely(!task_in_rq(t)))
                __task_wakeup(t);
        t->state |= f;
@@ -119,8 +124,6 @@ static inline struct task *task_wakeup(struct task *t, unsigned int f)
 static inline struct task *__task_unlink_wq(struct task *t)
 {
        eb32_delete(&t->wq);
-       if (last_timer == &t->wq)
-               last_timer = NULL;
        return t;
 }
 
@@ -153,8 +156,6 @@ static inline struct task *__task_unlink_rq(struct task *t)
 static inline struct task *task_unlink_rq(struct task *t)
 {
        if (likely(task_in_rq(t))) {
-               if (&t->rq == rq_next)
-                       rq_next = eb32_next(rq_next);
                __task_unlink_rq(t);
        }
        return t;
@@ -180,7 +181,7 @@ static inline struct task *task_init(struct task *t)
 {
        t->wq.node.leaf_p = NULL;
        t->rq.node.leaf_p = NULL;
-       t->state = TASK_SLEEPING;
+       t->pending_state = t->state = TASK_SLEEPING;
        t->nice = 0;
        t->calls = 0;
        return t;
index 0379a22b857cf373d2693e30bb0e9a1e98e0cbb7..6dfae98acd0851f995b3b3d35911893c54638f75 100644 (file)
@@ -54,6 +54,7 @@
 struct task {
        struct eb32_node rq;            /* ebtree node used to hold the task in the run queue */
        unsigned short state;           /* task state : bit field of TASK_* */
+       unsigned short pending_state;   /* pending states for running talk */
        short nice;                     /* the task's current nice value from -1024 to +1024 */
        unsigned int calls;             /* number of times ->process() was called */
        struct task * (*process)(struct task *t);  /* the function which processes the task */
index c99cea89cdd1ebacb442cbd3821303c170feb942..63b46c87e5d7078ef924a4b6a805bdf03d667b14 100644 (file)
@@ -30,8 +30,7 @@ unsigned int tasks_run_queue = 0;
 unsigned int tasks_run_queue_cur = 0;    /* copy of the run queue size */
 unsigned int nb_tasks_cur = 0;     /* copy of the tasks count */
 unsigned int niced_tasks = 0;      /* number of niced tasks in the run queue */
-struct eb32_node *last_timer = NULL;  /* optimization: last queued timer */
-struct eb32_node *rq_next    = NULL;  /* optimization: next task except if delete/insert */
+
 
 static struct eb_root timers;      /* sorted timers tree */
 static struct eb_root rqueue;      /* tree constituting the run queue */
@@ -61,11 +60,12 @@ struct task *__task_wakeup(struct task *t)
                t->rq.key += offset;
        }
 
-       /* clear state flags at the same time */
-       t->state &= ~TASK_WOKEN_ANY;
-
+       /* reset flag to pending ones
+        * Note: __task_wakeup must not be called
+        * if task is running
+        */
+       t->state = t->pending_state;
        eb32_insert(&rqueue, &t->rq);
-       rq_next = NULL;
        return t;
 }
 
@@ -95,25 +95,8 @@ void __task_queue(struct task *task)
                return;
 #endif
 
-       if (likely(last_timer &&
-                  last_timer->node.bit < 0 &&
-                  last_timer->key == task->wq.key &&
-                  last_timer->node.node_p)) {
-               /* Most often, last queued timer has the same expiration date, so
-                * if it's not queued at the root, let's queue a dup directly there.
-                * Note that we can only use dups at the dup tree's root (most
-                * negative bit).
-                */
-               eb_insert_dup(&last_timer->node, &task->wq.node);
-               if (task->wq.node.bit < last_timer->node.bit)
-                       last_timer = &task->wq;
-               return;
-       }
        eb32_insert(&timers, &task->wq);
 
-       /* Make sure we don't assign the last_timer to a node-less entry */
-       if (task->wq.node.node_p && (!last_timer || (task->wq.node.bit < last_timer->node.bit)))
-               last_timer = &task->wq;
        return;
 }
 
@@ -126,8 +109,8 @@ int wake_expired_tasks()
        struct task *task;
        struct eb32_node *eb;
 
-       eb = eb32_lookup_ge(&timers, now_ms - TIMER_LOOK_BACK);
        while (1) {
+               eb = eb32_lookup_ge(&timers, now_ms - TIMER_LOOK_BACK);
                if (unlikely(!eb)) {
                        /* we might have reached the end of the tree, typically because
                        * <now_ms> is in the first half and we're first scanning the last
@@ -145,7 +128,6 @@ int wake_expired_tasks()
 
                /* timer looks expired, detach it from the queue */
                task = eb32_entry(eb, struct task, wq);
-               eb = eb32_next(eb);
                __task_unlink_wq(task);
 
                /* It is possible that this task was left at an earlier place in the
@@ -165,8 +147,6 @@ int wake_expired_tasks()
                        if (!tick_isset(task->expire))
                                continue;
                        __task_queue(task);
-                       if (!eb || eb->key > task->wq.key)
-                               eb = &task->wq;
                        continue;
                }
                task_wakeup(task, TASK_WOKEN_TIMER);
@@ -189,12 +169,15 @@ int wake_expired_tasks()
 void process_runnable_tasks()
 {
        struct task *t;
-       unsigned int max_processed;
-
+       int i;
+       int max_processed;
+       struct eb32_node *rq_next;
+       int rewind;
+       struct task *local_tasks[16];
+       int local_tasks_count;
        tasks_run_queue_cur = tasks_run_queue; /* keep a copy for reporting */
        nb_tasks_cur = nb_tasks;
        max_processed = tasks_run_queue;
-
        if (!tasks_run_queue)
                return;
 
@@ -204,45 +187,75 @@ void process_runnable_tasks()
        if (likely(niced_tasks))
                max_processed = (max_processed + 3) / 4;
 
-       while (max_processed--) {
+       while (max_processed > 0) {
                /* Note: this loop is one of the fastest code path in
                 * the whole program. It should not be re-arranged
                 * without a good reason.
                 */
-               if (unlikely(!rq_next)) {
-                       rq_next = eb32_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK);
+
+               rewind = 0;
+               rq_next = eb32_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK);
+               if (!rq_next) {
+                       /* we might have reached the end of the tree, typically because
+                        * <rqueue_ticks> is in the first half and we're first scanning
+                        * the last half. Let's loop back to the beginning of the tree now.
+                        */
+                       rq_next = eb32_first(&rqueue);
                        if (!rq_next) {
-                               /* we might have reached the end of the tree, typically because
-                                * <rqueue_ticks> is in the first half and we're first scanning
-                                * the last half. Let's loop back to the beginning of the tree now.
-                                */
-                               rq_next = eb32_first(&rqueue);
-                               if (!rq_next)
+                               break;
+                       }
+                       rewind = 1;
+               }
+
+               local_tasks_count = 0;
+               while (local_tasks_count < 16) {
+                       t = eb32_entry(rq_next, struct task, rq);
+                       rq_next = eb32_next(rq_next);
+                       /* detach the task from the queue */
+                       __task_unlink_rq(t);
+                       t->state |= TASK_RUNNING;
+                       t->pending_state = 0;
+                       t->calls++;
+                       local_tasks[local_tasks_count++] = t;
+                       if (!rq_next) {
+                               if (rewind || !(rq_next = eb32_first(&rqueue))) {
                                        break;
+                               }
+                               rewind = 1;
                        }
                }
 
-               /* detach the task from the queue after updating the pointer to
-                * the next entry.
-                */
-               t = eb32_entry(rq_next, struct task, rq);
-               rq_next = eb32_next(rq_next);
-               __task_unlink_rq(t);
+               if (!local_tasks_count)
+                       break;
 
-               t->state |= TASK_RUNNING;
-               /* This is an optimisation to help the processor's branch
-                * predictor take this most common call.
-                */
-               t->calls++;
-               if (likely(t->process == process_stream))
-                       t = process_stream(t);
-               else
-                       t = t->process(t);
 
-               if (likely(t != NULL)) {
-                       t->state &= ~TASK_RUNNING;
-                       if (t->expire)
-                               task_queue(t);
+               for (i = 0; i < local_tasks_count ; i++) {
+                       t = local_tasks[i];
+                       /* This is an optimisation to help the processor's branch
+                        * predictor take this most common call.
+                        */
+                       if (likely(t->process == process_stream))
+                               t = process_stream(t);
+                       else
+                               t = t->process(t);
+                       local_tasks[i] = t;
+               }
+
+               max_processed -= local_tasks_count;
+               for (i = 0; i < local_tasks_count ; i++) {
+                       t = local_tasks[i];
+                       if (likely(t != NULL)) {
+                               t->state &= ~TASK_RUNNING;
+                               /* If there is a pending state
+                                * we have to wake up the task
+                                * immediatly, else we defer
+                                * it into wait queue
+                                */
+                               if (t->pending_state)
+                                       __task_wakeup(t);
+                               else
+                                       task_queue(t);
+                       }
                }
        }
 }