extern unsigned int nb_tasks_cur;
extern unsigned int niced_tasks; /* number of niced tasks in the run queue */
extern struct pool_head *pool_head_task;
+extern struct pool_head *pool_head_tasklet;
extern struct pool_head *pool_head_notification;
extern THREAD_LOCAL struct task *curr_task; /* task currently running or NULL */
extern THREAD_LOCAL struct eb32sc_node *rq_next; /* Next task to be potentially run */
extern struct eb_root rqueue; /* tree constituting the run queue */
extern struct eb_root rqueue_local[MAX_THREADS]; /* tree constituting the per-thread run queue */
+extern struct list task_list[MAX_THREADS]; /* List of tasks to be run, mixing tasks and tasklets */
+extern int task_list_size[MAX_THREADS]; /* Number of task sin the task_list */
__decl_hathreads(extern HA_SPINLOCK_T rq_lock); /* spin lock related to run queue */
__decl_hathreads(extern HA_SPINLOCK_T wq_lock); /* spin lock related to wait queue */
/* return 0 if task is in run queue, otherwise non-zero */
static inline int task_in_rq(struct task *t)
{
- return t->rq.node.leaf_p != NULL;
+ /* Check if leaf_p is NULL, in case he's not in the runqueue, and if
+ * it's not 0x1, which would mean it's in the tasklet list.
+ */
+ return t->rq.node.leaf_p != NULL && t->rq.node.leaf_p != (void *)0x1;
}
/* return 0 if task is in wait queue, otherwise non-zero */
#ifdef USE_THREAD
struct eb_root *root;
- if (t->thread_mask == tid_bit && global.nbthread > 1)
+ if (t->thread_mask == tid_bit || global.nbthread == 1)
root = &rqueue_local[tid];
else
root = &rqueue;
static inline struct task *__task_unlink_rq(struct task *t)
{
eb32sc_delete(&t->rq);
- HA_ATOMIC_SUB(&tasks_run_queue, 1);
if (likely(t->nice))
HA_ATOMIC_SUB(&niced_tasks, 1);
return t;
return t;
}
+static inline void tasklet_wakeup(struct tasklet *tl)
+{
+ LIST_ADDQ(&task_list[tid], &tl->list);
+ task_list_size[tid]++;
+ HA_ATOMIC_ADD(&tasks_run_queue, 1);
+
+}
+
+static inline void task_insert_into_tasklet_list(struct task *t)
+{
+ struct tasklet *tl;
+ void *expected = NULL;
+
+ /* Protect ourself against anybody trying to insert the task into
+ * another runqueue. We set leaf_p to 0x1 to indicate that the node is
+ * not in a tree but that it's in the tasklet list. See task_in_rq().
+ */
+ if (unlikely(!HA_ATOMIC_CAS(&t->rq.node.leaf_p, &expected, 0x1)))
+ return;
+ task_list_size[tid]++;
+ tl = (struct tasklet *)t;
+ LIST_ADDQ(&task_list[tid], &tl->list);
+}
+
+static inline void task_remove_from_task_list(struct task *t)
+{
+ LIST_DEL(&((struct tasklet *)t)->list);
+ task_list_size[tid]--;
+ HA_ATOMIC_SUB(&tasks_run_queue, 1);
+ if (!TASK_IS_TASKLET(t)) {
+ t->rq.node.leaf_p = NULL; // was 0x1
+ __ha_barrier_store();
+ }
+}
+
/*
* Unlinks the task and adjusts run queue stats.
* A pointer to the task itself is returned.
return t;
}
+static inline void tasklet_init(struct tasklet *t)
+{
+ t->nice = -32768;
+ t->calls = 0;
+ t->state = 0;
+ t->list.p = t->list.n = NULL;
+}
+
+static inline struct tasklet *tasklet_new(void)
+{
+ struct tasklet *t = pool_alloc(pool_head_tasklet);
+
+ if (t) {
+ tasklet_init(t);
+ }
+ return t;
+}
+
/*
* Allocate and initialise a new task. The new task is returned, or NULL in
* case of lack of memory. The task count is incremented. Tasks should only
}
+static inline void tasklet_free(struct tasklet *tl)
+{
+ pool_free(pool_head_tasklet, tl);
+ if (unlikely(stopping))
+ pool_flush(pool_head_tasklet);
+}
+
/* Place <task> into the wait queue, where it may already be. If the expiration
* timer is infinite, do nothing and rely on wake_expired_task to clean up.
*/
__decl_hathreads(HA_SPINLOCK_T lock);
};
+/* This part is common between struct task and struct tasklet so that tasks
+ * can be used as-is as tasklets.
+ */
+#define TASK_COMMON \
+ struct { \
+ unsigned short state; /* task state : bitfield of TASK_ */ \
+ short nice; /* task prio from -1024 to +1024, or -32768 for tasklets */ \
+ unsigned int calls; /* number of times process was called */ \
+ struct task *(*process)(struct task *t, void *ctx, unsigned short state); /* the function which processes the task */ \
+ void *context; /* the task's context */ \
+ }
+
/* The base for all tasks */
struct task {
+ TASK_COMMON; /* must be at the beginning! */
struct eb32sc_node rq; /* ebtree node used to hold the task in the run queue */
- unsigned short state; /* task state : bit field of TASK_* */
- unsigned short pending_state; /* pending states for running talk */
- short nice; /* the task's current nice value from -1024 to +1024 */
- unsigned int calls; /* number of times ->process() was called */
- struct task * (*process)(struct task *t, void *ctx, unsigned short state); /* the function which processes the task */
- void *context; /* the task's context */
struct eb32_node wq; /* ebtree node used to hold the task in the wait queue */
int expire; /* next expiration date for this task, in ticks */
unsigned long thread_mask; /* mask of thread IDs authorized to process the task */
};
+/* lightweight tasks, without priority, mainly used for I/Os */
+struct tasklet {
+ TASK_COMMON; /* must be at the beginning! */
+ struct list list;
+};
+
+#define TASK_IS_TASKLET(t) ((t)->nice == -32768)
+
/*
* The task callback (->process) is responsible for updating ->expire. It must
* return a pointer to the task itself, except if the task has been deleted, in
#include <proto/task.h>
struct pool_head *pool_head_task;
+struct pool_head *pool_head_tasklet;
/* This is the memory pool containing all the signal structs. These
* struct are used to store each requiered signal between two tasks.
THREAD_LOCAL struct task *curr_task = NULL; /* task currently running or NULL */
THREAD_LOCAL struct eb32sc_node *rq_next = NULL; /* Next task to be potentially run */
+struct list task_list[MAX_THREADS]; /* List of tasks to be run, mixing tasks and tasklets */
+int task_list_size[MAX_THREADS]; /* Number of tasks in the task_list */
+
__decl_hathreads(HA_SPINLOCK_T __attribute__((aligned(64))) rq_lock); /* spin lock related to run queue */
__decl_hathreads(HA_SPINLOCK_T __attribute__((aligned(64))) wq_lock); /* spin lock related to wait queue */
tasks_run_queue_cur = tasks_run_queue; /* keep a copy for reporting */
nb_tasks_cur = nb_tasks;
max_processed = 200;
- if (unlikely(global.nbthread <= 1)) {
- /* when no lock is needed, this loop is much faster */
+
+ if (likely(global.nbthread > 1)) {
+ HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
if (!(active_tasks_mask & tid_bit)) {
+ HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
activity[tid].empty_rq++;
return;
}
- active_tasks_mask &= ~tid_bit;
- rq_next = eb32sc_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit);
- while (1) {
- if (!rq_next) {
- /* we might have reached the end of the tree, typically because
- * <rqueue_ticks> is in the first half and we're first scanning
- * the last half. Let's loop back to the beginning of the tree now.
+ average = tasks_run_queue / global.nbthread;
+
+ /* Get some elements from the global run queue and put it in the
+ * local run queue. To try to keep a bit of fairness, just get as
+ * much elements from the global list as to have a bigger local queue
+ * than the average.
+ */
+ while (rqueue_size[tid] <= average) {
+
+ /* we have to restart looking up after every batch */
+ rq_next = eb32sc_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit);
+ if (unlikely(!rq_next)) {
+ /* either we just started or we reached the end
+ * of the tree, typically because <rqueue_ticks>
+ * is in the first half and we're first scanning
+ * the last half. Let's loop back to the beginning
+ * of the tree now.
*/
rq_next = eb32sc_first(&rqueue, tid_bit);
if (!rq_next)
/* detach the task from the queue */
__task_unlink_rq(t);
- t->state |= TASK_RUNNING;
-
- t->calls++;
- curr_task = t;
- /* This is an optimisation to help the processor's branch
- * predictor take this most common call.
- */
- if (likely(t->process == process_stream))
- t = process_stream(t, t->context, t->state);
- else {
- if (t->process != NULL)
- t = t->process(t, t->context, t->state);
- else {
- __task_free(t);
- t = NULL;
- }
- }
- curr_task = NULL;
-
- if (likely(t != NULL)) {
- t->state &= ~TASK_RUNNING;
- /* If there is a pending state
- * we have to wake up the task
- * immediatly, else we defer
- * it into wait queue
- */
- if (t->state)
- __task_wakeup(t, &rqueue);
- else
- task_queue(t);
- }
-
- max_processed--;
- if (max_processed <= 0) {
- active_tasks_mask |= tid_bit;
- activity[tid].long_rq++;
- break;
- }
+ __task_wakeup(t, &rqueue_local[tid]);
}
- return;
- }
- HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
- if (!(active_tasks_mask & tid_bit)) {
HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
- activity[tid].empty_rq++;
- return;
- }
-
- average = tasks_run_queue / global.nbthread;
-
- /* Get some elements from the global run queue and put it in the
- * local run queue. To try to keep a bit of fairness, just get as
- * much elements from the global list as to have a bigger local queue
- * than the average.
- */
- while (rqueue_size[tid] <= average) {
-
- /* we have to restart looking up after every batch */
- rq_next = eb32sc_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit);
- if (unlikely(!rq_next)) {
- /* either we just started or we reached the end
- * of the tree, typically because <rqueue_ticks>
- * is in the first half and we're first scanning
- * the last half. Let's loop back to the beginning
- * of the tree now.
- */
- rq_next = eb32sc_first(&rqueue, tid_bit);
- if (!rq_next)
- break;
+ } else {
+ if (!(active_tasks_mask & tid_bit)) {
+ activity[tid].empty_rq++;
+ return;
}
-
- t = eb32sc_entry(rq_next, struct task, rq);
- rq_next = eb32sc_next(rq_next, tid_bit);
-
- /* detach the task from the queue */
- __task_unlink_rq(t);
- __task_wakeup(t, &rqueue_local[tid]);
}
-
- HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
active_tasks_mask &= ~tid_bit;
- while (1) {
- unsigned short state;
+ /* Get some tasks from the run queue, make sure we don't
+ * get too much in the task list, but put a bit more than
+ * the max that will be run, to give a bit more fairness
+ */
+ while (max_processed + 20 > task_list_size[tid]) {
/* Note: this loop is one of the fastest code path in
* the whole program. It should not be re-arranged
* without a good reason.
}
t = eb32sc_entry(rq_next, struct task, rq);
rq_next = eb32sc_next(rq_next, tid_bit);
+ /* Make sure nobody re-adds the task in the runqueue */
+ HA_ATOMIC_OR(&t->state, TASK_RUNNING);
- state = HA_ATOMIC_XCHG(&t->state, TASK_RUNNING);
/* detach the task from the queue */
__task_unlink_rq(t);
- t->calls++;
- max_processed--;
+ /* And add it to the local task list */
+ task_insert_into_tasklet_list(t);
+ }
+ while (max_processed > 0 && !LIST_ISEMPTY(&task_list[tid])) {
+ struct task *t;
+ unsigned short state;
+ void *ctx;
+ struct task *(*process)(struct task *t, void *ctx, unsigned short state);
+
+ t = (struct task *)LIST_ELEM(task_list[tid].n, struct tasklet *, list);
+ state = HA_ATOMIC_XCHG(&t->state, TASK_RUNNING);
+ __ha_barrier_store();
+ task_remove_from_task_list(t);
+
+ ctx = t->context;
+ process = t->process;
rqueue_size[tid]--;
- curr_task = t;
- if (likely(t->process == process_stream))
- t = process_stream(t, t->context, state);
- else
- t = t->process(t, t->context, state);
+ t->calls++;
+ curr_task = (struct task *)t;
+ if (TASK_IS_TASKLET(t))
+ t = NULL;
+ if (likely(process == process_stream))
+ t = process_stream(t, ctx, state);
+ else {
+ if (t->process != NULL)
+ t = process(t, ctx, state);
+ else {
+ __task_free(t);
+ t = NULL;
+ }
+ }
curr_task = NULL;
/* If there is a pending state we have to wake up the task
* immediatly, else we defer it into wait queue
memset(&rqueue, 0, sizeof(rqueue));
HA_SPIN_INIT(&wq_lock);
HA_SPIN_INIT(&rq_lock);
- for (i = 0; i < MAX_THREADS; i++)
+ for (i = 0; i < MAX_THREADS; i++) {
memset(&rqueue_local[i], 0, sizeof(rqueue_local[i]));
+ LIST_INIT(&task_list[i]);
+ task_list_size[i] = 0;
+ }
pool_head_task = create_pool("task", sizeof(struct task), MEM_F_SHARED);
if (!pool_head_task)
return 0;
+ pool_head_tasklet = create_pool("tasklet", sizeof(struct tasklet), MEM_F_SHARED);
+ if (!pool_head_tasklet)
+ return 0;
pool_head_notification = create_pool("notification", sizeof(struct notification), MEM_F_SHARED);
if (!pool_head_notification)
return 0;