]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Replace the shared work pool with per-loop, per-lane worker threads 12226/head
authorOndřej Surý <ondrej@isc.org>
Tue, 9 Jun 2026 04:12:58 +0000 (06:12 +0200)
committerOndřej Surý <ondrej@isc.org>
Wed, 17 Jun 2026 17:07:08 +0000 (19:07 +0200)
Offloaded work used two different mechanisms: a per-loop isc_helper
thread for CPU-bound crypto (DNSSEC validation, message signature
checks) and the process-global libuv thread pool for blocking I/O (zone
load and dump, inbound transfer apply). Neither could cancel a queued
task, and the two disagreed about exclusive mode — the helper paused
with its loop under isc_loopmgr_pause() but the libuv pool did not, so
blocking offloaded work kept running while a loop held the exclusive
lock.

Unify both behind isc_work: each loop gets its own worker thread per
lane — FAST for short, bounded tasks and SLOW for long, blocking ones —
fed by a private queue. Separate lanes keep a short crypto task off the
path of a multi-second zone dump once both run on per-loop workers;
every lane parks with isc_loopmgr_pause() so exclusive mode now quiesces
offloaded work too; and a still-queued task can be canceled before it
starts (isc_work_cancel). isc_helper is removed and its callers select a
lane.

21 files changed:
bin/dig/nslookup.c
lib/dns/catz.c
lib/dns/master.c
lib/dns/masterdump.c
lib/dns/message.c
lib/dns/rpz.c
lib/dns/validator.c
lib/dns/xfrin.c
lib/isc/helper.c [deleted file]
lib/isc/httpd.c
lib/isc/include/isc/helper.h [deleted file]
lib/isc/include/isc/loop.h
lib/isc/include/isc/queue.h
lib/isc/include/isc/urcu.h
lib/isc/include/isc/work.h
lib/isc/loop.c
lib/isc/loop_p.h
lib/isc/meson.build
lib/isc/work.c
meson.build
tests/isc/work_test.c

index 177cdeba6d4dbdc1ebe395b6598285dd2c7eff27..ac7a6c8285ac7e9c7cd17704edf95aa8da841b04 100644 (file)
@@ -841,7 +841,8 @@ static void
 start_next_command(void);
 
 static void
