nm_thread(isc_threadarg_t worker0);
static void
async_cb(uv_async_t *handle);
-static void
+static bool
process_queue(isc__networker_t *worker, isc_queue_t *queue);
+static bool
+process_priority_queue(isc__networker_t *worker);
+static bool
+process_normal_queue(isc__networker_t *worker);
+static void
+process_queues(isc__networker_t *worker);
+
+static void
+isc__nm_async_stopcb(isc__networker_t *worker, isc__netievent_t *ev0);
+static void
+isc__nm_async_pausecb(isc__networker_t *worker, isc__netievent_t *ev0);
+static void
+isc__nm_async_resumecb(isc__networker_t *worker, isc__netievent_t *ev0);
int
isc_nm_tid(void) {
isc_mutex_init(&mgr->lock);
isc_condition_init(&mgr->wkstatecond);
isc_refcount_init(&mgr->references, 1);
- atomic_init(&mgr->workers_running, 0);
- atomic_init(&mgr->workers_paused, 0);
atomic_init(&mgr->maxudp, 0);
- atomic_init(&mgr->paused, false);
atomic_init(&mgr->interlocked, false);
#ifdef NETMGR_TRACE
* race - we could exit isc_nm_start, launch nm_destroy,
* and nm_thread would still not be up.
*/
- atomic_fetch_add_explicit(&mgr->workers_running, 1,
- memory_order_relaxed);
+ mgr->workers_running++;
isc_thread_create(nm_thread, &mgr->workers[i], &worker->thread);
snprintf(name, sizeof(name), "isc-net-%04zu", i);
mgr->magic = 0;
for (size_t i = 0; i < mgr->nworkers; i++) {
- isc__netievent_t *event = NULL;
-
- LOCK(&mgr->workers[i].lock);
- mgr->workers[i].finished = true;
- UNLOCK(&mgr->workers[i].lock);
- event = isc__nm_get_ievent(mgr, netievent_stop);
- isc__nm_enqueue_ievent(&mgr->workers[i], event);
+ isc__networker_t *worker = &mgr->workers[i];
+ isc__netievent_t *event = isc__nm_get_ievent(mgr,
+ netievent_stop);
+ isc__nm_enqueue_ievent(worker, event);
}
LOCK(&mgr->lock);
- while (atomic_load(&mgr->workers_running) > 0) {
+ while (mgr->workers_running > 0) {
WAIT(&mgr->wkstatecond, &mgr->lock);
}
UNLOCK(&mgr->lock);
REQUIRE(VALID_NM(mgr));
REQUIRE(!isc__nm_in_netthread());
- atomic_store(&mgr->paused, true);
isc__nm_acquire_interlocked_force(mgr);
for (size_t i = 0; i < mgr->nworkers; i++) {
- isc__netievent_t *event = NULL;
-
- LOCK(&mgr->workers[i].lock);
- mgr->workers[i].paused = true;
- UNLOCK(&mgr->workers[i].lock);
-
- /*
- * We have to issue a stop, otherwise the uv_run loop will
- * run indefinitely!
- */
- event = isc__nm_get_ievent(mgr, netievent_stop);
- isc__nm_enqueue_ievent(&mgr->workers[i], event);
+ isc__networker_t *worker = &mgr->workers[i];
+ isc__netievent_t *event = isc__nm_get_ievent(mgr,
+ netievent_pause);
+ isc__nm_enqueue_ievent(worker, event);
}
LOCK(&mgr->lock);
- while (atomic_load_relaxed(&mgr->workers_paused) !=
- atomic_load_relaxed(&mgr->workers_running))
- {
+ while (mgr->workers_paused != mgr->workers_running) {
WAIT(&mgr->wkstatecond, &mgr->lock);
}
UNLOCK(&mgr->lock);
REQUIRE(!isc__nm_in_netthread());
for (size_t i = 0; i < mgr->nworkers; i++) {
- LOCK(&mgr->workers[i].lock);
- mgr->workers[i].paused = false;
- SIGNAL(&mgr->workers[i].cond);
- UNLOCK(&mgr->workers[i].lock);
+ isc__networker_t *worker = &mgr->workers[i];
+ isc__netievent_t *event = isc__nm_get_ievent(mgr,
+ netievent_resume);
+ isc__nm_enqueue_ievent(worker, event);
}
- isc__nm_drop_interlocked(mgr);
- /*
- * We're not waiting for all the workers to come back to life;
- * they eventually will, we don't care.
- */
+ LOCK(&mgr->lock);
+ while (mgr->workers_paused != 0) {
+ WAIT(&mgr->wkstatecond, &mgr->lock);
+ }
+ UNLOCK(&mgr->lock);
+
+ isc__nm_drop_interlocked(mgr);
}
void
isc_nm_destroy(isc_nm_t **mgr0) {
isc_nm_t *mgr = NULL;
int counter = 0;
+ uint_fast32_t references;
REQUIRE(mgr0 != NULL);
REQUIRE(VALID_NM(*mgr0));
/*
* Wait for the manager to be dereferenced elsewhere.
*/
- while (isc_refcount_current(&mgr->references) > 1 && counter++ < 1000) {
- /*
- * Sometimes libuv gets stuck, pausing and unpausing
- * netmgr goes over all events in async queue for all
- * the workers, and since it's done only on shutdown it
- * doesn't cost us anything.
- */
- isc_nm_pause(mgr);
- isc_nm_resume(mgr);
+ while ((references = isc_refcount_current(&mgr->references)) > 1 &&
+ counter++ < 1000)
+ {
#ifdef WIN32
_sleep(10);
#else /* ifdef WIN32 */
usleep(10000);
#endif /* ifdef WIN32 */
}
-#ifdef NETMGR_TRACE
- if (!ISC_LIST_EMPTY(mgr->active_sockets)) {
- isc__nm_dump_active(mgr);
- INSIST(ISC_LIST_EMPTY(mgr->active_sockets));
- }
-#endif
- INSIST(counter <= 1000);
+
+ INSIST(references == 1);
/*
* Detach final reference.
static isc_threadresult_t
nm_thread(isc_threadarg_t worker0) {
isc__networker_t *worker = (isc__networker_t *)worker0;
+ isc_nm_t *mgr = worker->mgr;
isc__nm_tid_v = worker->id;
isc_thread_setaffinity(isc__nm_tid_v);
while (true) {
int r = uv_run(&worker->loop, UV_RUN_DEFAULT);
- bool pausing = false;
+ /* There's always the async handle until we are done */
+ INSIST(r > 0 || worker->finished);
+
+ if (worker->paused) {
+ LOCK(&worker->lock);
+ /* We need to lock the worker first otherwise
+ * isc_nm_resume() might slip in before WAIT() in the
+ * while loop starts and the signal never gets delivered
+ * and we are forever stuck in the paused loop.
+ */
- /*
- * or there's nothing to do. In the first case - wait
- * for condition. In the latter - timedwait
- */
- LOCK(&worker->lock);
- while (worker->paused) {
- LOCK(&worker->mgr->lock);
- if (!pausing) {
- atomic_fetch_add_explicit(
- &worker->mgr->workers_paused, 1,
- memory_order_acquire);
- pausing = true;
- }
+ LOCK(&mgr->lock);
+ mgr->workers_paused++;
+ SIGNAL(&mgr->wkstatecond);
+ UNLOCK(&mgr->lock);
- SIGNAL(&worker->mgr->wkstatecond);
- UNLOCK(&worker->mgr->lock);
+ while (worker->paused) {
+ WAIT(&worker->cond, &worker->lock);
+ (void)process_priority_queue(worker);
+ }
- WAIT(&worker->cond, &worker->lock);
+ LOCK(&mgr->lock);
+ mgr->workers_paused--;
+ SIGNAL(&mgr->wkstatecond);
+ UNLOCK(&mgr->lock);
- /* Process priority events */
- process_queue(worker, worker->ievents_prio);
+ UNLOCK(&worker->lock);
}
- if (pausing) {
- uint32_t wp = atomic_fetch_sub_explicit(
- &worker->mgr->workers_paused, 1,
- memory_order_release);
- if (wp == 1) {
- atomic_store(&worker->mgr->paused, false);
- }
- }
- bool finished = worker->finished;
- UNLOCK(&worker->lock);
- if (finished) {
- /*
- * We need to launch the loop one more time
- * in UV_RUN_NOWAIT mode to make sure that
- * worker->async is closed, so that we can
- * close the loop cleanly. We don't care
- * about the callback, as in this case we can
- * be certain that uv_run() will eat the event.
- *
- * XXX: We may need to take steps here to ensure
- * that all netmgr handles are freed.
- */
- uv_close((uv_handle_t *)&worker->async, NULL);
- uv_run(&worker->loop, UV_RUN_NOWAIT);
+ if (r == 0) {
+ INSIST(worker->finished);
break;
}
- if (r == 0) {
- /*
- * XXX: uv_run() in UV_RUN_DEFAULT mode returns
- * zero if there are still active uv_handles.
- * This shouldn't happen, but if it does, we just
- * keep checking until they're done. We nap for a
- * tenth of a second on each loop so as not to burn
- * CPU. (We do a conditional wait instead, but it
- * seems like overkill for this case.)
- */
-#ifdef WIN32
- _sleep(100);
-#else /* ifdef WIN32 */
- usleep(100000);
-#endif /* ifdef WIN32 */
- }
+ INSIST(!worker->finished);
/*
* Empty the async queue.
*/
- process_queue(worker, worker->ievents_prio);
- process_queue(worker, worker->ievents);
+ process_queues(worker);
}
- LOCK(&worker->mgr->lock);
- atomic_fetch_sub_explicit(&worker->mgr->workers_running, 1,
- memory_order_relaxed);
- SIGNAL(&worker->mgr->wkstatecond);
- UNLOCK(&worker->mgr->lock);
+ LOCK(&mgr->lock);
+ mgr->workers_running--;
+ SIGNAL(&mgr->wkstatecond);
+ UNLOCK(&mgr->lock);
return ((isc_threadresult_t)0);
}
static void
async_cb(uv_async_t *handle) {
isc__networker_t *worker = (isc__networker_t *)handle->loop->data;
- process_queue(worker, worker->ievents_prio);
- process_queue(worker, worker->ievents);
+ process_queues(worker);
+}
+
+static void
+isc__nm_async_stopcb(isc__networker_t *worker, isc__netievent_t *ev0) {
+ UNUSED(ev0);
+ worker->finished = true;
+ /* Close the async handler */
+ uv_close((uv_handle_t *)&worker->async, NULL);
+ /* uv_stop(&worker->loop); */
}
static void
+isc__nm_async_pausecb(isc__networker_t *worker, isc__netievent_t *ev0) {
+ UNUSED(ev0);
+ REQUIRE(worker->paused == false);
+ worker->paused = true;
+ uv_stop(&worker->loop);
+}
+
+static void
+isc__nm_async_resumecb(isc__networker_t *worker, isc__netievent_t *ev0) {
+ UNUSED(ev0);
+ REQUIRE(worker->paused == true);
+ worker->paused = false;
+}
+
+static bool
+process_priority_queue(isc__networker_t *worker) {
+ return (process_queue(worker, worker->ievents_prio));
+}
+
+static bool
+process_normal_queue(isc__networker_t *worker) {
+ return (process_queue(worker, worker->ievents));
+}
+
+static void
+process_queues(isc__networker_t *worker) {
+ if (!process_priority_queue(worker)) {
+ return;
+ }
+ (void)process_normal_queue(worker);
+}
+
+static bool
process_queue(isc__networker_t *worker, isc_queue_t *queue) {
isc__netievent_t *ievent = NULL;
+ bool more = true;
while ((ievent = (isc__netievent_t *)isc_queue_dequeue(queue)) != NULL)
{
switch (ievent->type) {
case netievent_stop:
- uv_stop(&worker->loop);
- isc_mempool_put(worker->mgr->evpool, ievent);
- return;
+ isc__nm_async_stopcb(worker, ievent);
+ /* Don't process more ievents when we are stopping */
+ more = false;
+ break;
case netievent_udplisten:
isc__nm_async_udplisten(worker, ievent);
case netievent_shutdown:
isc__nm_async_shutdown(worker, ievent);
break;
+
+ case netievent_resume:
+ isc__nm_async_resumecb(worker, ievent);
+ break;
+ case netievent_pause:
+ isc__nm_async_pausecb(worker, ievent);
+ /* Don't process more ievents when we are pausing */
+ more = false;
+ break;
default:
INSIST(0);
ISC_UNREACHABLE();
}
isc__nm_put_ievent(worker->mgr, ievent);
+ if (!more) {
+ break;
+ }
}
+ return (more);
}
void *