if (likely(tl->tid < 0)) {
/* this tasklet runs on the caller thread */
if (LIST_ISEMPTY(&tl->list)) {
- LIST_ADDQ(&task_per_thread[tid].task_list, &tl->list);
+ LIST_ADDQ(&task_per_thread[tid].tasklets[TL_URGENT], &tl->list);
_HA_ATOMIC_ADD(&tasks_run_queue, 1);
}
} else {
/* Insert a tasklet into the tasklet list. If used with a plain task instead,
* the caller must update the task_list_size.
*/
-static inline void tasklet_insert_into_tasklet_list(struct tasklet *tl)
+static inline void tasklet_insert_into_tasklet_list(struct list *list, struct tasklet *tl)
{
_HA_ATOMIC_ADD(&tasks_run_queue, 1);
- LIST_ADDQ(&sched->task_list, &tl->list);
+ LIST_ADDQ(list, &tl->list);
}
/* Remove the tasklet from the tasklet list. The tasklet MUST already be there.
{
return (!!(global_tasks_mask & tid_bit) |
(sched->rqueue_size > 0) |
- !LIST_ISEMPTY(&sched->task_list) | !MT_LIST_ISEMPTY(&sched->shared_tasklet_list));
+ !LIST_ISEMPTY(&sched->tasklets[TL_URGENT]) |
+ !LIST_ISEMPTY(&sched->tasklets[TL_NORMAL]) |
+ !LIST_ISEMPTY(&sched->tasklets[TL_BULK]) |
+ !MT_LIST_ISEMPTY(&sched->shared_tasklet_list));
}
/* adds list item <item> to work list <work> and wake up the associated task */
TASK_WOKEN_IO|TASK_WOKEN_SIGNAL|TASK_WOKEN_MSG| \
TASK_WOKEN_RES)
+enum {
+ TL_URGENT = 0, /* urgent tasklets (I/O callbacks) */
+ TL_NORMAL = 1, /* normal tasks */
+ TL_BULK = 2, /* bulk task/tasklets, streaming I/Os */
+ TL_CLASSES /* must be last */
+};
+
struct notification {
struct list purge_me; /* Part of the list of signals to be purged in the
case of the LUA execution stack crash. */
struct task_per_thread {
struct eb_root timers; /* tree constituting the per-thread wait queue */
struct eb_root rqueue; /* tree constituting the per-thread run queue */
- struct list task_list; /* List of tasks to be run, mixing tasks and tasklets */
struct mt_list shared_tasklet_list; /* Tasklet to be run, woken up by other threads */
- int task_list_size; /* Number of tasks in the task_list */
+ struct list tasklets[TL_CLASSES]; /* tasklets (and/or tasks) to run, by class */
+ int task_list_size; /* Number of tasks among the tasklets */
int rqueue_size; /* Number of elements in the per-thread run queue */
struct task *current; /* current task (not tasklet) */
__attribute__((aligned(64))) char end[0];
!!(global_tasks_mask & thr_bit),
!eb_is_empty(&task_per_thread[thr].timers),
!eb_is_empty(&task_per_thread[thr].rqueue),
- !(LIST_ISEMPTY(&task_per_thread[thr].task_list) |
- MT_LIST_ISEMPTY(&task_per_thread[thr].shared_tasklet_list)),
+ !(LIST_ISEMPTY(&task_per_thread[thr].tasklets[TL_URGENT]) &&
+ LIST_ISEMPTY(&task_per_thread[thr].tasklets[TL_NORMAL]) &&
+ LIST_ISEMPTY(&task_per_thread[thr].tasklets[TL_BULK]) &&
+ MT_LIST_ISEMPTY(&task_per_thread[thr].shared_tasklet_list)),
task_per_thread[thr].task_list_size,
task_per_thread[thr].rqueue_size,
stuck,
*/
tmp_list = MT_LIST_BEHEAD(&sched->shared_tasklet_list);
if (tmp_list)
- LIST_SPLICE_END_DETACHED(&sched->task_list, (struct list *)tmp_list);
+ LIST_SPLICE_END_DETACHED(&sched->tasklets[TL_URGENT], (struct list *)tmp_list);
tasks_run_queue_cur = tasks_run_queue; /* keep a copy for reporting */
nb_tasks_cur = nb_tasks;
if (likely(niced_tasks))
max_processed = (max_processed + 3) / 4;
+ /* run up to 3*max_processed/4 urgent tasklets */
+ done = run_tasks_from_list(&tt->tasklets[TL_URGENT], 3*(max_processed + 1) / 4);
+ max_processed -= done;
+
+ /* pick up to (max_processed-done+1)/2 regular tasks from prio-ordered run queues */
+
/* Note: the grq lock is always held when grq is not null */
- while (tt->task_list_size < max_processed) {
+ while (tt->task_list_size < (max_processed + 1) / 2) {
if ((global_tasks_mask & tid_bit) && !grq) {
#ifdef USE_THREAD
HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
/* Make sure the entry doesn't appear to be in a list */
LIST_INIT(&((struct tasklet *)t)->list);
/* And add it to the local task list */
- tasklet_insert_into_tasklet_list((struct tasklet *)t);
+ tasklet_insert_into_tasklet_list(&tt->tasklets[TL_NORMAL], (struct tasklet *)t);
tt->task_list_size++;
activity[tid].tasksw++;
}
grq = NULL;
}
- done = run_tasks_from_list(&tt->task_list, max_processed);
+ /* run between max_processed/8 and max_processed/2 regular tasks */
+ done = run_tasks_from_list(&tt->tasklets[TL_NORMAL], (max_processed + 1) / 2);
+ max_processed -= done;
+
+ /* run between max_processed/8 and max_processed bulk tasklets */
+ done = run_tasks_from_list(&tt->tasklets[TL_BULK], max_processed);
max_processed -= done;
- if (!LIST_ISEMPTY(&tt->task_list))
+ if (!LIST_ISEMPTY(&sched->tasklets[TL_URGENT]) |
+ !LIST_ISEMPTY(&sched->tasklets[TL_NORMAL]) |
+ !LIST_ISEMPTY(&sched->tasklets[TL_BULK]))
activity[tid].long_rq++;
}
#endif
memset(&task_per_thread, 0, sizeof(task_per_thread));
for (i = 0; i < MAX_THREADS; i++) {
- LIST_INIT(&task_per_thread[i].task_list);
+ LIST_INIT(&task_per_thread[i].tasklets[TL_URGENT]);
+ LIST_INIT(&task_per_thread[i].tasklets[TL_NORMAL]);
+ LIST_INIT(&task_per_thread[i].tasklets[TL_BULK]);
MT_LIST_INIT(&task_per_thread[i].shared_tasklet_list);
}
}