]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MAJOR: tasks: Introduce tasklets.
authorOlivier Houchard <ohouchard@haproxy.com>
Fri, 18 May 2018 16:45:28 +0000 (18:45 +0200)
committerWilly Tarreau <w@1wt.eu>
Sat, 26 May 2018 18:03:19 +0000 (20:03 +0200)
Introduce tasklets, lightweight tasks. They have no notion of priority,
they are just run as soon as possible, and will probably be used for I/O
later.

For the moment they're used to replace the temporary thread-local list
that was used in the scheduler. The first part of the struct is common
with tasks so that tasks can be cast to tasklets and queued in this list.
Once a task is in the tasklet list, it has its leaf_p set to 0x1 so that
it cannot accidently be confused as not in the queue.

Pure tasklets are identifiable by their nice value of -32768 (which is
normally not possible).

include/proto/task.h
include/types/task.h
src/task.c

index 59ac38258f2d78fdb321ae3d9a51b6a13e225c99..c8004143bb132b4645a587d0e3328f62b60cadac 100644 (file)
@@ -89,11 +89,14 @@ extern unsigned int tasks_run_queue_cur;
 extern unsigned int nb_tasks_cur;
 extern unsigned int niced_tasks;  /* number of niced tasks in the run queue */
 extern struct pool_head *pool_head_task;
+extern struct pool_head *pool_head_tasklet;
 extern struct pool_head *pool_head_notification;
 extern THREAD_LOCAL struct task *curr_task; /* task currently running or NULL */
 extern THREAD_LOCAL struct eb32sc_node *rq_next; /* Next task to be potentially run */
 extern struct eb_root rqueue;      /* tree constituting the run queue */
 extern struct eb_root rqueue_local[MAX_THREADS]; /* tree constituting the per-thread run queue */
+extern struct list task_list[MAX_THREADS]; /* List of tasks to be run, mixing tasks and tasklets */
+extern int task_list_size[MAX_THREADS]; /* Number of task sin the task_list */
 
 __decl_hathreads(extern HA_SPINLOCK_T rq_lock);  /* spin lock related to run queue */
 __decl_hathreads(extern HA_SPINLOCK_T wq_lock);  /* spin lock related to wait queue */
@@ -101,7 +104,10 @@ __decl_hathreads(extern HA_SPINLOCK_T wq_lock);  /* spin lock related to wait qu
 /* return 0 if task is in run queue, otherwise non-zero */
 static inline int task_in_rq(struct task *t)
 {
-       return t->rq.node.leaf_p != NULL;
+       /* Check if leaf_p is NULL, in case he's not in the runqueue, and if
+        * it's not 0x1, which would mean it's in the tasklet list.
+        */
+       return t->rq.node.leaf_p != NULL && t->rq.node.leaf_p != (void *)0x1;
 }
 
 /* return 0 if task is in wait queue, otherwise non-zero */
@@ -122,7 +128,7 @@ static inline void task_wakeup(struct task *t, unsigned int f)
 #ifdef USE_THREAD
        struct eb_root *root;
 
-       if (t->thread_mask == tid_bit && global.nbthread > 1)
+       if (t->thread_mask == tid_bit || global.nbthread == 1)
                root = &rqueue_local[tid];
        else
                root = &rqueue;
@@ -172,7 +178,6 @@ static inline struct task *task_unlink_wq(struct task *t)
 static inline struct task *__task_unlink_rq(struct task *t)
 {
        eb32sc_delete(&t->rq);
-       HA_ATOMIC_SUB(&tasks_run_queue, 1);
        if (likely(t->nice))
                HA_ATOMIC_SUB(&niced_tasks, 1);
        return t;
@@ -195,6 +200,41 @@ static inline struct task *task_unlink_rq(struct task *t)
        return t;
 }
 
