]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Replace netmgr quantum with loop-preventing barrier
authorOndřej Surý <ondrej@sury.org>
Fri, 7 May 2021 10:58:40 +0000 (12:58 +0200)
committerOndřej Surý <ondrej@sury.org>
Mon, 17 May 2021 09:59:19 +0000 (11:59 +0200)
Instead of using fixed quantum, this commit adds atomic counter for
number of items on each queue and uses the number of netievents
scheduled to run as the limit of maximum number of netievents for a
single process_queue() run.

This prevents the endless loops when the netievent would schedule more
netievents onto the same loop, but we don't have to pick "magic" number
for the quantum.

lib/isc/include/isc/result.h
lib/isc/netmgr/netmgr-int.h
lib/isc/netmgr/netmgr.c
lib/isc/result.c

index 22969775dfef09e2a5616bd6545be11d19f23764..21071c7dad41a29267be60448b51a55956efb3c9 100644 (file)
@@ -64,7 +64,7 @@
 #define ISC_R_MULTICAST               43           /*%< invalid use of multicast */
 #define ISC_R_NOTFILE         44           /*%< not a file */
 #define ISC_R_NOTDIRECTORY     45          /*%< not a directory */
-#define ISC_R_QUEUEFULL               46           /*%< queue is full */
+#define ISC_R_EMPTY           46           /*%< queue is empty */
 #define ISC_R_FAMILYMISMATCH   47          /*%< address family mismatch */
 #define ISC_R_FAMILYNOSUPPORT  48          /*%< AF not supported */
 #define ISC_R_BADHEX          49           /*%< bad hex encoding */
index f131400152add9069725e16b41c4afb1a6e9ebb7..0ad9bd42c8c3bab8ca3a091367061238a89861cb 100644 (file)
@@ -40,8 +40,6 @@
 
 #include "uv-compat.h"
 
-#define ISC_NETMGR_QUANTUM_DEFAULT 1024
-
 #define ISC_NETMGR_TID_UNKNOWN -1
 
 /* Must be different from ISC_NETMGR_TID_UNKNOWN */
@@ -165,6 +163,17 @@ isc__nm_dump_active(isc_nm_t *nm);
 #define isc__nmsocket_prep_destroy(sock) isc___nmsocket_prep_destroy(sock)
 #endif
 
+/*
+ * Queue types in the order of processing priority.
+ */
+typedef enum {
+       NETIEVENT_PRIORITY = 0,
+       NETIEVENT_PRIVILEGED = 1,
+       NETIEVENT_TASK = 2,
+       NETIEVENT_NORMAL = 3,
+       NETIEVENT_MAX = 4,
+} netievent_type_t;
+
 /*
  * Single network event loop worker.
  */
