]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
PoC: a separate worker for greedy tasks in taskmgr wpk-taskmgr-greedy-worker
authorWitold Kręcicki <wpk@isc.org>
Mon, 26 Nov 2018 13:11:48 +0000 (13:11 +0000)
committerWitold Kręcicki <wpk@isc.org>
Mon, 26 Nov 2018 13:23:36 +0000 (13:23 +0000)
lib/isc/include/isc/task.h
lib/isc/task.c

index b2d9c4838460b256481932db37056ac598bdd6d0..1434c59d4df431720e2f52367a22343ea28bd961 100644 (file)
@@ -87,6 +87,8 @@
 #define ISC_TASKEVENT_TEST             (ISC_EVENTCLASS_TASK + 1)
 #define ISC_TASKEVENT_LASTEVENT                (ISC_EVENTCLASS_TASK + 65535)
 
+#define ISC_TASKCPU_NONE               -1
+#define ISC_TASKCPU_GREEDY             -2
 /*****
  ***** Tasks.
  *****/
index a51d3359abfcc42137126362420c4bc8ea46b898..dd88712c4617738f2f7a1bcbab591190a7c3c966 100644 (file)
@@ -145,6 +145,7 @@ struct isc__taskmgr {
        isc_mutex_t                     halt_lock;
        isc_condition_t                 halt_cond;
        unsigned int                    workers;
+       unsigned int                    greedy_worker;
        atomic_uint_fast32_t            tasks_running;
        atomic_uint_fast32_t            tasks_ready;
        atomic_uint_fast32_t            curq;
@@ -209,7 +210,7 @@ wake_all_queues(isc__taskmgr_t *manager);
 
 static inline void
 wake_all_queues(isc__taskmgr_t *manager) {
-       for (unsigned int i = 0; i < manager->workers; i++) {
+       for (unsigned int i = 0; i < manager->workers+1; i++) {
                LOCK(&manager->queues[i].lock);
                BROADCAST(&manager->queues[i].work_available);
                UNLOCK(&manager->queues[i].lock);
@@ -249,7 +250,7 @@ isc_result_t
 isc_task_create(isc_taskmgr_t *manager0, unsigned int quantum,
                isc_task_t **taskp)
 {
-       return (isc_task_create_bound(manager0, quantum, taskp, -1));
+       return (isc_task_create_bound(manager0, quantum, taskp, ISC_TASKCPU_NONE));
 }
 
 isc_result_t
@@ -269,7 +270,7 @@ isc_task_create_bound(isc_taskmgr_t *manager0, unsigned int quantum,
        XTRACE("isc_task_create");
        task->manager = manager;
 
-       if (threadid == -1) {
+       if (threadid == ISC_TASKCPU_NONE) {
                /*
                 * Task is not pinned to a queue, it's threadid will be
                 * choosen when first task will be sent to it - either
@@ -277,6 +278,13 @@ isc_task_create_bound(isc_taskmgr_t *manager0, unsigned int quantum,
                 */
                task->bound = false;
                task->threadid = 0;
+       } else if (threadid == ISC_TASKCPU_GREEDY) {
+               /*
+                * Task is greedy - it will be pinned to a special
+                * queue for greedy tasks
+                */
+               task->bound = true;
+               task->threadid = manager->greedy_worker;
        } else {
                /*
                 * Task is pinned to a queue, it'll always be run
@@ -494,12 +502,12 @@ task_send(isc__task_t *task, isc_event_t **eventp, int c) {
 
 void
 isc_task_send(isc_task_t *task0, isc_event_t **eventp) {
-       isc_task_sendto(task0, eventp, -1);
+       isc_task_sendto(task0, eventp, ISC_TASKCPU_NONE);
 }
 
 void
 isc_task_sendanddetach(isc_task_t **taskp, isc_event_t **eventp) {
-       isc_task_sendtoanddetach(taskp, eventp, -1);
+       isc_task_sendtoanddetach(taskp, eventp, ISC_TASKCPU_NONE);
 }
 
 void
@@ -517,11 +525,14 @@ isc_task_sendto(isc_task_t *task0, isc_event_t **eventp, int c) {
        /* If task is bound ignore provided cpu. */
        if (task->bound) {
                c = task->threadid;
-       } else if (c < 0) {
+               c %= task->manager->workers;
+       } else if (c == ISC_TASKCPU_GREEDY) {
+               c = task->manager->greedy_worker;
+       } else if (c == ISC_TASKCPU_NONE) {
                c = atomic_fetch_add_explicit(&task->manager->curq, 1,
                                              memory_order_relaxed);
+               c %= task->manager->workers;
        }
-       c %= task->manager->workers;
 
        /*
         * We're trying hard to hold locks for as short a time as possible.
@@ -569,11 +580,14 @@ isc_task_sendtoanddetach(isc_task_t **taskp, isc_event_t **eventp, int c) {
 
        if (task->bound) {
                c = task->threadid;
-       } else if (c < 0) {
+               c %= task->manager->workers;
+       } else if (c == ISC_TASKCPU_GREEDY) {
+               c = task->manager->greedy_worker;
+       } else if (c == ISC_TASKCPU_NONE) {
                c = atomic_fetch_add_explicit(&task->manager->curq, 1,
                                              memory_order_relaxed);
+               c %= task->manager->workers;
        }
-       c %= task->manager->workers;
 
        LOCK(&task->lock);
        idle1 = task_send(task, eventp, c);
@@ -1279,7 +1293,7 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
                        {
                                bool empty = true;
                                unsigned int i;
-                               for (i = 0; i < manager->workers && empty; i++)
+                               for (i = 0; i < manager->workers+1 && empty; i++)
                                {
                                        LOCK(&manager->queues[i].lock);
                                        empty &= empty_readyq(manager, i);
@@ -1309,8 +1323,11 @@ WINAPI
 run(void *queuep) {
        isc__taskqueue_t *tq = queuep;
        isc__taskmgr_t *manager = tq->manager;
-       int threadid = tq->threadid;
-       isc_thread_setaffinity(threadid);
+       unsigned int threadid = tq->threadid;
+       /* greedy worker is not pinned */
+       if (threadid != manager->greedy_worker) {
+               isc_thread_setaffinity(threadid);
+       }
 
        XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
                                    ISC_MSG_STARTING, "starting"));
@@ -1329,13 +1346,13 @@ run(void *queuep) {
 
 static void
 manager_free(isc__taskmgr_t *manager) {
-       for (unsigned int i = 0; i < manager->workers; i++) {
+       for (unsigned int i = 0; i < manager->workers+1; i++) {
                isc_mutex_destroy(&manager->queues[i].lock);
        }
        isc_mutex_destroy(&manager->lock);
        isc_mutex_destroy(&manager->halt_lock);
        isc_mem_put(manager->mctx, manager->queues,
-                   manager->workers * sizeof(isc__taskqueue_t));
+                   (manager->workers+1) * sizeof(isc__taskqueue_t));
        manager->common.impmagic = 0;
        manager->common.magic = 0;
        isc_mem_putanddetach(&manager->mctx, manager, sizeof(*manager));
@@ -1374,7 +1391,7 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
        }
        manager->default_quantum = default_quantum;
        INIT_LIST(manager->tasks);
-       manager->queues = isc_mem_get(mctx, workers * sizeof(isc__taskqueue_t));
+       manager->queues = isc_mem_get(mctx, (workers+1) * sizeof(isc__taskqueue_t));
        RUNTIME_CHECK(manager->queues != NULL);
 
        manager->tasks_running = 0;
@@ -1392,24 +1409,31 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
        /*
         * Start workers.
         */
-       for (i = 0; i < workers; i++) {
+       for (i = 0; i < workers+1; i++) {
+               char name[16];
                INIT_LIST(manager->queues[i].ready_tasks);
                INIT_LIST(manager->queues[i].ready_priority_tasks);
                isc_mutex_init(&manager->queues[i].lock);
                isc_condition_init(&manager->queues[i].work_available);
+               if (i == workers) {
+                       snprintf(name, sizeof(name), "isc-greedywrkr");
+                       manager->greedy_worker = i;
+               } else {
+                       snprintf(name, sizeof(name), "isc-worker%04u", i);
+               }
 
                manager->queues[i].manager = manager;
                manager->queues[i].threadid = i;
                RUNTIME_CHECK(isc_thread_create(run, &manager->queues[i],
                                                &manager->queues[i].thread)
                              == ISC_R_SUCCESS);
-               char name[16];
+
                snprintf(name, sizeof(name), "isc-worker%04u", i);
-               isc_thread_setname(manager->queues[i].thread, name);
        }
+
        UNLOCK(&manager->lock);
 
-       isc_thread_setconcurrency(workers);
+       isc_thread_setconcurrency(workers+1);
 
        *managerp = (isc_taskmgr_t *)manager;
 
@@ -1497,7 +1521,7 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) {
        /*
         * Wait for all the worker threads to exit.
         */
-       for (i = 0; i < manager->workers; i++)
+       for (i = 0; i < manager->workers+1; i++)
                (void)isc_thread_join(manager->queues[i].thread, NULL);
 
        manager_free(manager);
@@ -1537,7 +1561,7 @@ isc__taskmgr_pause(isc_taskmgr_t *manager0) {
        }
 
        manager->pause_requested = true;
-       while (manager->halted < manager->workers) {
+       while (manager->halted < manager->workers+1) {
                wake_all_queues(manager);
                WAIT(&manager->halt_cond, &manager->halt_lock);
        }
@@ -1611,7 +1635,7 @@ isc_task_beginexclusive(isc_task_t *task0) {
        LOCK(&manager->halt_lock);
        INSIST(!manager->exclusive_requested && !manager->pause_requested);
        manager->exclusive_requested = true;
-       while (manager->halted + 1 < manager->workers) {
+       while (manager->halted + 1 < manager->workers+1) {
                wake_all_queues(manager);
                WAIT(&manager->halt_cond, &manager->halt_lock);
        }