From: Witold Kręcicki Date: Thu, 11 Oct 2018 13:39:04 +0000 (+0000) Subject: Multiple worker queues X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=87e4e442d7da915ab5c99de44ab30a3158c5314d;p=thirdparty%2Fbind9.git Multiple worker queues --- diff --git a/lib/isc/include/isc/task.h b/lib/isc/include/isc/task.h index c1bc09047ac..7746f1c12dd 100644 --- a/lib/isc/include/isc/task.h +++ b/lib/isc/include/isc/task.h @@ -208,6 +208,9 @@ isc_task_detach(isc_task_t **taskp); void isc_task_send(isc_task_t *task, isc_event_t **eventp); + +void +isc_task_sendto(isc_task_t *task, isc_event_t **eventp, int c); /*%< * Send '*event' to 'task'. * @@ -221,6 +224,9 @@ isc_task_send(isc_task_t *task, isc_event_t **eventp); *\li *eventp == NULL. */ +void +isc_task_sendtoanddetach(isc_task_t **taskp, isc_event_t **eventp, int c); + void isc_task_sendanddetach(isc_task_t **taskp, isc_event_t **eventp); /*%< diff --git a/lib/isc/task.c b/lib/isc/task.c index 1d03e791273..0338e0f7439 100644 --- a/lib/isc/task.c +++ b/lib/isc/task.c @@ -21,6 +21,7 @@ #include #include +#include #include #include #include @@ -31,6 +32,7 @@ #include #include #include +#include #include #include #include @@ -104,6 +106,7 @@ struct isc__task { isc_time_t tnow; char name[16]; void * tag; + int threadid; /* Locked by task manager lock. */ LINK(isc__task_t) link; LINK(isc__task_t) ready_link; @@ -126,22 +129,26 @@ struct isc__taskmgr { isc_taskmgr_t common; isc_mem_t * mctx; isc_mutex_t lock; + isc_mutex_t **locks; unsigned int workers; + unsigned int queues; isc_thread_t * threads; + atomic_uint_fast32_t tasks_running; + atomic_uint_fast32_t tasks_ready; + atomic_uint_fast32_t curq; + /* Locked by task manager lock. */ unsigned int default_quantum; LIST(isc__task_t) tasks; - isc__tasklist_t ready_tasks; - isc__tasklist_t ready_priority_tasks; + isc__tasklist_t *ready_tasks; + isc__tasklist_t *ready_priority_tasks; isc_taskmgrmode_t mode; - isc_condition_t work_available; + isc_condition_t *work_available; isc_condition_t exclusive_granted; isc_condition_t paused; - unsigned int tasks_running; - unsigned int tasks_ready; - bool pause_requested; - bool exclusive_requested; - bool exiting; + bool pause_requested; + bool exclusive_requested; + bool exiting; /* * Multiple threads can read/write 'excl' at the same time, so we need @@ -175,13 +182,13 @@ isc_taskmgr_setexcltask(isc_taskmgr_t *mgr0, isc_task_t *task0); isc_result_t isc_taskmgr_excltask(isc_taskmgr_t *mgr0, isc_task_t **taskp); static inline bool -empty_readyq(isc__taskmgr_t *manager); +empty_readyq(isc__taskmgr_t *manager, int c); static inline isc__task_t * -pop_readyq(isc__taskmgr_t *manager); +pop_readyq(isc__taskmgr_t *manager, int c); static inline void -push_readyq(isc__taskmgr_t *manager, isc__task_t *task); +push_readyq(isc__taskmgr_t *manager, isc__task_t *task, int c); /*** *** Tasks. @@ -190,7 +197,6 @@ push_readyq(isc__taskmgr_t *manager, isc__task_t *task); static void task_finished(isc__task_t *task) { isc__taskmgr_t *manager = task->manager; - REQUIRE(EMPTY(task->events)); REQUIRE(task->nevents == 0); REQUIRE(EMPTY(task->on_shutdown)); @@ -201,6 +207,7 @@ task_finished(isc__task_t *task) { LOCK(&manager->lock); UNLINK(manager->tasks, task, link); + UNLOCK(&manager->lock); if (FINISHED(manager)) { /* * All tasks have completed and the @@ -208,10 +215,10 @@ task_finished(isc__task_t *task) { * any idle worker threads so they * can exit. */ - BROADCAST(&manager->work_available); + for (unsigned int i=0; iqueues; i++) { + BROADCAST(&manager->work_available[i]); + } } - UNLOCK(&manager->lock); - DESTROYLOCK(&task->lock); task->common.impmagic = 0; task->common.magic = 0; @@ -235,6 +242,7 @@ isc_task_create(isc_taskmgr_t *manager0, unsigned int quantum, return (ISC_R_NOMEMORY); XTRACE("isc_task_create"); task->manager = manager; + task->threadid = -1; result = isc_mutex_init(&task->lock); if (result != ISC_R_SUCCESS) { isc_mem_put(manager->mctx, task, sizeof(*task)); @@ -346,17 +354,17 @@ static inline void task_ready(isc__task_t *task) { isc__taskmgr_t *manager = task->manager; bool has_privilege = isc_task_privilege((isc_task_t *) task); + int queue = task->threadid % manager->queues; REQUIRE(VALID_MANAGER(manager)); REQUIRE(task->state == task_state_ready); XTRACE("task_ready"); - - LOCK(&manager->lock); - push_readyq(manager, task); + LOCK(manager->locks[queue]); + push_readyq(manager, task, queue); if (manager->mode == isc_taskmgrmode_normal || has_privilege) - SIGNAL(&manager->work_available); - UNLOCK(&manager->lock); + SIGNAL(&manager->work_available[queue]); + UNLOCK(manager->locks[queue]); } static inline bool @@ -414,7 +422,7 @@ isc_task_detach(isc_task_t **taskp) { } static inline bool -task_send(isc__task_t *task, isc_event_t **eventp) { +task_send(isc__task_t *task, isc_event_t **eventp, int c) { bool was_idle = false; isc_event_t *event; @@ -433,6 +441,7 @@ task_send(isc__task_t *task, isc_event_t **eventp) { if (task->state == task_state_idle) { was_idle = true; + task->threadid = c; INSIST(EMPTY(task->events)); task->state = task_state_ready; } @@ -447,8 +456,21 @@ task_send(isc__task_t *task, isc_event_t **eventp) { void isc_task_send(isc_task_t *task0, isc_event_t **eventp) { + isc_task_sendto(task0, eventp, -1); +} + +void +isc_task_sendanddetach(isc_task_t **taskp, isc_event_t **eventp) { + isc_task_sendtoanddetach(taskp, eventp, -1); +} + +void +isc_task_sendto(isc_task_t *task0, isc_event_t **eventp, int c) { isc__task_t *task = (isc__task_t *)task0; bool was_idle; + if (c == -1) { + c = atomic_fetch_add_explicit(&task->manager->curq, 1, memory_order_relaxed); + } /* * Send '*event' to 'task'. @@ -464,7 +486,7 @@ isc_task_send(isc_task_t *task0, isc_event_t **eventp) { * some processing is deferred until after the lock is released. */ LOCK(&task->lock); - was_idle = task_send(task, eventp); + was_idle = task_send(task, eventp, c); UNLOCK(&task->lock); if (was_idle) { @@ -488,7 +510,7 @@ isc_task_send(isc_task_t *task0, isc_event_t **eventp) { } void -isc_task_sendanddetach(isc_task_t **taskp, isc_event_t **eventp) { +isc_task_sendtoanddetach(isc_task_t **taskp, isc_event_t **eventp, int c) { bool idle1, idle2; isc__task_t *task; @@ -500,11 +522,13 @@ isc_task_sendanddetach(isc_task_t **taskp, isc_event_t **eventp) { REQUIRE(taskp != NULL); task = (isc__task_t *)*taskp; REQUIRE(VALID_TASK(task)); + if (c == -1) { + c = atomic_fetch_add_explicit(&task->manager->curq, 1, memory_order_relaxed); + } XTRACE("isc_task_sendanddetach"); - LOCK(&task->lock); - idle1 = task_send(task, eventp); + idle1 = task_send(task, eventp, c); idle2 = task_detach(task); UNLOCK(&task->lock); @@ -829,13 +853,13 @@ isc_task_getcurrenttimex(isc_task_t *task0, isc_time_t *t) { * Caller must hold the task manager lock. */ static inline bool -empty_readyq(isc__taskmgr_t *manager) { +empty_readyq(isc__taskmgr_t *manager, int c) { isc__tasklist_t queue; if (manager->mode == isc_taskmgrmode_normal) - queue = manager->ready_tasks; + queue = manager->ready_tasks[c]; else - queue = manager->ready_priority_tasks; + queue = manager->ready_priority_tasks[c]; return (EMPTY(queue)); } @@ -849,18 +873,18 @@ empty_readyq(isc__taskmgr_t *manager) { * Caller must hold the task manager lock. */ static inline isc__task_t * -pop_readyq(isc__taskmgr_t *manager) { +pop_readyq(isc__taskmgr_t *manager, int c) { isc__task_t *task; if (manager->mode == isc_taskmgrmode_normal) - task = HEAD(manager->ready_tasks); + task = HEAD(manager->ready_tasks[c]); else - task = HEAD(manager->ready_priority_tasks); + task = HEAD(manager->ready_priority_tasks[c]); if (task != NULL) { - DEQUEUE(manager->ready_tasks, task, ready_link); + DEQUEUE(manager->ready_tasks[c], task, ready_link); if (ISC_LINK_LINKED(task, ready_priority_link)) - DEQUEUE(manager->ready_priority_tasks, task, + DEQUEUE(manager->ready_priority_tasks[c], task, ready_priority_link); } @@ -874,20 +898,26 @@ pop_readyq(isc__taskmgr_t *manager) { * Caller must hold the task manager lock. */ static inline void -push_readyq(isc__taskmgr_t *manager, isc__task_t *task) { - ENQUEUE(manager->ready_tasks, task, ready_link); +push_readyq(isc__taskmgr_t *manager, isc__task_t *task, int c) { + ENQUEUE(manager->ready_tasks[c], task, ready_link); if ((task->flags & TASK_F_PRIVILEGED) != 0) - ENQUEUE(manager->ready_priority_tasks, task, + ENQUEUE(manager->ready_priority_tasks[c], task, ready_priority_link); - manager->tasks_ready++; + atomic_fetch_add_explicit(&manager->tasks_ready, 1, memory_order_relaxed); } static void -dispatch(isc__taskmgr_t *manager) { +dispatch(isc__taskmgr_t *manager, int threadid) { isc__task_t *task; REQUIRE(VALID_MANAGER(manager)); + /* Wait for everything to initialize */ + LOCK(&manager->lock); + UNLOCK(&manager->lock); + + int queue = threadid % manager->queues; + /* * Again we're trying to hold the lock for as short a time as possible * and to do as little locking and unlocking as possible. @@ -937,8 +967,7 @@ dispatch(isc__taskmgr_t *manager) { * For N iterations of the loop, this code does N+1 locks and N+1 * unlocks. The while expression is always protected by the lock. */ - - LOCK(&manager->lock); + LOCK(manager->locks[queue]); while (!FINISHED(manager)) { /* @@ -951,13 +980,19 @@ dispatch(isc__taskmgr_t *manager) { * If a pause has been requested, don't do any work * until it's been released. */ - while ((empty_readyq(manager) || manager->pause_requested || + while ((empty_readyq(manager, queue) || manager->pause_requested || manager->exclusive_requested) && !FINISHED(manager)) { XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_WAIT, "wait")); - WAIT(&manager->work_available, &manager->lock); + XTHREADTRACE(isc_msgcat_get(isc_msgcat, + ISC_MSGSET_GENERAL, + ISC_MSG_WAIT, manager->pause_requested ? "paused" : "notpaused")); + XTHREADTRACE(isc_msgcat_get(isc_msgcat, + ISC_MSGSET_GENERAL, + ISC_MSG_WAIT, manager->exclusive_requested ? "excreq" : "notexcreq")); + WAIT(&manager->work_available[queue], manager->locks[queue]); XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_TASK, ISC_MSG_AWAKE, "awake")); @@ -965,7 +1000,7 @@ dispatch(isc__taskmgr_t *manager) { XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_TASK, ISC_MSG_WORKING, "working")); - task = pop_readyq(manager); + task = pop_readyq(manager, queue); if (task != NULL) { unsigned int dispatch_count = 0; bool done = false; @@ -980,15 +1015,16 @@ dispatch(isc__taskmgr_t *manager) { * have a task to do. We must reacquire the manager * lock before exiting the 'if (task != NULL)' block. */ - manager->tasks_ready--; - manager->tasks_running++; - UNLOCK(&manager->lock); + UNLOCK(manager->locks[queue]); + atomic_fetch_add_explicit(&manager->tasks_ready, -1, memory_order_relaxed); + atomic_fetch_add_explicit(&manager->tasks_running, 1, memory_order_relaxed); LOCK(&task->lock); INSIST(task->state == task_state_ready); task->state = task_state_running; XTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_RUNNING, "running")); + XTRACE(task->name); TIME_NOW(&task->tnow); task->now = isc_time_seconds(&task->tnow); do { @@ -1004,6 +1040,7 @@ dispatch(isc__taskmgr_t *manager) { ISC_MSGSET_TASK, ISC_MSG_EXECUTE, "execute action")); + XTRACE(task->name); if (event->ev_action != NULL) { UNLOCK(&task->lock); (event->ev_action)( @@ -1094,8 +1131,7 @@ dispatch(isc__taskmgr_t *manager) { if (finished) task_finished(task); - LOCK(&manager->lock); - manager->tasks_running--; + atomic_fetch_add_explicit(&manager->tasks_running, -1, memory_order_relaxed); if (manager->exclusive_requested && manager->tasks_running == 1) { SIGNAL(&manager->exclusive_granted); @@ -1103,6 +1139,7 @@ dispatch(isc__taskmgr_t *manager) { manager->tasks_running == 0) { SIGNAL(&manager->paused); } + LOCK(manager->locks[queue]); if (requeue) { /* * We know we're awake, so we don't have @@ -1123,7 +1160,7 @@ dispatch(isc__taskmgr_t *manager) { * were usually nonempty, the 'optimization' * might even hurt rather than help. */ - push_readyq(manager, task); + push_readyq(manager, task, queue); } } @@ -1133,27 +1170,38 @@ dispatch(isc__taskmgr_t *manager) { * we're stuck. Automatically drop privileges at that * point and continue with the regular ready queue. */ - if (manager->tasks_running == 0 && empty_readyq(manager)) { - manager->mode = isc_taskmgrmode_normal; - if (!empty_readyq(manager)) - BROADCAST(&manager->work_available); + if (manager->tasks_running == 0 && empty_readyq(manager, queue)) { + if (manager->mode != isc_taskmgrmode_normal) { + manager->mode = isc_taskmgrmode_normal; + for (unsigned i=0; i < manager->workers; i++) { + BROADCAST(&manager->work_available[i]); + } + } } } - - UNLOCK(&manager->lock); + UNLOCK(manager->locks[queue]); } +typedef struct st { + isc__taskmgr_t *manager; + int threadid; +} stt; + static isc_threadresult_t #ifdef _WIN32 WINAPI #endif run(void *uap) { - isc__taskmgr_t *manager = uap; + stt *st = uap; + isc__taskmgr_t *manager = st->manager; + int threadid = st->threadid; + isc_mem_put(manager->mctx, st, sizeof(*st)); + isc_thread_setaffinity(threadid); XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_STARTING, "starting")); - dispatch(manager); + dispatch(manager, threadid); XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_EXITING, "exiting")); @@ -1170,7 +1218,7 @@ manager_free(isc__taskmgr_t *manager) { isc_mem_t *mctx; (void)isc_condition_destroy(&manager->exclusive_granted); - (void)isc_condition_destroy(&manager->work_available); + (void)isc_condition_destroy(&manager->work_available[0]); (void)isc_condition_destroy(&manager->paused); isc_mem_free(manager->mctx, manager->threads); DESTROYLOCK(&manager->lock); @@ -1214,20 +1262,13 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, } manager->workers = 0; + manager->queues = 0; manager->threads = isc_mem_allocate(mctx, workers * sizeof(isc_thread_t)); if (manager->threads == NULL) { result = ISC_R_NOMEMORY; goto cleanup_lock; } - if (isc_condition_init(&manager->work_available) != ISC_R_SUCCESS) { - UNEXPECTED_ERROR(__FILE__, __LINE__, - "isc_condition_init() %s", - isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, - ISC_MSG_FAILED, "failed")); - result = ISC_R_UNEXPECTED; - goto cleanup_threads; - } if (isc_condition_init(&manager->exclusive_granted) != ISC_R_SUCCESS) { UNEXPECTED_ERROR(__FILE__, __LINE__, "isc_condition_init() %s", @@ -1248,12 +1289,15 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, default_quantum = DEFAULT_DEFAULT_QUANTUM; manager->default_quantum = default_quantum; INIT_LIST(manager->tasks); - INIT_LIST(manager->ready_tasks); - INIT_LIST(manager->ready_priority_tasks); + manager->ready_tasks = malloc(workers * sizeof(isc__tasklist_t)); + manager->locks = malloc(workers * sizeof(isc_mutex_t *)); + manager->work_available = malloc(workers * sizeof(isc_condition_t)); + manager->ready_priority_tasks = malloc(workers * sizeof(isc__tasklist_t)); manager->tasks_running = 0; manager->tasks_ready = 0; manager->exclusive_requested = false; manager->pause_requested = false; + manager->curq = 0; manager->exiting = false; manager->excl = NULL; @@ -1264,17 +1308,33 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, * Start workers. */ for (i = 0; i < workers; i++) { - if (isc_thread_create(run, manager, - &manager->threads[manager->workers]) == - ISC_R_SUCCESS) { - char name[16]; /* thread name limit on Linux */ - snprintf(name, sizeof(name), "isc-worker%04u", i); - isc_thread_setname(manager->threads[manager->workers], - name); - manager->workers++; - started++; + INIT_LIST(manager->ready_tasks[i]); + INIT_LIST(manager->ready_priority_tasks[i]); + manager->locks[i] = malloc(4096); + isc_mutex_init(manager->locks[i]); + if (isc_condition_init(&manager->work_available[i]) != ISC_R_SUCCESS) { + UNEXPECTED_ERROR(__FILE__, __LINE__, + "isc_condition_init() %s", + isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, + ISC_MSG_FAILED, "failed")); + result = ISC_R_UNEXPECTED; + goto cleanup_threads; + } + stt *st = isc_mem_get(mctx, sizeof(stt)); + st->manager = manager; + st->threadid = i; + if (isc_thread_create(run, st, + &manager->threads[i]) != ISC_R_SUCCESS) { + goto cleanup_threads; } + char name[16]; /* thread name limit on Linux */ + snprintf(name, sizeof(name), "isc-worker%04u", i); + isc_thread_setname(manager->threads[manager->workers], + name); + manager->workers++; + started++; } + manager->queues = manager->workers; UNLOCK(&manager->lock); if (started == 0) { @@ -1290,7 +1350,7 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, cleanup_exclusivegranted: (void)isc_condition_destroy(&manager->exclusive_granted); cleanup_workavailable: - (void)isc_condition_destroy(&manager->work_available); + (void)isc_condition_destroy(&manager->work_available[0]); cleanup_threads: isc_mem_free(mctx, manager->threads); cleanup_lock: @@ -1362,7 +1422,7 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) { task = NEXT(task, link)) { LOCK(&task->lock); if (task_shutdown(task)) - push_readyq(manager, task); + push_readyq(manager, task, 0); UNLOCK(&task->lock); } /* @@ -1370,7 +1430,11 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) { * there's work left to do, and if there are already no tasks left * it will cause the workers to see manager->exiting. */ - BROADCAST(&manager->work_available); + for (i = 0; i < manager->queues; i++) { + LOCK(manager->locks[i]); + BROADCAST(&manager->work_available[i]); + UNLOCK(manager->locks[i]); + } UNLOCK(&manager->lock); /* @@ -1421,7 +1485,7 @@ isc__taskmgr_resume(isc_taskmgr_t *manager0) { LOCK(&manager->lock); if (manager->pause_requested) { manager->pause_requested = false; - BROADCAST(&manager->work_available); + BROADCAST(&manager->work_available[0]); } UNLOCK(&manager->lock); } @@ -1493,7 +1557,9 @@ isc_task_endexclusive(isc_task_t *task0) { LOCK(&manager->lock); REQUIRE(manager->exclusive_requested); manager->exclusive_requested = false; - BROADCAST(&manager->work_available); + for (unsigned int i=0; i < manager->workers; i++) { + BROADCAST(&manager->work_available[i]); + } UNLOCK(&manager->lock); } @@ -1502,6 +1568,7 @@ isc_task_setprivilege(isc_task_t *task0, bool priv) { REQUIRE(ISCAPI_TASK_VALID(task0)); isc__task_t *task = (isc__task_t *)task0; isc__taskmgr_t *manager = task->manager; + int queue = task->threadid % manager->queues; bool oldpriv; LOCK(&task->lock); @@ -1515,14 +1582,14 @@ isc_task_setprivilege(isc_task_t *task0, bool priv) { if (priv == oldpriv) return; - LOCK(&manager->lock); + LOCK(manager->locks[queue]); if (priv && ISC_LINK_LINKED(task, ready_link)) - ENQUEUE(manager->ready_priority_tasks, task, + ENQUEUE(manager->ready_priority_tasks[queue], task, ready_priority_link); else if (!priv && ISC_LINK_LINKED(task, ready_priority_link)) - DEQUEUE(manager->ready_priority_tasks, task, + DEQUEUE(manager->ready_priority_tasks[queue], task, ready_priority_link); - UNLOCK(&manager->lock); + UNLOCK(manager->locks[queue]); } bool @@ -1575,11 +1642,13 @@ isc_taskmgr_renderxml(isc_taskmgr_t *mgr0, xmlTextWriterPtr writer) { TRY0(xmlTextWriterEndElement(writer)); /* default-quantum */ TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks-running")); - TRY0(xmlTextWriterWriteFormatString(writer, "%d", mgr->tasks_running)); + TRY0(xmlTextWriterWriteFormatString(writer, "%d", + (int) mgr->tasks_running)); TRY0(xmlTextWriterEndElement(writer)); /* tasks-running */ TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks-ready")); - TRY0(xmlTextWriterWriteFormatString(writer, "%d", mgr->tasks_ready)); + TRY0(xmlTextWriterWriteFormatString(writer, "%d", + (int) mgr->tasks_ready)); TRY0(xmlTextWriterEndElement(writer)); /* tasks-ready */ TRY0(xmlTextWriterEndElement(writer)); /* thread-model */ diff --git a/lib/isc/win32/libisc.def.in b/lib/isc/win32/libisc.def.in index ea25fc3f2d3..2761affad60 100644 --- a/lib/isc/win32/libisc.def.in +++ b/lib/isc/win32/libisc.def.in @@ -629,6 +629,8 @@ isc_task_purgeevent isc_task_purgerange isc_task_send isc_task_sendanddetach +isc_task_sendto +isc_task_sendtoanddetach isc_task_setname isc_task_setprivilege isc_task_shutdown