isc_mutex_t halt_lock;
isc_condition_t halt_cond;
unsigned int workers;
+ unsigned int greedy_worker;
atomic_uint_fast32_t tasks_running;
atomic_uint_fast32_t tasks_ready;
atomic_uint_fast32_t curq;
static inline void
wake_all_queues(isc__taskmgr_t *manager) {
- for (unsigned int i = 0; i < manager->workers; i++) {
+ for (unsigned int i = 0; i < manager->workers+1; i++) {
LOCK(&manager->queues[i].lock);
BROADCAST(&manager->queues[i].work_available);
UNLOCK(&manager->queues[i].lock);
isc_task_create(isc_taskmgr_t *manager0, unsigned int quantum,
isc_task_t **taskp)
{
- return (isc_task_create_bound(manager0, quantum, taskp, -1));
+ return (isc_task_create_bound(manager0, quantum, taskp, ISC_TASKCPU_NONE));
}
isc_result_t
XTRACE("isc_task_create");
task->manager = manager;
- if (threadid == -1) {
+ if (threadid == ISC_TASKCPU_NONE) {
/*
* Task is not pinned to a queue, it's threadid will be
* choosen when first task will be sent to it - either
*/
task->bound = false;
task->threadid = 0;
+ } else if (threadid == ISC_TASKCPU_GREEDY) {
+ /*
+ * Task is greedy - it will be pinned to a special
+ * queue for greedy tasks
+ */
+ task->bound = true;
+ task->threadid = manager->greedy_worker;
} else {
/*
* Task is pinned to a queue, it'll always be run
void
isc_task_send(isc_task_t *task0, isc_event_t **eventp) {
- isc_task_sendto(task0, eventp, -1);
+ isc_task_sendto(task0, eventp, ISC_TASKCPU_NONE);
}
void
isc_task_sendanddetach(isc_task_t **taskp, isc_event_t **eventp) {
- isc_task_sendtoanddetach(taskp, eventp, -1);
+ isc_task_sendtoanddetach(taskp, eventp, ISC_TASKCPU_NONE);
}
void
/* If task is bound ignore provided cpu. */
if (task->bound) {
c = task->threadid;
- } else if (c < 0) {
+ c %= task->manager->workers;
+ } else if (c == ISC_TASKCPU_GREEDY) {
+ c = task->manager->greedy_worker;
+ } else if (c == ISC_TASKCPU_NONE) {
c = atomic_fetch_add_explicit(&task->manager->curq, 1,
memory_order_relaxed);
+ c %= task->manager->workers;
}
- c %= task->manager->workers;
/*
* We're trying hard to hold locks for as short a time as possible.
if (task->bound) {
c = task->threadid;
- } else if (c < 0) {
+ c %= task->manager->workers;
+ } else if (c == ISC_TASKCPU_GREEDY) {
+ c = task->manager->greedy_worker;
+ } else if (c == ISC_TASKCPU_NONE) {
c = atomic_fetch_add_explicit(&task->manager->curq, 1,
memory_order_relaxed);
+ c %= task->manager->workers;
}
- c %= task->manager->workers;
LOCK(&task->lock);
idle1 = task_send(task, eventp, c);
{
bool empty = true;
unsigned int i;
- for (i = 0; i < manager->workers && empty; i++)
+ for (i = 0; i < manager->workers+1 && empty; i++)
{
LOCK(&manager->queues[i].lock);
empty &= empty_readyq(manager, i);
run(void *queuep) {
isc__taskqueue_t *tq = queuep;
isc__taskmgr_t *manager = tq->manager;
- int threadid = tq->threadid;
- isc_thread_setaffinity(threadid);
+ unsigned int threadid = tq->threadid;
+ /* greedy worker is not pinned */
+ if (threadid != manager->greedy_worker) {
+ isc_thread_setaffinity(threadid);
+ }
XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
ISC_MSG_STARTING, "starting"));
static void
manager_free(isc__taskmgr_t *manager) {
- for (unsigned int i = 0; i < manager->workers; i++) {
+ for (unsigned int i = 0; i < manager->workers+1; i++) {
isc_mutex_destroy(&manager->queues[i].lock);
}
isc_mutex_destroy(&manager->lock);
isc_mutex_destroy(&manager->halt_lock);
isc_mem_put(manager->mctx, manager->queues,
- manager->workers * sizeof(isc__taskqueue_t));
+ (manager->workers+1) * sizeof(isc__taskqueue_t));
manager->common.impmagic = 0;
manager->common.magic = 0;
isc_mem_putanddetach(&manager->mctx, manager, sizeof(*manager));
}
manager->default_quantum = default_quantum;
INIT_LIST(manager->tasks);
- manager->queues = isc_mem_get(mctx, workers * sizeof(isc__taskqueue_t));
+ manager->queues = isc_mem_get(mctx, (workers+1) * sizeof(isc__taskqueue_t));
RUNTIME_CHECK(manager->queues != NULL);
manager->tasks_running = 0;
/*
* Start workers.
*/
- for (i = 0; i < workers; i++) {
+ for (i = 0; i < workers+1; i++) {
+ char name[16];
INIT_LIST(manager->queues[i].ready_tasks);
INIT_LIST(manager->queues[i].ready_priority_tasks);
isc_mutex_init(&manager->queues[i].lock);
isc_condition_init(&manager->queues[i].work_available);
+ if (i == workers) {
+ snprintf(name, sizeof(name), "isc-greedywrkr");
+ manager->greedy_worker = i;
+ } else {
+ snprintf(name, sizeof(name), "isc-worker%04u", i);
+ }
manager->queues[i].manager = manager;
manager->queues[i].threadid = i;
RUNTIME_CHECK(isc_thread_create(run, &manager->queues[i],
&manager->queues[i].thread)
== ISC_R_SUCCESS);
- char name[16];
+
snprintf(name, sizeof(name), "isc-worker%04u", i);
- isc_thread_setname(manager->queues[i].thread, name);
}
+
UNLOCK(&manager->lock);
- isc_thread_setconcurrency(workers);
+ isc_thread_setconcurrency(workers+1);
*managerp = (isc_taskmgr_t *)manager;
/*
* Wait for all the worker threads to exit.
*/
- for (i = 0; i < manager->workers; i++)
+ for (i = 0; i < manager->workers+1; i++)
(void)isc_thread_join(manager->queues[i].thread, NULL);
manager_free(manager);
}
manager->pause_requested = true;
- while (manager->halted < manager->workers) {
+ while (manager->halted < manager->workers+1) {
wake_all_queues(manager);
WAIT(&manager->halt_cond, &manager->halt_lock);
}
LOCK(&manager->halt_lock);
INSIST(!manager->exclusive_requested && !manager->pause_requested);
manager->exclusive_requested = true;
- while (manager->halted + 1 < manager->workers) {
+ while (manager->halted + 1 < manager->workers+1) {
wake_all_queues(manager);
WAIT(&manager->halt_cond, &manager->halt_lock);
}