extern struct pool_head *pool_head_notification;
extern THREAD_LOCAL struct task *curr_task; /* task currently running or NULL */
extern THREAD_LOCAL struct eb32sc_node *rq_next; /* Next task to be potentially run */
+extern struct eb_root rqueue; /* tree constituting the run queue */
+extern struct eb_root rqueue_local[MAX_THREADS]; /* tree constituting the per-thread run queue */
__decl_hathreads(extern HA_SPINLOCK_T rq_lock); /* spin lock related to run queue */
__decl_hathreads(extern HA_SPINLOCK_T wq_lock); /* spin lock related to wait queue */
}
/* puts the task <t> in run queue with reason flags <f>, and returns <t> */
-struct task *__task_wakeup(struct task *t);
-static inline struct task *task_wakeup(struct task *t, unsigned int f)
+/* This will put the task in the local runqueue if the task is only runnable
+ * by the current thread, in the global runqueue otherwies.
+ */
+void __task_wakeup(struct task *t, struct eb_root *);
+static inline void task_wakeup(struct task *t, unsigned int f)
{
- HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
+ unsigned short state;
- /* If task is running, we postpone the call
- * and backup the state.
- */
- if (unlikely(t->state & TASK_RUNNING)) {
- t->pending_state |= f;
- HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
- return t;
- }
- if (likely(!task_in_rq(t)))
- __task_wakeup(t);
- t->state |= f;
- HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
+#ifdef USE_THREAD
+ struct eb_root *root;
- return t;
+ if (t->thread_mask == tid_bit && global.nbthread > 1)
+ root = &rqueue_local[tid];
+ else
+ root = &rqueue;
+#else
+ struct eb_root *root = &rqueue;
+#endif
+
+ state = HA_ATOMIC_OR(&t->state, f);
+ if (!(state & TASK_RUNNING))
+ __task_wakeup(t, root);
}
/* change the thread affinity of a task to <thread_mask> */
static inline struct task *__task_unlink_rq(struct task *t)
{
eb32sc_delete(&t->rq);
- tasks_run_queue--;
+ HA_ATOMIC_SUB(&tasks_run_queue, 1);
if (likely(t->nice))
- niced_tasks--;
+ HA_ATOMIC_SUB(&niced_tasks, 1);
return t;
}
*/
static inline struct task *task_unlink_rq(struct task *t)
{
- HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
+ if (t->thread_mask != tid_bit)
+ HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
if (likely(task_in_rq(t))) {
if (&t->rq == rq_next)
rq_next = eb32sc_next(rq_next, tid_bit);
__task_unlink_rq(t);
}
- HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
+ if (t->thread_mask != tid_bit)
+ HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
return t;
}
{
t->wq.node.leaf_p = NULL;
t->rq.node.leaf_p = NULL;
- t->pending_state = t->state = TASK_SLEEPING;
+ t->state = TASK_SLEEPING;
t->thread_mask = thread_mask;
t->nice = 0;
t->calls = 0;
__decl_hathreads(HA_SPINLOCK_T __attribute__((aligned(64))) wq_lock); /* spin lock related to wait queue */
static struct eb_root timers; /* sorted timers tree */
-static struct eb_root rqueue; /* tree constituting the run queue */
+struct eb_root rqueue; /* tree constituting the run queue */
+struct eb_root rqueue_local[MAX_THREADS]; /* tree constituting the per-thread run queue */
+static int global_rqueue_size; /* Number of element sin the global runqueue */
+static int rqueue_size[MAX_THREADS]; /* Number of elements in the per-thread run queue */
static unsigned int rqueue_ticks; /* insertion count */
/* Puts the task <t> in run queue at a position depending on t->nice. <t> is
* The task must not already be in the run queue. If unsure, use the safer
* task_wakeup() function.
*/
-struct task *__task_wakeup(struct task *t)
+void __task_wakeup(struct task *t, struct eb_root *root)
{
- tasks_run_queue++;
+ void *expected = NULL;
+ int *rq_size;
+
+ if (root == &rqueue) {
+ rq_size = &global_rqueue_size;
+ HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
+ } else {
+ int nb = root - &rqueue_local[0];
+ rq_size = &rqueue_size[nb];
+ }
+ /* Make sure if the task isn't in the runqueue, nobody inserts it
+ * in the meanwhile.
+ */
+redo:
+ if (unlikely(!HA_ATOMIC_CAS(&t->rq.node.leaf_p, &expected, 0x1))) {
+ if (root == &rqueue)
+ HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
+ return;
+ }
+ /* There's a small race condition, when running a task, the thread
+ * first sets TASK_RUNNING, and then unlink the task.
+ * If an another thread calls task_wakeup() for the same task,
+ * it may set t->state before TASK_RUNNING was set, and then try
+ * to set t->rq.nod.leaf_p after it was unlinked.
+ * To make sure it is not a problem, we check if TASK_RUNNING is set
+ * again. If it is, we unset t->rq.node.leaf_p.
+ * We then check for TASK_RUNNING a third time. If it is still there,
+ * then we can give up, the task will be re-queued later if it needs
+ * to be. If it's not there, and there is still something in t->state,
+ * then we have to requeue.
+ */
+ if (((volatile unsigned short)(t->state)) & TASK_RUNNING) {
+ unsigned short state;
+ t->rq.node.leaf_p = NULL;
+ __ha_barrier_store();
+
+ state = (volatile unsigned short)(t->state);
+ if (unlikely(state != 0 && !(state & TASK_RUNNING)))
+ goto redo;
+ if (root == &rqueue)
+ HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
+ return;
+ }
+ HA_ATOMIC_ADD(&tasks_run_queue, 1);
active_tasks_mask |= t->thread_mask;
- t->rq.key = ++rqueue_ticks;
+ t->rq.key = HA_ATOMIC_ADD(&rqueue_ticks, 1);
if (likely(t->nice)) {
int offset;
- niced_tasks++;
+ HA_ATOMIC_ADD(&niced_tasks, 1);
if (likely(t->nice > 0))
- offset = (unsigned)((tasks_run_queue * (unsigned int)t->nice) / 32U);
+ offset = (unsigned)((*rq_size * (unsigned int)t->nice) / 32U);
else
- offset = -(unsigned)((tasks_run_queue * (unsigned int)-t->nice) / 32U);
+ offset = -(unsigned)((*rq_size * (unsigned int)-t->nice) / 32U);
t->rq.key += offset;
}
- /* reset flag to pending ones
- * Note: __task_wakeup must not be called
- * if task is running
- */
- t->state = t->pending_state;
- eb32sc_insert(&rqueue, &t->rq, t->thread_mask);
- return t;
+ eb32sc_insert(root, &t->rq, t->thread_mask);
+ if (root == &rqueue) {
+ global_rqueue_size++;
+ HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
+ } else {
+ int nb = root - &rqueue_local[0];
+
+ rqueue_size[nb]++;
+ }
+ return;
}
/*
void process_runnable_tasks()
{
struct task *t;
- int i;
int max_processed;
- struct task *local_tasks[16];
- int local_tasks_count;
- int final_tasks_count;
+ uint64_t average = 0;
tasks_run_queue_cur = tasks_run_queue; /* keep a copy for reporting */
nb_tasks_cur = nb_tasks;
t = eb32sc_entry(rq_next, struct task, rq);
rq_next = eb32sc_next(rq_next, tid_bit);
+ global_rqueue_size--;
+
+ /* detach the task from the queue */
__task_unlink_rq(t);
t->state |= TASK_RUNNING;
- t->pending_state = 0;
t->calls++;
curr_task = t;
* immediatly, else we defer
* it into wait queue
*/
- if (t->pending_state)
- __task_wakeup(t);
+ if (t->state)
+ __task_wakeup(t, &rqueue);
else
task_queue(t);
}
return;
}
+ average = tasks_run_queue / global.nbthread;
+
+ /* Get some elements from the global run queue and put it in the
+ * local run queue. To try to keep a bit of fairness, just get as
+ * much elements from the global list as to have a bigger local queue
+ * than the average.
+ */
+ while (rqueue_size[tid] <= average) {
+
+ /* we have to restart looking up after every batch */
+ rq_next = eb32sc_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit);
+ if (unlikely(!rq_next)) {
+ /* either we just started or we reached the end
+ * of the tree, typically because <rqueue_ticks>
+ * is in the first half and we're first scanning
+ * the last half. Let's loop back to the beginning
+ * of the tree now.
+ */
+ rq_next = eb32sc_first(&rqueue, tid_bit);
+ if (!rq_next)
+ break;
+ }
+
+ t = eb32sc_entry(rq_next, struct task, rq);
+ rq_next = eb32sc_next(rq_next, tid_bit);
+
+ /* detach the task from the queue */
+ __task_unlink_rq(t);
+ __task_wakeup(t, &rqueue_local[tid]);
+ }
+
+ HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
active_tasks_mask &= ~tid_bit;
while (1) {
+ unsigned short state;
/* Note: this loop is one of the fastest code path in
* the whole program. It should not be re-arranged
* without a good reason.
*/
/* we have to restart looking up after every batch */
- rq_next = eb32sc_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit);
- for (local_tasks_count = 0; local_tasks_count < 16; local_tasks_count++) {
- if (unlikely(!rq_next)) {
- /* either we just started or we reached the end
- * of the tree, typically because <rqueue_ticks>
- * is in the first half and we're first scanning
- * the last half. Let's loop back to the beginning
- * of the tree now.
- */
- rq_next = eb32sc_first(&rqueue, tid_bit);
- if (!rq_next)
- break;
- }
-
- t = eb32sc_entry(rq_next, struct task, rq);
- rq_next = eb32sc_next(rq_next, tid_bit);
-
- /* detach the task from the queue */
- __task_unlink_rq(t);
- local_tasks[local_tasks_count] = t;
- t->state |= TASK_RUNNING;
- t->pending_state = 0;
- t->calls++;
- max_processed--;
- }
-
- if (!local_tasks_count)
- break;
-
- HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
-
- final_tasks_count = 0;
- for (i = 0; i < local_tasks_count ; i++) {
- t = local_tasks[i];
- /* This is an optimisation to help the processor's branch
- * predictor take this most common call.
+ rq_next = eb32sc_lookup_ge(&rqueue_local[tid], rqueue_ticks - TIMER_LOOK_BACK, tid_bit);
+ if (unlikely(!rq_next)) {
+ /* either we just started or we reached the end
+ * of the tree, typically because <rqueue_ticks>
+ * is in the first half and we're first scanning
+ * the last half. Let's loop back to the beginning
+ * of the tree now.
*/
- curr_task = t;
- if (likely(t->process == process_stream))
- t = process_stream(t, t->context, t->state);
- else {
- if (t->process != NULL)
- t = t->process(t, t->context, t->state);
- else {
- __task_free(t);
- t = NULL;
- }
- }
- curr_task = NULL;
- if (t)
- local_tasks[final_tasks_count++] = t;
+ rq_next = eb32sc_first(&rqueue_local[tid], tid_bit);
+ if (!rq_next)
+ break;
}
-
- for (i = 0; i < final_tasks_count ; i++) {
- t = local_tasks[i];
- /* If there is a pending state
- * we have to wake up the task
- * immediatly, else we defer
- * it into wait queue
- */
- HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
- t->state &= ~TASK_RUNNING;
- if (t->pending_state) {
- __task_wakeup(t);
- HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
- }
- else {
- /* we must never hold the RQ lock before the WQ lock */
- HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
+ t = eb32sc_entry(rq_next, struct task, rq);
+ rq_next = eb32sc_next(rq_next, tid_bit);
+
+ state = HA_ATOMIC_XCHG(&t->state, TASK_RUNNING);
+ /* detach the task from the queue */
+ __task_unlink_rq(t);
+ t->calls++;
+ max_processed--;
+ rqueue_size[tid]--;
+ curr_task = t;
+ if (likely(t->process == process_stream))
+ t = process_stream(t, t->context, state);
+ else
+ t = t->process(t, t->context, state);
+ curr_task = NULL;
+ /* If there is a pending state we have to wake up the task
+ * immediatly, else we defer it into wait queue
+ */
+ if (t != NULL) {
+ state = HA_ATOMIC_AND(&t->state, ~TASK_RUNNING);
+ if (state)
+ __task_wakeup(t, (t->thread_mask == tid_bit) ?
+ &rqueue_local[tid] : &rqueue);
+ else
task_queue(t);
- }
}
- HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
if (max_processed <= 0) {
active_tasks_mask |= tid_bit;
activity[tid].long_rq++;
break;
}
}
- HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
}
/* perform minimal intializations, report 0 in case of error, 1 if OK. */
int init_task()
{
+ int i;
+
memset(&timers, 0, sizeof(timers));
memset(&rqueue, 0, sizeof(rqueue));
HA_SPIN_INIT(&wq_lock);
HA_SPIN_INIT(&rq_lock);
+ for (i = 0; i < MAX_THREADS; i++)
+ memset(&rqueue_local[i], 0, sizeof(rqueue_local[i]));
pool_head_task = create_pool("task", sizeof(struct task), MEM_F_SHARED);
if (!pool_head_task)
return 0;