]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Change the isc_async API to use cds_wfcqueue internally
authorOndřej Surý <ondrej@isc.org>
Mon, 8 May 2023 21:31:54 +0000 (23:31 +0200)
committerOndřej Surý <ondrej@isc.org>
Fri, 12 May 2023 12:16:25 +0000 (14:16 +0200)
The isc_async API was using lock-free stack (where enqueue operation was
not wait-free).  Change the isc_async to use cds_wfcqueue internally -
enqueue and splice (move the queue members from one list to another) is
nonblocking and wait-free.

lib/isc/async.c
lib/isc/async_p.h
lib/isc/include/isc/job.h
lib/isc/include/isc/loop.h
lib/isc/job.c
lib/isc/job_p.h
lib/isc/loop.c
lib/isc/loop_p.h
lib/isc/netmgr/netmgr.c

index 326d3a080309a4f0e747640f23d548efd45ed133..db4388964e4573c9311f6739ee8d66c345d77cda 100644 (file)
@@ -27,7 +27,6 @@
 #include <isc/refcount.h>
 #include <isc/result.h>
 #include <isc/signal.h>
-#include <isc/stack.h>
 #include <isc/strerr.h>
 #include <isc/thread.h>
 #include <isc/util.h>
@@ -45,42 +44,72 @@ isc_async_run(isc_loop_t *loop, isc_job_cb cb, void *cbarg) {
 
        isc_job_t *job = isc_mem_get(loop->mctx, sizeof(*job));
        *job = (isc_job_t){
-               .link = ISC_LINK_INITIALIZER,
                .cb = cb,
                .cbarg = cbarg,
        };
 
+       cds_wfcq_node_init(&job->wfcq_node);
+
        /*
-        * Now send the half-initialized job to the loop queue.
+        * cds_wfcq_enqueue() is non-blocking and enqueues the job to async
+        * queue.
+        *
+        * The function returns 'false' in case the queue was empty - in such
+        * case we need to trigger the async callback.
         */
-       ISC_ASTACK_PUSH(loop->async_jobs, job, link);
-
-       int r = uv_async_send(&loop->async_trigger);
-       UV_RUNTIME_CHECK(uv_async_send, r);
+       __tsan_release(job);
+       if (!cds_wfcq_enqueue(&loop->async_jobs.head, &loop->async_jobs.tail,
+                             &job->wfcq_node))
+       {
+               int r = uv_async_send(&loop->async_trigger);
+               UV_RUNTIME_CHECK(uv_async_send, r);
+       }
 }
 
 void
 isc__async_cb(uv_async_t *handle) {
        isc_loop_t *loop = uv_handle_get_data(handle);
+       isc_jobqueue_t jobs;
 
        REQUIRE(VALID_LOOP(loop));
 
-       ISC_STACK(isc_job_t) drain = ISC_ASTACK_TO_STACK(loop->async_jobs);
-       ISC_LIST(isc_job_t) jobs = ISC_LIST_INITIALIZER;
-
-       isc_job_t *job = ISC_STACK_POP(drain, link);
-       isc_job_t *next = NULL;
-       while (job != NULL) {
-               ISC_LIST_PREPEND(jobs, job, link);
+       /* Initialize local wfcqueue */
+       __cds_wfcq_init(&jobs.head, &jobs.tail);
 
-               job = ISC_STACK_POP(drain, link);
+       /*
+        * Move all the elements from loop->async_jobs to a local jobs queue.
+        *
+        * __cds_wfcq_splice_blocking() assumes that synchronization is
+        * done externally - there's no internal locking, unlike
+        * cds_wfcq_splice_blocking(), and we do not need to check whether
+        * it needs to block, unlike __cds_wfcq_splice_nonblocking().
+        *
+        * The reason we can use __cds_wfcq_splice_blocking() is that the
+        * only other function we use is cds_wfcq_enqueue() which doesn't
+        * require any synchronization (see the table in urcu/wfcqueue.h
+        * for more details).
+        */
+       enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking(
+               &jobs.head, &jobs.tail, &loop->async_jobs.head,
+               &loop->async_jobs.tail);
+       INSIST(ret != CDS_WFCQ_RET_WOULDBLOCK);
+       if (ret == CDS_WFCQ_RET_SRC_EMPTY) {
+               /*
+                * Nothing to do, the source queue was empty - most
+                * probably we were called from isc__async_close() below.
+                */
+               return;
        }
 
-       for (job = ISC_LIST_HEAD(jobs),
-           next = (job ? ISC_LIST_NEXT(job, link) : NULL);
-            job != NULL;
-            job = next, next = (job ? ISC_LIST_NEXT(job, link) : NULL))
-       {
+       /*
+        * Walk through the local queue which has now all the members copied
+        * locally, and call the callbacks and free all the isc_job_t(s).
+        */
+       struct cds_wfcq_node *node, *next;
+       __cds_wfcq_for_each_blocking_safe(&jobs.head, &jobs.tail, node, next) {
+               isc_job_t *job = caa_container_of(node, isc_job_t, wfcq_node);
+               __tsan_acquire(job);
+
                job->cb(job->cbarg);
 
                isc_mem_put(loop->mctx, job, sizeof(*job));
index 0d854cd0ae3e71c400ec7abbddcf9264d2965d61..1ce94b99e11e1cfdc1e9e5e587c5fb1647dee949 100644 (file)
 #include <isc/job.h>
 #include <isc/loop.h>
 #include <isc/mem.h>
-#include <isc/stack.h>
 #include <isc/uv.h>
 
-typedef ISC_ASTACK(isc_job_t) isc_asyncstack_t;
-
 void
 isc__async_cb(uv_async_t *handle);
 
index 8eeb60d0f482d61a30104594697480d7f0d3b264..9a44974c251b38f14a67f599a68c67986a3e3b71 100644 (file)
@@ -28,6 +28,7 @@
 #include <isc/mem.h>
 #include <isc/refcount.h>
 #include <isc/types.h>
+#include <isc/urcu.h>
 
 typedef void (*isc_job_cb)(void *);
 typedef struct isc_job isc_job_t;
@@ -35,12 +36,15 @@ typedef struct isc_job isc_job_t;
 struct isc_job {
        isc_job_cb cb;
        void      *cbarg;
-       ISC_LINK(isc_job_t) link;
+       union {
+               struct cds_wfcq_node wfcq_node;
+               ISC_LINK(isc_job_t) link;
+       };
 };
 
-#define ISC_JOB_INITIALIZER                  \
-       {                                    \
-               .link = ISC_LINK_INITIALIZER \
+#define ISC_JOB_INITIALIZER                   \
+       {                                     \
+               .link = ISC_LINK_INITIALIZER, \
        }
 
 ISC_LANG_BEGINDECLS
index 05440375a00c753449dfb434f3b2c305b3687615..93330e60d5bf18669d7297a661e6894b14262895 100644 (file)
@@ -122,11 +122,6 @@ isc_loop_teardown(isc_loop_t *loop, isc_job_cb cb, void *cbarg);
  *     yet been started.
  */
 
-void
-isc_loop_nosetup(isc_loop_t *loop, isc_job_t *job);
-void
-isc_loop_noteardown(isc_loop_t *loop, isc_job_t *job);
-
 void
 isc_loopmgr_setup(isc_loopmgr_t *loopmgr, isc_job_cb cb, void *cbarg);
 void
index 9f64df73e8a54a674a60ec2a112ed183774ffc6d..963248df15c8777c69e1c9b4531afbc45869daa9 100644 (file)
@@ -48,6 +48,7 @@ isc_job_run(isc_loop_t *loop, isc_job_t *job, isc_job_cb cb, void *cbarg) {
 
        job->cb = cb;
        job->cbarg = cbarg;
+       ISC_LINK_INIT(job, link);
 
        ISC_LIST_APPEND(loop->run_jobs, job, link);
 }
index 89fa07807714209095e9c96bf576a15ac69b0afc..06c7bc319ec8ca8c12e357023b03b5e9445f9843 100644 (file)
 
 #pragma once
 
+#include <isc/align.h>
 #include <isc/job.h>
 #include <isc/loop.h>
+#include <isc/os.h>
 #include <isc/uv.h>
 
+/*%
+ * NOTE: We are using struct __cds_wfcq_head that doesn't have an internal
+ * mutex, because we are only using enqueue and splice, and those don't need
+ * any synchronization (see urcu/wfcqueue.h for detailed description).
+ */
+typedef struct isc_jobqueue {
+       alignas(ISC_OS_CACHELINE_SIZE) struct __cds_wfcq_head head;
+       alignas(ISC_OS_CACHELINE_SIZE) struct cds_wfcq_tail tail;
+} isc_jobqueue_t;
+
 typedef ISC_LIST(isc_job_t) isc_joblist_t;
 
 void
index 77f3dcc52e0a9418c363650ec127798ddeb1ae98..1cef7d35ad098af411b6200bcb9a1c25556c5b8f 100644 (file)
@@ -159,7 +159,6 @@ destroy_cb(uv_async_t *handle) {
 
 static void
 shutdown_cb(uv_async_t *handle) {
-       isc_job_t *job = NULL;
        isc_loop_t *loop = uv_handle_get_data(handle);
        isc_loopmgr_t *loopmgr = loop->loopmgr;
 
@@ -178,17 +177,12 @@ shutdown_cb(uv_async_t *handle) {
                isc_signal_destroy(&loopmgr->sigint);
        }
 
-       job = ISC_LIST_TAIL(loop->teardown_jobs);
-       while (job != NULL) {
-               isc_job_t *prev = ISC_LIST_PREV(job, link);
-               ISC_LIST_UNLINK(loop->teardown_jobs, job, link);
-
-               job->cb(job->cbarg);
-
-               isc_mem_put(loop->mctx, job, sizeof(*job));
-
-               job = prev;
-       }
+       enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking(
+               &loop->async_jobs.head, &loop->async_jobs.tail,
+               &loop->teardown_jobs.head, &loop->teardown_jobs.tail);
+       INSIST(ret != CDS_WFCQ_RET_WOULDBLOCK);
+       int r = uv_async_send(&loop->async_trigger);
+       UV_RUNTIME_CHECK(uv_async_send, r);
 }
 
 static void
@@ -202,12 +196,13 @@ loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid) {
        *loop = (isc_loop_t){
                .tid = tid,
                .loopmgr = loopmgr,
-               .async_jobs = ISC_ASTACK_INITIALIZER,
                .run_jobs = ISC_LIST_INITIALIZER,
-               .setup_jobs = ISC_LIST_INITIALIZER,
-               .teardown_jobs = ISC_LIST_INITIALIZER,
        };
 
+       __cds_wfcq_init(&loop->async_jobs.head, &loop->async_jobs.tail);
+       __cds_wfcq_init(&loop->setup_jobs.head, &loop->setup_jobs.tail);
+       __cds_wfcq_init(&loop->teardown_jobs.head, &loop->teardown_jobs.tail);
+
        int r = uv_loop_init(&loop->loop);
        UV_RUNTIME_CHECK(uv_loop_init, r);
 
@@ -248,23 +243,6 @@ loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid) {
        loop->magic = LOOP_MAGIC;
 }
 
-static void
-setup_jobs_cb(void *arg) {
-       isc_loop_t *loop = arg;
-       isc_job_t *job = ISC_LIST_HEAD(loop->setup_jobs);
-
-       while (job != NULL) {
-               isc_job_t *next = ISC_LIST_NEXT(job, link);
-               ISC_LIST_UNLINK(loop->setup_jobs, job, link);
-
-               job->cb(job->cbarg);
-
-               isc_mem_put(loop->mctx, job, sizeof(*job));
-
-               job = next;
-       }
-}
-
 static void
 quiescent_cb(uv_prepare_t *handle) {
        isc__qsbr_quiescent_cb(handle);
@@ -285,7 +263,7 @@ loop_close(isc_loop_t *loop) {
        int r = uv_loop_close(&loop->loop);
        UV_RUNTIME_CHECK(uv_loop_close, r);
 
-       INSIST(ISC_ASTACK_EMPTY(loop->async_jobs));
+       INSIST(cds_wfcq_empty(&loop->async_jobs.head, &loop->async_jobs.tail));
        INSIST(ISC_LIST_EMPTY(loop->run_jobs));
 
        loop->magic = 0;
@@ -306,7 +284,13 @@ loop_thread(void *arg) {
 
        isc_barrier_wait(&loop->loopmgr->starting);
 
-       isc_async_run(loop, setup_jobs_cb, loop);
+       enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking(
+               &loop->async_jobs.head, &loop->async_jobs.tail,
+               &loop->setup_jobs.head, &loop->setup_jobs.tail);
+       INSIST(ret != CDS_WFCQ_RET_WOULDBLOCK);
+
+       r = uv_async_send(&loop->async_trigger);
+       UV_RUNTIME_CHECK(uv_async_send, r);
 
        r = uv_run(&loop->loop, UV_RUN_DEFAULT);
        UV_RUNTIME_CHECK(uv_run, r);
@@ -316,16 +300,6 @@ loop_thread(void *arg) {
        return (NULL);
 }
 
-void
-isc_loop_nosetup(isc_loop_t *loop, isc_job_t *job) {
-       ISC_LIST_DEQUEUE(loop->setup_jobs, job, link);
-}
-
-void
-isc_loop_noteardown(isc_loop_t *loop, isc_job_t *job) {
-       ISC_LIST_DEQUEUE(loop->teardown_jobs, job, link);
-}
-
 /**
  * Public
  */
@@ -406,13 +380,15 @@ isc_loop_setup(isc_loop_t *loop, isc_job_cb cb, void *cbarg) {
        *job = (isc_job_t){
                .cb = cb,
                .cbarg = cbarg,
-               .link = ISC_LINK_INITIALIZER,
        };
 
+       cds_wfcq_node_init(&job->wfcq_node);
+
        REQUIRE(loop->tid == isc_tid() || !atomic_load(&loopmgr->running) ||
                atomic_load(&loopmgr->paused));
 
-       ISC_LIST_APPEND(loop->setup_jobs, job, link);
+       cds_wfcq_enqueue(&loop->setup_jobs.head, &loop->setup_jobs.tail,
+                        &job->wfcq_node);
 
        return (job);
 }
@@ -426,13 +402,14 @@ isc_loop_teardown(isc_loop_t *loop, isc_job_cb cb, void *cbarg) {
        *job = (isc_job_t){
                .cb = cb,
                .cbarg = cbarg,
-               .link = ISC_LINK_INITIALIZER,
        };
+       cds_wfcq_node_init(&job->wfcq_node);
 
        REQUIRE(loop->tid == isc_tid() || !atomic_load(&loopmgr->running) ||
                atomic_load(&loopmgr->paused));
 
-       ISC_LIST_APPEND(loop->teardown_jobs, job, link);
+       cds_wfcq_enqueue(&loop->teardown_jobs.head, &loop->teardown_jobs.tail,
+                        &job->wfcq_node);
 
        return (job);
 }
index 9594a0f306a377e95351fa9b935c9d43327bfcff..b9bef13b1937de84909c51dcd6fd68394240fe8c 100644 (file)
@@ -25,9 +25,9 @@
 #include <isc/refcount.h>
 #include <isc/result.h>
 #include <isc/signal.h>
-#include <isc/stack.h>
 #include <isc/thread.h>
 #include <isc/types.h>
+#include <isc/urcu.h>
 #include <isc/uv.h>
 #include <isc/work.h>
 
@@ -58,7 +58,7 @@ struct isc_loop {
 
        /* Async queue */
        uv_async_t async_trigger;
-       isc_asyncstack_t async_jobs;
+       isc_jobqueue_t async_jobs;
 
        /* Jobs queue */
        uv_idle_t run_trigger;
@@ -69,8 +69,8 @@ struct isc_loop {
 
        /* Shutdown */
        uv_async_t shutdown_trigger;
-       isc_joblist_t setup_jobs;
-       isc_joblist_t teardown_jobs;
+       isc_jobqueue_t setup_jobs;
+       isc_jobqueue_t teardown_jobs;
 
        /* Destroy */
        uv_async_t destroy_trigger;
index 583febaa8a5c855dfe2156653726ff7702f5073b..c861751555603ef120be3781918d481c4d1be107 100644 (file)
@@ -692,8 +692,6 @@ isc___nmsocket_init(isc_nmsocket_t *sock, isc__networker_t *worker,
                .active_handles = ISC_LIST_INITIALIZER,
                .active_link = ISC_LINK_INITIALIZER,
                .active = true,
-               .job = ISC_JOB_INITIALIZER,
-               .quotacb = ISC_JOB_INITIALIZER,
        };
 
        if (iface != NULL) {
@@ -716,8 +714,6 @@ isc___nmsocket_init(isc_nmsocket_t *sock, isc__networker_t *worker,
        isc__networker_attach(worker, &sock->worker);
        sock->uv_handle.handle.data = sock;
 
-       ISC_LINK_INIT(&sock->quotacb, link);
-
        switch (type) {
        case isc_nm_udpsocket:
        case isc_nm_udplistener:
@@ -805,7 +801,6 @@ alloc_handle(isc_nmsocket_t *sock) {
                .magic = NMHANDLE_MAGIC,
                .active_link = ISC_LINK_INITIALIZER,
                .inactive_link = ISC_LINK_INITIALIZER,
-               .job = ISC_JOB_INITIALIZER,
        };
        isc_refcount_init(&handle->references, 1);
 
@@ -1522,7 +1517,6 @@ isc___nm_uvreq_get(isc_nmsocket_t *sock FLARG) {
                .connect_tries = 3,
                .link = ISC_LINK_INITIALIZER,
                .active_link = ISC_LINK_INITIALIZER,
-               .job = ISC_JOB_INITIALIZER,
                .magic = UVREQ_MAGIC,
        };
        uv_handle_set_data(&req->uv_req.handle, req);