/* use this to check a task state or to clean it up before queueing */
#define TASK_WOKEN_ANY (TASK_WOKEN_OTHER|TASK_WOKEN_INIT|TASK_WOKEN_TIMER| \
TASK_WOKEN_IO|TASK_WOKEN_SIGNAL|TASK_WOKEN_MSG| \
- TASK_WOKEN_RES)
+ TASK_WOKEN_RES|TASK_WOKEN_WQ)
#define TASK_F_TASKLET 0x00008000 /* nature of this task: 0=task 1=tasklet */
#define TASK_F_USR1 0x00010000 /* preserved user flag 1, application-specific, def:0 */
#define TASK_F_UEVT2 0x00040000 /* one-shot user event type 2, application specific, def:0 */
#define TASK_F_WANTS_TIME 0x00080000 /* task/tasklet wants th_ctx->sched_call_date to be set */
#define TASK_F_UEVT3 0x00100000 /* one-shot user event type 3, application specific, def:0 */
-/* unused: 0x200000..0x80000000 */
+#define TASK_WOKEN_WQ 0x00200000 /* The task has been waken up only to be put in the wait queue, because its expire changed */
+/* unused: 0x400000..0x80000000 */
/* These flags are persistent across scheduler calls */
#define TASK_PERSISTENT (TASK_SELF_WAKING | TASK_KILLED | \
_(TASK_KILLED, _(TASK_HEAVY, _(TASK_WOKEN_INIT,
_(TASK_WOKEN_TIMER, _(TASK_WOKEN_IO, _(TASK_WOKEN_SIGNAL,
_(TASK_WOKEN_MSG, _(TASK_WOKEN_RES, _(TASK_WOKEN_OTHER,
- _(TASK_F_TASKLET, _(TASK_F_USR1))))))))))))));
+ _(TASK_F_TASKLET, _(TASK_F_USR1, _(TASK_WOKEN_WQ)))))))))))))));
/* epilogue */
_(~0U);
return buf;
void task_kill(struct task *t);
void tasklet_kill(struct tasklet *t);
void __task_wakeup(struct task *t);
-void __task_queue(struct task *task, struct eb_root *wq);
+void __task_queue(struct task *task);
+static inline void _task_queue(struct task *task, const struct ha_caller *caller);
unsigned int run_tasks_from_lists(unsigned int budgets[]);
state = _HA_ATOMIC_LOAD(&t->state);
new_state = (state | f) &~ TASK_RUNNING;
+ cur_tid = t->tid;
+ if ((new_state & TASK_WOKEN_WQ) && __task_get_current_owner(cur_tid) == tid) {
+ _task_queue(t, NULL);
+ new_state &= ~TASK_WOKEN_WQ;
+ }
if (new_state & TASK_WOKEN_ANY)
new_state |= TASK_QUEUED;
- cur_tid = t->tid;
- if ((new_state & TASK_QUEUED) || cur_tid >= 0)
+ if ((new_state & TASK_QUEUED) || cur_tid >= 0 || task_in_wq(t))
new_tid = cur_tid;
else
new_tid = -1;
if (!tick_isset(task->expire))
return;
-#ifdef USE_THREAD
- if (task->tid < 0) {
- HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &wq_lock);
- if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) {
- if (likely(caller)) {
- caller = HA_ATOMIC_XCHG(&task->caller, caller);
- BUG_ON((ulong)caller & 1);
-#ifdef DEBUG_TASK
- HA_ATOMIC_STORE(&task->debug.prev_caller, caller);
-#endif
- }
- __task_queue(task, &tg_ctx->timers);
- }
- HA_RWLOCK_WRUNLOCK(TASK_WQ_LOCK, &wq_lock);
- } else
-#endif
- {
- BUG_ON(task->tid != tid);
- if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) {
- if (likely(caller)) {
- caller = HA_ATOMIC_XCHG(&task->caller, caller);
- BUG_ON((ulong)caller & 1);
+ BUG_ON(task->tid >= 0 && task->tid != tid);
+
+ if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) {
+ if (likely(caller)) {
+ caller = HA_ATOMIC_XCHG(&task->caller, caller);
+ BUG_ON((ulong)caller & 1);
#ifdef DEBUG_TASK
- HA_ATOMIC_STORE(&task->debug.prev_caller, caller);
+ HA_ATOMIC_STORE(&task->debug.prev_caller, caller);
#endif
- }
- __task_queue(task, &th_ctx->timers);
}
+ __task_queue(task);
}
}
/* no shared queue without threads */
thr = 0;
#endif
+ /*
+ * Nothing to do, the task is only temporarily owned
+ */
+ if (thr == -1 && t->tid == -2 - tid)
+ return;
if (unlikely(task_in_wq(t))) {
task_unlink_wq(t);
t->tid = thr;
static inline void _task_schedule(struct task *task, int when, const struct ha_caller *caller)
{
+ int did_lock = 0;
/* TODO: mthread, check if there is no task with this test */
if (task_in_rq(task))
return;
-#ifdef USE_THREAD
if (task->tid < 0) {
/*
* If the task is already running, then just wake it up, just
task_wakeup(task, TASK_WOKEN_OTHER);
return;
}
-
- /* FIXME: is it really needed to lock the WQ during the check ? */
- HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &wq_lock);
- if (task_in_wq(task))
- when = tick_first(when, task->expire);
-
- task->expire = when;
- if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) {
- if (likely(caller)) {
- caller = HA_ATOMIC_XCHG(&task->caller, caller);
- BUG_ON((ulong)caller & 1);
-#ifdef DEBUG_TASK
- HA_ATOMIC_STORE(&task->debug.prev_caller, caller);
-#endif
- }
- __task_queue(task, &tg_ctx->timers);
- }
- task_drop_running(task, 0);
- HA_RWLOCK_WRUNLOCK(TASK_WQ_LOCK, &wq_lock);
+ did_lock = 1;
} else
-#endif
- {
BUG_ON(task->tid != tid);
- if (task_in_wq(task))
- when = tick_first(when, task->expire);
- task->expire = when;
- if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) {
- if (likely(caller)) {
- caller = HA_ATOMIC_XCHG(&task->caller, caller);
- BUG_ON((ulong)caller & 1);
+ if (task_in_wq(task))
+ when = tick_first(when, task->expire);
+
+ task->expire = when;
+ if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) {
+ if (likely(caller)) {
+ caller = HA_ATOMIC_XCHG(&task->caller, caller);
+ BUG_ON((ulong)caller & 1);
#ifdef DEBUG_TASK
- HA_ATOMIC_STORE(&task->debug.prev_caller, caller);
+ HA_ATOMIC_STORE(&task->debug.prev_caller, caller);
#endif
- }
- __task_queue(task, &th_ctx->timers);
}
+ __task_queue(task);
}
+ if (did_lock)
+ task_drop_running(task, 0);
}
/* returns the string corresponding to a task type as found in the task caller
void __task_wakeup(struct task *t)
{
struct eb_root *root = &th_ctx->rqueue;
- int thr __maybe_unused = t->tid >= 0 ? t->tid : tid;
+ /*
+ * At this point the task tid should always be set to the relevant
+ * thread, so we can just use __task_get_current_owner();
+ */
+ int thr __maybe_unused = __task_get_current_owner(t->tid);
+
+ BUG_ON(t->tid == -1);
#ifdef USE_THREAD
if (thr != tid) {
* at all about locking so the caller must be careful when deciding whether to
* lock or not around this call.
*/
-void __task_queue(struct task *task, struct eb_root *wq)
+void __task_queue(struct task *task)
{
-#ifdef USE_THREAD
- BUG_ON((wq == &tg_ctx->timers && task->tid >= 0) ||
- (wq == &th_ctx->timers && task->tid < 0) ||
- (wq != &tg_ctx->timers && wq != &th_ctx->timers));
-#endif
+ int old_state, new_state;
+ int old_tid;
+ int cur_owner;
+
/* if this happens the process is doomed anyway, so better catch it now
* so that we have the caller in the stack.
*/
BUG_ON(task->expire == TICK_ETERNITY);
+ do {
+ new_state = old_state = _HA_ATOMIC_LOAD(&task->state);
+ if (old_state & TASK_KILLED)
+ return;
+ old_tid = _HA_ATOMIC_LOAD(&task->tid);
+ cur_owner = __task_get_current_owner(old_tid);
+ if (old_tid != -1 && cur_owner != tid)
+ new_state |= TASK_WOKEN_WQ;
+ } while (!(__task_set_state_and_tid(task, old_tid, __task_get_new_tid_field(old_tid), old_state, new_state)));
+
+ if (cur_owner != tid && cur_owner != -1) {
+ /*
+ * If the task has already been woken up to be added in the
+ * wait queue, nothing left to do, the target thread will
+ * eventually do the right thing.
+ */
+ if (!(old_state & TASK_WOKEN_WQ))
+ _task_wakeup(task, 0, NULL);
+ return;
+ }
+
if (likely(task_in_wq(task)))
__task_unlink_wq(task);
return;
#endif
- eb32_insert(wq, &task->wq);
+ eb32_insert(&th_ctx->timers, &task->wq);
}
/*
int max_processed = global.tune.runqueue_depth;
struct task *task;
struct eb32_node *eb;
- __decl_thread(int key);
while (1) {
if (max_processed-- <= 0)
*/
__task_unlink_wq(task);
if (tick_isset(task->expire))
- __task_queue(task, &tt->timers);
- }
- else {
- /* task not expired and correctly placed. It may not be eternal. */
- BUG_ON(task->expire == TICK_ETERNITY);
- break;
- }
- }
-
-#ifdef USE_THREAD
- if (eb_is_empty(&tg_ctx->timers))
- goto leave;
-
- HA_RWLOCK_RDLOCK(TASK_WQ_LOCK, &wq_lock);
- eb = eb32_lookup_ge(&tg_ctx->timers, now_ms - TIMER_LOOK_BACK);
- if (!eb) {
- eb = eb32_first(&tg_ctx->timers);
- if (likely(!eb)) {
- HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &wq_lock);
- goto leave;
- }
- }
- key = eb->key;
-
- if (tick_is_lt(now_ms, key)) {
- HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &wq_lock);
- goto leave;
- }
-
- /* There's really something of interest here, let's visit the queue */
-
- if (HA_RWLOCK_TRYRDTOSK(TASK_WQ_LOCK, &wq_lock)) {
- /* if we failed to grab the lock it means another thread is
- * already doing the same here, so let it do the job.
- */
- HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &wq_lock);
- goto leave;
- }
-
- while (1) {
- lookup_next:
- if (max_processed-- <= 0)
- break;
- eb = eb32_lookup_ge(&tg_ctx->timers, now_ms - TIMER_LOOK_BACK);
- if (!eb) {
- /* we might have reached the end of the tree, typically because
- * <now_ms> is in the first half and we're first scanning the last
- * half. Let's loop back to the beginning of the tree now.
- */
- eb = eb32_first(&tg_ctx->timers);
- if (likely(!eb))
- break;
- }
-
- task = eb32_entry(eb, struct task, wq);
-
- /* Check for any competing run of the task (quite rare but may
- * involve a dangerous concurrent access on task->expire). In
- * order to protect against this, we'll take an exclusive access
- * on TASK_RUNNING before checking/touching task->expire. If the
- * task is already RUNNING on another thread, it will deal by
- * itself with the requeuing so we must not do anything and
- * simply quit the loop for now, because we cannot wait with the
- * WQ lock held as this would prevent the running thread from
- * requeuing the task. One annoying effect of holding RUNNING
- * here is that a concurrent task_wakeup() will refrain from
- * waking it up. This forces us to check for a wakeup after
- * releasing the flag.
- */
- if (HA_ATOMIC_FETCH_OR(&task->state, TASK_RUNNING) & TASK_RUNNING)
- break;
-
- if (tick_is_expired(task->expire, now_ms)) {
- /* expired task, wake it up */
- HA_RWLOCK_SKTOWR(TASK_WQ_LOCK, &wq_lock);
- __task_unlink_wq(task);
- HA_RWLOCK_WRTOSK(TASK_WQ_LOCK, &wq_lock);
- task_drop_running(task, TASK_WOKEN_TIMER);
- }
- else if (task->expire != eb->key) {
- /* task is not expired but its key doesn't match so let's
- * update it and skip to next apparently expired task.
- */
- HA_RWLOCK_SKTOWR(TASK_WQ_LOCK, &wq_lock);
- __task_unlink_wq(task);
- if (tick_isset(task->expire))
- __task_queue(task, &tg_ctx->timers);
- HA_RWLOCK_WRTOSK(TASK_WQ_LOCK, &wq_lock);
- task_drop_running(task, 0);
- goto lookup_next;
+ __task_queue(task);
}
else {
/* task not expired and correctly placed. It may not be eternal. */
BUG_ON(task->expire == TICK_ETERNITY);
- task_drop_running(task, 0);
break;
}
}
-
- HA_RWLOCK_SKUNLOCK(TASK_WQ_LOCK, &wq_lock);
-#endif
leave:
return;
}
goto next;
}
+ if (state & TASK_WOKEN_WQ) {
+ /* We should add this task to our wait queue */
+ task_queue(t);
+ /*
+ * If this is the only reason the task got scheduled,
+ * then we don't actually have ot run it.
+ */
+ if ((state & TASK_WOKEN_ANY) == TASK_WOKEN_WQ) {
+ task_drop_running(t, 0);
+ goto next;
+ }
+ state &= ~TASK_WOKEN_WQ;
+ }
/* OK now the task or tasklet is well alive and is going to be run */
if (state & TASK_F_TASKLET) {
/* this is a tasklet */
__task_free(t);
}
else {
- task_queue(t);
+ if (__task_get_current_owner(t->tid) == tid)
+ task_queue(t);
task_drop_running(t, 0);
}
}