-process_next_command(void *arg ISC_ATTR_UNUSED) {
+process_next_command(void *arg ISC_ATTR_UNUSED,
+                    isc_result_t result ISC_ATTR_UNUSED) {
        isc_loop_t *loop = isc_loop_main();
        if (cmdline == NULL) {
                in_use = false;
@@ -868,11 +869,11 @@ start_next_command(void) {
 
        isc_loopmgr_pause();
        if (interactive) {
-               isc_work_enqueue(loop, readline_next_command,
+               isc_work_enqueue(loop, ISC_WORKLANE_FAST, readline_next_command,
                                 process_next_command, loop);
        } else {
-               isc_work_enqueue(loop, fgets_next_command, process_next_command,
-                                loop);
+               isc_work_enqueue(loop, ISC_WORKLANE_FAST, fgets_next_command,
+                                process_next_command, loop);
        }
        isc_loopmgr_resume();
 }
index b731432678b79dd2391975720534bcfa4f868705..b63a42512677a59dbc39c66d2bf303d0eb8a4732 100644 (file)
 
 #include <dns/catz.h>
 #include <dns/dbiterator.h>
+#include <dns/name.h>
 #include <dns/rdatasetiter.h>
 #include <dns/view.h>
 #include <dns/zone.h>
 
-#include "dns/name.h"
-
 #define DNS_CATZ_ZONE_MAGIC  ISC_MAGIC('c', 'a', 't', 'z')
 #define DNS_CATZ_ZONES_MAGIC ISC_MAGIC('c', 'a', 't', 's')
 #define DNS_CATZ_ENTRY_MAGIC ISC_MAGIC('c', 'a', 't', 'e')
@@ -118,7 +117,7 @@ dns__catz_timer_stop(void *arg);
 static void
 dns__catz_update_cb(void *data);
 static void
-dns__catz_done_cb(void *data);
+dns__catz_done_cb(void *data, isc_result_t result);
 
 static isc_result_t
 catz_process_zones_entry(dns_catz_zone_t *catz, dns_rdataset_t *value,
@@ -2056,8 +2055,8 @@ dns__catz_timer_cb(void *arg) {
                      "catz: %s: reload start", domain);
 
        dns_catz_zone_ref(catz);
-       isc_work_enqueue(catz->loop, dns__catz_update_cb, dns__catz_done_cb,
-                        catz);
+       isc_work_enqueue(catz->loop, ISC_WORKLANE_SLOW, dns__catz_update_cb,
+                        dns__catz_done_cb, catz);
 
 exit:
        isc_timer_destroy(&catz->updatetimer);
@@ -2444,7 +2443,7 @@ exit:
 }
 
 static void
-dns__catz_done_cb(void *data) {
+dns__catz_done_cb(void *data, isc_result_t result ISC_ATTR_UNUSED) {
        dns_catz_zone_t *catz = (dns_catz_zone_t *)data;
        char dname[DNS_NAME_FORMATSIZE];
 
index 741b18f981d460701d84aab6a43b76b3cfde41de..f380a69b97699fff363afe10edd71479058010e2 100644 (file)
@@ -2596,7 +2596,7 @@ master_load_callback(void *arg) {
 }
 
 static void
-master_load_done(void *arg) {
+master_load_done(void *arg, isc_result_t result ISC_ATTR_UNUSED) {
        dns_loadctx_t *lctx = arg;
 
        isc_async_run(lctx->loop, master_load_callback, lctx);
@@ -2632,7 +2632,8 @@ dns_master_loadfileasync(const char *master_file, dns_name_t *top,
 
        dns_loadctx_ref(lctx);
        isc_loop_attach(loop, &lctx->loop);
-       isc_work_enqueue(isc_loop(), master_load, master_load_done, lctx);
+       isc_work_enqueue(isc_loop(), ISC_WORKLANE_SLOW, master_load,
+                        master_load_done, lctx);
 
        *lctxp = lctx;
 
index 4249e6d6f8633c694252c49ae8d88b7024a37fa9..671f33cf55eb0c4481c7431adb2f27c7b7aed892 100644 (file)
@@ -1499,7 +1499,7 @@ master_dump_callback(void *data) {
 }
 
 static void
-master_dump_done(void *data) {
+master_dump_done(void *data, isc_result_t result ISC_ATTR_UNUSED) {
        dns_dumpctx_t *dctx = data;
 
        isc_async_run(dctx->loop, master_dump_callback, dctx);
@@ -1742,7 +1742,8 @@ dns_master_dumptostreamasync(isc_mem_t *mctx, dns_db_t *db,
 
        dns_dumpctx_ref(dctx);
        isc_loop_attach(loop, &dctx->loop);
-       isc_work_enqueue(isc_loop(), master_dump, master_dump_done, dctx);
+       isc_work_enqueue(isc_loop(), ISC_WORKLANE_SLOW, master_dump,
+                        master_dump_done, dctx);
 
        *dctxp = dctx;
 
@@ -1834,7 +1835,8 @@ dns_master_dumpasync(isc_mem_t *mctx, dns_db_t *db, dns_dbversion_t *version,
 
        dns_dumpctx_ref(dctx);
        isc_loop_attach(loop, &dctx->loop);
-       isc_work_enqueue(isc_loop(), master_dump, master_dump_done, dctx);
+       isc_work_enqueue(isc_loop(), ISC_WORKLANE_SLOW, master_dump,
+                        master_dump_done, dctx);
 
        *dctxp = dctx;
 
index 9b5360ea3fcb55b573de5f744f9d8933878dee7c..424a11ff2d52393305415c1a1df7924742aefab8 100644 (file)
@@ -25,7 +25,6 @@
 #include <isc/buffer.h>
 #include <isc/hash.h>
 #include <isc/hashmap.h>
-#include <isc/helper.h>
 #include <isc/log.h>
 #include <isc/mem.h>
 #include <isc/result.h>
@@ -3001,23 +3000,22 @@ dns_message_dumpsig(dns_message_t *msg, char *txt1) {
 #endif /* ifdef SKAN_MSG_DEBUG */
 
 static void
-checksig_done(void *arg);
+checksig_done(void *arg, isc_result_t result);
 
 static void
 checksig_run(void *arg) {
        checksig_ctx_t *chsigctx = arg;
 
        chsigctx->result = dns_message_checksig(chsigctx->msg, chsigctx->view);
-
-       isc_async_run(chsigctx->loop, checksig_done, chsigctx);
 }
 
 static void
-checksig_done(void *arg) {
+checksig_done(void *arg, isc_result_t result ISC_ATTR_UNUSED) {
        checksig_ctx_t *chsigctx = arg;
        dns_message_t *msg = chsigctx->msg;
 
-       chsigctx->cb(chsigctx->cbarg, chsigctx->result);
+       chsigctx->cb(chsigctx->cbarg,
+                    (result != ISC_R_SUCCESS) ? result : chsigctx->result);
 
        dns_view_detach(&chsigctx->view);
        isc_loop_detach(&chsigctx->loop);
@@ -3044,7 +3042,8 @@ dns_message_checksig_async(dns_message_t *msg, dns_view_t *view,
        dns_view_attach(view, &chsigctx->view);
 
        dns_message_clonebuffer(msg);
-       isc_helper_run(loop, checksig_run, chsigctx);
+       isc_work_enqueue(loop, ISC_WORKLANE_FAST, checksig_run, checksig_done,
+                        chsigctx);
 
        return DNS_R_WAIT;
 }
index 079d34e3501d2df3fb4accb4e4eeccdd0ba2651b..a4b13460430cb753d13de91d2e05b52966676394 100644 (file)
@@ -1661,7 +1661,7 @@ dns__rpz_timer_stop(void *arg) {
 }
 
 static void
-update_rpz_done_cb(void *data) {
+update_rpz_done_cb(void *data, isc_result_t result ISC_ATTR_UNUSED) {
        dns_rpz_zone_t *rpz = (dns_rpz_zone_t *)data;
        char dname[DNS_NAME_FORMATSIZE];
 
@@ -1958,7 +1958,8 @@ dns__rpz_timer_cb(void *arg) {
                      "rpz: %s: reload start", domain);
 
        dns_rpz_zones_ref(rpz->rpzs);
-       isc_work_enqueue(rpz->loop, update_rpz_cb, update_rpz_done_cb, rpz);
+       isc_work_enqueue(rpz->loop, ISC_WORKLANE_SLOW, update_rpz_cb,
+                        update_rpz_done_cb, rpz);
 
        isc_timer_destroy(&rpz->updatetimer);
        rpz->loop = NULL;
index 94998ce9a038b9aeb650695ec38a309660454ada..38ccb5611ca2831f9a4e0de4e7f1fd124079ea78 100644 (file)
@@ -18,7 +18,6 @@
 #include <isc/atomic.h>
 #include <isc/base32.h>
 #include <isc/counter.h>
-#include <isc/helper.h>
 #include <isc/job.h>
 #include <isc/log.h>
 #include <isc/md.h>
@@ -135,7 +134,7 @@ validate_async_done(dns_validator_t *val, isc_result_t result);
 static isc_result_t
 validate_async_run(dns_validator_t *val, isc_job_cb cb);
 static isc_result_t
-validate_helper_run(dns_validator_t *val, isc_job_cb cb);
+validate_work_enqueue(dns_validator_t *val, isc_job_cb cb);
 
 static void
 validate_dnskey(void *arg);
@@ -497,7 +496,7 @@ fetch_callback_dnskey(void *arg) {
                        if (eresult == ISC_R_SUCCESS &&
                            rdataset->trust >= dns_trust_secure)
                        {
-                               result = validate_helper_run(
+                               result = validate_work_enqueue(
                                        val, resume_answer_with_key);
                        } else {
                                result = validate_async_run(val, resume_answer);
@@ -687,8 +686,8 @@ validator_callback_dnskey(void *arg) {
                 * Only extract the dst key if the keyset is secure.
                 */
                if (val->frdataset.trust >= dns_trust_secure) {
-                       result = validate_helper_run(val,
-                                                    resume_answer_with_key);
+                       result = validate_work_enqueue(val,
+                                                      resume_answer_with_key);
                } else {
                        result = validate_async_run(val, resume_answer);
                }
@@ -1315,7 +1314,8 @@ seek_dnskey(dns_validator_t *val) {
                        }
                        dns_rdataset_cleanup(&val->fsigrdataset);
 
-                       return validate_helper_run(val, resume_answer_with_key);
+                       return validate_work_enqueue(val,
+                                                    resume_answer_with_key);
                }
                break;
 
@@ -1773,7 +1773,7 @@ validate_answer_signing_key_done(void *arg) {
                val->result = ISC_R_CANCELED;
        } else if (val->key != NULL) {
                /* Process with next key if we selected one */
-               (void)validate_helper_run(val, validate_answer_signing_key);
+               (void)validate_work_enqueue(val, validate_answer_signing_key);
                return;
        }
 
@@ -1841,7 +1841,7 @@ validate_answer_process(void *arg) {
                goto next_key;
        }
 
-       (void)validate_helper_run(val, validate_answer_signing_key);
+       (void)validate_work_enqueue(val, validate_answer_signing_key);
        return;
 
 next_key:
@@ -1964,10 +1964,15 @@ validate_async_run(dns_validator_t *val, isc_job_cb cb) {
        return DNS_R_WAIT;
 }
 
+static void
+null_done(void *arg ISC_ATTR_UNUSED, isc_result_t result ISC_ATTR_UNUSED) {
+       /* no-op for now */
+}
+
 static isc_result_t
-validate_helper_run(dns_validator_t *val, isc_job_cb cb) {
+validate_work_enqueue(dns_validator_t *val, isc_job_cb cb) {
        val->attributes |= VALATTR_OFFLOADED;
-       isc_helper_run(val->loop, cb, val);
+       isc_work_enqueue(val->loop, ISC_WORKLANE_FAST, cb, null_done, val);
        return DNS_R_WAIT;
 }
 
@@ -2306,7 +2311,7 @@ validate_dnskey_dsset_next_done(void *arg) {
                break;
        default:
                /* Continue validation until we have success or no more data */
-               (void)validate_helper_run(val, validate_dnskey_dsset_next);
+               (void)validate_work_enqueue(val, validate_dnskey_dsset_next);
                return;
        }
 
@@ -2333,8 +2338,8 @@ validate_dnskey_dsset_first(dns_validator_t *val) {
                /* continue async run */
                result = validate_dnskey_dsset(val);
                if (result != ISC_R_SUCCESS) {
-                       (void)validate_helper_run(val,
-                                                 validate_dnskey_dsset_next);
+                       (void)validate_work_enqueue(val,
+                                                   validate_dnskey_dsset_next);
                        return;
                }
        }
index 652d2da596bcb7779bd907560bda48b1ef505e22..408dc707e9c62fd8de7bcd6d648a977a0e1f9aef 100644 (file)
@@ -374,10 +374,11 @@ cleanup:
 }
 
 static void
-axfr_apply_done(void *arg) {
+axfr_apply_done(void *arg, isc_result_t eresult) {
        xfrin_work_t *work = arg;
        dns_xfrin_t *xfr = work->xfr;
-       isc_result_t result = work->result;
+       isc_result_t result = (eresult == ISC_R_SUCCESS) ? work->result
+                                                        : eresult;
 
        REQUIRE(VALID_XFRIN(xfr));
        REQUIRE(VALID_XFRIN_WORK(work));
@@ -421,7 +422,8 @@ axfr_commit(dns_xfrin_t *xfr) {
                .xfr = dns_xfrin_ref(xfr),
        };
        xfr->diff_running = true;
-       isc_work_enqueue(xfr->loop, axfr_apply, axfr_apply_done, work);
+       isc_work_enqueue(xfr->loop, ISC_WORKLANE_SLOW, axfr_apply,
+                        axfr_apply_done, work);
 }
 
 static isc_result_t
@@ -613,14 +615,15 @@ ixfr_apply(void *arg) {
 }
 
 static void
-ixfr_apply_done(void *arg) {
+ixfr_apply_done(void *arg, isc_result_t eresult) {
        xfrin_work_t *work = arg;
        REQUIRE(VALID_XFRIN_WORK(work));
 
        dns_xfrin_t *xfr = work->xfr;
        REQUIRE(VALID_XFRIN(xfr));
 
-       isc_result_t result = work->result;
+       isc_result_t result = (eresult == ISC_R_SUCCESS) ? work->result
+                                                        : eresult;
 
        if (atomic_load(&xfr->shuttingdown)) {
                result = ISC_R_SHUTTINGDOWN;
@@ -632,7 +635,8 @@ ixfr_apply_done(void *arg) {
        if (!xfr->retry_axfr &&
            !cds_wfcq_empty(&xfr->diff_head, &xfr->diff_tail))
        {
-               isc_work_enqueue(xfr->loop, ixfr_apply, ixfr_apply_done, work);
+               isc_work_enqueue(xfr->loop, ISC_WORKLANE_SLOW, ixfr_apply,
+                                ixfr_apply_done, work);
                return;
        }
 
@@ -712,7 +716,8 @@ ixfr_commit(dns_xfrin_t *xfr) {
                        .xfr = dns_xfrin_ref(xfr),
                };
                xfr->diff_running = true;
-               isc_work_enqueue(xfr->loop, ixfr_apply, ixfr_apply_done, work);
+               isc_work_enqueue(xfr->loop, ISC_WORKLANE_SLOW, ixfr_apply,
+                                ixfr_apply_done, work);
        }
 
 cleanup:
diff --git a/lib/isc/helper.c b/lib/isc/helper.c
deleted file mode 100644 (file)
index 2ef13d8..0000000
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
- *
- * SPDX-License-Identifier: MPL-2.0
- *
- * This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, you can obtain one at https://mozilla.org/MPL/2.0/.
- *
- * See the COPYRIGHT file distributed with this work for additional
- * information regarding copyright ownership.
- */
-
-#include <stdlib.h>
-#include <sys/types.h>
-#include <unistd.h>
-
-#include <isc/atomic.h>
-#include <isc/barrier.h>
-#include <isc/helper.h>
-#include <isc/job.h>
-#include <isc/loop.h>
-#include <isc/magic.h>
-#include <isc/mem.h>
-#include <isc/mutex.h>
-#include <isc/refcount.h>
-#include <isc/result.h>
-#include <isc/signal.h>
-#include <isc/strerr.h>
-#include <isc/thread.h>
-#include <isc/util.h>
-#include <isc/uv.h>
-#include <isc/work.h>
-
-#include "async_p.h"
-#include "job_p.h"
-#include "loop_p.h"
-
-void
-isc_helper_run(isc_loop_t *loop, isc_job_cb cb, void *cbarg) {
-       REQUIRE(VALID_LOOP(loop));
-       REQUIRE(cb != NULL);
-
-       isc_loop_t *helper = isc_loop_helper(loop);
-       isc_job_t *job = isc_mem_get(helper->mctx, sizeof(*job));
-       *job = (isc_job_t){
-               .cb = cb,
-               .cbarg = cbarg,
-       };
-
-       cds_wfcq_node_init(&job->wfcq_node);
-
-       /*
-        * cds_wfcq_enqueue() is non-blocking and enqueues the job to async
-        * queue.
-        *
-        * The function returns 'false' in case the queue was empty - in such
-        * case we need to trigger the async callback.
-        */
-       if (!cds_wfcq_enqueue(&helper->async_jobs.head,
-                             &helper->async_jobs.tail, &job->wfcq_node))
-       {
-               int r = uv_async_send(&helper->async_trigger);
-               UV_RUNTIME_CHECK(uv_async_send, r);
-       }
-}
index 910e15d00feb51f8747acd0e5b57536dcbf2e728..465f29da9e497d9a69864bb67b2eeb7cddd72bf2 100644 (file)
@@ -869,7 +869,7 @@ prepare_response(void *arg) {
 }
 
 static void
-prepare_response_done(void *arg) {
+prepare_response_done(void *arg, isc_result_t result) {
        isc_region_t r;
        isc_httpd_sendreq_t *req = arg;
        isc_httpd_t *httpd = req->httpd;
@@ -879,6 +879,11 @@ prepare_response_done(void *arg) {
         */
        isc_buffer_usedregion(req->sendbuffer, &r);
 
+       if (result != ISC_R_SUCCESS) {
+               httpd_senddone(httpd->handle, result, req);
+               return;
+       }
+
        isc_nm_send(httpd->handle, &r, httpd_senddone, req);
 }
 
@@ -939,8 +944,8 @@ httpd_request(isc_nmhandle_t *handle, isc_result_t eresult,
 
        isc_httpd_sendreq_t *req = isc__httpd_sendreq_new(httpd);
        isc_nmhandle_ref(handle);
-       isc_work_enqueue(isc_loop(), prepare_response, prepare_response_done,
-                        req);
+       isc_work_enqueue(isc_loop(), ISC_WORKLANE_SLOW, prepare_response,
+                        prepare_response_done, req);
        return;
 
 close_readhandle:
diff --git a/lib/isc/include/isc/helper.h b/lib/isc/include/isc/helper.h
deleted file mode 100644 (file)
index e13ec9b..0000000
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
- *
- * SPDX-License-Identifier: MPL-2.0
- *
- * This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, you can obtain one at https://mozilla.org/MPL/2.0/.
- *
- * See the COPYRIGHT file distributed with this work for additional
- * information regarding copyright ownership.
- */
-
-/*! \file isc/helper.h */
-
-#pragma once
-
-#include <inttypes.h>
-
-#include <isc/job.h>
-#include <isc/loop.h>
-#include <isc/mem.h>
-#include <isc/types.h>
-
-void
-isc_helper_run(isc_loop_t *loop, isc_job_cb cb, void *cbarg);
-/*%<
- * Schedule the job callback 'cb' to be run on the 'loop' event loop.
- *
- * Requires:
- *
- *\li  'loop' is a valid isc event loop
- *\li  'cb' is a callback function, must be non-NULL
- *\li  'cbarg' is passed to the 'cb' as the only argument, may be NULL
- */
-
-#define isc_helper_current(cb, cbarg) isc_async_run(isc_loop(), cb, cbarg)
-/*%<
- * Helper macro to run the job on the current loop
- */
index f076bf4423dcfe0e693cc3bb05ccc456f111e23a..e5dbd8ccd31cfadf80189d09e43dd071d99b95af 100644 (file)
 
 #include <inttypes.h>
 
-#include <urcu/compiler.h>
-#include <urcu/system.h>
-
 #include <isc/job.h>
 #include <isc/mem.h>
 #include <isc/refcount.h>
 #include <isc/types.h>
+#include <isc/urcu.h>
+#include <isc/work.h>
 
 typedef void (*isc_job_cb)(void *);
 
@@ -205,13 +204,3 @@ isc_loop_shuttingdown(isc_loop_t *loop);
  *
  * \li 'loop' is a valid loop and the loop tid matches the current tid.
  */
-
-isc_loop_t *
-isc_loop_helper(isc_loop_t *loop);
-/*%<
- * Returns the helper thread corresponding to the thread ID for 'loop'.
- *
- * Requires:
- *
- * \li 'loop' is a valid loop.
- */
index 02f94d866de18b6ba8c6ddad4c2b7d61324c27ad..1996ba195cf9e267718664a698101128e0f16b3d 100644 (file)
@@ -24,7 +24,7 @@ typedef struct isc_queue {
                                              sizeof(struct __cds_wfcq_head)];
        struct cds_wfcq_tail   tail;
        uint8_t                __padding_tail[ISC_OS_CACHELINE_SIZE -
-                                             sizeof(struct __cds_wfcq_head)];
+                                             sizeof(struct cds_wfcq_tail)];
 } isc_queue_t;
 
 typedef struct cds_wfcq_node isc_queue_node_t;
index 20c2902d43ec8afd8b3bf816737651e2446a3948..1962078745fcb07c4131ce7ee8333658e658bdcf 100644 (file)
 #include <urcu-bp.h>
 #endif
 
-#include <urcu-pointer.h>
-
+#if HAVE_URCU_ASSERT_H
+#include <urcu/assert.h>
+#endif
+#if HAVE_URCU_UATOMIC_H
+#include <urcu/uatomic.h>
+#endif
 #include <urcu/compiler.h>
+#include <urcu/futex.h>
 #include <urcu/list.h>
+#if HAVE_URCU_POINTER_H
+#include <urcu/pointer.h>
+#else
+#include <urcu-pointer.h>
+#endif
 #include <urcu/rculfhash.h>
 #include <urcu/rculist.h>
 #include <urcu/ref.h>
 #define _CMM_STORE_SHARED(x, v) CMM_STORE_SHARED(x, v)
 
 #endif /* __SANITIZE_THREAD__ */
+
+#if !defined(uatomic_load) || !defined(uatomic_store)
+#define uatomic_load(ptr, mo)    uatomic_read(ptr)
+#define uatomic_store(ptr, v, mo) uatomic_set(ptr, v)
+
+#define CMM_RELAXED __ATOMIC_RELAXED
+#define CMM_CONSUME __ATOMIC_CONSUME
+#define CMM_ACQUIRE __ATOMIC_ACQUIRE
+#define CMM_RELEASE __ATOMIC_RELEASE
+#define CMM_ACQ_REL __ATOMIC_ACQ_REL
+#define CMM_SEQ_CST __ATOMIC_SEQ_CST
+#endif
index d986b2e7093443360af0fa9a416059ed2c9abee8..a4567f13f0ddbf0835ea2d238624f6087572199f 100644 (file)
  * information regarding copyright ownership.
  */
 
+/*! \file isc/work.h
+ * \brief Offload work from an event loop onto a dedicated worker thread.
+ *
+ * Each isc event loop has one worker thread per lane (see isc_worklane_t).
+ * isc_work_enqueue() runs a callback on the worker thread bound to the calling
+ * loop's lane and, when it finishes, runs a second callback back on that loop.
+ * The handle it returns can be used to cancel a task that has not started
+ * running yet.
+ */
+
 #pragma once
 
-#include <stdlib.h>
+#include <isc/mem.h>
 
-#include <isc/loop.h>
+typedef enum isc_worklane {
+       ISC_WORKLANE_FAST = 0, /*%< short, bounded tasks (e.g. crypto) */
+       ISC_WORKLANE_SLOW,     /*%< blocking/long tasks (e.g. zone dump) */
+       ISC_WORKLANE_COUNT,
+} isc_worklane_t;
+/*%<
+ * Selects which per-loop worker thread runs an enqueued task.  Keeping long,
+ * blocking SLOW work (disk I/O, zone dump/load) on its own lane stops it from
+ * holding up short FAST tasks behind it.
+ */
 
 typedef void (*isc_work_cb)(void *arg);
-typedef void (*isc_after_work_cb)(void *arg);
+typedef void (*isc_work_done_cb)(void *arg, isc_result_t result);
 typedef struct isc_work isc_work_t;
 
-void
-isc_work_enqueue(isc_loop_t *loop, isc_work_cb work_cb,
-                isc_after_work_cb after_work_cb, void *cbarg);
+isc_work_t *
+isc_work_enqueue(isc_loop_t *loop, isc_worklane_t lane, isc_work_cb cb,
+                isc_work_done_cb done_cb, void *cbarg);
 /*%<
- * Schedules work to be handled by the libuv thread pool (see uv_work_t).
- * The function specified in `work_cb` will be run by a thread in the
- * thread pool; when complete, the `after_work_cb` function will run
- * in 'loop' to inform the caller that the work was completed.
+ * Schedule 'cb' to run on the worker thread bound to 'loop' and 'lane'.  When
+ * 'cb' returns, 'done_cb' is scheduled back on 'loop' with the result of the
+ * work: ISC_R_SUCCESS normally, or ISC_R_CANCELED if the task was canceled
+ * before it started (see isc_work_cancel()).
+ *
+ * Returns a handle that may be passed to isc_work_cancel().  The handle is
+ * owned by 'loop' and stays valid until 'after_cb' has run; it must not be used
+ * afterwards.
  *
  * Requires:
- * \li 'loop' is a valid event loop.
- * \li 'work_cb' and 'after_work_cb' are not NULL.
+ *
+ *\li  'loop' is a valid isc event loop.
+ *\li  'cb' is non-NULL.
+ *\li  'after_cb' is non-NULL.
+ *\li  'cbarg' is passed to both callbacks, may be NULL.
+ */
+
+bool
+isc_work_cancel(isc_work_t *work);
+/*%<
+ * Try to cancel 'work' before its 'cb' starts running.  If the task is still
+ * queued it is marked canceled and 'cb' will not run; if it is already running
+ * or has finished this has no effect.  Either way the 'after_cb' passed to
+ * isc_work_enqueue() still runs on the origin loop, with ISC_R_CANCELED when
+ * the cancel succeeded.  Nothing is freed here.
+ *
+ * Returns:
+ *
+ *\li  true    the task was still queued; 'cb' will not run.
+ *\li  false   the task is already running or done (uv_cancel() semantics).
+ *
+ * Requires:
+ *
+ *\li  'work' is a handle from isc_work_enqueue() whose 'after_cb' has not run.
+ */
+
+/* private */
+
+typedef struct isc__workthread isc__workthread_t;
+
+isc__workthread_t *
+isc__workthread_create(isc_mem_t *mctx, isc_worklane_t lane);
+/*%<
+ * Create one worker thread for 'lane' with its own dispatch queue (the loop
+ * manager creates one per loop).  Used by the loop manager; not for general
+ * use.
+ */
+
+void
+isc__workthread_shutdown(isc__workthread_t *thread);
+/*%<
+ * Begin shutdown of 'thread': set its SHUTDOWN flag (after which new enqueues
+ * run inline on the caller instead of queuing), then, after an RCU grace period
+ * that fences any in-flight enqueue, wake the worker so it drains its queue and
+ * exits.  Does not join the worker; see isc__workthread_destroy().  Idempotent.
+ */
+
+void
+isc__workthread_pause(isc__workthread_t *thread);
+/*%<
+ * Quiesce 'thread' for isc_loopmgr_pause(): mark it paused and block until it
+ * has parked.  A no-op if the worker is already shutting down.  Must be paired
+ * with isc__workthread_resume() and called from the worker's owning loop.
+ */
+
+void
+isc__workthread_resume(isc__workthread_t *thread);
+/*%<
+ * Release a worker previously parked by isc__workthread_pause().
+ */
+
+void
+isc__workthread_destroy(isc__workthread_t **threadp);
+/*%<
+ * Join the worker thread, then free '*threadp' and set it to NULL.  Must be
+ * called after isc__workthread_shutdown().
  */
index 5a7c7763601146b5bf64241d5c15336bcf7309d8..062bcfa599ec1f9f55290ebe8925b8ce3fd87f84 100644 (file)
@@ -94,6 +94,11 @@ static void
 pause_loop(isc_loop_t *loop) {
        isc_loopmgr_t *loopmgr = isc__loopmgr;
 
+       /* Quiesce this loop's own workers before going to the barrier. */
+       for (isc_worklane_t lane = 0; lane < ISC_WORKLANE_COUNT; lane++) {
+               isc__workthread_pause(loop->workthreads[lane]);
+       }
+
        rcu_thread_offline();
 
        loop->paused = true;
@@ -108,6 +113,10 @@ resume_loop(isc_loop_t *loop) {
        loop->paused = false;
 
        rcu_thread_online();
+
+       for (isc_worklane_t lane = 0; lane < ISC_WORKLANE_COUNT; lane++) {
+               isc__workthread_resume(loop->workthreads[lane]);
+       }
 }
 
 static void
@@ -180,6 +189,10 @@ shutdown_cb(uv_async_t *handle) {
                isc_signal_destroy(&loopmgr->sigint);
        }
 
+       for (isc_worklane_t lane = 0; lane < ISC_WORKLANE_COUNT; lane++) {
+               isc__workthread_shutdown(loop->workthreads[lane]);
+       }
+
        enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking(
                &loop->async_jobs.head, &loop->async_jobs.tail,
                &loop->teardown_jobs.head, &loop->teardown_jobs.tail);
@@ -248,16 +261,6 @@ quiescent_cb(uv_prepare_t *handle) {
 #endif
 }
 
-static void
-helper_close(isc_loop_t *loop) {
-       int r = uv_loop_close(&loop->loop);
-       UV_RUNTIME_CHECK(uv_loop_close, r);
-
-       INSIST(cds_wfcq_empty(&loop->async_jobs.head, &loop->async_jobs.tail));
-
-       isc_mem_detach(&loop->mctx);
-}
-
 static void
 loop_close(isc_loop_t *loop) {
        int r = uv_loop_close(&loop->loop);
@@ -271,32 +274,10 @@ loop_close(isc_loop_t *loop) {
        isc_mem_detach(&loop->mctx);
 }
 
-static void *
-helper_thread(void *arg) {
-       isc_loop_t *helper = (isc_loop_t *)arg;
-
-       int r = uv_prepare_start(&helper->quiescent, quiescent_cb);
-       UV_RUNTIME_CHECK(uv_prepare_start, r);
-
-       isc_barrier_wait(&isc__loopmgr->starting);
-
-       r = uv_run(&helper->loop, UV_RUN_DEFAULT);
-       UV_RUNTIME_CHECK(uv_run, r);
-
-       /* Invalidate the helper early */
-       helper->magic = 0;
-
-       isc_barrier_wait(&isc__loopmgr->stopping);
-
-       return NULL;
-}
-
 static void *
 loop_thread(void *arg) {
        isc_loop_t *loop = (isc_loop_t *)arg;
        isc_loopmgr_t *loopmgr = isc__loopmgr;
-       isc_loop_t *helper = &loopmgr->helpers[loop->tid];
-       char name[32];
        /* Initialize the thread_local variables*/
 
        REQUIRE(isc__loop_local == NULL || isc__loop_local == loop);
@@ -304,14 +285,14 @@ loop_thread(void *arg) {
 
        isc__tid_init(loop->tid);
 
-       /* Start the helper thread */
-       isc_thread_create(helper_thread, helper, &helper->thread);
-       snprintf(name, sizeof(name), "isc-helper-%04" PRItid, loop->tid);
-       isc_thread_setname(helper->thread, name);
-
        int r = uv_prepare_start(&loop->quiescent, quiescent_cb);
        UV_RUNTIME_CHECK(uv_prepare_start, r);
 
+       for (isc_worklane_t lane = 0; lane < ISC_WORKLANE_COUNT; lane++) {
+               loop->workthreads[lane] =
+                       isc__workthread_create(isc__loopmgr->mctx, lane);
+       }
+
        isc_barrier_wait(&loopmgr->starting);
 
        enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking(
@@ -330,12 +311,12 @@ loop_thread(void *arg) {
        /* Invalidate the loop early */
        loop->magic = 0;
 
-       /* Shutdown the helper thread */
-       r = uv_async_send(&helper->shutdown_trigger);
-       UV_RUNTIME_CHECK(uv_async_send, r);
-
        isc_barrier_wait(&loopmgr->stopping);
 
+       for (isc_worklane_t lane = 0; lane < ISC_WORKLANE_COUNT; lane++) {
+               isc__workthread_destroy(&loop->workthreads[lane]);
+       }
+
        return NULL;
 }
 
@@ -343,17 +324,6 @@ loop_thread(void *arg) {
  * Public
  */
 
-static void
-threadpool_initialize(uint32_t workers) {
-       char buf[11];
-       int r = uv_os_getenv("UV_THREADPOOL_SIZE", buf,
-                            &(size_t){ sizeof(buf) });
-       if (r == UV_ENOENT) {
-               snprintf(buf, sizeof(buf), "%" PRIu32, workers);
-               uv_os_setenv("UV_THREADPOOL_SIZE", buf);
-       }
-}
-
 static void
 loop_destroy(isc_loop_t *loop) {
        int r = uv_async_send(&loop->destroy_trigger);
@@ -373,7 +343,6 @@ isc_loopmgr_create(isc_mem_t *mctx, uint32_t nloops) {
 
        isc_loopmgr_t *loopmgr = NULL;
 
-       threadpool_initialize(nloops);
        isc__tid_initcount(nloops);
 
        loopmgr = isc_mem_get(mctx, sizeof(*loopmgr));
@@ -384,11 +353,13 @@ isc_loopmgr_create(isc_mem_t *mctx, uint32_t nloops) {
 
        isc_mem_attach(mctx, &loopmgr->mctx);
 
-       /* We need to double the number for loops and helpers */
-       isc_barrier_init(&loopmgr->pausing, loopmgr->nloops * 2);
-       isc_barrier_init(&loopmgr->resuming, loopmgr->nloops * 2);
-       isc_barrier_init(&loopmgr->starting, loopmgr->nloops * 2);
-       isc_barrier_init(&loopmgr->stopping, loopmgr->nloops * 2);
+       /* We need to double the number for loops */
+       isc_barrier_init(&loopmgr->pausing, loopmgr->nloops);
+       isc_barrier_init(&loopmgr->resuming, loopmgr->nloops);
+       isc_barrier_init(&loopmgr->starting,
+                        loopmgr->nloops * (1 + ISC_WORKLANE_COUNT));
+       isc_barrier_init(&loopmgr->stopping,
+                        loopmgr->nloops * (1 + ISC_WORKLANE_COUNT));
 
        loopmgr->loops = isc_mem_cget(loopmgr->mctx, loopmgr->nloops,
                                      sizeof(loopmgr->loops[0]));
@@ -397,13 +368,6 @@ isc_loopmgr_create(isc_mem_t *mctx, uint32_t nloops) {
                loop_init(loop, i, "loop");
        }
 
-       loopmgr->helpers = isc_mem_cget(loopmgr->mctx, loopmgr->nloops,
-                                       sizeof(loopmgr->helpers[0]));
-       for (size_t i = 0; i < loopmgr->nloops; i++) {
-               isc_loop_t *loop = &loopmgr->helpers[i];
-               loop_init(loop, i, "helper");
-       }
-
        isc__loopmgr = loopmgr;
 
        loopmgr->sigint = isc_signal_new(isc__loopmgr_signal, loopmgr, SIGINT);
@@ -522,13 +486,6 @@ isc_loopmgr_pause(void) {
                              "loop exclusive mode: starting");
        }
 
-       for (size_t i = 0; i < isc__loopmgr->nloops; i++) {
-               isc_loop_t *helper = &isc__loopmgr->helpers[i];
-
-               int r = uv_async_send(&helper->pause_trigger);
-               UV_RUNTIME_CHECK(uv_async_send, r);
-       }
-
        for (size_t i = 0; i < isc__loopmgr->nloops; i++) {
                isc_loop_t *loop = &isc__loopmgr->loops[i];
 
@@ -543,6 +500,7 @@ isc_loopmgr_pause(void) {
 
        RUNTIME_CHECK(atomic_compare_exchange_strong(&isc__loopmgr->paused,
                                                     &(bool){ false }, true));
+
        pause_loop(CURRENT_LOOP(isc__loopmgr));
 
        if (isc_log_wouldlog(ISC_LOG_DEBUG(1))) {
@@ -586,12 +544,6 @@ isc_loopmgr_destroy(void) {
 
        isc__loopmgr = NULL;
 
-       /* Wait for all helpers to finish */
-       for (size_t i = 0; i < loopmgr->nloops; i++) {
-               isc_loop_t *helper = &loopmgr->helpers[i];
-               isc_thread_join(helper->thread, NULL);
-       }
-
        /* First wait for all loops to finish */
        for (size_t i = 1; i < loopmgr->nloops; i++) {
                isc_loop_t *loop = &loopmgr->loops[i];
@@ -600,13 +552,6 @@ isc_loopmgr_destroy(void) {
 
        loopmgr->magic = 0;
 
-       for (size_t i = 0; i < loopmgr->nloops; i++) {
-               isc_loop_t *helper = &loopmgr->helpers[i];
-               helper_close(helper);
-       }
-       isc_mem_cput(loopmgr->mctx, loopmgr->helpers, loopmgr->nloops,
-                    sizeof(loopmgr->helpers[0]));
-
        for (size_t i = 0; i < loopmgr->nloops; i++) {
                isc_loop_t *loop = &loopmgr->loops[i];
                loop_close(loop);
@@ -690,9 +635,22 @@ isc_loop_shuttingdown(isc_loop_t *loop) {
        return loop->shuttingdown;
 }
 
-isc_loop_t *
-isc_loop_helper(isc_loop_t *loop) {
+isc__workthread_t *
+isc__loopmgr_workthread(isc_loop_t *loop, isc_worklane_t lane) {
        REQUIRE(VALID_LOOP(loop));
+       REQUIRE(lane < ISC_WORKLANE_COUNT);
+
+       return loop->workthreads[lane];
+}
 
-       return &isc__loopmgr->helpers[loop->tid];
+void
+isc__loopmgr_starting(void) {
+       isc_loopmgr_t *loopmgr = isc__loopmgr;
+       isc_barrier_wait(&loopmgr->starting);
+}
+
+void
+isc__loopmgr_stopping(void) {
+       isc_loopmgr_t *loopmgr = isc__loopmgr;
+       isc_barrier_wait(&loopmgr->stopping);
 }
index e449afeb7d86f3a6b56f24a4e3bc7af5cac7dd1e..071e677c53b3d8aa7f358d9e2cbb6866c40cc125 100644 (file)
@@ -73,6 +73,9 @@ struct isc_loop {
 
        /* safe memory reclamation */
        uv_prepare_t quiescent;
+
+       /* thread pools */
+       isc__workthread_t *workthreads[ISC_WORKLANE_COUNT];
 };
 
 /*
@@ -107,7 +110,6 @@ typedef struct isc_loopmgr {
 
        /* per-thread objects */
        isc_loop_t *loops;
-       isc_loop_t *helpers;
 } isc_loopmgr_t;
 
 /*
@@ -131,18 +133,15 @@ struct isc_signal {
 #define JOB_MAGIC    ISC_MAGIC('J', 'O', 'B', ' ')
 #define VALID_JOB(t) ISC_MAGIC_VALID(t, JOB_MAGIC)
 
-/*
- * Work to be offloaded to an external thread.
- */
-struct isc_work {
-       uv_work_t work;
-       isc_loop_t *loop;
-       isc_work_cb work_cb;
-       isc_after_work_cb after_work_cb;
-       void *cbarg;
-};
-
 #define DEFAULT_LOOP(loopmgr) (&(loopmgr)->loops[0])
 #define CURRENT_LOOP(loopmgr) (&(loopmgr)->loops[isc_tid()])
 #define LOOP(loopmgr, tid)    (&(loopmgr)->loops[tid])
 #define ON_LOOP(loop)        ((loop) == CURRENT_LOOP((loop)->loopmgr))
+
+isc__workthread_t *
+isc__loopmgr_workthread(isc_loop_t *loop, isc_worklane_t lane);
+
+void
+isc__loopmgr_starting(void);
+void
+isc__loopmgr_stopping(void);
index a99c3450f949e40b2167280e49fd41599cdd3778..288f8c306e4252186791924ec01fd9c647bc500e 100644 (file)
@@ -86,7 +86,6 @@ isc_srcset.add(
         'hash.c',
         'hashmap.c',
         'heap.c',
-        'helper.c',
         'hex.c',
         'histo.c',
         'ht.c',
index 0b7cdf5743dfc950ea80d20a9e990fa38f4c96e7..c2af8cfe16a6ccfbf916167458566423beda5c89 100644 (file)
  * information regarding copyright ownership.
  */
 
-#include <stdlib.h>
+#include <limits.h>
+#include <stddef.h>
+#include <stdint.h>
 
-#include <isc/iterated_hash.h>
+#include <isc/async.h>
 #include <isc/job.h>
 #include <isc/loop.h>
+#include <isc/magic.h>
+#include <isc/queue.h>
+#include <isc/thread.h>
 #include <isc/urcu.h>
+#include <isc/util.h>
 #include <isc/uv.h>
 #include <isc/work.h>
 
 #include "loop_p.h"
 
+#define WORK_MAGIC         ISC_MAGIC('W', 'o', 'r', 'k')
+#define VALID_WORK(t)      ISC_MAGIC_VALID(t, WORK_MAGIC)
+#define WORKTHREAD_MAGIC    ISC_MAGIC('W', 'k', 'T', 'h')
+#define VALID_WORKTHREAD(t) ISC_MAGIC_VALID(t, WORKTHREAD_MAGIC)
+
+enum waitstate {
+       /* The value a sleeping worker blocks on in FUTEX_WAIT. */
+       THREAD_WAITING = 0,
+       /* Any non-zero bit keeps FUTEX_WAIT from blocking. */
+       THREAD_WAKEUP = (1 << 0),
+       THREAD_RUNNING = (1 << 1),
+       THREAD_SHUTDOWN = (1 << 2),
+       THREAD_PAUSE = (1 << 3),  /* request from the owning loop */
+       THREAD_PAUSED = (1 << 4), /* ack from the worker */
+};
+
+/* Sticky bits a paused worker must not drop. */
+#define THREAD_STICKY (THREAD_SHUTDOWN | THREAD_PAUSE | THREAD_PAUSED)
+
+enum workstate {
+       WORK_QUEUED = 0,
+       WORK_RUNNING,
+       WORK_CANCELED,
+};
+
+struct isc_work {
+       unsigned int magic;
+       uint32_t state;           /* enum workstate */
+       isc_work_cb cb;           /* runs on a worker thread */
+       isc_work_done_cb done_cb; /* runs on the origin loop */
+       void *cbarg;
+       isc_loop_t *loop;          /* origin loop, referenced */
+       struct cds_wfcq_node node; /* dispatch queue linkage */
+};
+
+typedef struct isc__workthread {
+       union {
+               struct {
+                       unsigned int magic;
+                       isc_worklane_t lane;
+                       isc_mem_t *mctx;
+                       isc_thread_t thread;
+                       struct __cds_wfcq_head qhead;
+                       int32_t state; /* enum waitstate */
+               };
+               uint8_t __padding0[ISC_OS_CACHELINE_SIZE];
+       };
+       union {
+               struct cds_wfcq_tail qtail;
+               uint8_t __padding1[ISC_OS_CACHELINE_SIZE];
+       };
+} isc__workthread_t;
+
+STATIC_ASSERT(ISC_OS_CACHELINE_SIZE >= sizeof(struct cds_wfcq_tail),
+             "ISC_OS_CACHELINE_SIZE smaller than sizeof(struct "
+             "cds_wfcq_tail)");
+STATIC_ASSERT(offsetof(isc__workthread_t, qtail) == ISC_OS_CACHELINE_SIZE,
+             "isc__workthread_t.qtail not on second cacheline");
+STATIC_ASSERT(sizeof(isc__workthread_t) == 2 * ISC_OS_CACHELINE_SIZE,
+             "isc__workthread_t is not two cachelines");
+
 static void
-isc__work_cb(uv_work_t *req) {
-       isc_work_t *work = uv_req_get_data((uv_req_t *)req);
+workthread_wake(isc__workthread_t *thread) {
+       cmm_smp_mb();
+       if ((uatomic_load(&thread->state, CMM_RELAXED) & THREAD_RUNNING) != 0) {
+               /* Actively running; it will notice the queue on its own. */
+               return;
+       }
 
-       isc__iterated_hash_initialize();
+       uatomic_or(&thread->state, THREAD_WAKEUP);
+       if (futex_noasync(&thread->state, FUTEX_WAKE, 1, NULL, NULL, 0) < 0) {
+               FATAL_ERROR("futex_noasync(FUTEX_WAKE): %s", strerror(errno));
+       }
+}
 
-       rcu_register_thread();
+static void
+workthread_slumber(isc__workthread_t *thread) {
+       rcu_thread_offline();
+       while (futex_noasync(&thread->state, FUTEX_WAIT, THREAD_WAITING, NULL,
+                            NULL, 0) != 0)
+       {
+               if (errno == EWOULDBLOCK) {
+                       break;
+               } else if (errno != EINTR) {
+                       FATAL_ERROR("futex_noasync(FUTEX_WAIT): %s",
+                                   strerror(errno));
+               }
+               /* Or retry if interrupted by signal. */
+       }
+       rcu_thread_online();
+}
 
-       work->work_cb(work->cbarg);
+static void
+workthread_sleep(isc__workthread_t *thread) {
+       /*
+        * Drop to WAITING while keeping a pending SHUTDOWN/PAUSE sticky, so the
+        * FUTEX_WAIT below refuses to block once either is signalled.
+        */
+       uatomic_and(&thread->state, THREAD_STICKY);
+       cmm_smp_mb();
 
-       rcu_unregister_thread();
+       /*
+        * The queue is the one wake condition that can't live in 'state', so
+        * recheck it under the fence; SHUTDOWN and WAKEUP are handled by
+        * FUTEX_WAIT's own value check.
+        */
+       if (cds_wfcq_empty(&thread->qhead, &thread->qtail)) {
+               workthread_slumber(thread);
+       }
 
-       isc__iterated_hash_shutdown();
+       /* Tell the waker we are running (keeping any sticky SHUTDOWN/PAUSE). */
+       uatomic_or(&thread->state, THREAD_RUNNING);
 }
 
+/*
+ * Acknowledge a pause request: publish PAUSED (dropping RUNNING/WAKEUP) and
+ * wake the waiting pauser.  A new pause clears PAUSED, so the worker re-acks
+ * and the pauser only ever observes an ack set for its own request, never a
+ * stale one from the previous pause generation.
+ */
 static void
-isc__after_work_cb(uv_work_t *req, int status) {
-       isc_work_t *work = uv_req_get_data((uv_req_t *)req);
+workthread_ack_pause(isc__workthread_t *thread) {
+       int32_t old, next;
+       do {
+               old = uatomic_load(&thread->state, CMM_RELAXED);
+               next = (old & THREAD_STICKY) | THREAD_PAUSED;
+       } while (uatomic_cmpxchg(&thread->state, old, next) != old);
+
+       (void)futex_noasync(&thread->state, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
+}
+
+/*
+ * Honour a pause: pause until the owning loop clears PAUSE (resume).  A fresh
+ * pause clears PAUSED (see isc__workthread_pause), so (re-)ack whenever PAUSED
+ * is gone — the pauser only proceeds on an ack set for *its* request, never a
+ * stale one from the previous generation.  Stays RCU-offline while paused so
+ * it can't hold up an exclusive-mode grace period.
+ */
+static void
+workthread_pause(isc__workthread_t *thread) {
+       rcu_thread_offline();
+
+       while (true) {
+               int32_t old = uatomic_load(&thread->state, CMM_ACQUIRE);
+               if ((old & (THREAD_PAUSE | THREAD_SHUTDOWN)) != THREAD_PAUSE) {
+                       break;
+               }
+               if ((old & THREAD_PAUSED) == 0) {
+                       workthread_ack_pause(thread);
+                       continue;
+               }
+               (void)futex_noasync(&thread->state, FUTEX_WAIT, old, NULL, NULL,
+                                   0);
+       }
+
+       uatomic_and(&thread->state, ~THREAD_PAUSED);
+       rcu_thread_online();
+}
+
+static void
+work_done(void *arg) {
+       isc_work_t *work = arg;
        isc_loop_t *loop = work->loop;
+       isc_result_t result = (uatomic_load(&work->state, CMM_ACQUIRE) !=
+                              WORK_CANCELED)
+                                     ? ISC_R_SUCCESS
+                                     : ISC_R_CANCELED;
 
-       UV_RUNTIME_CHECK(uv_after_work_cb, status);
+       work->done_cb(work->cbarg, result);
 
-       work->after_work_cb(work->cbarg);
+       work->magic = 0;
+       isc_mem_put(work->loop->mctx, work, sizeof(*work));
+       isc_loop_unref(loop);
+}
 
-       isc_mem_put(loop->mctx, work, sizeof(*work));
+static void
+work_run(isc_work_t *work) {
+       /*
+        * The CAS *is* the tombstone check: whoever moves the item out
+        * of WORK_QUEUED first — this worker or isc_work_cancel() —
+        * decides whether the callback runs.  uatomic_cmpxchg returns the
+        * prior state, so WORK_QUEUED means we won the race.
+        */
+       uint32_t prev = uatomic_cmpxchg(&work->state, WORK_QUEUED,
+                                       WORK_RUNNING);
+       switch (prev) {
+       case WORK_QUEUED:
+               work->cb(work->cbarg);
+               break;
+       case WORK_CANCELED:
+               break;
+       default:
+               UNREACHABLE();
+       }
 
-       isc_loop_detach(&loop);
+       /* Completion always routes back to the origin loop. */
+       isc_async_run(work->loop, work_done, work);
 }
 
-void
-isc_work_enqueue(isc_loop_t *loop, isc_work_cb work_cb,
-                isc_after_work_cb after_work_cb, void *cbarg) {
-       isc_work_t *work = NULL;
-       int r;
+static void *
+workthread_thread(void *arg) {
+       isc__workthread_t *thread = arg;
+
+       isc__loopmgr_starting();
+
+       while (true) {
+               /*
+                * Honour a pause before touching the queue (gated on !SHUTDOWN
+                * so a shutting-down worker exits instead of pausing).
+                */
+               int32_t state = uatomic_load(&thread->state, CMM_ACQUIRE);
+               if ((state & (THREAD_PAUSE | THREAD_SHUTDOWN)) == THREAD_PAUSE)
+               {
+                       workthread_pause(thread);
+                       continue;
+               }
+
+               struct cds_wfcq_node *node;
+               node = __cds_wfcq_dequeue_blocking(&thread->qhead,
+                                                  &thread->qtail);
+
+               if (node == NULL) {
+                       /*
+                        * Only exit the loop if there's nothing to do.
+                        */
+                       if ((uatomic_load(&thread->state, CMM_ACQUIRE) &
+                            THREAD_SHUTDOWN) != 0)
+                       {
+                               synchronize_rcu();
+                               if (!cds_wfcq_empty(&thread->qhead,
+                                                   &thread->qtail))
+                               {
+                                       continue;
+                               }
+                               break;
+                       }
+
+                       workthread_sleep(thread);
+
+                       continue;
+               }
+
+               isc_work_t *work = caa_container_of(node, isc_work_t, node);
+               work_run(work);
+       }
+
+       isc__loopmgr_stopping();
 
-       REQUIRE(VALID_LOOP(loop));
-       REQUIRE(isc_loop() == loop);
-       REQUIRE(work_cb != NULL);
-       REQUIRE(after_work_cb != NULL);
+       return NULL;
+}
+
+isc_work_t *
+isc_work_enqueue(isc_loop_t *loop, isc_worklane_t lane, isc_work_cb cb,
+                isc_work_done_cb done_cb, void *cbarg) {
+       REQUIRE(loop == isc_loop());
+
+       isc__workthread_t *thread = isc__loopmgr_workthread(loop, lane);
 
-       work = isc_mem_get(loop->mctx, sizeof(*work));
+       isc_work_t *work = isc_mem_get(loop->mctx, sizeof(*work));
        *work = (isc_work_t){
-               .work_cb = work_cb,
-               .after_work_cb = after_work_cb,
+               .magic = WORK_MAGIC,
+               .cb = cb,
+               .done_cb = done_cb,
                .cbarg = cbarg,
+               .loop = isc_loop_ref(loop),
+               .state = WORK_QUEUED,
        };
 
-       isc_loop_attach(loop, &work->loop);
+       rcu_read_lock();
+       if ((uatomic_load(&thread->state, CMM_ACQUIRE) & THREAD_SHUTDOWN) != 0)
+       {
+               rcu_read_unlock();
+
+               /*
+                * We are shutting down, so immedaitely run task instead of
+                * adding more in the queue. (The worker is running the
+                * remaining enqueue tasks and shutdown after, see
+                * workthread_thread().)
+                */
+               work_run(work);
+       } else {
+               (void)cds_wfcq_enqueue(&thread->qhead, &thread->qtail,
+                                      &work->node);
+               rcu_read_unlock();
+
+               if ((uatomic_load(&thread->state, CMM_ACQUIRE) &
+                    THREAD_RUNNING) == 0)
+               {
+                       workthread_wake(thread);
+               }
+       }
+
+       return work;
+}
+
+bool
+isc_work_cancel(isc_work_t *work) {
+       REQUIRE(VALID_WORK(work));
+
+       /*
+        * Tombstone: QUEUED -> CANCELED.  The node stays in the queue
+        * (no interior unlink in a singly-linked lock-free queue) and
+        * is discarded by whichever worker dequeues it; after_cb still
+        * fires with ISC_R_CANCELED.  Nothing is freed here.  False
+        * means the callback is running or done — uv_cancel semantics.
+        */
+       return uatomic_cmpxchg(&work->state, WORK_QUEUED, WORK_CANCELED) ==
+              WORK_QUEUED;
+}
+
+isc__workthread_t *
+isc__workthread_create(isc_mem_t *mctx, isc_worklane_t lane) {
+       isc__workthread_t *thread = isc_mem_get(mctx, sizeof(*thread));
+
+       *thread = (isc__workthread_t){
+               .lane = lane,
+               .magic = WORKTHREAD_MAGIC,
+               .state = THREAD_WAITING,
+               .mctx = isc_mem_ref(mctx),
+       };
+
+       __cds_wfcq_init(&thread->qhead, &thread->qtail);
+
+       isc_thread_create(workthread_thread, thread, &thread->thread);
+
+       return thread;
+}
+
+void
+isc__workthread_shutdown(isc__workthread_t *thread) {
+       REQUIRE(VALID_WORKTHREAD(thread));
 
-       uv_req_set_data((uv_req_t *)&work->work, work);
+       /*
+        * Not called while the worker is paused by isc__workthread_pause():
+        * shutdown callbacks run from uv loops, and loopmgr pause keeps every
+        * loop out of uv_run() until resume, so PAUSE and SHUTDOWN never
+        * coexist on a worker (the SHUTDOWN checks in the pause path are only
+        * a belt-and-braces exit if that ever changed).
+        */
+
+       /* Set the sticky SHUTDOWN bit once; bail if already shutting down. */
+       int32_t old;
+       do {
+               old = uatomic_load(&thread->state, CMM_RELAXED);
+               if ((old & THREAD_SHUTDOWN) != 0) {
+                       return;
+               }
+       } while (uatomic_cmpxchg(&thread->state, old, old | THREAD_SHUTDOWN) !=
+                old);
+
+       /* Fence in-flight enqueues (which touch the queue) before draining. */
+       synchronize_rcu();
+
+       workthread_wake(thread);
+}
+
+void
+isc__workthread_destroy(isc__workthread_t **threadp) {
+       REQUIRE(threadp != NULL && VALID_WORKTHREAD(*threadp));
+       isc__workthread_t *thread = MOVE_OWNERSHIP(*threadp);
+
+       isc_thread_join(thread->thread, NULL);
+
+       INSIST(cds_wfcq_empty(&thread->qhead, &thread->qtail));
+
+       thread->magic = 0;
+       isc_mem_putanddetach(&thread->mctx, thread, sizeof(*thread));
+}
+
+void
+isc__workthread_pause(isc__workthread_t *thread) {
+       REQUIRE(VALID_WORKTHREAD(thread));
+
+       /*
+        * Request a pause, but only if not already shutting down — a
+        * shutting-down worker heads for the stopping barrier and must never
+        * be waited on here (that'd be a deadlock).  Clearing PAUSED as we set
+        * PAUSE invalidates any ack left over from the previous generation, so
+        * the wait below can only succeed on an ack for this request.
+        */
+       int32_t old;
+       do {
+               old = uatomic_load(&thread->state, CMM_RELAXED);
+               if ((old & THREAD_SHUTDOWN) != 0) {
+                       return;
+               }
+       } while (uatomic_cmpxchg(&thread->state, old,
+                                (old | THREAD_PAUSE) & ~THREAD_PAUSED) != old);
+
+       workthread_wake(thread);
+
+       /*
+        * Wait for the worker to acknowledge (PAUSED, form workthread_thread()
+        * calling workthread_pause()) or for shutdown.
+        */
+       while (true) {
+               old = uatomic_load(&thread->state, CMM_ACQUIRE);
+               if ((old & (THREAD_PAUSED | THREAD_SHUTDOWN)) != 0) {
+                       return;
+               }
+               (void)futex_noasync(&thread->state, FUTEX_WAIT, old, NULL, NULL,
+                                   0);
+       }
+}
+
+void
+isc__workthread_resume(isc__workthread_t *thread) {
+       REQUIRE(VALID_WORKTHREAD(thread));
 
-       r = uv_queue_work(&loop->loop, &work->work, isc__work_cb,
-                         isc__after_work_cb);
-       UV_RUNTIME_CHECK(uv_queue_work, r);
+       /* Clear the request and wake the paused worker. */
+       uatomic_and(&thread->state, ~THREAD_PAUSE);
+       (void)futex_noasync(&thread->state, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
 }
index a3d05d9d2568c41df9e8bf5a18b0b3dccb84b392..3a326a4704dbc9928fbf82944e1798a3c1cd352a 100644 (file)
@@ -729,6 +729,16 @@ if urcu_dep[1].version().version_compare('<0.13.0')
     urcu_dep += cc.find_library('liburcu-common')
 endif
 
+foreach h : [
+    'urcu/uatomic.h',
+    'urcu/assert.h',
+    'urcu/pointer.h',
+]
+    if cc.has_header(h, dependencies: urcu_dep)
+        config.set('HAVE_@0@'.format(h.underscorify().to_upper()), 1)
+    endif
+endforeach
+
 config.set_quoted('RCU_FLAVOR', f'liburcu-@rcu_flavor@')
 config.set_quoted('RCU_VERSION', urcu_dep[0].version())
 
index 60eb7f6c25cfe2617e5621a064f53bc7de99ee0f..9433ee6a6a22e2124eeaa7c7c6b3697965f47a7f 100644 (file)
@@ -47,14 +47,15 @@ work_cb(void *arg ISC_ATTR_UNUSED) {
 }
 
 static void
-after_work_cb(void *arg ISC_ATTR_UNUSED) {
+after_work_cb(void *arg ISC_ATTR_UNUSED, isc_result_t result ISC_ATTR_UNUSED) {
        assert_int_equal(atomic_load(&scheduled), 1);
        isc_loopmgr_shutdown();
 }
 
 static void
 work_enqueue_cb(void *arg ISC_ATTR_UNUSED) {
-       isc_work_enqueue(isc_loop(), work_cb, after_work_cb, NULL);
+       isc_work_enqueue(isc_loop(), ISC_WORKLANE_FAST, work_cb, after_work_cb,
+                        NULL);
 }
 
 ISC_RUN_TEST_IMPL(isc_work_enqueue) {