+static inline void tasklet_wakeup(struct tasklet *tl)
+{
+       LIST_ADDQ(&task_list[tid], &tl->list);
+       task_list_size[tid]++;
+       HA_ATOMIC_ADD(&tasks_run_queue, 1);
+
+}
+
+static inline void task_insert_into_tasklet_list(struct task *t)
+{
+       struct tasklet *tl;
+       void *expected = NULL;
+
+       /* Protect ourself against anybody trying to insert the task into
+        * another runqueue. We set leaf_p to 0x1 to indicate that the node is
+        * not in a tree but that it's in the tasklet list. See task_in_rq().
+        */
+       if (unlikely(!HA_ATOMIC_CAS(&t->rq.node.leaf_p, &expected, 0x1)))
+               return;
+       task_list_size[tid]++;
+       tl = (struct tasklet *)t;
+       LIST_ADDQ(&task_list[tid], &tl->list);
+}
+
+static inline void task_remove_from_task_list(struct task *t)
+{
+       LIST_DEL(&((struct tasklet *)t)->list);
+       task_list_size[tid]--;
+       HA_ATOMIC_SUB(&tasks_run_queue, 1);
+       if (!TASK_IS_TASKLET(t)) {
+               t->rq.node.leaf_p = NULL; // was 0x1
+               __ha_barrier_store();
+       }
+}
+
 /*
  * Unlinks the task and adjusts run queue stats.
  * A pointer to the task itself is returned.
@@ -223,6 +263,24 @@ static inline struct task *task_init(struct task *t, unsigned long thread_mask)
        return t;
 }
 
+static inline void tasklet_init(struct tasklet *t)
+{
+       t->nice = -32768;
+       t->calls = 0;
+       t->state = 0;
+       t->list.p = t->list.n = NULL;
+}
+
+static inline struct tasklet *tasklet_new(void)
+{
+       struct tasklet *t = pool_alloc(pool_head_tasklet);
+
+       if (t) {
+               tasklet_init(t);
+       }
+       return t;
+}
+
 /*
  * Allocate and initialise a new task. The new task is returned, or NULL in
  * case of lack of memory. The task count is incremented. Tasks should only
@@ -262,6 +320,13 @@ static inline void task_free(struct task *t)
 }
 
 
+static inline void tasklet_free(struct tasklet *tl)
+{
+       pool_free(pool_head_tasklet, tl);
+       if (unlikely(stopping))
+               pool_flush(pool_head_tasklet);
+}
+
 /* Place <task> into the wait queue, where it may already be. If the expiration
  * timer is infinite, do nothing and rely on wake_expired_task to clean up.
  */
index cff2351553c48918c7fb1e9495f0246fca17201c..be7b6f3ab3b3ba0806e971ef4b1d2637c84657f6 100644 (file)
@@ -60,20 +60,35 @@ struct notification {
        __decl_hathreads(HA_SPINLOCK_T lock);
 };
 
+/* This part is common between struct task and struct tasklet so that tasks
+ * can be used as-is as tasklets.
+ */
+#define TASK_COMMON                                                    \
+       struct {                                                        \
+               unsigned short state; /* task state : bitfield of TASK_ */ \
+               short nice; /* task prio from -1024 to +1024, or -32768 for tasklets */ \
+               unsigned int calls; /* number of times process was called */ \
+               struct task *(*process)(struct task *t, void *ctx, unsigned short state); /* the function which processes the task */ \
+               void *context; /* the task's context */                 \
+       }
+
 /* The base for all tasks */
 struct task {
+       TASK_COMMON;                    /* must be at the beginning! */
        struct eb32sc_node rq;          /* ebtree node used to hold the task in the run queue */
-       unsigned short state;           /* task state : bit field of TASK_* */
-       unsigned short pending_state;   /* pending states for running talk */
-       short nice;                     /* the task's current nice value from -1024 to +1024 */
-       unsigned int calls;             /* number of times ->process() was called */
-       struct task * (*process)(struct task *t, void *ctx, unsigned short state);  /* the function which processes the task */
-       void *context;                  /* the task's context */
        struct eb32_node wq;            /* ebtree node used to hold the task in the wait queue */
        int expire;                     /* next expiration date for this task, in ticks */
        unsigned long thread_mask;      /* mask of thread IDs authorized to process the task */
 };
 
+/* lightweight tasks, without priority, mainly used for I/Os */
+struct tasklet {
+       TASK_COMMON;                    /* must be at the beginning! */
+       struct list list;
+};
+
+#define TASK_IS_TASKLET(t) ((t)->nice == -32768)
+
 /*
  * The task callback (->process) is responsible for updating ->expire. It must
  * return a pointer to the task itself, except if the task has been deleted, in
index 876b837e8bc33ee74b974ada123aded7e63eb7cb..3032010ba04c3e4693e78f089b7d9a62e2774425 100644 (file)
@@ -25,6 +25,7 @@
 #include <proto/task.h>
 
 struct pool_head *pool_head_task;
+struct pool_head *pool_head_tasklet;
 
 /* This is the memory pool containing all the signal structs. These
  * struct are used to store each requiered signal between two tasks.
@@ -41,6 +42,9 @@ unsigned int niced_tasks = 0;      /* number of niced tasks in the run queue */
 THREAD_LOCAL struct task *curr_task = NULL; /* task currently running or NULL */
 THREAD_LOCAL struct eb32sc_node *rq_next = NULL; /* Next task to be potentially run */
 
