#include <stdbool.h>
#include <isc/app.h>
+#include <isc/atomic.h>
#include <isc/condition.h>
#include <isc/event.h>
#include <isc/json.h>
#include <isc/platform.h>
#include <isc/print.h>
#include <isc/string.h>
+#include <isc/random.h>
#include <isc/task.h>
#include <isc/thread.h>
#include <isc/time.h>
isc_time_t tnow;
char name[16];
void * tag;
+ int threadid;
/* Locked by task manager lock. */
LINK(isc__task_t) link;
LINK(isc__task_t) ready_link;
isc_taskmgr_t common;
isc_mem_t * mctx;
isc_mutex_t lock;
+ isc_mutex_t **locks;
unsigned int workers;
+ unsigned int queues;
isc_thread_t * threads;
+ atomic_uint_fast32_t tasks_running;
+ atomic_uint_fast32_t tasks_ready;
+ atomic_uint_fast32_t curq;
+
/* Locked by task manager lock. */
unsigned int default_quantum;
LIST(isc__task_t) tasks;
- isc__tasklist_t ready_tasks;
- isc__tasklist_t ready_priority_tasks;
+ isc__tasklist_t *ready_tasks;
+ isc__tasklist_t *ready_priority_tasks;
isc_taskmgrmode_t mode;
- isc_condition_t work_available;
+ isc_condition_t *work_available;
isc_condition_t exclusive_granted;
isc_condition_t paused;
- unsigned int tasks_running;
- unsigned int tasks_ready;
- bool pause_requested;
- bool exclusive_requested;
- bool exiting;
+ bool pause_requested;
+ bool exclusive_requested;
+ bool exiting;
/*
* Multiple threads can read/write 'excl' at the same time, so we need
isc_result_t
isc_taskmgr_excltask(isc_taskmgr_t *mgr0, isc_task_t **taskp);
static inline bool
-empty_readyq(isc__taskmgr_t *manager);
+empty_readyq(isc__taskmgr_t *manager, int c);
static inline isc__task_t *
-pop_readyq(isc__taskmgr_t *manager);
+pop_readyq(isc__taskmgr_t *manager, int c);
static inline void
-push_readyq(isc__taskmgr_t *manager, isc__task_t *task);
+push_readyq(isc__taskmgr_t *manager, isc__task_t *task, int c);
/***
*** Tasks.
static void
task_finished(isc__task_t *task) {
isc__taskmgr_t *manager = task->manager;
-
REQUIRE(EMPTY(task->events));
REQUIRE(task->nevents == 0);
REQUIRE(EMPTY(task->on_shutdown));
LOCK(&manager->lock);
UNLINK(manager->tasks, task, link);
+ UNLOCK(&manager->lock);
if (FINISHED(manager)) {
/*
* All tasks have completed and the
* any idle worker threads so they
* can exit.
*/
- BROADCAST(&manager->work_available);
+ for (unsigned int i=0; i<manager->queues; i++) {
+ BROADCAST(&manager->work_available[i]);
+ }
}
- UNLOCK(&manager->lock);
-
DESTROYLOCK(&task->lock);
task->common.impmagic = 0;
task->common.magic = 0;
return (ISC_R_NOMEMORY);
XTRACE("isc_task_create");
task->manager = manager;
+ task->threadid = -1;
result = isc_mutex_init(&task->lock);
if (result != ISC_R_SUCCESS) {
isc_mem_put(manager->mctx, task, sizeof(*task));
task_ready(isc__task_t *task) {
isc__taskmgr_t *manager = task->manager;
bool has_privilege = isc_task_privilege((isc_task_t *) task);
+ int queue = task->threadid % manager->queues;
REQUIRE(VALID_MANAGER(manager));
REQUIRE(task->state == task_state_ready);
XTRACE("task_ready");
-
- LOCK(&manager->lock);
- push_readyq(manager, task);
+ LOCK(manager->locks[queue]);
+ push_readyq(manager, task, queue);
if (manager->mode == isc_taskmgrmode_normal || has_privilege)
- SIGNAL(&manager->work_available);
- UNLOCK(&manager->lock);
+ SIGNAL(&manager->work_available[queue]);
+ UNLOCK(manager->locks[queue]);
}
static inline bool
}
static inline bool
-task_send(isc__task_t *task, isc_event_t **eventp) {
+task_send(isc__task_t *task, isc_event_t **eventp, int c) {
bool was_idle = false;
isc_event_t *event;
if (task->state == task_state_idle) {
was_idle = true;
+ task->threadid = c;
INSIST(EMPTY(task->events));
task->state = task_state_ready;
}
void
isc_task_send(isc_task_t *task0, isc_event_t **eventp) {
+ isc_task_sendto(task0, eventp, -1);
+}
+
+void
+isc_task_sendanddetach(isc_task_t **taskp, isc_event_t **eventp) {
+ isc_task_sendtoanddetach(taskp, eventp, -1);
+}
+
+void
+isc_task_sendto(isc_task_t *task0, isc_event_t **eventp, int c) {
isc__task_t *task = (isc__task_t *)task0;
bool was_idle;
+ if (c == -1) {
+ c = atomic_fetch_add_explicit(&task->manager->curq, 1, memory_order_relaxed);
+ }
/*
* Send '*event' to 'task'.
* some processing is deferred until after the lock is released.
*/
LOCK(&task->lock);
- was_idle = task_send(task, eventp);
+ was_idle = task_send(task, eventp, c);
UNLOCK(&task->lock);
if (was_idle) {
}
void
-isc_task_sendanddetach(isc_task_t **taskp, isc_event_t **eventp) {
+isc_task_sendtoanddetach(isc_task_t **taskp, isc_event_t **eventp, int c) {
bool idle1, idle2;
isc__task_t *task;
REQUIRE(taskp != NULL);
task = (isc__task_t *)*taskp;
REQUIRE(VALID_TASK(task));
+ if (c == -1) {
+ c = atomic_fetch_add_explicit(&task->manager->curq, 1, memory_order_relaxed);
+ }
XTRACE("isc_task_sendanddetach");
-
LOCK(&task->lock);
- idle1 = task_send(task, eventp);
+ idle1 = task_send(task, eventp, c);
idle2 = task_detach(task);
UNLOCK(&task->lock);
* Caller must hold the task manager lock.
*/
static inline bool
-empty_readyq(isc__taskmgr_t *manager) {
+empty_readyq(isc__taskmgr_t *manager, int c) {
isc__tasklist_t queue;
if (manager->mode == isc_taskmgrmode_normal)
- queue = manager->ready_tasks;
+ queue = manager->ready_tasks[c];
else
- queue = manager->ready_priority_tasks;
+ queue = manager->ready_priority_tasks[c];
return (EMPTY(queue));
}
* Caller must hold the task manager lock.
*/
static inline isc__task_t *
-pop_readyq(isc__taskmgr_t *manager) {
+pop_readyq(isc__taskmgr_t *manager, int c) {
isc__task_t *task;
if (manager->mode == isc_taskmgrmode_normal)
- task = HEAD(manager->ready_tasks);
+ task = HEAD(manager->ready_tasks[c]);
else
- task = HEAD(manager->ready_priority_tasks);
+ task = HEAD(manager->ready_priority_tasks[c]);
if (task != NULL) {
- DEQUEUE(manager->ready_tasks, task, ready_link);
+ DEQUEUE(manager->ready_tasks[c], task, ready_link);
if (ISC_LINK_LINKED(task, ready_priority_link))
- DEQUEUE(manager->ready_priority_tasks, task,
+ DEQUEUE(manager->ready_priority_tasks[c], task,
ready_priority_link);
}
* Caller must hold the task manager lock.
*/
static inline void
-push_readyq(isc__taskmgr_t *manager, isc__task_t *task) {
- ENQUEUE(manager->ready_tasks, task, ready_link);
+push_readyq(isc__taskmgr_t *manager, isc__task_t *task, int c) {
+ ENQUEUE(manager->ready_tasks[c], task, ready_link);
if ((task->flags & TASK_F_PRIVILEGED) != 0)
- ENQUEUE(manager->ready_priority_tasks, task,
+ ENQUEUE(manager->ready_priority_tasks[c], task,
ready_priority_link);
- manager->tasks_ready++;
+ atomic_fetch_add_explicit(&manager->tasks_ready, 1, memory_order_relaxed);
}
static void
-dispatch(isc__taskmgr_t *manager) {
+dispatch(isc__taskmgr_t *manager, int threadid) {
isc__task_t *task;
REQUIRE(VALID_MANAGER(manager));
+ /* Wait for everything to initialize */
+ LOCK(&manager->lock);
+ UNLOCK(&manager->lock);
+
+ int queue = threadid % manager->queues;
+
/*
* Again we're trying to hold the lock for as short a time as possible
* and to do as little locking and unlocking as possible.
* For N iterations of the loop, this code does N+1 locks and N+1
* unlocks. The while expression is always protected by the lock.
*/
-
- LOCK(&manager->lock);
+ LOCK(manager->locks[queue]);
while (!FINISHED(manager)) {
/*
* If a pause has been requested, don't do any work
* until it's been released.
*/
- while ((empty_readyq(manager) || manager->pause_requested ||
+ while ((empty_readyq(manager, queue) || manager->pause_requested ||
manager->exclusive_requested) && !FINISHED(manager))
{
XTHREADTRACE(isc_msgcat_get(isc_msgcat,
ISC_MSGSET_GENERAL,
ISC_MSG_WAIT, "wait"));
- WAIT(&manager->work_available, &manager->lock);
+ XTHREADTRACE(isc_msgcat_get(isc_msgcat,
+ ISC_MSGSET_GENERAL,
+ ISC_MSG_WAIT, manager->pause_requested ? "paused" : "notpaused"));
+ XTHREADTRACE(isc_msgcat_get(isc_msgcat,
+ ISC_MSGSET_GENERAL,
+ ISC_MSG_WAIT, manager->exclusive_requested ? "excreq" : "notexcreq"));
+ WAIT(&manager->work_available[queue], manager->locks[queue]);
XTHREADTRACE(isc_msgcat_get(isc_msgcat,
ISC_MSGSET_TASK,
ISC_MSG_AWAKE, "awake"));
XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_TASK,
ISC_MSG_WORKING, "working"));
- task = pop_readyq(manager);
+ task = pop_readyq(manager, queue);
if (task != NULL) {
unsigned int dispatch_count = 0;
bool done = false;
* have a task to do. We must reacquire the manager
* lock before exiting the 'if (task != NULL)' block.
*/
- manager->tasks_ready--;
- manager->tasks_running++;
- UNLOCK(&manager->lock);
+ UNLOCK(manager->locks[queue]);
+ atomic_fetch_add_explicit(&manager->tasks_ready, -1, memory_order_relaxed);
+ atomic_fetch_add_explicit(&manager->tasks_running, 1, memory_order_relaxed);
LOCK(&task->lock);
INSIST(task->state == task_state_ready);
task->state = task_state_running;
XTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
ISC_MSG_RUNNING, "running"));
+ XTRACE(task->name);
TIME_NOW(&task->tnow);
task->now = isc_time_seconds(&task->tnow);
do {
ISC_MSGSET_TASK,
ISC_MSG_EXECUTE,
"execute action"));
+ XTRACE(task->name);
if (event->ev_action != NULL) {
UNLOCK(&task->lock);
(event->ev_action)(
if (finished)
task_finished(task);
- LOCK(&manager->lock);
- manager->tasks_running--;
+ atomic_fetch_add_explicit(&manager->tasks_running, -1, memory_order_relaxed);
if (manager->exclusive_requested &&
manager->tasks_running == 1) {
SIGNAL(&manager->exclusive_granted);
manager->tasks_running == 0) {
SIGNAL(&manager->paused);
}
+ LOCK(manager->locks[queue]);
if (requeue) {
/*
* We know we're awake, so we don't have
* were usually nonempty, the 'optimization'
* might even hurt rather than help.
*/
- push_readyq(manager, task);
+ push_readyq(manager, task, queue);
}
}
* we're stuck. Automatically drop privileges at that
* point and continue with the regular ready queue.
*/
- if (manager->tasks_running == 0 && empty_readyq(manager)) {
- manager->mode = isc_taskmgrmode_normal;
- if (!empty_readyq(manager))
- BROADCAST(&manager->work_available);
+ if (manager->tasks_running == 0 && empty_readyq(manager, queue)) {
+ if (manager->mode != isc_taskmgrmode_normal) {
+ manager->mode = isc_taskmgrmode_normal;
+ for (unsigned i=0; i < manager->workers; i++) {
+ BROADCAST(&manager->work_available[i]);
+ }
+ }
}
}
-
- UNLOCK(&manager->lock);
+ UNLOCK(manager->locks[queue]);
}
+typedef struct st {
+ isc__taskmgr_t *manager;
+ int threadid;
+} stt;
+
static isc_threadresult_t
#ifdef _WIN32
WINAPI
#endif
run(void *uap) {
- isc__taskmgr_t *manager = uap;
+ stt *st = uap;
+ isc__taskmgr_t *manager = st->manager;
+ int threadid = st->threadid;
+ isc_mem_put(manager->mctx, st, sizeof(*st));
+ isc_thread_setaffinity(threadid);
XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
ISC_MSG_STARTING, "starting"));
- dispatch(manager);
+ dispatch(manager, threadid);
XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
ISC_MSG_EXITING, "exiting"));
isc_mem_t *mctx;
(void)isc_condition_destroy(&manager->exclusive_granted);
- (void)isc_condition_destroy(&manager->work_available);
+ (void)isc_condition_destroy(&manager->work_available[0]);
(void)isc_condition_destroy(&manager->paused);
isc_mem_free(manager->mctx, manager->threads);
DESTROYLOCK(&manager->lock);
}
manager->workers = 0;
+ manager->queues = 0;
manager->threads = isc_mem_allocate(mctx,
workers * sizeof(isc_thread_t));
if (manager->threads == NULL) {
result = ISC_R_NOMEMORY;
goto cleanup_lock;
}
- if (isc_condition_init(&manager->work_available) != ISC_R_SUCCESS) {
- UNEXPECTED_ERROR(__FILE__, __LINE__,
- "isc_condition_init() %s",
- isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
- ISC_MSG_FAILED, "failed"));
- result = ISC_R_UNEXPECTED;
- goto cleanup_threads;
- }
if (isc_condition_init(&manager->exclusive_granted) != ISC_R_SUCCESS) {
UNEXPECTED_ERROR(__FILE__, __LINE__,
"isc_condition_init() %s",
default_quantum = DEFAULT_DEFAULT_QUANTUM;
manager->default_quantum = default_quantum;
INIT_LIST(manager->tasks);
- INIT_LIST(manager->ready_tasks);
- INIT_LIST(manager->ready_priority_tasks);
+ manager->ready_tasks = malloc(workers * sizeof(isc__tasklist_t));
+ manager->locks = malloc(workers * sizeof(isc_mutex_t *));
+ manager->work_available = malloc(workers * sizeof(isc_condition_t));
+ manager->ready_priority_tasks = malloc(workers * sizeof(isc__tasklist_t));
manager->tasks_running = 0;
manager->tasks_ready = 0;
manager->exclusive_requested = false;
manager->pause_requested = false;
+ manager->curq = 0;
manager->exiting = false;
manager->excl = NULL;
* Start workers.
*/
for (i = 0; i < workers; i++) {
- if (isc_thread_create(run, manager,
- &manager->threads[manager->workers]) ==
- ISC_R_SUCCESS) {
- char name[16]; /* thread name limit on Linux */
- snprintf(name, sizeof(name), "isc-worker%04u", i);
- isc_thread_setname(manager->threads[manager->workers],
- name);
- manager->workers++;
- started++;
+ INIT_LIST(manager->ready_tasks[i]);
+ INIT_LIST(manager->ready_priority_tasks[i]);
+ manager->locks[i] = malloc(4096);
+ isc_mutex_init(manager->locks[i]);
+ if (isc_condition_init(&manager->work_available[i]) != ISC_R_SUCCESS) {
+ UNEXPECTED_ERROR(__FILE__, __LINE__,
+ "isc_condition_init() %s",
+ isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
+ ISC_MSG_FAILED, "failed"));
+ result = ISC_R_UNEXPECTED;
+ goto cleanup_threads;
+ }
+ stt *st = isc_mem_get(mctx, sizeof(stt));
+ st->manager = manager;
+ st->threadid = i;
+ if (isc_thread_create(run, st,
+ &manager->threads[i]) != ISC_R_SUCCESS) {
+ goto cleanup_threads;
}
+ char name[16]; /* thread name limit on Linux */
+ snprintf(name, sizeof(name), "isc-worker%04u", i);
+ isc_thread_setname(manager->threads[manager->workers],
+ name);
+ manager->workers++;
+ started++;
}
+ manager->queues = manager->workers;
UNLOCK(&manager->lock);
if (started == 0) {
cleanup_exclusivegranted:
(void)isc_condition_destroy(&manager->exclusive_granted);
cleanup_workavailable:
- (void)isc_condition_destroy(&manager->work_available);
+ (void)isc_condition_destroy(&manager->work_available[0]);
cleanup_threads:
isc_mem_free(mctx, manager->threads);
cleanup_lock:
task = NEXT(task, link)) {
LOCK(&task->lock);
if (task_shutdown(task))
- push_readyq(manager, task);
+ push_readyq(manager, task, 0);
UNLOCK(&task->lock);
}
/*
* there's work left to do, and if there are already no tasks left
* it will cause the workers to see manager->exiting.
*/
- BROADCAST(&manager->work_available);
+ for (i = 0; i < manager->queues; i++) {
+ LOCK(manager->locks[i]);
+ BROADCAST(&manager->work_available[i]);
+ UNLOCK(manager->locks[i]);
+ }
UNLOCK(&manager->lock);
/*
LOCK(&manager->lock);
if (manager->pause_requested) {
manager->pause_requested = false;
- BROADCAST(&manager->work_available);
+ BROADCAST(&manager->work_available[0]);
}
UNLOCK(&manager->lock);
}
LOCK(&manager->lock);
REQUIRE(manager->exclusive_requested);
manager->exclusive_requested = false;
- BROADCAST(&manager->work_available);
+ for (unsigned int i=0; i < manager->workers; i++) {
+ BROADCAST(&manager->work_available[i]);
+ }
UNLOCK(&manager->lock);
}
REQUIRE(ISCAPI_TASK_VALID(task0));
isc__task_t *task = (isc__task_t *)task0;
isc__taskmgr_t *manager = task->manager;
+ int queue = task->threadid % manager->queues;
bool oldpriv;
LOCK(&task->lock);
if (priv == oldpriv)
return;
- LOCK(&manager->lock);
+ LOCK(manager->locks[queue]);
if (priv && ISC_LINK_LINKED(task, ready_link))
- ENQUEUE(manager->ready_priority_tasks, task,
+ ENQUEUE(manager->ready_priority_tasks[queue], task,
ready_priority_link);
else if (!priv && ISC_LINK_LINKED(task, ready_priority_link))
- DEQUEUE(manager->ready_priority_tasks, task,
+ DEQUEUE(manager->ready_priority_tasks[queue], task,
ready_priority_link);
- UNLOCK(&manager->lock);
+ UNLOCK(manager->locks[queue]);
}
bool
TRY0(xmlTextWriterEndElement(writer)); /* default-quantum */
TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks-running"));
- TRY0(xmlTextWriterWriteFormatString(writer, "%d", mgr->tasks_running));
+ TRY0(xmlTextWriterWriteFormatString(writer, "%d",
+ (int) mgr->tasks_running));
TRY0(xmlTextWriterEndElement(writer)); /* tasks-running */
TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks-ready"));
- TRY0(xmlTextWriterWriteFormatString(writer, "%d", mgr->tasks_ready));
+ TRY0(xmlTextWriterWriteFormatString(writer, "%d",
+ (int) mgr->tasks_ready));
TRY0(xmlTextWriterEndElement(writer)); /* tasks-ready */
TRY0(xmlTextWriterEndElement(writer)); /* thread-model */