#include <openssl/err.h>
#endif
-/*%
- * For BIND9 internal applications:
- * when built with threads we use multiple worker threads shared by the whole
- * application.
- * when built without threads we share a single global task manager and use
- * an integrated event loop for socket, timer, and other generic task events.
- * For generic library:
- * we don't use either of them: an application can have multiple task managers
- * whether or not it's threaded, and if the application is threaded each thread
- * is expected to have a separate manager; no "worker threads" are shared by
- * the application threads.
- */
#ifdef ISC_TASK_TRACE
#define XTRACE(m) fprintf(stderr, "task %p thread %lu: %s\n", \
task, isc_thread_self(), (m))
typedef struct isc__task isc__task_t;
typedef struct isc__taskmgr isc__taskmgr_t;
+typedef struct isc__taskqueue isc__taskqueue_t;
struct isc__task {
/* Not locked. */
typedef ISC_LIST(isc__task_t) isc__tasklist_t;
+struct isc__taskqueue {
+ isc__tasklist_t ready_tasks;
+ isc__tasklist_t ready_priority_tasks;
+ isc_condition_t work_available;
+ isc_mutex_t lock;
+ isc_thread_t thread;
+ unsigned int threadid;
+ isc__taskmgr_t *manager;
+};
+
struct isc__taskmgr {
/* Not locked. */
isc_taskmgr_t common;
isc_mem_t * mctx;
isc_mutex_t lock;
- isc_mutex_t **locks;
+ isc_mutex_t prehalt_lock;
+ isc_mutex_t posthalt_lock;
+ isc_condition_t halt_cond;
+ isc_condition_t halt_avail_cond;
unsigned int workers;
- unsigned int queues;
- isc_thread_t * threads;
atomic_uint_fast32_t tasks_running;
atomic_uint_fast32_t tasks_ready;
atomic_uint_fast32_t curq;
+ isc__taskqueue_t *queues;
/* Locked by task manager lock. */
unsigned int default_quantum;
LIST(isc__task_t) tasks;
- isc__tasklist_t *ready_tasks;
- isc__tasklist_t *ready_priority_tasks;
isc_taskmgrmode_t mode;
- isc_condition_t *work_available;
- isc_condition_t exclusive_granted;
- isc_condition_t paused;
bool pause_requested;
bool exclusive_requested;
bool exiting;
+ /* Locked by {pre/post}halt_lock combo */
+ unsigned int halted;
+
+
/*
* Multiple threads can read/write 'excl' at the same time, so we need
* to protect the access. We can't use 'lock' since isc_task_detach()
* any idle worker threads so they
* can exit.
*/
- for (unsigned int i=0; i<manager->queues; i++) {
- BROADCAST(&manager->work_available[i]);
+ for (unsigned int i=0; i<manager->workers; i++) {
+ BROADCAST(&manager->queues[i].work_available);
}
}
DESTROYLOCK(&task->lock);
return (ISC_R_NOMEMORY);
XTRACE("isc_task_create");
task->manager = manager;
- task->threadid = -1;
+ task->threadid = atomic_fetch_add_explicit(&manager->curq, 1, memory_order_relaxed) % manager->workers;
result = isc_mutex_init(&task->lock);
if (result != ISC_R_SUCCESS) {
isc_mem_put(manager->mctx, task, sizeof(*task));
task_ready(isc__task_t *task) {
isc__taskmgr_t *manager = task->manager;
bool has_privilege = isc_task_privilege((isc_task_t *) task);
- int queue = task->threadid % manager->queues;
REQUIRE(VALID_MANAGER(manager));
REQUIRE(task->state == task_state_ready);
XTRACE("task_ready");
- LOCK(manager->locks[queue]);
- push_readyq(manager, task, queue);
+ LOCK(&manager->queues[task->threadid].lock);
+ push_readyq(manager, task, task->threadid);
if (manager->mode == isc_taskmgrmode_normal || has_privilege)
- SIGNAL(&manager->work_available[queue]);
- UNLOCK(manager->locks[queue]);
+ SIGNAL(&manager->queues[task->threadid].work_available);
+ UNLOCK(&manager->queues[task->threadid].lock);
}
static inline bool
isc_task_sendto(isc_task_t *task0, isc_event_t **eventp, int c) {
isc__task_t *task = (isc__task_t *)task0;
bool was_idle;
- if (c == -1) {
- c = atomic_fetch_add_explicit(&task->manager->curq, 1, memory_order_relaxed);
- }
/*
* Send '*event' to 'task'.
REQUIRE(VALID_TASK(task));
+ if (c == -1) {
+ c = atomic_fetch_add_explicit(&task->manager->curq, 1,
+ memory_order_relaxed)
+ % task->manager->workers;
+ }
+
XTRACE("isc_task_send");
/*
task = (isc__task_t *)*taskp;
REQUIRE(VALID_TASK(task));
if (c == -1) {
- c = atomic_fetch_add_explicit(&task->manager->curq, 1, memory_order_relaxed);
+ c = atomic_fetch_add_explicit(&task->manager->curq, 1,
+ memory_order_relaxed)
+ % task->manager->workers;
}
XTRACE("isc_task_sendanddetach");
isc__tasklist_t queue;
if (manager->mode == isc_taskmgrmode_normal)
- queue = manager->ready_tasks[c];
+ queue = manager->queues[c].ready_tasks;
else
- queue = manager->ready_priority_tasks[c];
+ queue = manager->queues[c].ready_priority_tasks;
return (EMPTY(queue));
}
isc__task_t *task;
if (manager->mode == isc_taskmgrmode_normal)
- task = HEAD(manager->ready_tasks[c]);
+ task = HEAD(manager->queues[c].ready_tasks);
else
- task = HEAD(manager->ready_priority_tasks[c]);
+ task = HEAD(manager->queues[c].ready_priority_tasks);
if (task != NULL) {
- DEQUEUE(manager->ready_tasks[c], task, ready_link);
+ DEQUEUE(manager->queues[c].ready_tasks, task, ready_link);
if (ISC_LINK_LINKED(task, ready_priority_link))
- DEQUEUE(manager->ready_priority_tasks[c], task,
+ DEQUEUE(manager->queues[c].ready_priority_tasks, task,
ready_priority_link);
}
*/
static inline void
push_readyq(isc__taskmgr_t *manager, isc__task_t *task, int c) {
- ENQUEUE(manager->ready_tasks[c], task, ready_link);
+ ENQUEUE(manager->queues[c].ready_tasks, task, ready_link);
if ((task->flags & TASK_F_PRIVILEGED) != 0)
- ENQUEUE(manager->ready_priority_tasks[c], task,
+ ENQUEUE(manager->queues[c].ready_priority_tasks, task,
ready_priority_link);
atomic_fetch_add_explicit(&manager->tasks_ready, 1, memory_order_relaxed);
}
LOCK(&manager->lock);
UNLOCK(&manager->lock);
- int queue = threadid % manager->queues;
-
/*
* Again we're trying to hold the lock for as short a time as possible
* and to do as little locking and unlocking as possible.
* For N iterations of the loop, this code does N+1 locks and N+1
* unlocks. The while expression is always protected by the lock.
*/
- LOCK(manager->locks[queue]);
+ LOCK(&manager->queues[threadid].lock);
while (!FINISHED(manager)) {
/*
* If a pause has been requested, don't do any work
* until it's been released.
*/
- while ((empty_readyq(manager, queue) || manager->pause_requested ||
- manager->exclusive_requested) && !FINISHED(manager))
+ while ((empty_readyq(manager, threadid) && !manager->pause_requested &&
+ !manager->exclusive_requested) && !FINISHED(manager))
{
XTHREADTRACE(isc_msgcat_get(isc_msgcat,
ISC_MSGSET_GENERAL,
XTHREADTRACE(isc_msgcat_get(isc_msgcat,
ISC_MSGSET_GENERAL,
ISC_MSG_WAIT, manager->exclusive_requested ? "excreq" : "notexcreq"));
- WAIT(&manager->work_available[queue], manager->locks[queue]);
+ WAIT(&manager->queues[threadid].work_available, &manager->queues[threadid].lock);
XTHREADTRACE(isc_msgcat_get(isc_msgcat,
ISC_MSGSET_TASK,
ISC_MSG_AWAKE, "awake"));
XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_TASK,
ISC_MSG_WORKING, "working"));
- task = pop_readyq(manager, queue);
+ if (manager->pause_requested || manager->exclusive_requested) {
+ UNLOCK(&manager->queues[threadid].lock);
+ XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_TASK,
+ ISC_MSG_WORKING, "halting"));
+
+ /*
+ * First we increase 'halted' and signal the thread
+ * that's waiting on exclusivity/pause. Then we
+ * try locking posthalt lock, which will be locked
+ * by exclusive/pausing thread. It is only unlocked
+ * after exclusivity/pause is done.
+ */
+ LOCK(&manager->prehalt_lock);
+ manager->halted++;
+ SIGNAL(&manager->halt_cond);
+ UNLOCK(&manager->prehalt_lock);
+
+ LOCK(&manager->posthalt_lock);
+ manager->halted--;
+ UNLOCK(&manager->posthalt_lock);
+
+ LOCK(&manager->queues[threadid].lock);
+ /* Restart the loop after */
+ continue;
+ }
+
+ task = pop_readyq(manager, threadid);
if (task != NULL) {
unsigned int dispatch_count = 0;
bool done = false;
* have a task to do. We must reacquire the manager
* lock before exiting the 'if (task != NULL)' block.
*/
- UNLOCK(manager->locks[queue]);
+ UNLOCK(&manager->queues[threadid].lock);
atomic_fetch_add_explicit(&manager->tasks_ready, -1, memory_order_relaxed);
atomic_fetch_add_explicit(&manager->tasks_running, 1, memory_order_relaxed);
task_finished(task);
atomic_fetch_add_explicit(&manager->tasks_running, -1, memory_order_relaxed);
- if (manager->exclusive_requested &&
- manager->tasks_running == 1) {
- SIGNAL(&manager->exclusive_granted);
- } else if (manager->pause_requested &&
- manager->tasks_running == 0) {
- SIGNAL(&manager->paused);
- }
- LOCK(manager->locks[queue]);
+ LOCK(&manager->queues[threadid].lock);
if (requeue) {
/*
* We know we're awake, so we don't have
* were usually nonempty, the 'optimization'
* might even hurt rather than help.
*/
- push_readyq(manager, task, queue);
+ push_readyq(manager, task, threadid);
}
}
* we're stuck. Automatically drop privileges at that
* point and continue with the regular ready queue.
*/
- if (manager->tasks_running == 0 && empty_readyq(manager, queue)) {
+ if (manager->tasks_running == 0 && empty_readyq(manager, threadid)) {
manager->mode = isc_taskmgrmode_normal;
for (unsigned i=0; i < manager->workers; i++) {
- BROADCAST(&manager->work_available[i]);
+ BROADCAST(&manager->queues[i].work_available);
}
}
}
- UNLOCK(manager->locks[queue]);
+ UNLOCK(&manager->queues[threadid].lock);
/*
* There might be other dispatchers waiting on empty tasks,
* wake them up.
*/
for (unsigned i=0; i < manager->workers; i++) {
- LOCK(manager->locks[i]);
- BROADCAST(&manager->work_available[i]);
- UNLOCK(manager->locks[i]);
+ LOCK(&manager->queues[i].lock);
+ BROADCAST(&manager->queues[i].work_available);
+ UNLOCK(&manager->queues[i].lock);
}
}
-typedef struct st {
- isc__taskmgr_t *manager;
- int threadid;
-} stt;
-
static isc_threadresult_t
#ifdef _WIN32
WINAPI
#endif
-run(void *uap) {
- stt *st = uap;
- isc__taskmgr_t *manager = st->manager;
- int threadid = st->threadid;
- isc_mem_put(manager->mctx, st, sizeof(*st));
+run(void *queuep) {
+ isc__taskqueue_t *tq = queuep;
+ isc__taskmgr_t *manager = tq->manager;
+ int threadid = tq->threadid;
isc_thread_setaffinity(threadid);
XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
static void
manager_free(isc__taskmgr_t *manager) {
- isc_mem_t *mctx;
+ /* TODO */
- (void)isc_condition_destroy(&manager->exclusive_granted);
- (void)isc_condition_destroy(&manager->work_available[0]);
- (void)isc_condition_destroy(&manager->paused);
- isc_mem_free(manager->mctx, manager->threads);
DESTROYLOCK(&manager->lock);
- DESTROYLOCK(&manager->excl_lock);
+ DESTROYLOCK(&manager->prehalt_lock);
+ DESTROYLOCK(&manager->posthalt_lock);
+ isc_mem_put(manager->mctx, manager->queues, manager->workers * sizeof(isc__taskqueue_t));
manager->common.impmagic = 0;
manager->common.magic = 0;
- mctx = manager->mctx;
- isc_mem_put(mctx, manager, sizeof(*manager));
- isc_mem_detach(&mctx);
+ isc_mem_putanddetach(&manager->mctx, manager, sizeof(*manager));
}
isc_result_t
unsigned int default_quantum, isc_taskmgr_t **managerp)
{
isc_result_t result;
- unsigned int i, started = 0;
+ unsigned int i;
isc__taskmgr_t *manager;
/*
goto cleanup_mgr;
}
- manager->workers = 0;
- manager->queues = 0;
- manager->threads = isc_mem_allocate(mctx,
- workers * sizeof(isc_thread_t));
- if (manager->threads == NULL) {
- result = ISC_R_NOMEMORY;
- goto cleanup_lock;
- }
- if (isc_condition_init(&manager->exclusive_granted) != ISC_R_SUCCESS) {
- UNEXPECTED_ERROR(__FILE__, __LINE__,
- "isc_condition_init() %s",
- isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
- ISC_MSG_FAILED, "failed"));
- result = ISC_R_UNEXPECTED;
- goto cleanup_workavailable;
- }
- if (isc_condition_init(&manager->paused) != ISC_R_SUCCESS) {
- UNEXPECTED_ERROR(__FILE__, __LINE__,
- "isc_condition_init() %s",
- isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
- ISC_MSG_FAILED, "failed"));
- result = ISC_R_UNEXPECTED;
- goto cleanup_exclusivegranted;
- }
+ RUNTIME_CHECK(isc_mutex_init(&manager->prehalt_lock) == ISC_R_SUCCESS);
+ RUNTIME_CHECK(isc_mutex_init(&manager->posthalt_lock) == ISC_R_SUCCESS);
+
+ RUNTIME_CHECK(isc_condition_init(&manager->halt_cond) == ISC_R_SUCCESS);
+
+ manager->workers = workers;
+
if (default_quantum == 0)
default_quantum = DEFAULT_DEFAULT_QUANTUM;
manager->default_quantum = default_quantum;
INIT_LIST(manager->tasks);
- manager->ready_tasks = malloc(workers * sizeof(isc__tasklist_t));
- manager->locks = malloc(workers * sizeof(isc_mutex_t *));
- manager->work_available = malloc(workers * sizeof(isc_condition_t));
- manager->ready_priority_tasks = malloc(workers * sizeof(isc__tasklist_t));
+ manager->queues = isc_mem_get(mctx, workers * sizeof(isc__taskqueue_t));
manager->tasks_running = 0;
manager->tasks_ready = 0;
- manager->exclusive_requested = false;
- manager->pause_requested = false;
manager->curq = 0;
manager->exiting = false;
manager->excl = NULL;
+ manager->halted = 0;
+ manager->exclusive_requested = false;
+ manager->pause_requested = false;
isc_mem_attach(mctx, &manager->mctx);
* Start workers.
*/
for (i = 0; i < workers; i++) {
- INIT_LIST(manager->ready_tasks[i]);
- INIT_LIST(manager->ready_priority_tasks[i]);
- manager->locks[i] = malloc(4096);
- isc_mutex_init(manager->locks[i]);
- if (isc_condition_init(&manager->work_available[i]) != ISC_R_SUCCESS) {
- UNEXPECTED_ERROR(__FILE__, __LINE__,
- "isc_condition_init() %s",
- isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
- ISC_MSG_FAILED, "failed"));
- result = ISC_R_UNEXPECTED;
- goto cleanup_threads;
- }
- stt *st = isc_mem_get(mctx, sizeof(stt));
- st->manager = manager;
- st->threadid = i;
- if (isc_thread_create(run, st,
- &manager->threads[i]) != ISC_R_SUCCESS) {
- goto cleanup_threads;
- }
- char name[16]; /* thread name limit on Linux */
+ INIT_LIST(manager->queues[i].ready_tasks);
+ INIT_LIST(manager->queues[i].ready_priority_tasks);
+ RUNTIME_CHECK(isc_mutex_init(&manager->queues[i].lock)
+ == ISC_R_SUCCESS);
+ RUNTIME_CHECK(isc_condition_init(
+ &manager->queues[i].work_available)
+ == ISC_R_SUCCESS);
+ manager->queues[i].manager = manager;
+ manager->queues[i].threadid = i;
+ RUNTIME_CHECK(isc_thread_create(run, &manager->queues[i],
+ &manager->queues[i].thread)
+ == ISC_R_SUCCESS);
+ char name[16];
snprintf(name, sizeof(name), "isc-worker%04u", i);
- isc_thread_setname(manager->threads[manager->workers],
- name);
- manager->workers++;
- started++;
+ isc_thread_setname(manager->queues[i].thread, name);
}
- manager->queues = manager->workers;
UNLOCK(&manager->lock);
- if (started == 0) {
- manager_free(manager);
- return (ISC_R_NOTHREADS);
- }
isc_thread_setconcurrency(workers);
*managerp = (isc_taskmgr_t *)manager;
return (ISC_R_SUCCESS);
- cleanup_exclusivegranted:
- (void)isc_condition_destroy(&manager->exclusive_granted);
- cleanup_workavailable:
- (void)isc_condition_destroy(&manager->work_available[0]);
- cleanup_threads:
- isc_mem_free(mctx, manager->threads);
- cleanup_lock:
- DESTROYLOCK(&manager->lock);
cleanup_mgr:
isc_mem_put(mctx, manager, sizeof(*manager));
return (result);
task = NEXT(task, link)) {
LOCK(&task->lock);
if (task_shutdown(task)) {
- int queue = task->threadid % manager->queues;
- push_readyq(manager, task, queue);
+ push_readyq(manager, task, task->threadid);
}
UNLOCK(&task->lock);
}
* there's work left to do, and if there are already no tasks left
* it will cause the workers to see manager->exiting.
*/
- for (i = 0; i < manager->queues; i++) {
- LOCK(manager->locks[i]);
- BROADCAST(&manager->work_available[i]);
- UNLOCK(manager->locks[i]);
+ for (i = 0; i < manager->workers; i++) {
+ LOCK(&manager->queues[i].lock);
+ BROADCAST(&manager->queues[i].work_available);
+ UNLOCK(&manager->queues[i].lock);
}
UNLOCK(&manager->lock);
* Wait for all the worker threads to exit.
*/
for (i = 0; i < manager->workers; i++)
- (void)isc_thread_join(manager->threads[i], NULL);
+ (void)isc_thread_join(manager->queues[i].thread, NULL);
manager_free(manager);
void
isc__taskmgr_pause(isc_taskmgr_t *manager0) {
isc__taskmgr_t *manager = (isc__taskmgr_t *)manager0;
+ unsigned int i;
+
+ LOCK(&manager->posthalt_lock);
+ while (manager->exclusive_requested || manager->pause_requested) {
+ UNLOCK(&manager->posthalt_lock);
+ /* This is ugly but pause is used EXCLUSIVELY in tests */
+ isc_thread_yield();
+ LOCK(&manager->posthalt_lock);
+ }
manager->pause_requested = true;
- LOCK(&manager->lock);
- while (manager->tasks_running > 0) {
- WAIT(&manager->paused, &manager->lock);
+ LOCK(&manager->prehalt_lock);
+ while (manager->halted < manager->workers) {
+ for (i = 0; i < manager->workers; i++) {
+ BROADCAST(&manager->queues[i].work_available);
+ }
+ WAIT(&manager->halt_cond, &manager->prehalt_lock);
}
- UNLOCK(&manager->lock);
+ UNLOCK(&manager->prehalt_lock);
}
void
isc__taskmgr_resume(isc_taskmgr_t *manager0) {
isc__taskmgr_t *manager = (isc__taskmgr_t *)manager0;
-
- LOCK(&manager->lock);
if (manager->pause_requested) {
manager->pause_requested = false;
- BROADCAST(&manager->work_available[0]);
+ UNLOCK(&manager->posthalt_lock);
}
- UNLOCK(&manager->lock);
}
void
isc_task_beginexclusive(isc_task_t *task0) {
isc__task_t *task = (isc__task_t *)task0;
isc__taskmgr_t *manager = task->manager;
+ unsigned int i;
+
REQUIRE(VALID_TASK(task));
REQUIRE(task->state == task_state_running);
+
/*
* TODO REQUIRE(task == task->manager->excl);
* it should be here, it fails on shutdown server->task
*/
- LOCK(&manager->lock);
- if (manager->exclusive_requested) {
- UNLOCK(&manager->lock);
+ if (manager->exclusive_requested || manager->pause_requested) {
return (ISC_R_LOCKBUSY);
}
+
+ LOCK(&manager->posthalt_lock);
+ INSIST(!manager->exclusive_requested && !manager->pause_requested);
manager->exclusive_requested = true;
- while (manager->tasks_running > 1) {
- WAIT(&manager->exclusive_granted, &manager->lock);
+ LOCK(&manager->prehalt_lock);
+ while (manager->halted + 1 < manager->workers) {
+ for (i = 0; i < manager->workers; i++) {
+ BROADCAST(&manager->queues[i].work_available);
+ }
+ WAIT(&manager->halt_cond, &manager->prehalt_lock);
}
- UNLOCK(&manager->lock);
+ UNLOCK(&manager->prehalt_lock);
return (ISC_R_SUCCESS);
}
REQUIRE(VALID_TASK(task));
REQUIRE(task->state == task_state_running);
- LOCK(&manager->lock);
REQUIRE(manager->exclusive_requested);
manager->exclusive_requested = false;
- for (unsigned int i=0; i < manager->workers; i++) {
- BROADCAST(&manager->work_available[i]);
- }
- UNLOCK(&manager->lock);
+ UNLOCK(&manager->posthalt_lock);
}
void
REQUIRE(ISCAPI_TASK_VALID(task0));
isc__task_t *task = (isc__task_t *)task0;
isc__taskmgr_t *manager = task->manager;
- int queue = task->threadid % manager->queues;
bool oldpriv;
LOCK(&task->lock);
if (priv == oldpriv)
return;
- LOCK(manager->locks[queue]);
+ LOCK(&manager->queues[task->threadid].lock);
if (priv && ISC_LINK_LINKED(task, ready_link))
- ENQUEUE(manager->ready_priority_tasks[queue], task,
- ready_priority_link);
+ ENQUEUE(manager->queues[task->threadid].ready_priority_tasks,
+ task, ready_priority_link);
else if (!priv && ISC_LINK_LINKED(task, ready_priority_link))
- DEQUEUE(manager->ready_priority_tasks[queue], task,
- ready_priority_link);
- UNLOCK(manager->locks[queue]);
+ DEQUEUE(manager->queues[task->threadid].ready_priority_tasks,
+ task, ready_priority_link);
+ UNLOCK(&manager->queues[task->threadid].lock);
}
bool