int rqueue_size; /* Number of elements in the per-thread run queue */
int current_queue; /* points to current tasklet list being run, -1 if none */
struct task *current; /* current task (not tasklet) */
+ unsigned int rq_total; /* total size of the run queue, prio_tree + tasklets */
uint8_t tl_class_mask; /* bit mask of non-empty tasklets classes */
__attribute__((aligned(64))) char end[0];
};
/* a few exported variables */
extern unsigned int nb_tasks; /* total number of tasks */
extern volatile unsigned long global_tasks_mask; /* Mask of threads with tasks in the global runqueue */
-extern unsigned int tasks_run_queue; /* run queue size */
-extern unsigned int tasks_run_queue_cur;
+extern unsigned int grq_total; /* total number of entries in the global run queue */
extern unsigned int nb_tasks_cur;
extern unsigned int niced_tasks; /* number of niced tasks in the run queue */
extern struct pool_head *pool_head_task;
*/
void mworker_cleantasks();
+/* returns the number of running tasks+tasklets on the whole process. Note
+ * that this *is* racy since a task may move from the global to a local
+ * queue for example and be counted twice. This is only for statistics
+ * reporting.
+ */
+static inline int total_run_queues()
+{
+ int thr, ret = 0;
+#ifdef USE_THREAD
+ ret = _HA_ATOMIC_LOAD(&grq_total);
+#endif
+ for (thr = 0; thr < global.nbthread; thr++)
+ ret += _HA_ATOMIC_LOAD(&task_per_thread[thr].rq_total);
+ return ret;
+}
/* return 0 if task is in run queue, otherwise non-zero */
static inline int task_in_rq(struct task *t)
}
/*
- * Unlink the task from the run queue. The tasks_run_queue size and number of
- * niced tasks are updated too. A pointer to the task itself is returned. The
- * task *must* already be in the run queue before calling this function. If
- * unsure, use the safer task_unlink_rq() function. Note that the pointer to the
- * next run queue entry is neither checked nor updated.
+ * Unlink the task from the run queue. The run queue size and number of niced
+ * tasks are updated too. A pointer to the task itself is returned. The task
+ * *must* already be in the run queue before calling this function. If the task
+ * is in the global run queue, the global run queue's lock must already be held.
+ * If unsure, use the safer task_unlink_rq() function. Note that the pointer to
+ * the next run queue entry is neither checked nor updated.
*/
static inline struct task *__task_unlink_rq(struct task *t)
{
- _HA_ATOMIC_SUB(&tasks_run_queue, 1);
#ifdef USE_THREAD
- if (t->state & TASK_GLOBAL)
+ if (t->state & TASK_GLOBAL) {
+ grq_total--;
_HA_ATOMIC_AND(&t->state, ~TASK_GLOBAL);
+ }
else
#endif
+ {
sched->rqueue_size--;
+ _HA_ATOMIC_SUB(&sched->rq_total, 1);
+ }
eb32sc_delete(&t->rq);
if (likely(t->nice))
_HA_ATOMIC_SUB(&niced_tasks, 1);
LIST_ADDQ(&sched->tasklets[sched->current_queue], &tl->list);
sched->tl_class_mask |= 1 << sched->current_queue;
}
+ _HA_ATOMIC_ADD(&sched->rq_total, 1);
} else {
/* this tasklet runs on a specific thread. */
MT_LIST_ADDQ(&task_per_thread[thr].shared_tasklet_list, (struct mt_list *)&tl->list);
+ _HA_ATOMIC_ADD(&task_per_thread[thr].rq_total, 1);
if (sleeping_thread_mask & (1UL << thr)) {
_HA_ATOMIC_AND(&sleeping_thread_mask, ~(1UL << thr));
wake_thread(thr);
}
}
- _HA_ATOMIC_ADD(&tasks_run_queue, 1);
}
/* schedules tasklet <tl> to run onto the thread designated by tl->tid, which
{
if (MT_LIST_DEL((struct mt_list *)&t->list)) {
_HA_ATOMIC_AND(&t->state, ~TASK_IN_LIST);
- _HA_ATOMIC_SUB(&tasks_run_queue, 1);
+ _HA_ATOMIC_SUB(&task_per_thread[t->tid >= 0 ? t->tid : tid].rq_total, 1);
}
}
static inline void tasklet_free(struct tasklet *tl)
{
if (MT_LIST_DEL((struct mt_list *)&tl->list))
- _HA_ATOMIC_SUB(&tasks_run_queue, 1);
+ _HA_ATOMIC_SUB(&task_per_thread[tl->tid >= 0 ? tl->tid : tid].rq_total, 1);
#ifdef DEBUG_TASK
if ((unsigned int)tl->debug.caller_idx > 1)
LIST_ISEMPTY(&task_per_thread[thr].tasklets[TL_BULK]) &&
MT_LIST_ISEMPTY(&task_per_thread[thr].shared_tasklet_list)),
task_per_thread[thr].task_list_size,
- task_per_thread[thr].rqueue_size,
+ task_per_thread[thr].rq_total,
stuck,
!!(task_profiling_mask & thr_bit));
actconn, pipes_used, pipes_used+pipes_free, read_freq_ctr(&global.conn_per_sec),
bps >= 1000000000UL ? (bps / 1000000000.0) : bps >= 1000000UL ? (bps / 1000000.0) : (bps / 1000.0),
bps >= 1000000000UL ? 'G' : bps >= 1000000UL ? 'M' : 'k',
- tasks_run_queue_cur, nb_tasks_cur, ti->idle_pct
+ total_run_queues(), nb_tasks_cur, ti->idle_pct
);
/* scope_txt = search query, appctx->ctx.stats.scope_len is always <= STAT_SCOPE_TXT_MAXLEN */
info[INF_MAX_ZLIB_MEM_USAGE] = mkf_u32(FO_CONFIG|FN_LIMIT, global.maxzlibmem);
#endif
info[INF_TASKS] = mkf_u32(0, nb_tasks_cur);
- info[INF_RUN_QUEUE] = mkf_u32(0, tasks_run_queue_cur);
+ info[INF_RUN_QUEUE] = mkf_u32(0, total_run_queues());
info[INF_IDLE_PCT] = mkf_u32(FN_AVG, ti->idle_pct);
info[INF_NODE] = mkf_str(FO_CONFIG|FN_OUTPUT|FS_SERVICE, global.node);
if (global.desc)
unsigned int nb_tasks = 0;
volatile unsigned long global_tasks_mask = 0; /* Mask of threads with tasks in the global runqueue */
-unsigned int tasks_run_queue = 0;
-unsigned int tasks_run_queue_cur = 0; /* copy of the run queue size */
unsigned int nb_tasks_cur = 0; /* copy of the tasks count */
unsigned int niced_tasks = 0; /* number of niced tasks in the run queue */
#ifdef USE_THREAD
struct eb_root timers; /* sorted timers tree, global, accessed under wq_lock */
struct eb_root rqueue; /* tree constituting the global run queue, accessed under rq_lock */
+unsigned int grq_total; /* total number of entries in the global run queue, use grq_lock */
static unsigned int global_rqueue_ticks; /* insertion count in the grq, use rq_lock */
#endif
/* Beware: tasks that have never run don't have their ->list empty yet! */
MT_LIST_ADDQ(&task_per_thread[thr].shared_tasklet_list,
(struct mt_list *)&((struct tasklet *)t)->list);
- _HA_ATOMIC_ADD(&tasks_run_queue, 1);
+ _HA_ATOMIC_ADD(&task_per_thread[thr].rq_total, 1);
_HA_ATOMIC_ADD(&task_per_thread[thr].task_list_size, 1);
if (sleeping_thread_mask & (1UL << thr)) {
_HA_ATOMIC_AND(&sleeping_thread_mask, ~(1UL << thr));
if (root == &rqueue) {
HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
}
-#endif
- /* Make sure if the task isn't in the runqueue, nobody inserts it
- * in the meanwhile.
- */
- _HA_ATOMIC_ADD(&tasks_run_queue, 1);
-#ifdef USE_THREAD
+
if (root == &rqueue) {
global_tasks_mask |= t->thread_mask;
+ grq_total++;
t->rq.key = ++global_rqueue_ticks;
__ha_barrier_store();
} else
#endif
+ {
+ _HA_ATOMIC_ADD(&sched->rq_total, 1);
t->rq.key = ++sched->rqueue_ticks;
+ }
if (likely(t->nice)) {
int offset;
t->calls++;
sched->current = t;
- _HA_ATOMIC_SUB(&tasks_run_queue, 1);
+ _HA_ATOMIC_SUB(&sched->rq_total, 1);
if (TASK_IS_TASKLET(t)) {
LIST_DEL_INIT(&((struct tasklet *)t)->list);
return;
}
- tasks_run_queue_cur = tasks_run_queue; /* keep a copy for reporting */
nb_tasks_cur = nb_tasks;
max_processed = global.tune.runqueue_depth;
if (picked) {
tt->tl_class_mask |= 1 << TL_NORMAL;
_HA_ATOMIC_ADD(&tt->task_list_size, picked);
- _HA_ATOMIC_ADD(&tasks_run_queue, picked);
+ _HA_ATOMIC_ADD(&tt->rq_total, picked);
activity[tid].tasksw += picked;
}