+struct list task_list[MAX_THREADS]; /* List of tasks to be run, mixing tasks and tasklets */
+int task_list_size[MAX_THREADS]; /* Number of tasks in the task_list */
+
 __decl_hathreads(HA_SPINLOCK_T __attribute__((aligned(64))) rq_lock); /* spin lock related to run queue */
 __decl_hathreads(HA_SPINLOCK_T __attribute__((aligned(64))) wq_lock); /* spin lock related to wait queue */
 
@@ -240,20 +244,32 @@ void process_runnable_tasks()
        tasks_run_queue_cur = tasks_run_queue; /* keep a copy for reporting */
        nb_tasks_cur = nb_tasks;
        max_processed = 200;
-       if (unlikely(global.nbthread <= 1)) {
-               /* when no lock is needed, this loop is much faster */
+
+       if (likely(global.nbthread > 1)) {
+               HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
                if (!(active_tasks_mask & tid_bit)) {
+                       HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
                        activity[tid].empty_rq++;
                        return;
                }
 
-               active_tasks_mask &= ~tid_bit;
-               rq_next = eb32sc_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit);
-               while (1) {
-                       if (!rq_next) {
-                               /* we might have reached the end of the tree, typically because
-                                * <rqueue_ticks> is in the first half and we're first scanning
-                                * the last half. Let's loop back to the beginning of the tree now.
+               average = tasks_run_queue / global.nbthread;
+
+               /* Get some elements from the global run queue and put it in the
+                * local run queue. To try to keep a bit of fairness, just get as
+                * much elements from the global list as to have a bigger local queue
+                * than the average.
+                */
+               while (rqueue_size[tid] <= average) {
+
+                       /* we have to restart looking up after every batch */
+                       rq_next = eb32sc_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit);
+                       if (unlikely(!rq_next)) {
+                               /* either we just started or we reached the end
+                                * of the tree, typically because <rqueue_ticks>
+                                * is in the first half and we're first scanning
+                                * the last half. Let's loop back to the beginning
+                                * of the tree now.
                                 */
                                rq_next = eb32sc_first(&rqueue, tid_bit);
                                if (!rq_next)
@@ -266,90 +282,22 @@ void process_runnable_tasks()
 
                        /* detach the task from the queue */
                        __task_unlink_rq(t);
-                       t->state |= TASK_RUNNING;
-
-                       t->calls++;
-                       curr_task = t;
-                       /* This is an optimisation to help the processor's branch
-                        * predictor take this most common call.
-                        */
-                       if (likely(t->process == process_stream))
-                               t = process_stream(t, t->context, t->state);
-                       else {
-                               if (t->process != NULL)
-                                       t = t->process(t, t->context, t->state);
-                               else {
-                                       __task_free(t);
-                                       t = NULL;
-                               }
-                       }
-                       curr_task = NULL;
-
-                       if (likely(t != NULL)) {
-                               t->state &= ~TASK_RUNNING;
-                               /* If there is a pending state
-                                * we have to wake up the task
-                                * immediatly, else we defer
-                                * it into wait queue
-                                */
-                               if (t->state)
-                                       __task_wakeup(t, &rqueue);
-                               else
-                                       task_queue(t);
-                       }
-
-                       max_processed--;
-                       if (max_processed <= 0) {
-                               active_tasks_mask |= tid_bit;
-                               activity[tid].long_rq++;
-                               break;
-                       }
+                       __task_wakeup(t, &rqueue_local[tid]);
                }
-               return;
-       }
 
-       HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
-       if (!(active_tasks_mask & tid_bit)) {
                HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
-               activity[tid].empty_rq++;
-               return;
-       }
-
-       average = tasks_run_queue / global.nbthread;
-
-       /* Get some elements from the global run queue and put it in the
-        * local run queue. To try to keep a bit of fairness, just get as
-        * much elements from the global list as to have a bigger local queue
-        * than the average.
-        */
-       while (rqueue_size[tid] <= average) {
-
-               /* we have to restart looking up after every batch */
-               rq_next = eb32sc_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit);
-               if (unlikely(!rq_next)) {
-                       /* either we just started or we reached the end
-                        * of the tree, typically because <rqueue_ticks>
-                        * is in the first half and we're first scanning
-                        * the last half. Let's loop back to the beginning
-                        * of the tree now.
-                        */
-                       rq_next = eb32sc_first(&rqueue, tid_bit);
-                       if (!rq_next)
-                               break;
+       } else {
+               if (!(active_tasks_mask & tid_bit)) {
+                       activity[tid].empty_rq++;
+                       return;
                }
-
-               t = eb32sc_entry(rq_next, struct task, rq);
-               rq_next = eb32sc_next(rq_next, tid_bit);
-
-               /* detach the task from the queue */
-               __task_unlink_rq(t);
-               __task_wakeup(t, &rqueue_local[tid]);
        }
-
-       HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
        active_tasks_mask &= ~tid_bit;
-       while (1) {
-               unsigned short state;
+       /* Get some tasks from the run queue, make sure we don't
+        * get too much in the task list, but put a bit more than
+        * the max that will be run, to give a bit more fairness
+        */
+       while (max_processed + 20 > task_list_size[tid]) {
                /* Note: this loop is one of the fastest code path in
                 * the whole program. It should not be re-arranged
                 * without a good reason.
@@ -370,18 +318,42 @@ void process_runnable_tasks()
                }
                t = eb32sc_entry(rq_next, struct task, rq);
                rq_next = eb32sc_next(rq_next, tid_bit);
+               /* Make sure nobody re-adds the task in the runqueue */
+               HA_ATOMIC_OR(&t->state, TASK_RUNNING);
 
-               state = HA_ATOMIC_XCHG(&t->state, TASK_RUNNING);
                /* detach the task from the queue */
                __task_unlink_rq(t);
-               t->calls++;
-               max_processed--;
+               /* And add it to the local task list */
+               task_insert_into_tasklet_list(t);
+       }
+       while (max_processed > 0 && !LIST_ISEMPTY(&task_list[tid])) {
+               struct task *t;
+               unsigned short state;
+               void *ctx;
+               struct task *(*process)(struct task *t, void *ctx, unsigned short state);
+
+               t = (struct task *)LIST_ELEM(task_list[tid].n, struct tasklet *, list);
+               state = HA_ATOMIC_XCHG(&t->state, TASK_RUNNING);
+               __ha_barrier_store();
+               task_remove_from_task_list(t);
+
+               ctx = t->context;
+               process = t->process;
                rqueue_size[tid]--;
-               curr_task = t;
-               if (likely(t->process == process_stream))
-                       t = process_stream(t, t->context, state);
-               else
-                       t = t->process(t, t->context, state);
+               t->calls++;
+               curr_task = (struct task *)t;
+               if (TASK_IS_TASKLET(t))
+                       t = NULL;
+               if (likely(process == process_stream))
+                       t = process_stream(t, ctx, state);
+               else {
+                       if (t->process != NULL)
+                               t = process(t, ctx, state);
+                       else {
+                               __task_free(t);
+                               t = NULL;
+                       }
+               }
                curr_task = NULL;
                /* If there is a pending state  we have to wake up the task
                 * immediatly, else we defer it into wait queue
@@ -412,11 +384,17 @@ int init_task()
        memset(&rqueue, 0, sizeof(rqueue));
        HA_SPIN_INIT(&wq_lock);
        HA_SPIN_INIT(&rq_lock);
-       for (i = 0; i < MAX_THREADS; i++)
+       for (i = 0; i < MAX_THREADS; i++) {
                memset(&rqueue_local[i], 0, sizeof(rqueue_local[i]));
+               LIST_INIT(&task_list[i]);
+               task_list_size[i] = 0;
+       }
        pool_head_task = create_pool("task", sizeof(struct task), MEM_F_SHARED);
        if (!pool_head_task)
                return 0;
+       pool_head_tasklet = create_pool("tasklet", sizeof(struct tasklet), MEM_F_SHARED);
+       if (!pool_head_tasklet)
+               return 0;
        pool_head_notification = create_pool("notification", sizeof(struct notification), MEM_F_SHARED);
        if (!pool_head_notification)
                return 0;