@@ -178,13 +187,8 @@ typedef struct isc__networker {
        bool paused;
        bool finished;
        isc_thread_t thread;
-       isc_queue_t *ievents;      /* incoming async events */
-       isc_queue_t *ievents_priv; /* privileged async tasks */
-       isc_queue_t *ievents_task; /* async tasks */
-       isc_queue_t *ievents_prio; /* priority async events
-                                   * used for listening etc.
-                                   * can be processed while
-                                   * worker is paused */
+       isc_queue_t *ievents[NETIEVENT_MAX];
+       atomic_uint_fast32_t nievents[NETIEVENT_MAX];
        isc_condition_t cond_prio;
 
        isc_refcount_t references;
@@ -192,7 +196,6 @@ typedef struct isc__networker {
        char *recvbuf;
        char *sendbuf;
        bool recvbuf_inuse;
-       unsigned int quantum;
 } isc__networker_t;
 
 /*
index ac2f38992a641113ac995343e8bb581fccffaa0f..f9affb2c040638b7a3e8c708c7ff45d78a69fc06 100644 (file)
@@ -137,28 +137,57 @@ static void
 async_cb(uv_async_t *handle);
 static bool
 process_netievent(isc__networker_t *worker, isc__netievent_t *ievent);
-static bool
-process_queue(isc__networker_t *worker, isc_queue_t *queue,
-             unsigned int *quantump);
+static isc_result_t
+process_queue(isc__networker_t *worker, netievent_type_t type);
 static void
 wait_for_priority_queue(isc__networker_t *worker);
-static bool
-process_priority_queue(isc__networker_t *worker, unsigned int *quantump);
-static bool
-process_privilege_queue(isc__networker_t *worker, unsigned int *quantump);
-static bool
-process_task_queue(isc__networker_t *worker, unsigned int *quantump);
-static bool
-process_normal_queue(isc__networker_t *worker, unsigned int *quantump);
-
-#define drain_priority_queue(worker) \
-       (void)process_priority_queue(worker, &(unsigned int){ UINT_MAX })
-#define drain_privilege_queue(worker) \
-       (void)process_privilege_queue(worker, &(unsigned int){ UINT_MAX })
-#define drain_task_queue(worker) \
-       (void)process_task_queue(worker, &(unsigned int){ UINT_MAX })
-#define drain_normal_queue(worker) \
-       (void)process_normal_queue(worker, &(unsigned int){ UINT_MAX })
+static void
+drain_queue(isc__networker_t *worker, netievent_type_t type);
+
+#define ENQUEUE_NETIEVENT(worker, queue, event) \
+       isc_queue_enqueue(worker->ievents[queue], (uintptr_t)event)
+#define DEQUEUE_NETIEVENT(worker, queue) \
+       (isc__netievent_t *)isc_queue_dequeue(worker->ievents[queue])
+
+#define ENQUEUE_PRIORITY_NETIEVENT(worker, event) \
+       ENQUEUE_NETIEVENT(worker, NETIEVENT_PRIORITY, event)
+#define ENQUEUE_PRIVILEGED_NETIEVENT(worker, event) \
+       ENQUEUE_NETIEVENT(worker, NETIEVENT_PRIVILEGED, event)
+#define ENQUEUE_TASK_NETIEVENT(worker, event) \
+       ENQUEUE_NETIEVENT(worker, NETIEVENT_TASK, event)
+#define ENQUEUE_NORMAL_NETIEVENT(worker, event) \
+       ENQUEUE_NETIEVENT(worker, NETIEVENT_NORMAL, event)
+
+#define DEQUEUE_PRIORITY_NETIEVENT(worker) \
+       DEQUEUE_NETIEVENT(worker, NETIEVENT_PRIORITY)
+#define DEQUEUE_PRIVILEGED_NETIEVENT(worker) \
+       DEQUEUE_NETIEVENT(worker, NETIEVENT_PRIVILEGED)
+#define DEQUEUE_TASK_NETIEVENT(worker) DEQUEUE_NETIEVENT(worker, NETIEVENT_TASK)
+#define DEQUEUE_NORMAL_NETIEVENT(worker) \
+       DEQUEUE_NETIEVENT(worker, NETIEVENT_NORMAL)
+
+#define INCREMENT_NETIEVENT(worker, queue) \
+       atomic_fetch_add_release(&worker->nievents[queue], 1)
+#define DECREMENT_NETIEVENT(worker, queue) \
+       atomic_fetch_sub_release(&worker->nievents[queue], 1)
+
+#define INCREMENT_PRIORITY_NETIEVENT(worker) \
+       INCREMENT_NETIEVENT(worker, NETIEVENT_PRIORITY)
+#define INCREMENT_PRIVILEGED_NETIEVENT(worker) \
+       INCREMENT_NETIEVENT(worker, NETIEVENT_PRIVILEGED)
+#define INCREMENT_TASK_NETIEVENT(worker) \
+       INCREMENT_NETIEVENT(worker, NETIEVENT_TASK)
+#define INCREMENT_NORMAL_NETIEVENT(worker) \
+       INCREMENT_NETIEVENT(worker, NETIEVENT_NORMAL)
+
+#define DECREMENT_PRIORITY_NETIEVENT(worker) \
+       DECREMENT_NETIEVENT(worker, NETIEVENT_PRIORITY)
+#define DECREMENT_PRIVILEGED_NETIEVENT(worker) \
+       DECREMENT_NETIEVENT(worker, NETIEVENT_PRIVILEGED)
+#define DECREMENT_TASK_NETIEVENT(worker) \
+       DECREMENT_NETIEVENT(worker, NETIEVENT_TASK)
+#define DECREMENT_NORMAL_NETIEVENT(worker) \
+       DECREMENT_NETIEVENT(worker, NETIEVENT_NORMAL)
 
 static void
 isc__nm_async_stop(isc__networker_t *worker, isc__netievent_t *ev0);
@@ -283,7 +312,6 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) {
                *worker = (isc__networker_t){
                        .mgr = mgr,
                        .id = i,
-                       .quantum = ISC_NETMGR_QUANTUM_DEFAULT,
                };
 
                r = uv_loop_init(&worker->loop);
@@ -296,11 +324,10 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) {
 
                isc_mutex_init(&worker->lock);
 
-               worker->ievents = isc_queue_new(mgr->mctx, 128);
-               worker->ievents_task = isc_queue_new(mgr->mctx, 128);
-               worker->ievents_priv = isc_queue_new(mgr->mctx, 128);
-               worker->ievents_prio = isc_queue_new(mgr->mctx, 128);
-               isc_condition_init(&worker->cond_prio);
+               for (size_t type = 0; type < NETIEVENT_MAX; type++) {
+                       worker->ievents[type] = isc_queue_new(mgr->mctx, 128);
+                       atomic_init(&worker->nievents[type], 0);
+               }
 
                worker->recvbuf = isc_mem_get(mctx, ISC_NETMGR_RECVBUF_SIZE);
                worker->sendbuf = isc_mem_get(mctx, ISC_NETMGR_SENDBUF_SIZE);
@@ -354,20 +381,14 @@ nm_destroy(isc_nm_t **mgr0) {
                int r;
 
                /* Empty the async event queues */
-               while ((ievent = (isc__netievent_t *)isc_queue_dequeue(
-                               worker->ievents)) != NULL)
-               {
+               while ((ievent = DEQUEUE_PRIORITY_NETIEVENT(worker)) != NULL) {
                        isc_mempool_put(mgr->evpool, ievent);
                }
 
-               INSIST(isc_queue_dequeue(worker->ievents_priv) ==
-                      (uintptr_t)NULL);
-               INSIST(isc_queue_dequeue(worker->ievents_task) ==
-                      (uintptr_t)NULL);
+               INSIST(DEQUEUE_PRIVILEGED_NETIEVENT(worker) == NULL);
+               INSIST(DEQUEUE_TASK_NETIEVENT(worker) == NULL);
 
-               while ((ievent = (isc__netievent_t *)isc_queue_dequeue(
-                               worker->ievents_prio)) != NULL)
-               {
+               while ((ievent = DEQUEUE_PRIORITY_NETIEVENT(worker)) != NULL) {
                        isc_mempool_put(mgr->evpool, ievent);
                }
                isc_condition_destroy(&worker->cond_prio);
@@ -375,11 +396,9 @@ nm_destroy(isc_nm_t **mgr0) {
                r = uv_loop_close(&worker->loop);
                INSIST(r == 0);
 
-               isc_queue_destroy(worker->ievents);
-               isc_queue_destroy(worker->ievents_priv);
-               isc_queue_destroy(worker->ievents_task);
-               isc_queue_destroy(worker->ievents_prio);
-               isc_mutex_destroy(&worker->lock);
+               for (size_t type = 0; type < NETIEVENT_MAX; type++) {
+                       isc_queue_destroy(worker->ievents[type]);
+               }
 
                isc_mem_put(mgr->mctx, worker->sendbuf,
                            ISC_NETMGR_SENDBUF_SIZE);
@@ -487,7 +506,7 @@ isc_nm_resume(isc_nm_t *mgr) {
 
        if (isc__nm_in_netthread()) {
                REQUIRE(isc_nm_tid() == 0);
-               drain_priority_queue(&mgr->workers[isc_nm_tid()]);
+               drain_queue(&mgr->workers[isc_nm_tid()], NETIEVENT_PRIORITY);
        }
 
        for (int i = 0; i < mgr->nworkers; i++) {
@@ -500,7 +519,7 @@ isc_nm_resume(isc_nm_t *mgr) {
        }
 
        if (isc__nm_in_netthread()) {
-               drain_privilege_queue(&mgr->workers[isc_nm_tid()]);
+               drain_queue(&mgr->workers[isc_nm_tid()], NETIEVENT_PRIVILEGED);
 
                atomic_fetch_sub(&mgr->workers_paused, 1);
                isc_barrier_wait(&mgr->resuming);
@@ -711,7 +730,7 @@ nm_thread(isc_threadarg_t worker0) {
                         * All workers must drain the privileged event
                         * queue before we resume from pause.
                         */
-                       drain_privilege_queue(worker);
+                       drain_queue(worker, NETIEVENT_PRIVILEGED);
 
                        atomic_fetch_sub(&mgr->workers_paused, 1);
                        if (isc_barrier_wait(&mgr->resuming) != 0) {
@@ -734,8 +753,8 @@ nm_thread(isc_threadarg_t worker0) {
         * (they may include shutdown events) but do not process
         * the netmgr event queue.
         */
-       drain_privilege_queue(worker);
-       drain_task_queue(worker);
+       drain_queue(worker, NETIEVENT_PRIVILEGED);
+       drain_queue(worker, NETIEVENT_TASK);
 
        LOCK(&mgr->lock);
        mgr->workers_running--;
@@ -746,20 +765,32 @@ nm_thread(isc_threadarg_t worker0) {
 }
 
 static bool
-process_all_queues(isc__networker_t *worker, unsigned int quantum) {
+process_all_queues(isc__networker_t *worker) {
+       bool reschedule = false;
        /*
         * The queue processing functions will return false when the
-        * system is pausing or stopping, or if we have completed
-        * 'quantum' events.
-        *
-        * We don't want to proceed to a new queue until the previous one
-        * has been fully drained, so whenever one queue is interrupted,
-        * we skip all the later ones.
+        * system is pausing or stopping and we don't want to process
+        * the other queues in such case, but we need the async event
+        * to be rescheduled in the next uv_run().
         */
-       return (process_priority_queue(worker, &quantum) &&
-               process_privilege_queue(worker, &quantum) &&
-               process_task_queue(worker, &quantum) &&
-               process_normal_queue(worker, &quantum));
+       for (size_t type = 0; type < NETIEVENT_MAX; type++) {
+               isc_result_t result = process_queue(worker, type);
+               switch (result) {
+               case ISC_R_SUSPEND:
+                       return (true);
+               case ISC_R_EMPTY:
+                       /* empty queue */
+                       break;
+               case ISC_R_SUCCESS:
+                       reschedule = true;
+                       break;
+               default:
+                       INSIST(0);
+                       ISC_UNREACHABLE();
+               }
+       }
+
+       return (reschedule);
 }
 
 /*
@@ -771,9 +802,8 @@ process_all_queues(isc__networker_t *worker, unsigned int quantum) {
 static void
 async_cb(uv_async_t *handle) {
        isc__networker_t *worker = (isc__networker_t *)handle->loop->data;
-       unsigned int quantum = worker->quantum;
 
-       if (!process_all_queues(worker, quantum)) {
+       if (process_all_queues(worker)) {
                /* If we didn't process all the events, we need to enqueue
                 * async_cb to be run in the next iteration of the uv_loop
                 */
@@ -840,19 +870,17 @@ isc__nm_async_task(isc__networker_t *worker, isc__netievent_t *ev0) {
 
 static void
 wait_for_priority_queue(isc__networker_t *worker) {
-       isc_queue_t *queue = worker->ievents_prio;
        isc_condition_t *cond = &worker->cond_prio;
        bool wait_for_work = true;
 
        while (true) {
                isc__netievent_t *ievent;
                LOCK(&worker->lock);
-               ievent = (isc__netievent_t *)isc_queue_dequeue(queue);
+               ievent = DEQUEUE_PRIORITY_NETIEVENT(worker);
                if (wait_for_work) {
                        while (ievent == NULL) {
                                WAIT(cond, &worker->lock);
-                               ievent = (isc__netievent_t *)isc_queue_dequeue(
-                                       queue);
+                               ievent = DEQUEUE_PRIORITY_NETIEVENT(worker);
                        }
                }
                UNLOCK(&worker->lock);
@@ -861,29 +889,17 @@ wait_for_priority_queue(isc__networker_t *worker) {
                if (ievent == NULL) {
                        return;
                }
+               DECREMENT_PRIORITY_NETIEVENT(worker);
 
                (void)process_netievent(worker, ievent);
        }
 }
 
-static bool
-process_priority_queue(isc__networker_t *worker, unsigned int *quantump) {
-       return (process_queue(worker, worker->ievents_prio, quantump));
-}
-
-static bool
-process_privilege_queue(isc__networker_t *worker, unsigned int *quantump) {
-       return (process_queue(worker, worker->ievents_priv, quantump));
-}
-
-static bool
-process_task_queue(isc__networker_t *worker, unsigned int *quantump) {
-       return (process_queue(worker, worker->ievents_task, quantump));
-}
-
-static bool
-process_normal_queue(isc__networker_t *worker, unsigned int *quantump) {
-       return (process_queue(worker, worker->ievents, quantump));
+static void
+drain_queue(isc__networker_t *worker, netievent_type_t type) {
+       while (process_queue(worker, type) != ISC_R_EMPTY) {
+               ;
+       }
 }
 
 /*
@@ -984,28 +1000,46 @@ process_netievent(isc__networker_t *worker, isc__netievent_t *ievent) {
        return (true);
 }
 
-static bool
-process_queue(isc__networker_t *worker, isc_queue_t *queue,
-             unsigned int *quantump) {
-       while (*quantump > 0) {
-               isc__netievent_t *ievent =
-                       (isc__netievent_t *)isc_queue_dequeue(queue);
-
-               if (ievent == NULL) {
-                       /* We fully drained this queue */
-                       return (true);
-               }
+static isc_result_t
+process_queue(isc__networker_t *worker, netievent_type_t type) {
+       /*
+        * The number of items on the queue is only loosely synchronized with
+        * the items on the queue.  But there's a guarantee that if there's an
+        * item on the queue, it will be accounted for.  However there's a
+        * possibility that the counter might be higher than the items on the
+        * queue stored.
+        */
+       uint_fast32_t waiting = atomic_load_acquire(&worker->nievents[type]);
+       isc__netievent_t *ievent = DEQUEUE_NETIEVENT(worker, type);
+
+       if (ievent == NULL && waiting == 0) {
+               /* There's nothing scheduled */
+               return (ISC_R_EMPTY);
+       } else if (ievent == NULL) {
+               /* There's at least one item scheduled, but not on the queue yet
+                */
+               return (ISC_R_SUCCESS);
+       }
 
-               (*quantump)--;
+       while (ievent != NULL) {
+               DECREMENT_NETIEVENT(worker, type);
+               bool stop = !process_netievent(worker, ievent);
 
-               if (!process_netievent(worker, ievent)) {
+               if (stop) {
                        /* Netievent told us to stop */
-                       return (false);
+                       return (ISC_R_SUSPEND);
+               }
+
+               if (waiting-- == 0) {
+                       /* We reached this round "quota" */
+                       break;
                }
+
+               ievent = DEQUEUE_NETIEVENT(worker, type);
        }
 
-       /* No more quantum */
-       return (false);
+       /* We processed at least one */
+       return (ISC_R_SUCCESS);
 }
 
 void *
@@ -1107,15 +1141,19 @@ isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event) {
                 * the queue will be processed.
                 */
                LOCK(&worker->lock);
-               isc_queue_enqueue(worker->ievents_prio, (uintptr_t)event);
+               INCREMENT_PRIORITY_NETIEVENT(worker);
+               ENQUEUE_PRIORITY_NETIEVENT(worker, event);
                SIGNAL(&worker->cond_prio);
                UNLOCK(&worker->lock);
        } else if (event->type == netievent_privilegedtask) {
-               isc_queue_enqueue(worker->ievents_priv, (uintptr_t)event);
+               INCREMENT_PRIVILEGED_NETIEVENT(worker);
+               ENQUEUE_PRIVILEGED_NETIEVENT(worker, event);
        } else if (event->type == netievent_task) {
-               isc_queue_enqueue(worker->ievents_task, (uintptr_t)event);
+               INCREMENT_TASK_NETIEVENT(worker);
+               ENQUEUE_TASK_NETIEVENT(worker, event);
        } else {
-               isc_queue_enqueue(worker->ievents, (uintptr_t)event);
+               INCREMENT_NORMAL_NETIEVENT(worker);
+               ENQUEUE_NORMAL_NETIEVENT(worker, event);
        }
        uv_async_send(&worker->async);
 }
index 565e8406d34af1a9e54a27b337a0cb01ffd8190b..72e7a3c28e4f6dbf304118d9a4761ed13e0aaa2f 100644 (file)
@@ -77,7 +77,7 @@ static const char *description[ISC_R_NRESULTS] = {
        "invalid use of multicast address", /*%< 43 */
        "not a file",                       /*%< 44 */
        "not a directory",                  /*%< 45 */
-       "queue is full",                    /*%< 46 */
+       "queue is empty",                   /*%< 46 */
        "address family mismatch",          /*%< 47 */
        "address family not supported",     /*%< 48 */
        "bad hex encoding",                 /*%< 49 */
@@ -151,7 +151,7 @@ static const char *identifier[ISC_R_NRESULTS] = { "ISC_R_SUCCESS",
                                                  "ISC_R_MULTICAST",
                                                  "ISC_R_NOTFILE",
                                                  "ISC_R_NOTDIRECTORY",
-                                                 "ISC_R_QUEUEFULL",
+                                                 "ISC_R_EMPTY",
                                                  "ISC_R_FAMILYMISMATCH",
                                                  "ISC_R_FAMILYNOSUPPORT",
                                                  "ISC_R_BADHEX",