--- /dev/null
+/*
+ * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
+ *
+ * SPDX-License-Identifier: MPL-2.0
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, you can obtain one at https://mozilla.org/MPL/2.0/.
+ *
+ * See the COPYRIGHT file distributed with this work for additional
+ * information regarding copyright ownership.
+ */
+
+#include <stdlib.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <isc/atomic.h>
+#include <isc/barrier.h>
+#include <isc/condition.h>
+#include <isc/helper.h>
+#include <isc/job.h>
+#include <isc/loop.h>
+#include <isc/magic.h>
+#include <isc/mem.h>
+#include <isc/mutex.h>
+#include <isc/refcount.h>
+#include <isc/result.h>
+#include <isc/signal.h>
+#include <isc/strerr.h>
+#include <isc/thread.h>
+#include <isc/util.h>
+#include <isc/uv.h>
+#include <isc/work.h>
+
+#include "async_p.h"
+#include "job_p.h"
+#include "loop_p.h"
+
+void
+isc_helper_run(isc_loop_t *loop, isc_job_cb cb, void *cbarg) {
+ REQUIRE(VALID_LOOP(loop));
+ REQUIRE(cb != NULL);
+
+ isc_loop_t *helper = &loop->loopmgr->helpers[loop->tid];
+
+ isc_job_t *job = isc_mem_get(helper->mctx, sizeof(*job));
+ *job = (isc_job_t){
+ .cb = cb,
+ .cbarg = cbarg,
+ };
+
+ cds_wfcq_node_init(&job->wfcq_node);
+
+ /*
+ * cds_wfcq_enqueue() is non-blocking and enqueues the job to async
+ * queue.
+ *
+ * The function returns 'false' in case the queue was empty - in such
+ * case we need to trigger the async callback.
+ */
+ if (!cds_wfcq_enqueue(&helper->async_jobs.head,
+ &helper->async_jobs.tail, &job->wfcq_node))
+ {
+ int r = uv_async_send(&helper->async_trigger);
+ UV_RUNTIME_CHECK(uv_async_send, r);
+ }
+}
}
static void
-loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid) {
+loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid,
+ const char *kind) {
*loop = (isc_loop_t){
.tid = tid,
.loopmgr = loopmgr,
uv_handle_set_data(&loop->quiescent, loop);
char name[16];
- snprintf(name, sizeof(name), "loop-%08" PRIx32, tid);
+ snprintf(name, sizeof(name), "%s-%08" PRIx32, kind, tid);
isc_mem_create(&loop->mctx);
isc_mem_setname(loop->mctx, name);
#endif
}
+static void
+helper_close(isc_loop_t *loop) {
+ int r = uv_loop_close(&loop->loop);
+ UV_RUNTIME_CHECK(uv_loop_close, r);
+
+ INSIST(cds_wfcq_empty(&loop->async_jobs.head, &loop->async_jobs.tail));
+
+ isc_mem_detach(&loop->mctx);
+}
+
static void
loop_close(isc_loop_t *loop) {
int r = uv_loop_close(&loop->loop);
isc_mem_detach(&loop->mctx);
}
+static void *
+helper_thread(void *arg) {
+ isc_loop_t *helper = (isc_loop_t *)arg;
+
+ int r = uv_prepare_start(&helper->quiescent, quiescent_cb);
+ UV_RUNTIME_CHECK(uv_prepare_start, r);
+
+ isc_barrier_wait(&helper->loopmgr->starting);
+
+ r = uv_run(&helper->loop, UV_RUN_DEFAULT);
+ UV_RUNTIME_CHECK(uv_run, r);
+
+ /* Invalidate the helper early */
+ helper->magic = 0;
+
+ isc_barrier_wait(&helper->loopmgr->stopping);
+
+ return (NULL);
+}
+
static void *
loop_thread(void *arg) {
isc_loop_t *loop = (isc_loop_t *)arg;
+ isc_loopmgr_t *loopmgr = loop->loopmgr;
+ isc_loop_t *helper = &loopmgr->helpers[loop->tid];
+ char name[32];
/* Initialize the thread_local variables*/
REQUIRE(isc__loop_local == NULL || isc__loop_local == loop);
isc__tid_init(loop->tid);
+ /* Start the helper thread */
+ isc_thread_create(helper_thread, helper, &helper->thread);
+ snprintf(name, sizeof(name), "isc-helper-%04" PRIu32, loop->tid);
+ isc_thread_setname(helper->thread, name);
+
int r = uv_prepare_start(&loop->quiescent, quiescent_cb);
UV_RUNTIME_CHECK(uv_prepare_start, r);
- isc_barrier_wait(&loop->loopmgr->starting);
+ isc_barrier_wait(&loopmgr->starting);
enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking(
&loop->async_jobs.head, &loop->async_jobs.tail,
/* Invalidate the loop early */
loop->magic = 0;
- isc_barrier_wait(&loop->loopmgr->stopping);
+ /* Shutdown the helper thread */
+ r = uv_async_send(&helper->shutdown_trigger);
+ UV_RUNTIME_CHECK(uv_async_send, r);
+
+ isc_barrier_wait(&loopmgr->stopping);
return (NULL);
}
isc_mem_attach(mctx, &loopmgr->mctx);
- isc_barrier_init(&loopmgr->pausing, loopmgr->nloops);
- isc_barrier_init(&loopmgr->resuming, loopmgr->nloops);
- isc_barrier_init(&loopmgr->starting, loopmgr->nloops);
- isc_barrier_init(&loopmgr->stopping, loopmgr->nloops);
+ /* We need to double the number for loops and helpers */
+ isc_barrier_init(&loopmgr->pausing, loopmgr->nloops * 2);
+ isc_barrier_init(&loopmgr->resuming, loopmgr->nloops * 2);
+ isc_barrier_init(&loopmgr->starting, loopmgr->nloops * 2);
+ isc_barrier_init(&loopmgr->stopping, loopmgr->nloops * 2);
loopmgr->loops = isc_mem_cget(loopmgr->mctx, loopmgr->nloops,
sizeof(loopmgr->loops[0]));
for (size_t i = 0; i < loopmgr->nloops; i++) {
isc_loop_t *loop = &loopmgr->loops[i];
- loop_init(loop, loopmgr, i);
+ loop_init(loop, loopmgr, i, "loop");
+ }
+
+ loopmgr->helpers = isc_mem_cget(loopmgr->mctx, loopmgr->nloops,
+ sizeof(loopmgr->helpers[0]));
+ for (size_t i = 0; i < loopmgr->nloops; i++) {
+ isc_loop_t *loop = &loopmgr->helpers[i];
+ loop_init(loop, loopmgr, i, "helper");
}
loopmgr->sigint = isc_signal_new(loopmgr, isc__loopmgr_signal, loopmgr,
void
isc_loopmgr_pause(isc_loopmgr_t *loopmgr) {
REQUIRE(VALID_LOOPMGR(loopmgr));
+ REQUIRE(isc_tid() != ISC_TID_UNKNOWN);
if (isc_log_wouldlog(ISC_LOG_DEBUG(1))) {
isc_log_write(ISC_LOGCATEGORY_GENERAL, ISC_LOGMODULE_OTHER,
"loop exclusive mode: starting");
}
+ for (size_t i = 0; i < loopmgr->nloops; i++) {
+ isc_loop_t *helper = &loopmgr->helpers[i];
+
+ int r = uv_async_send(&helper->pause_trigger);
+ UV_RUNTIME_CHECK(uv_async_send, r);
+ }
+
for (size_t i = 0; i < loopmgr->nloops; i++) {
isc_loop_t *loop = &loopmgr->loops[i];
RUNTIME_CHECK(atomic_compare_exchange_strong(&loopmgr->running,
&(bool){ true }, false));
+ /* Wait for all helpers to finish */
+ for (size_t i = 0; i < loopmgr->nloops; i++) {
+ isc_loop_t *helper = &loopmgr->helpers[i];
+ isc_thread_join(helper->thread, NULL);
+ }
+
/* First wait for all loops to finish */
for (size_t i = 1; i < loopmgr->nloops; i++) {
isc_loop_t *loop = &loopmgr->loops[i];
loopmgr->magic = 0;
+ for (size_t i = 0; i < loopmgr->nloops; i++) {
+ isc_loop_t *helper = &loopmgr->helpers[i];
+ helper_close(helper);
+ }
+ isc_mem_cput(loopmgr->mctx, loopmgr->helpers, loopmgr->nloops,
+ sizeof(loopmgr->helpers[0]));
+
for (size_t i = 0; i < loopmgr->nloops; i++) {
isc_loop_t *loop = &loopmgr->loops[i];
loop_close(loop);