]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Separate structure for each thread/queue; 2-phase-locking for exclusive tasks
authorWitold Kręcicki <wpk@isc.org>
Mon, 22 Oct 2018 09:37:17 +0000 (09:37 +0000)
committerWitold Kręcicki <wpk@isc.org>
Wed, 24 Oct 2018 07:06:57 +0000 (07:06 +0000)
lib/isc/task.c

index e2cf9793f5710fcaafd11b7c86b427985bf9a684..d36143d32a187fad61c804de579f910244aaf50c 100644 (file)
 #include <openssl/err.h>
 #endif
 
-/*%
- * For BIND9 internal applications:
- * when built with threads we use multiple worker threads shared by the whole
- * application.
- * when built without threads we share a single global task manager and use
- * an integrated event loop for socket, timer, and other generic task events.
- * For generic library:
- * we don't use either of them: an application can have multiple task managers
- * whether or not it's threaded, and if the application is threaded each thread
- * is expected to have a separate manager; no "worker threads" are shared by
- * the application threads.
- */
 #ifdef ISC_TASK_TRACE
 #define XTRACE(m)              fprintf(stderr, "task %p thread %lu: %s\n", \
                                       task, isc_thread_self(), (m))
@@ -88,6 +76,7 @@ static const char *statenames[] = {
 
 typedef struct isc__task isc__task_t;
 typedef struct isc__taskmgr isc__taskmgr_t;
+typedef struct isc__taskqueue isc__taskqueue_t;
 
 struct isc__task {
        /* Not locked. */
@@ -124,32 +113,43 @@ struct isc__task {
 
 typedef ISC_LIST(isc__task_t)  isc__tasklist_t;
 
+struct isc__taskqueue {
+       isc__tasklist_t                 ready_tasks;
+       isc__tasklist_t                 ready_priority_tasks;
+       isc_condition_t                 work_available;
+       isc_mutex_t                     lock;
+       isc_thread_t                    thread;
+       unsigned int                    threadid;
+       isc__taskmgr_t                  *manager;
+};
+
 struct isc__taskmgr {
        /* Not locked. */
        isc_taskmgr_t                   common;
        isc_mem_t *                     mctx;
        isc_mutex_t                     lock;
-       isc_mutex_t                     **locks;
+       isc_mutex_t                     prehalt_lock;
+       isc_mutex_t                     posthalt_lock;
+       isc_condition_t                 halt_cond;
+       isc_condition_t                 halt_avail_cond;
        unsigned int                    workers;
-       unsigned int                    queues;
-       isc_thread_t *                  threads;
        atomic_uint_fast32_t            tasks_running;
        atomic_uint_fast32_t            tasks_ready;
        atomic_uint_fast32_t            curq;
+       isc__taskqueue_t                *queues;
 
        /* Locked by task manager lock. */
        unsigned int                    default_quantum;
        LIST(isc__task_t)               tasks;
-       isc__tasklist_t                 *ready_tasks;
-       isc__tasklist_t                 *ready_priority_tasks;
        isc_taskmgrmode_t               mode;
-       isc_condition_t                 *work_available;
-       isc_condition_t                 exclusive_granted;
-       isc_condition_t                 paused;
        bool                            pause_requested;
        bool                            exclusive_requested;
        bool                            exiting;
 
+       /* Locked by {pre/post}halt_lock combo */
+       unsigned int                    halted;
+
+
        /*
         * Multiple threads can read/write 'excl' at the same time, so we need
         * to protect the access.  We can't use 'lock' since isc_task_detach()
@@ -215,8 +215,8 @@ task_finished(isc__task_t *task) {
                 * any idle worker threads so they
                 * can exit.
                 */
-               for (unsigned int i=0; i<manager->queues; i++) {
-                       BROADCAST(&manager->work_available[i]);
+               for (unsigned int i=0; i<manager->workers; i++) {
+                       BROADCAST(&manager->queues[i].work_available);
                }
        }
        DESTROYLOCK(&task->lock);
@@ -242,7 +242,7 @@ isc_task_create(isc_taskmgr_t *manager0, unsigned int quantum,
                return (ISC_R_NOMEMORY);
        XTRACE("isc_task_create");
        task->manager = manager;
-       task->threadid = -1;
+       task->threadid = atomic_fetch_add_explicit(&manager->curq, 1, memory_order_relaxed) % manager->workers;
        result = isc_mutex_init(&task->lock);
        if (result != ISC_R_SUCCESS) {
                isc_mem_put(manager->mctx, task, sizeof(*task));
@@ -354,17 +354,16 @@ static inline void
 task_ready(isc__task_t *task) {
        isc__taskmgr_t *manager = task->manager;
        bool has_privilege = isc_task_privilege((isc_task_t *) task);
-       int queue = task->threadid % manager->queues;
 
        REQUIRE(VALID_MANAGER(manager));
        REQUIRE(task->state == task_state_ready);
 
        XTRACE("task_ready");
-       LOCK(manager->locks[queue]);
-       push_readyq(manager, task, queue);
+       LOCK(&manager->queues[task->threadid].lock);
+       push_readyq(manager, task, task->threadid);
        if (manager->mode == isc_taskmgrmode_normal || has_privilege)
-               SIGNAL(&manager->work_available[queue]);
-       UNLOCK(manager->locks[queue]);
+               SIGNAL(&manager->queues[task->threadid].work_available);
+       UNLOCK(&manager->queues[task->threadid].lock);
 }
 
 static inline bool
@@ -468,9 +467,6 @@ void
 isc_task_sendto(isc_task_t *task0, isc_event_t **eventp, int c) {
        isc__task_t *task = (isc__task_t *)task0;
        bool was_idle;
-       if (c == -1) {
-               c = atomic_fetch_add_explicit(&task->manager->curq, 1, memory_order_relaxed);
-       }
 
        /*
         * Send '*event' to 'task'.
@@ -478,6 +474,12 @@ isc_task_sendto(isc_task_t *task0, isc_event_t **eventp, int c) {
 
        REQUIRE(VALID_TASK(task));
 
+       if (c == -1) {
+               c = atomic_fetch_add_explicit(&task->manager->curq, 1,
+                                             memory_order_relaxed)
+                                             % task->manager->workers;
+       }
+
        XTRACE("isc_task_send");
 
        /*
@@ -523,7 +525,9 @@ isc_task_sendtoanddetach(isc_task_t **taskp, isc_event_t **eventp, int c) {
        task = (isc__task_t *)*taskp;
        REQUIRE(VALID_TASK(task));
        if (c == -1) {
-               c = atomic_fetch_add_explicit(&task->manager->curq, 1, memory_order_relaxed);
+               c = atomic_fetch_add_explicit(&task->manager->curq, 1,
+                                             memory_order_relaxed)
+                                             % task->manager->workers;
        }
 
        XTRACE("isc_task_sendanddetach");
@@ -857,9 +861,9 @@ empty_readyq(isc__taskmgr_t *manager, int c) {
        isc__tasklist_t queue;
 
        if (manager->mode == isc_taskmgrmode_normal)
-               queue = manager->ready_tasks[c];
+               queue = manager->queues[c].ready_tasks;
        else
-               queue = manager->ready_priority_tasks[c];
+               queue = manager->queues[c].ready_priority_tasks;
 
        return (EMPTY(queue));
 }
@@ -877,14 +881,14 @@ pop_readyq(isc__taskmgr_t *manager, int c) {
        isc__task_t *task;
 
        if (manager->mode == isc_taskmgrmode_normal)
-               task = HEAD(manager->ready_tasks[c]);
+               task = HEAD(manager->queues[c].ready_tasks);
        else
-               task = HEAD(manager->ready_priority_tasks[c]);
+               task = HEAD(manager->queues[c].ready_priority_tasks);
 
        if (task != NULL) {
-               DEQUEUE(manager->ready_tasks[c], task, ready_link);
+               DEQUEUE(manager->queues[c].ready_tasks, task, ready_link);
                if (ISC_LINK_LINKED(task, ready_priority_link))
-                       DEQUEUE(manager->ready_priority_tasks[c], task,
+                       DEQUEUE(manager->queues[c].ready_priority_tasks, task,
                                ready_priority_link);
        }
 
@@ -899,9 +903,9 @@ pop_readyq(isc__taskmgr_t *manager, int c) {
  */
 static inline void
 push_readyq(isc__taskmgr_t *manager, isc__task_t *task, int c) {
-       ENQUEUE(manager->ready_tasks[c], task, ready_link);
+       ENQUEUE(manager->queues[c].ready_tasks, task, ready_link);
        if ((task->flags & TASK_F_PRIVILEGED) != 0)
-               ENQUEUE(manager->ready_priority_tasks[c], task,
+               ENQUEUE(manager->queues[c].ready_priority_tasks, task,
                        ready_priority_link);
        atomic_fetch_add_explicit(&manager->tasks_ready, 1, memory_order_relaxed);
 }
@@ -916,8 +920,6 @@ dispatch(isc__taskmgr_t *manager, int threadid) {
        LOCK(&manager->lock);
        UNLOCK(&manager->lock);
 
-       int queue = threadid % manager->queues;
-
        /*
         * Again we're trying to hold the lock for as short a time as possible
         * and to do as little locking and unlocking as possible.
@@ -967,7 +969,7 @@ dispatch(isc__taskmgr_t *manager, int threadid) {
         * For N iterations of the loop, this code does N+1 locks and N+1
         * unlocks.  The while expression is always protected by the lock.
         */
-       LOCK(manager->locks[queue]);
+       LOCK(&manager->queues[threadid].lock);
 
        while (!FINISHED(manager)) {
                /*
@@ -980,8 +982,8 @@ dispatch(isc__taskmgr_t *manager, int threadid) {
                 * If a pause has been requested, don't do any work
                 * until it's been released.
                 */
-               while ((empty_readyq(manager, queue) || manager->pause_requested ||
-                       manager->exclusive_requested) && !FINISHED(manager))
+               while ((empty_readyq(manager, threadid) && !manager->pause_requested &&
+                       !manager->exclusive_requested) && !FINISHED(manager))
                {
                        XTHREADTRACE(isc_msgcat_get(isc_msgcat,
                                                    ISC_MSGSET_GENERAL,
@@ -992,7 +994,7 @@ dispatch(isc__taskmgr_t *manager, int threadid) {
                        XTHREADTRACE(isc_msgcat_get(isc_msgcat,
                                                    ISC_MSGSET_GENERAL,
                                                    ISC_MSG_WAIT, manager->exclusive_requested ? "excreq" : "notexcreq"));
-                       WAIT(&manager->work_available[queue], manager->locks[queue]);
+                       WAIT(&manager->queues[threadid].work_available, &manager->queues[threadid].lock);
                        XTHREADTRACE(isc_msgcat_get(isc_msgcat,
                                                    ISC_MSGSET_TASK,
                                                    ISC_MSG_AWAKE, "awake"));
@@ -1000,7 +1002,33 @@ dispatch(isc__taskmgr_t *manager, int threadid) {
                XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_TASK,
                                            ISC_MSG_WORKING, "working"));
 
-               task = pop_readyq(manager, queue);
+               if (manager->pause_requested || manager->exclusive_requested) {
+                       UNLOCK(&manager->queues[threadid].lock);
+                       XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_TASK,
+                                           ISC_MSG_WORKING, "halting"));
+
+                       /*
+                        * First we increase 'halted' and signal the thread
+                        * that's waiting on exclusivity/pause. Then we
+                        * try locking posthalt lock, which will be locked
+                        * by exclusive/pausing thread. It is only unlocked
+                        * after exclusivity/pause is done.
+                        */
+                       LOCK(&manager->prehalt_lock);
+                       manager->halted++;
+                       SIGNAL(&manager->halt_cond);
+                       UNLOCK(&manager->prehalt_lock);
+
+                       LOCK(&manager->posthalt_lock);
+                       manager->halted--;
+                       UNLOCK(&manager->posthalt_lock);
+
+                       LOCK(&manager->queues[threadid].lock);
+                       /* Restart the loop after */
+                       continue;
+               }
+
+               task = pop_readyq(manager, threadid);
                if (task != NULL) {
                        unsigned int dispatch_count = 0;
                        bool done = false;
@@ -1015,7 +1043,7 @@ dispatch(isc__taskmgr_t *manager, int threadid) {
                         * have a task to do.  We must reacquire the manager
                         * lock before exiting the 'if (task != NULL)' block.
                         */
-                       UNLOCK(manager->locks[queue]);
+                       UNLOCK(&manager->queues[threadid].lock);
                        atomic_fetch_add_explicit(&manager->tasks_ready, -1, memory_order_relaxed);
                        atomic_fetch_add_explicit(&manager->tasks_running, 1, memory_order_relaxed);
 
@@ -1132,14 +1160,7 @@ dispatch(isc__taskmgr_t *manager, int threadid) {
                                task_finished(task);
 
                        atomic_fetch_add_explicit(&manager->tasks_running, -1, memory_order_relaxed);
-                       if (manager->exclusive_requested &&
-                           manager->tasks_running == 1) {
-                               SIGNAL(&manager->exclusive_granted);
-                       } else if (manager->pause_requested &&
-                                  manager->tasks_running == 0) {
-                               SIGNAL(&manager->paused);
-                       }
-                       LOCK(manager->locks[queue]);
+                       LOCK(&manager->queues[threadid].lock);
                        if (requeue) {
                                /*
                                 * We know we're awake, so we don't have
@@ -1160,7 +1181,7 @@ dispatch(isc__taskmgr_t *manager, int threadid) {
                                 * were usually nonempty, the 'optimization'
                                 * might even hurt rather than help.
                                 */
-                               push_readyq(manager, task, queue);
+                               push_readyq(manager, task, threadid);
                        }
                }
 
@@ -1170,39 +1191,33 @@ dispatch(isc__taskmgr_t *manager, int threadid) {
                 * we're stuck.  Automatically drop privileges at that
                 * point and continue with the regular ready queue.
                 */
-               if (manager->tasks_running == 0 && empty_readyq(manager, queue)) {
+               if (manager->tasks_running == 0 && empty_readyq(manager, threadid)) {
                        manager->mode = isc_taskmgrmode_normal;
                        for (unsigned i=0; i < manager->workers; i++) {
-                               BROADCAST(&manager->work_available[i]);
+                               BROADCAST(&manager->queues[i].work_available);
                        }
                }
        }
-       UNLOCK(manager->locks[queue]);
+       UNLOCK(&manager->queues[threadid].lock);
        /*
         * There might be other dispatchers waiting on empty tasks,
         * wake them up.
         */
        for (unsigned i=0; i < manager->workers; i++) {
-               LOCK(manager->locks[i]);
-               BROADCAST(&manager->work_available[i]);
-               UNLOCK(manager->locks[i]);
+               LOCK(&manager->queues[i].lock);
+               BROADCAST(&manager->queues[i].work_available);
+               UNLOCK(&manager->queues[i].lock);
        }
 }
 
-typedef struct st {
-       isc__taskmgr_t *manager;
-       int threadid;
-} stt;
-
 static isc_threadresult_t
 #ifdef _WIN32
 WINAPI
 #endif
-run(void *uap) {
-       stt *st = uap;
-       isc__taskmgr_t *manager = st->manager;
-       int threadid = st->threadid;
-       isc_mem_put(manager->mctx, st, sizeof(*st));
+run(void *queuep) {
+       isc__taskqueue_t *tq = queuep;
+       isc__taskmgr_t *manager = tq->manager;
+       int threadid = tq->threadid;
        isc_thread_setaffinity(threadid);
 
        XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
@@ -1222,19 +1237,15 @@ run(void *uap) {
 
 static void
 manager_free(isc__taskmgr_t *manager) {
-       isc_mem_t *mctx;
+       /* TODO */
 
-       (void)isc_condition_destroy(&manager->exclusive_granted);
-       (void)isc_condition_destroy(&manager->work_available[0]);
-       (void)isc_condition_destroy(&manager->paused);
-       isc_mem_free(manager->mctx, manager->threads);
        DESTROYLOCK(&manager->lock);
-       DESTROYLOCK(&manager->excl_lock);
+       DESTROYLOCK(&manager->prehalt_lock);
+       DESTROYLOCK(&manager->posthalt_lock);
+       isc_mem_put(manager->mctx, manager->queues, manager->workers * sizeof(isc__taskqueue_t));
        manager->common.impmagic = 0;
        manager->common.magic = 0;
-       mctx = manager->mctx;
-       isc_mem_put(mctx, manager, sizeof(*manager));
-       isc_mem_detach(&mctx);
+       isc_mem_putanddetach(&manager->mctx, manager, sizeof(*manager));
 }
 
 isc_result_t
@@ -1242,7 +1253,7 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
                    unsigned int default_quantum, isc_taskmgr_t **managerp)
 {
        isc_result_t result;
-       unsigned int i, started = 0;
+       unsigned int i;
        isc__taskmgr_t *manager;
 
        /*
@@ -1268,45 +1279,26 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
                goto cleanup_mgr;
        }
 
-       manager->workers = 0;
-       manager->queues = 0;
-       manager->threads = isc_mem_allocate(mctx,
-                                           workers * sizeof(isc_thread_t));
-       if (manager->threads == NULL) {
-               result = ISC_R_NOMEMORY;
-               goto cleanup_lock;
-       }
-       if (isc_condition_init(&manager->exclusive_granted) != ISC_R_SUCCESS) {
-               UNEXPECTED_ERROR(__FILE__, __LINE__,
-                                "isc_condition_init() %s",
-                                isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
-                                               ISC_MSG_FAILED, "failed"));
-               result = ISC_R_UNEXPECTED;
-               goto cleanup_workavailable;
-       }
-       if (isc_condition_init(&manager->paused) != ISC_R_SUCCESS) {
-               UNEXPECTED_ERROR(__FILE__, __LINE__,
-                                "isc_condition_init() %s",
-                                isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
-                                               ISC_MSG_FAILED, "failed"));
-               result = ISC_R_UNEXPECTED;
-               goto cleanup_exclusivegranted;
-       }
+       RUNTIME_CHECK(isc_mutex_init(&manager->prehalt_lock) == ISC_R_SUCCESS);
+       RUNTIME_CHECK(isc_mutex_init(&manager->posthalt_lock) == ISC_R_SUCCESS);
+
+       RUNTIME_CHECK(isc_condition_init(&manager->halt_cond) == ISC_R_SUCCESS);
+
+       manager->workers = workers;
+
        if (default_quantum == 0)
                default_quantum = DEFAULT_DEFAULT_QUANTUM;
        manager->default_quantum = default_quantum;
        INIT_LIST(manager->tasks);
-       manager->ready_tasks = malloc(workers * sizeof(isc__tasklist_t));
-       manager->locks = malloc(workers * sizeof(isc_mutex_t *));
-       manager->work_available = malloc(workers * sizeof(isc_condition_t));
-       manager->ready_priority_tasks = malloc(workers * sizeof(isc__tasklist_t));
+       manager->queues = isc_mem_get(mctx, workers * sizeof(isc__taskqueue_t));
        manager->tasks_running = 0;
        manager->tasks_ready = 0;
-       manager->exclusive_requested = false;
-       manager->pause_requested = false;
        manager->curq = 0;
        manager->exiting = false;
        manager->excl = NULL;
+       manager->halted = 0;
+       manager->exclusive_requested = false;
+       manager->pause_requested = false;
 
        isc_mem_attach(mctx, &manager->mctx);
 
@@ -1315,53 +1307,30 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
         * Start workers.
         */
        for (i = 0; i < workers; i++) {
-               INIT_LIST(manager->ready_tasks[i]);
-               INIT_LIST(manager->ready_priority_tasks[i]);
-               manager->locks[i] = malloc(4096);
-               isc_mutex_init(manager->locks[i]);
-               if (isc_condition_init(&manager->work_available[i]) != ISC_R_SUCCESS) {
-                       UNEXPECTED_ERROR(__FILE__, __LINE__,
-                                        "isc_condition_init() %s",
-                                        isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
-                                                       ISC_MSG_FAILED, "failed"));
-                       result = ISC_R_UNEXPECTED;
-                       goto cleanup_threads;
-               }
-               stt *st = isc_mem_get(mctx, sizeof(stt));
-               st->manager = manager;
-               st->threadid = i;
-               if (isc_thread_create(run, st,
-                                     &manager->threads[i]) != ISC_R_SUCCESS) {
-                       goto cleanup_threads;
-               }
-               char name[16];  /* thread name limit on Linux */
+               INIT_LIST(manager->queues[i].ready_tasks);
+               INIT_LIST(manager->queues[i].ready_priority_tasks);
+               RUNTIME_CHECK(isc_mutex_init(&manager->queues[i].lock)
+                             == ISC_R_SUCCESS);
+               RUNTIME_CHECK(isc_condition_init(
+                                          &manager->queues[i].work_available)
+                             == ISC_R_SUCCESS);
+               manager->queues[i].manager = manager;
+               manager->queues[i].threadid = i;
+               RUNTIME_CHECK(isc_thread_create(run, &manager->queues[i],
+                                               &manager->queues[i].thread)
+                             == ISC_R_SUCCESS);
+               char name[16];
                snprintf(name, sizeof(name), "isc-worker%04u", i);
-               isc_thread_setname(manager->threads[manager->workers],
-                                  name);
-               manager->workers++;
-               started++;
+               isc_thread_setname(manager->queues[i].thread, name);
        }
-       manager->queues = manager->workers;
        UNLOCK(&manager->lock);
 
-       if (started == 0) {
-               manager_free(manager);
-               return (ISC_R_NOTHREADS);
-       }
        isc_thread_setconcurrency(workers);
 
        *managerp = (isc_taskmgr_t *)manager;
 
        return (ISC_R_SUCCESS);
 
- cleanup_exclusivegranted:
-       (void)isc_condition_destroy(&manager->exclusive_granted);
- cleanup_workavailable:
-       (void)isc_condition_destroy(&manager->work_available[0]);
- cleanup_threads:
-       isc_mem_free(mctx, manager->threads);
- cleanup_lock:
-       DESTROYLOCK(&manager->lock);
  cleanup_mgr:
        isc_mem_put(mctx, manager, sizeof(*manager));
        return (result);
@@ -1429,8 +1398,7 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) {
             task = NEXT(task, link)) {
                LOCK(&task->lock);
                if (task_shutdown(task)) {
-                       int queue = task->threadid % manager->queues;
-                       push_readyq(manager, task, queue);
+                       push_readyq(manager, task, task->threadid);
                }
                UNLOCK(&task->lock);
        }
@@ -1439,10 +1407,10 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) {
         * there's work left to do, and if there are already no tasks left
         * it will cause the workers to see manager->exiting.
         */
-       for (i = 0; i < manager->queues; i++) {
-               LOCK(manager->locks[i]);
-               BROADCAST(&manager->work_available[i]);
-               UNLOCK(manager->locks[i]);
+       for (i = 0; i < manager->workers; i++) {
+               LOCK(&manager->queues[i].lock);
+               BROADCAST(&manager->queues[i].work_available);
+               UNLOCK(&manager->queues[i].lock);
        }
        UNLOCK(&manager->lock);
 
@@ -1450,7 +1418,7 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) {
         * Wait for all the worker threads to exit.
         */
        for (i = 0; i < manager->workers; i++)
-               (void)isc_thread_join(manager->threads[i], NULL);
+               (void)isc_thread_join(manager->queues[i].thread, NULL);
 
        manager_free(manager);
 
@@ -1479,24 +1447,33 @@ isc_taskmgr_mode(isc_taskmgr_t *manager0) {
 void
 isc__taskmgr_pause(isc_taskmgr_t *manager0) {
        isc__taskmgr_t *manager = (isc__taskmgr_t *)manager0;
+       unsigned int i;
+
+       LOCK(&manager->posthalt_lock);
+       while (manager->exclusive_requested || manager->pause_requested) {
+               UNLOCK(&manager->posthalt_lock);
+               /* This is ugly but pause is used EXCLUSIVELY in tests */
+               isc_thread_yield();
+               LOCK(&manager->posthalt_lock);
+       }
        manager->pause_requested = true;
-       LOCK(&manager->lock);
-       while (manager->tasks_running > 0) {
-               WAIT(&manager->paused, &manager->lock);
+       LOCK(&manager->prehalt_lock);
+       while (manager->halted < manager->workers) {
+               for (i = 0; i < manager->workers; i++) {
+                       BROADCAST(&manager->queues[i].work_available);
+               }
+               WAIT(&manager->halt_cond, &manager->prehalt_lock);
        }
-       UNLOCK(&manager->lock);
+       UNLOCK(&manager->prehalt_lock);
 }
 
 void
 isc__taskmgr_resume(isc_taskmgr_t *manager0) {
        isc__taskmgr_t *manager = (isc__taskmgr_t *)manager0;
-
-       LOCK(&manager->lock);
        if (manager->pause_requested) {
                manager->pause_requested = false;
-               BROADCAST(&manager->work_available[0]);
+               UNLOCK(&manager->posthalt_lock);
        }
-       UNLOCK(&manager->lock);
 }
 
 void
@@ -1535,24 +1512,32 @@ isc_result_t
 isc_task_beginexclusive(isc_task_t *task0) {
        isc__task_t *task = (isc__task_t *)task0;
        isc__taskmgr_t *manager = task->manager;
+       unsigned int i;
+
        REQUIRE(VALID_TASK(task));
 
        REQUIRE(task->state == task_state_running);
+
 /*
  *  TODO REQUIRE(task == task->manager->excl);
  *  it should be here, it fails on shutdown server->task
  */
 
-       LOCK(&manager->lock);
-       if (manager->exclusive_requested) {
-               UNLOCK(&manager->lock);
+       if (manager->exclusive_requested || manager->pause_requested) {
                return (ISC_R_LOCKBUSY);
        }
+
+       LOCK(&manager->posthalt_lock);
+       INSIST(!manager->exclusive_requested && !manager->pause_requested);
        manager->exclusive_requested = true;
-       while (manager->tasks_running > 1) {
-               WAIT(&manager->exclusive_granted, &manager->lock);
+       LOCK(&manager->prehalt_lock);
+       while (manager->halted + 1 < manager->workers) {
+               for (i = 0; i < manager->workers; i++) {
+                       BROADCAST(&manager->queues[i].work_available);
+               }
+               WAIT(&manager->halt_cond, &manager->prehalt_lock);
        }
-       UNLOCK(&manager->lock);
+       UNLOCK(&manager->prehalt_lock);
        return (ISC_R_SUCCESS);
 }
 
@@ -1563,13 +1548,9 @@ isc_task_endexclusive(isc_task_t *task0) {
 
        REQUIRE(VALID_TASK(task));
        REQUIRE(task->state == task_state_running);
-       LOCK(&manager->lock);
        REQUIRE(manager->exclusive_requested);
        manager->exclusive_requested = false;
-       for (unsigned int i=0; i < manager->workers; i++) {
-               BROADCAST(&manager->work_available[i]);
-       }
-       UNLOCK(&manager->lock);
+       UNLOCK(&manager->posthalt_lock);
 }
 
 void
@@ -1577,7 +1558,6 @@ isc_task_setprivilege(isc_task_t *task0, bool priv) {
        REQUIRE(ISCAPI_TASK_VALID(task0));
        isc__task_t *task = (isc__task_t *)task0;
        isc__taskmgr_t *manager = task->manager;
-       int queue = task->threadid % manager->queues;
        bool oldpriv;
 
        LOCK(&task->lock);
@@ -1591,14 +1571,14 @@ isc_task_setprivilege(isc_task_t *task0, bool priv) {
        if (priv == oldpriv)
                return;
 
-       LOCK(manager->locks[queue]);
+       LOCK(&manager->queues[task->threadid].lock);
        if (priv && ISC_LINK_LINKED(task, ready_link))
-               ENQUEUE(manager->ready_priority_tasks[queue], task,
-                       ready_priority_link);
+               ENQUEUE(manager->queues[task->threadid].ready_priority_tasks,
+                       task, ready_priority_link);
        else if (!priv && ISC_LINK_LINKED(task, ready_priority_link))
-               DEQUEUE(manager->ready_priority_tasks[queue], task,
-                       ready_priority_link);
-       UNLOCK(manager->locks[queue]);
+               DEQUEUE(manager->queues[task->threadid].ready_priority_tasks,
+                       task, ready_priority_link);
+       UNLOCK(&manager->queues[task->threadid].lock);
 }
 
 bool