]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Refactor the pausing/unpausing and finishing the nm_thread
authorOndřej Surý <ondrej@sury.org>
Wed, 23 Sep 2020 19:49:46 +0000 (21:49 +0200)
committerOndřej Surý <ondrej@sury.org>
Mon, 28 Sep 2020 09:17:11 +0000 (11:17 +0200)
The isc_nm_pause(), isc_nm_resume() and finishing the nm_thread() from
nm_destroy() has been refactored, so all use the netievents instead of
directly touching the worker structure members.  This allows us to
remove most of the locking as the .paused and .finished members are
always accessed from the matching nm_thread.

When shutting down the nm_thread(), instead of issuing uv_stop(), we
just shutdown the .async handler, so all uv_loop_t events are properly
finished first and uv_run() ends gracefully with no outstanding active
handles in the loop.

lib/isc/netmgr/netmgr-int.h
lib/isc/netmgr/netmgr.c

index 91563ad0decb7e1f5d0094445a001a8800d0fbab..1c30c6c15bd2c349cdb23215c181fa777197ca43 100644 (file)
@@ -153,6 +153,7 @@ typedef enum isc__netievent_type {
        netievent_closecb,
        netievent_shutdown,
        netievent_stop,
+       netievent_pause,
 
        netievent_prio = 0xff, /* event type values higher than this
                                * will be treated as high-priority
@@ -161,6 +162,7 @@ typedef enum isc__netievent_type {
                                */
        netievent_udplisten,
        netievent_tcplisten,
+       netievent_resume,
 } isc__netievent_type;
 
 typedef union {
@@ -297,10 +299,9 @@ struct isc_nm {
        isc_mempool_t *evpool;
        isc_mutex_t evlock;
 
-       atomic_uint_fast32_t workers_running;
-       atomic_uint_fast32_t workers_paused;
+       uint_fast32_t workers_running;
+       uint_fast32_t workers_paused;
        atomic_uint_fast32_t maxudp;
-       atomic_bool paused;
 
        /*
         * Active connections are being closed and new connections are
index aa47309b11e3214dfd530eaee5a8b09054c7759c..d4e2bf29f528f97cd627fa559730455997e3cc41 100644 (file)
@@ -130,8 +130,21 @@ static isc_threadresult_t
 nm_thread(isc_threadarg_t worker0);
 static void
 async_cb(uv_async_t *handle);
-static void
+static bool
 process_queue(isc__networker_t *worker, isc_queue_t *queue);
+static bool
+process_priority_queue(isc__networker_t *worker);
+static bool
+process_normal_queue(isc__networker_t *worker);
+static void
+process_queues(isc__networker_t *worker);
+
+static void
+isc__nm_async_stopcb(isc__networker_t *worker, isc__netievent_t *ev0);
+static void
+isc__nm_async_pausecb(isc__networker_t *worker, isc__netievent_t *ev0);
+static void
+isc__nm_async_resumecb(isc__networker_t *worker, isc__netievent_t *ev0);
 
 int
 isc_nm_tid(void) {
@@ -155,10 +168,7 @@ isc_nm_start(isc_mem_t *mctx, uint32_t workers) {
        isc_mutex_init(&mgr->lock);
        isc_condition_init(&mgr->wkstatecond);
        isc_refcount_init(&mgr->references, 1);
-       atomic_init(&mgr->workers_running, 0);
-       atomic_init(&mgr->workers_paused, 0);
        atomic_init(&mgr->maxudp, 0);
-       atomic_init(&mgr->paused, false);
        atomic_init(&mgr->interlocked, false);
 
 #ifdef NETMGR_TRACE
@@ -218,8 +228,7 @@ isc_nm_start(isc_mem_t *mctx, uint32_t workers) {
                 * race - we could exit isc_nm_start, launch nm_destroy,
                 * and nm_thread would still not be up.
                 */
-               atomic_fetch_add_explicit(&mgr->workers_running, 1,
-                                         memory_order_relaxed);
+               mgr->workers_running++;
                isc_thread_create(nm_thread, &mgr->workers[i], &worker->thread);
 
                snprintf(name, sizeof(name), "isc-net-%04zu", i);
@@ -246,17 +255,14 @@ nm_destroy(isc_nm_t **mgr0) {
        mgr->magic = 0;
 
        for (size_t i = 0; i < mgr->nworkers; i++) {
-               isc__netievent_t *event = NULL;
-
-               LOCK(&mgr->workers[i].lock);
-               mgr->workers[i].finished = true;
-               UNLOCK(&mgr->workers[i].lock);
-               event = isc__nm_get_ievent(mgr, netievent_stop);
-               isc__nm_enqueue_ievent(&mgr->workers[i], event);
+               isc__networker_t *worker = &mgr->workers[i];
+               isc__netievent_t *event = isc__nm_get_ievent(mgr,
+                                                            netievent_stop);
+               isc__nm_enqueue_ievent(worker, event);
        }
 
        LOCK(&mgr->lock);
-       while (atomic_load(&mgr->workers_running) > 0) {
+       while (mgr->workers_running > 0) {
                WAIT(&mgr->wkstatecond, &mgr->lock);
        }
        UNLOCK(&mgr->lock);
@@ -315,28 +321,17 @@ isc_nm_pause(isc_nm_t *mgr) {
        REQUIRE(VALID_NM(mgr));
        REQUIRE(!isc__nm_in_netthread());
 
-       atomic_store(&mgr->paused, true);
        isc__nm_acquire_interlocked_force(mgr);
 
        for (size_t i = 0; i < mgr->nworkers; i++) {
-               isc__netievent_t *event = NULL;
-
-               LOCK(&mgr->workers[i].lock);
-               mgr->workers[i].paused = true;
-               UNLOCK(&mgr->workers[i].lock);
-
-               /*
-                * We have to issue a stop, otherwise the uv_run loop will
-                * run indefinitely!
-                */
-               event = isc__nm_get_ievent(mgr, netievent_stop);
-               isc__nm_enqueue_ievent(&mgr->workers[i], event);
+               isc__networker_t *worker = &mgr->workers[i];
+               isc__netievent_t *event = isc__nm_get_ievent(mgr,
+                                                            netievent_pause);
+               isc__nm_enqueue_ievent(worker, event);
        }
 
        LOCK(&mgr->lock);
-       while (atomic_load_relaxed(&mgr->workers_paused) !=
-              atomic_load_relaxed(&mgr->workers_running))
-       {
+       while (mgr->workers_paused != mgr->workers_running) {
                WAIT(&mgr->wkstatecond, &mgr->lock);
        }
        UNLOCK(&mgr->lock);
@@ -348,17 +343,19 @@ isc_nm_resume(isc_nm_t *mgr) {
        REQUIRE(!isc__nm_in_netthread());
 
        for (size_t i = 0; i < mgr->nworkers; i++) {
-               LOCK(&mgr->workers[i].lock);
-               mgr->workers[i].paused = false;
-               SIGNAL(&mgr->workers[i].cond);
-               UNLOCK(&mgr->workers[i].lock);
+               isc__networker_t *worker = &mgr->workers[i];
+               isc__netievent_t *event = isc__nm_get_ievent(mgr,
+                                                            netievent_resume);
+               isc__nm_enqueue_ievent(worker, event);
        }
-       isc__nm_drop_interlocked(mgr);
 
-       /*
-        * We're not waiting for all the workers to come back to life;
-        * they eventually will, we don't care.
-        */
+       LOCK(&mgr->lock);
+       while (mgr->workers_paused != 0) {
+               WAIT(&mgr->wkstatecond, &mgr->lock);
+       }
+       UNLOCK(&mgr->lock);
+
+       isc__nm_drop_interlocked(mgr);
 }
 
 void
@@ -402,6 +399,7 @@ void
 isc_nm_destroy(isc_nm_t **mgr0) {
        isc_nm_t *mgr = NULL;
        int counter = 0;
+       uint_fast32_t references;
 
        REQUIRE(mgr0 != NULL);
        REQUIRE(VALID_NM(*mgr0));
@@ -416,28 +414,17 @@ isc_nm_destroy(isc_nm_t **mgr0) {
        /*
         * Wait for the manager to be dereferenced elsewhere.
         */
-       while (isc_refcount_current(&mgr->references) > 1 && counter++ < 1000) {
-               /*
-                * Sometimes libuv gets stuck, pausing and unpausing
-                * netmgr goes over all events in async queue for all
-                * the workers, and since it's done only on shutdown it
-                * doesn't cost us anything.
-                */
-               isc_nm_pause(mgr);
-               isc_nm_resume(mgr);
+       while ((references = isc_refcount_current(&mgr->references)) > 1 &&
+              counter++ < 1000)
+       {
 #ifdef WIN32
                _sleep(10);
 #else  /* ifdef WIN32 */
                usleep(10000);
 #endif /* ifdef WIN32 */
        }
-#ifdef NETMGR_TRACE
-       if (!ISC_LIST_EMPTY(mgr->active_sockets)) {
-               isc__nm_dump_active(mgr);
-               INSIST(ISC_LIST_EMPTY(mgr->active_sockets));
-       }
-#endif
-       INSIST(counter <= 1000);
+
+       INSIST(references == 1);
 
        /*
         * Detach final reference.
@@ -492,93 +479,59 @@ isc_nm_tcp_gettimeouts(isc_nm_t *mgr, uint32_t *initial, uint32_t *idle,
 static isc_threadresult_t
 nm_thread(isc_threadarg_t worker0) {
        isc__networker_t *worker = (isc__networker_t *)worker0;
+       isc_nm_t *mgr = worker->mgr;
 
        isc__nm_tid_v = worker->id;
        isc_thread_setaffinity(isc__nm_tid_v);
 
        while (true) {
                int r = uv_run(&worker->loop, UV_RUN_DEFAULT);
-               bool pausing = false;
+               /* There's always the async handle until we are done */
+               INSIST(r > 0 || worker->finished);
+
+               if (worker->paused) {
+                       LOCK(&worker->lock);
+                       /* We need to lock the worker first otherwise
+                        * isc_nm_resume() might slip in before WAIT() in the
+                        * while loop starts and the signal never gets delivered
+                        * and we are forever stuck in the paused loop.
+                        */
 
-               /*
-                * or there's nothing to do. In the first case - wait
-                * for condition. In the latter - timedwait
-                */
-               LOCK(&worker->lock);
-               while (worker->paused) {
-                       LOCK(&worker->mgr->lock);
-                       if (!pausing) {
-                               atomic_fetch_add_explicit(
-                                       &worker->mgr->workers_paused, 1,
-                                       memory_order_acquire);
-                               pausing = true;
-                       }
+                       LOCK(&mgr->lock);
+                       mgr->workers_paused++;
+                       SIGNAL(&mgr->wkstatecond);
+                       UNLOCK(&mgr->lock);
 
-                       SIGNAL(&worker->mgr->wkstatecond);
-                       UNLOCK(&worker->mgr->lock);
+                       while (worker->paused) {
+                               WAIT(&worker->cond, &worker->lock);
+                               (void)process_priority_queue(worker);
+                       }
 
-                       WAIT(&worker->cond, &worker->lock);
+                       LOCK(&mgr->lock);
+                       mgr->workers_paused--;
+                       SIGNAL(&mgr->wkstatecond);
+                       UNLOCK(&mgr->lock);
 
-                       /* Process priority events */
-                       process_queue(worker, worker->ievents_prio);
+                       UNLOCK(&worker->lock);
                }
-               if (pausing) {
-                       uint32_t wp = atomic_fetch_sub_explicit(
-                               &worker->mgr->workers_paused, 1,
-                               memory_order_release);
-                       if (wp == 1) {
-                               atomic_store(&worker->mgr->paused, false);
-                       }
-               }
-               bool finished = worker->finished;
-               UNLOCK(&worker->lock);
 
-               if (finished) {
-                       /*
-                        * We need to launch the loop one more time
-                        * in UV_RUN_NOWAIT mode to make sure that
-                        * worker->async is closed, so that we can
-                        * close the loop cleanly.  We don't care
-                        * about the callback, as in this case we can
-                        * be certain that uv_run() will eat the event.
-                        *
-                        * XXX: We may need to take steps here to ensure
-                        * that all netmgr handles are freed.
-                        */
-                       uv_close((uv_handle_t *)&worker->async, NULL);
-                       uv_run(&worker->loop, UV_RUN_NOWAIT);
+               if (r == 0) {
+                       INSIST(worker->finished);
                        break;
                }
 
-               if (r == 0) {
-                       /*
-                        * XXX: uv_run() in UV_RUN_DEFAULT mode returns
-                        * zero if there are still active uv_handles.
-                        * This shouldn't happen, but if it does, we just
-                        * keep checking until they're done. We nap for a
-                        * tenth of a second on each loop so as not to burn
-                        * CPU. (We do a conditional wait instead, but it
-                        * seems like overkill for this case.)
-                        */
-#ifdef WIN32
-                       _sleep(100);
-#else  /* ifdef WIN32 */
-                       usleep(100000);
-#endif /* ifdef WIN32 */
-               }
+               INSIST(!worker->finished);
 
                /*
                 * Empty the async queue.
                 */
-               process_queue(worker, worker->ievents_prio);
-               process_queue(worker, worker->ievents);
+               process_queues(worker);
        }
 
-       LOCK(&worker->mgr->lock);
-       atomic_fetch_sub_explicit(&worker->mgr->workers_running, 1,
-                                 memory_order_relaxed);
-       SIGNAL(&worker->mgr->wkstatecond);
-       UNLOCK(&worker->mgr->lock);
+       LOCK(&mgr->lock);
+       mgr->workers_running--;
+       SIGNAL(&mgr->wkstatecond);
+       UNLOCK(&mgr->lock);
 
        return ((isc_threadresult_t)0);
 }
@@ -592,21 +545,64 @@ nm_thread(isc_threadarg_t worker0) {
 static void
 async_cb(uv_async_t *handle) {
        isc__networker_t *worker = (isc__networker_t *)handle->loop->data;
-       process_queue(worker, worker->ievents_prio);
-       process_queue(worker, worker->ievents);
+       process_queues(worker);
+}
+
+static void
+isc__nm_async_stopcb(isc__networker_t *worker, isc__netievent_t *ev0) {
+       UNUSED(ev0);
+       worker->finished = true;
+       /* Close the async handler */
+       uv_close((uv_handle_t *)&worker->async, NULL);
+       /* uv_stop(&worker->loop); */
 }
 
 static void
+isc__nm_async_pausecb(isc__networker_t *worker, isc__netievent_t *ev0) {
+       UNUSED(ev0);
+       REQUIRE(worker->paused == false);
+       worker->paused = true;
+       uv_stop(&worker->loop);
+}
+
+static void
+isc__nm_async_resumecb(isc__networker_t *worker, isc__netievent_t *ev0) {
+       UNUSED(ev0);
+       REQUIRE(worker->paused == true);
+       worker->paused = false;
+}
+
+static bool
+process_priority_queue(isc__networker_t *worker) {
+       return (process_queue(worker, worker->ievents_prio));
+}
+
+static bool
+process_normal_queue(isc__networker_t *worker) {
+       return (process_queue(worker, worker->ievents));
+}
+
+static void
+process_queues(isc__networker_t *worker) {
+       if (!process_priority_queue(worker)) {
+               return;
+       }
+       (void)process_normal_queue(worker);
+}
+
+static bool
 process_queue(isc__networker_t *worker, isc_queue_t *queue) {
        isc__netievent_t *ievent = NULL;
+       bool more = true;
 
        while ((ievent = (isc__netievent_t *)isc_queue_dequeue(queue)) != NULL)
        {
                switch (ievent->type) {
                case netievent_stop:
-                       uv_stop(&worker->loop);
-                       isc_mempool_put(worker->mgr->evpool, ievent);
-                       return;
+                       isc__nm_async_stopcb(worker, ievent);
+                       /* Don't process more ievents when we are stopping */
+                       more = false;
+                       break;
 
                case netievent_udplisten:
                        isc__nm_async_udplisten(worker, ievent);
@@ -659,13 +655,26 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) {
                case netievent_shutdown:
                        isc__nm_async_shutdown(worker, ievent);
                        break;
+
+               case netievent_resume:
+                       isc__nm_async_resumecb(worker, ievent);
+                       break;
+               case netievent_pause:
+                       isc__nm_async_pausecb(worker, ievent);
+                       /* Don't process more ievents when we are pausing */
+                       more = false;
+                       break;
                default:
                        INSIST(0);
                        ISC_UNREACHABLE();
                }
 
                isc__nm_put_ievent(worker->mgr, ievent);
+               if (!more) {
+                       break;
+               }
        }
+       return (more);
 }
 
 void *