]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Add isc_helper API that adds 1:1 thread for each loop
authorOndřej Surý <ondrej@isc.org>
Tue, 10 Sep 2024 13:02:53 +0000 (15:02 +0200)
committerOndřej Surý <ondrej@isc.org>
Thu, 12 Sep 2024 12:09:45 +0000 (12:09 +0000)
Add an extra thread that can be used to offload operations that would
affect latency, but are not long-running tasks; those are handled by
isc_work API.

Each isc_loop now has matching isc_helper thread that also built on top
of uv_loop.  In fact, it matches most of the isc_loop functionality, but
only the `isc_helper_run()` asynchronous call is exposed.

lib/isc/Makefile.am
lib/isc/helper.c [new file with mode: 0644]
lib/isc/include/isc/helper.h [new file with mode: 0644]
lib/isc/loop.c
lib/isc/loop_p.h

index 8428ee771b779b7eb637d5e3e7ea8757825ad7c4..77eda51858f2a87dce7dbba6217b7558143446af 100644 (file)
@@ -31,6 +31,7 @@ libisc_la_HEADERS =                   \
        include/isc/hash.h              \
        include/isc/hashmap.h           \
        include/isc/heap.h              \
+       include/isc/helper.h            \
        include/isc/hex.h               \
        include/isc/histo.h             \
        include/isc/hmac.h              \
@@ -135,6 +136,7 @@ libisc_la_SOURCES =         \
        hash.c                  \
        hashmap.c               \
        heap.c                  \
+       helper.c                \
        hex.c                   \
        histo.c                 \
        hmac.c                  \
diff --git a/lib/isc/helper.c b/lib/isc/helper.c
new file mode 100644 (file)
index 0000000..f5a83cc
--- /dev/null
@@ -0,0 +1,68 @@
+/*
+ * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
+ *
+ * SPDX-License-Identifier: MPL-2.0
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, you can obtain one at https://mozilla.org/MPL/2.0/.
+ *
+ * See the COPYRIGHT file distributed with this work for additional
+ * information regarding copyright ownership.
+ */
+
+#include <stdlib.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <isc/atomic.h>
+#include <isc/barrier.h>
+#include <isc/condition.h>
+#include <isc/helper.h>
+#include <isc/job.h>
+#include <isc/loop.h>
+#include <isc/magic.h>
+#include <isc/mem.h>
+#include <isc/mutex.h>
+#include <isc/refcount.h>
+#include <isc/result.h>
+#include <isc/signal.h>
+#include <isc/strerr.h>
+#include <isc/thread.h>
+#include <isc/util.h>
+#include <isc/uv.h>
+#include <isc/work.h>
+
+#include "async_p.h"
+#include "job_p.h"
+#include "loop_p.h"
+
+void
+isc_helper_run(isc_loop_t *loop, isc_job_cb cb, void *cbarg) {
+       REQUIRE(VALID_LOOP(loop));
+       REQUIRE(cb != NULL);
+
+       isc_loop_t *helper = &loop->loopmgr->helpers[loop->tid];
+
+       isc_job_t *job = isc_mem_get(helper->mctx, sizeof(*job));
+       *job = (isc_job_t){
+               .cb = cb,
+               .cbarg = cbarg,
+       };
+
+       cds_wfcq_node_init(&job->wfcq_node);
+
+       /*
+        * cds_wfcq_enqueue() is non-blocking and enqueues the job to async
+        * queue.
+        *
+        * The function returns 'false' in case the queue was empty - in such
+        * case we need to trigger the async callback.
+        */
+       if (!cds_wfcq_enqueue(&helper->async_jobs.head,
+                             &helper->async_jobs.tail, &job->wfcq_node))
+       {
+               int r = uv_async_send(&helper->async_trigger);
+               UV_RUNTIME_CHECK(uv_async_send, r);
+       }
+}
diff --git a/lib/isc/include/isc/helper.h b/lib/isc/include/isc/helper.h
new file mode 100644 (file)
index 0000000..fa13f0b
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
+ *
+ * SPDX-License-Identifier: MPL-2.0
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, you can obtain one at https://mozilla.org/MPL/2.0/.
+ *
+ * See the COPYRIGHT file distributed with this work for additional
+ * information regarding copyright ownership.
+ */
+
+/*! \file isc/helper.h */
+
+#pragma once
+
+#include <inttypes.h>
+
+#include <isc/job.h>
+#include <isc/lang.h>
+#include <isc/loop.h>
+#include <isc/mem.h>
+#include <isc/types.h>
+
+ISC_LANG_BEGINDECLS
+
+void
+isc_helper_run(isc_loop_t *loop, isc_job_cb cb, void *cbarg);
+/*%<
+ * Schedule the job callback 'cb' to be run on the 'loop' event loop.
+ *
+ * Requires:
+ *
+ *\li  'loop' is a valid isc event loop
+ *\li  'cb' is a callback function, must be non-NULL
+ *\li  'cbarg' is passed to the 'cb' as the only argument, may be NULL
+ */
+
+#define isc_helper_current(cb, cbarg) isc_async_run(isc_loop(), cb, cbarg)
+/*%<
+ * Helper macro to run the job on the current loop
+ */
+
+ISC_LANG_ENDDECLS
index 8f2fcdf8e831342be620e312f87cba9a35fc3c01..fb183569736df787e43fa3990a2c7e092c5d3cba 100644 (file)
@@ -186,7 +186,8 @@ shutdown_cb(uv_async_t *handle) {
 }
 
 static void
-loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid) {
+loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid,
+         const char *kind) {
        *loop = (isc_loop_t){
                .tid = tid,
                .loopmgr = loopmgr,
@@ -225,7 +226,7 @@ loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid) {
        uv_handle_set_data(&loop->quiescent, loop);
 
        char name[16];
-       snprintf(name, sizeof(name), "loop-%08" PRIx32, tid);
+       snprintf(name, sizeof(name), "%s-%08" PRIx32, kind, tid);
        isc_mem_create(&loop->mctx);
        isc_mem_setname(loop->mctx, name);
 
@@ -249,6 +250,16 @@ quiescent_cb(uv_prepare_t *handle) {
 #endif
 }
 
+static void
+helper_close(isc_loop_t *loop) {
+       int r = uv_loop_close(&loop->loop);
+       UV_RUNTIME_CHECK(uv_loop_close, r);
+
+       INSIST(cds_wfcq_empty(&loop->async_jobs.head, &loop->async_jobs.tail));
+
+       isc_mem_detach(&loop->mctx);
+}
+
 static void
 loop_close(isc_loop_t *loop) {
        int r = uv_loop_close(&loop->loop);
@@ -262,9 +273,32 @@ loop_close(isc_loop_t *loop) {
        isc_mem_detach(&loop->mctx);
 }
 
+static void *
+helper_thread(void *arg) {
+       isc_loop_t *helper = (isc_loop_t *)arg;
+
+       int r = uv_prepare_start(&helper->quiescent, quiescent_cb);
+       UV_RUNTIME_CHECK(uv_prepare_start, r);
+
+       isc_barrier_wait(&helper->loopmgr->starting);
+
+       r = uv_run(&helper->loop, UV_RUN_DEFAULT);
+       UV_RUNTIME_CHECK(uv_run, r);
+
+       /* Invalidate the helper early */
+       helper->magic = 0;
+
+       isc_barrier_wait(&helper->loopmgr->stopping);
+
+       return (NULL);
+}
+
 static void *
 loop_thread(void *arg) {
        isc_loop_t *loop = (isc_loop_t *)arg;
+       isc_loopmgr_t *loopmgr = loop->loopmgr;
+       isc_loop_t *helper = &loopmgr->helpers[loop->tid];
+       char name[32];
        /* Initialize the thread_local variables*/
 
        REQUIRE(isc__loop_local == NULL || isc__loop_local == loop);
@@ -272,10 +306,15 @@ loop_thread(void *arg) {
 
        isc__tid_init(loop->tid);
 
+       /* Start the helper thread */
+       isc_thread_create(helper_thread, helper, &helper->thread);
+       snprintf(name, sizeof(name), "isc-helper-%04" PRIu32, loop->tid);
+       isc_thread_setname(helper->thread, name);
+
        int r = uv_prepare_start(&loop->quiescent, quiescent_cb);
        UV_RUNTIME_CHECK(uv_prepare_start, r);
 
-       isc_barrier_wait(&loop->loopmgr->starting);
+       isc_barrier_wait(&loopmgr->starting);
 
        enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking(
                &loop->async_jobs.head, &loop->async_jobs.tail,
@@ -293,7 +332,11 @@ loop_thread(void *arg) {
        /* Invalidate the loop early */
        loop->magic = 0;
 
-       isc_barrier_wait(&loop->loopmgr->stopping);
+       /* Shutdown the helper thread */
+       r = uv_async_send(&helper->shutdown_trigger);
+       UV_RUNTIME_CHECK(uv_async_send, r);
+
+       isc_barrier_wait(&loopmgr->stopping);
 
        return (NULL);
 }
@@ -342,16 +385,24 @@ isc_loopmgr_create(isc_mem_t *mctx, uint32_t nloops, isc_loopmgr_t **loopmgrp) {
 
        isc_mem_attach(mctx, &loopmgr->mctx);
 
-       isc_barrier_init(&loopmgr->pausing, loopmgr->nloops);
-       isc_barrier_init(&loopmgr->resuming, loopmgr->nloops);
-       isc_barrier_init(&loopmgr->starting, loopmgr->nloops);
-       isc_barrier_init(&loopmgr->stopping, loopmgr->nloops);
+       /* We need to double the number for loops and helpers */
+       isc_barrier_init(&loopmgr->pausing, loopmgr->nloops * 2);
+       isc_barrier_init(&loopmgr->resuming, loopmgr->nloops * 2);
+       isc_barrier_init(&loopmgr->starting, loopmgr->nloops * 2);
+       isc_barrier_init(&loopmgr->stopping, loopmgr->nloops * 2);
 
        loopmgr->loops = isc_mem_cget(loopmgr->mctx, loopmgr->nloops,
                                      sizeof(loopmgr->loops[0]));
        for (size_t i = 0; i < loopmgr->nloops; i++) {
                isc_loop_t *loop = &loopmgr->loops[i];
-               loop_init(loop, loopmgr, i);
+               loop_init(loop, loopmgr, i, "loop");
+       }
+
+       loopmgr->helpers = isc_mem_cget(loopmgr->mctx, loopmgr->nloops,
+                                       sizeof(loopmgr->helpers[0]));
+       for (size_t i = 0; i < loopmgr->nloops; i++) {
+               isc_loop_t *loop = &loopmgr->helpers[i];
+               loop_init(loop, loopmgr, i, "helper");
        }
 
        loopmgr->sigint = isc_signal_new(loopmgr, isc__loopmgr_signal, loopmgr,
@@ -465,6 +516,7 @@ isc_loopmgr_run(isc_loopmgr_t *loopmgr) {
 void
 isc_loopmgr_pause(isc_loopmgr_t *loopmgr) {
        REQUIRE(VALID_LOOPMGR(loopmgr));
+       REQUIRE(isc_tid() != ISC_TID_UNKNOWN);
 
        if (isc_log_wouldlog(ISC_LOG_DEBUG(1))) {
                isc_log_write(ISC_LOGCATEGORY_GENERAL, ISC_LOGMODULE_OTHER,
@@ -472,6 +524,13 @@ isc_loopmgr_pause(isc_loopmgr_t *loopmgr) {
                              "loop exclusive mode: starting");
        }
 
+       for (size_t i = 0; i < loopmgr->nloops; i++) {
+               isc_loop_t *helper = &loopmgr->helpers[i];
+
+               int r = uv_async_send(&helper->pause_trigger);
+               UV_RUNTIME_CHECK(uv_async_send, r);
+       }
+
        for (size_t i = 0; i < loopmgr->nloops; i++) {
                isc_loop_t *loop = &loopmgr->loops[i];
 
@@ -526,6 +585,12 @@ isc_loopmgr_destroy(isc_loopmgr_t **loopmgrp) {
        RUNTIME_CHECK(atomic_compare_exchange_strong(&loopmgr->running,
                                                     &(bool){ true }, false));
 
+       /* Wait for all helpers to finish */
+       for (size_t i = 0; i < loopmgr->nloops; i++) {
+               isc_loop_t *helper = &loopmgr->helpers[i];
+               isc_thread_join(helper->thread, NULL);
+       }
+
        /* First wait for all loops to finish */
        for (size_t i = 1; i < loopmgr->nloops; i++) {
                isc_loop_t *loop = &loopmgr->loops[i];
@@ -534,6 +599,13 @@ isc_loopmgr_destroy(isc_loopmgr_t **loopmgrp) {
 
        loopmgr->magic = 0;
 
+       for (size_t i = 0; i < loopmgr->nloops; i++) {
+               isc_loop_t *helper = &loopmgr->helpers[i];
+               helper_close(helper);
+       }
+       isc_mem_cput(loopmgr->mctx, loopmgr->helpers, loopmgr->nloops,
+                    sizeof(loopmgr->helpers[0]));
+
        for (size_t i = 0; i < loopmgr->nloops; i++) {
                isc_loop_t *loop = &loopmgr->loops[i];
                loop_close(loop);
index 185406d0d7ee173d747d9a591c6e36f334a9bbd4..a822004e8874ffa787e3556ddbd114fed381b640 100644 (file)
@@ -110,6 +110,7 @@ struct isc_loopmgr {
 
        /* per-thread objects */
        isc_loop_t *loops;
+       isc_loop_t *helpers;
 };
 
 /*