From: Ondřej Surý Date: Tue, 9 Jun 2026 04:12:58 +0000 (+0200) Subject: Replace the shared work pool with per-loop, per-lane worker threads X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=a5f13b3410d0031a155d64377782cf3f235cde79;p=thirdparty%2Fbind9.git Replace the shared work pool with per-loop, per-lane worker threads Offloaded work used two different mechanisms: a per-loop isc_helper thread for CPU-bound crypto (DNSSEC validation, message signature checks) and the process-global libuv thread pool for blocking I/O (zone load and dump, inbound transfer apply). Neither could cancel a queued task, and the two disagreed about exclusive mode — the helper paused with its loop under isc_loopmgr_pause() but the libuv pool did not, so blocking offloaded work kept running while a loop held the exclusive lock. Unify both behind isc_work: each loop gets its own worker thread per lane — FAST for short, bounded tasks and SLOW for long, blocking ones — fed by a private queue. Separate lanes keep a short crypto task off the path of a multi-second zone dump once both run on per-loop workers; every lane parks with isc_loopmgr_pause() so exclusive mode now quiesces offloaded work too; and a still-queued task can be canceled before it starts (isc_work_cancel). isc_helper is removed and its callers select a lane. --- diff --git a/bin/dig/nslookup.c b/bin/dig/nslookup.c index 177cdeba6d4..ac7a6c8285a 100644 --- a/bin/dig/nslookup.c +++ b/bin/dig/nslookup.c @@ -841,7 +841,8 @@ static void start_next_command(void); static void -process_next_command(void *arg ISC_ATTR_UNUSED) { +process_next_command(void *arg ISC_ATTR_UNUSED, + isc_result_t result ISC_ATTR_UNUSED) { isc_loop_t *loop = isc_loop_main(); if (cmdline == NULL) { in_use = false; @@ -868,11 +869,11 @@ start_next_command(void) { isc_loopmgr_pause(); if (interactive) { - isc_work_enqueue(loop, readline_next_command, + isc_work_enqueue(loop, ISC_WORKLANE_FAST, readline_next_command, process_next_command, loop); } else { - isc_work_enqueue(loop, fgets_next_command, process_next_command, - loop); + isc_work_enqueue(loop, ISC_WORKLANE_FAST, fgets_next_command, + process_next_command, loop); } isc_loopmgr_resume(); } diff --git a/lib/dns/catz.c b/lib/dns/catz.c index b731432678b..b63a4251267 100644 --- a/lib/dns/catz.c +++ b/lib/dns/catz.c @@ -31,12 +31,11 @@ #include #include +#include #include #include #include -#include "dns/name.h" - #define DNS_CATZ_ZONE_MAGIC ISC_MAGIC('c', 'a', 't', 'z') #define DNS_CATZ_ZONES_MAGIC ISC_MAGIC('c', 'a', 't', 's') #define DNS_CATZ_ENTRY_MAGIC ISC_MAGIC('c', 'a', 't', 'e') @@ -118,7 +117,7 @@ dns__catz_timer_stop(void *arg); static void dns__catz_update_cb(void *data); static void -dns__catz_done_cb(void *data); +dns__catz_done_cb(void *data, isc_result_t result); static isc_result_t catz_process_zones_entry(dns_catz_zone_t *catz, dns_rdataset_t *value, @@ -2056,8 +2055,8 @@ dns__catz_timer_cb(void *arg) { "catz: %s: reload start", domain); dns_catz_zone_ref(catz); - isc_work_enqueue(catz->loop, dns__catz_update_cb, dns__catz_done_cb, - catz); + isc_work_enqueue(catz->loop, ISC_WORKLANE_SLOW, dns__catz_update_cb, + dns__catz_done_cb, catz); exit: isc_timer_destroy(&catz->updatetimer); @@ -2444,7 +2443,7 @@ exit: } static void -dns__catz_done_cb(void *data) { +dns__catz_done_cb(void *data, isc_result_t result ISC_ATTR_UNUSED) { dns_catz_zone_t *catz = (dns_catz_zone_t *)data; char dname[DNS_NAME_FORMATSIZE]; diff --git a/lib/dns/master.c b/lib/dns/master.c index 741b18f981d..f380a69b976 100644 --- a/lib/dns/master.c +++ b/lib/dns/master.c @@ -2596,7 +2596,7 @@ master_load_callback(void *arg) { } static void -master_load_done(void *arg) { +master_load_done(void *arg, isc_result_t result ISC_ATTR_UNUSED) { dns_loadctx_t *lctx = arg; isc_async_run(lctx->loop, master_load_callback, lctx); @@ -2632,7 +2632,8 @@ dns_master_loadfileasync(const char *master_file, dns_name_t *top, dns_loadctx_ref(lctx); isc_loop_attach(loop, &lctx->loop); - isc_work_enqueue(isc_loop(), master_load, master_load_done, lctx); + isc_work_enqueue(isc_loop(), ISC_WORKLANE_SLOW, master_load, + master_load_done, lctx); *lctxp = lctx; diff --git a/lib/dns/masterdump.c b/lib/dns/masterdump.c index 4249e6d6f86..671f33cf55e 100644 --- a/lib/dns/masterdump.c +++ b/lib/dns/masterdump.c @@ -1499,7 +1499,7 @@ master_dump_callback(void *data) { } static void -master_dump_done(void *data) { +master_dump_done(void *data, isc_result_t result ISC_ATTR_UNUSED) { dns_dumpctx_t *dctx = data; isc_async_run(dctx->loop, master_dump_callback, dctx); @@ -1742,7 +1742,8 @@ dns_master_dumptostreamasync(isc_mem_t *mctx, dns_db_t *db, dns_dumpctx_ref(dctx); isc_loop_attach(loop, &dctx->loop); - isc_work_enqueue(isc_loop(), master_dump, master_dump_done, dctx); + isc_work_enqueue(isc_loop(), ISC_WORKLANE_SLOW, master_dump, + master_dump_done, dctx); *dctxp = dctx; @@ -1834,7 +1835,8 @@ dns_master_dumpasync(isc_mem_t *mctx, dns_db_t *db, dns_dbversion_t *version, dns_dumpctx_ref(dctx); isc_loop_attach(loop, &dctx->loop); - isc_work_enqueue(isc_loop(), master_dump, master_dump_done, dctx); + isc_work_enqueue(isc_loop(), ISC_WORKLANE_SLOW, master_dump, + master_dump_done, dctx); *dctxp = dctx; diff --git a/lib/dns/message.c b/lib/dns/message.c index 9b5360ea3fc..424a11ff2d5 100644 --- a/lib/dns/message.c +++ b/lib/dns/message.c @@ -25,7 +25,6 @@ #include #include #include -#include #include #include #include @@ -3001,23 +3000,22 @@ dns_message_dumpsig(dns_message_t *msg, char *txt1) { #endif /* ifdef SKAN_MSG_DEBUG */ static void -checksig_done(void *arg); +checksig_done(void *arg, isc_result_t result); static void checksig_run(void *arg) { checksig_ctx_t *chsigctx = arg; chsigctx->result = dns_message_checksig(chsigctx->msg, chsigctx->view); - - isc_async_run(chsigctx->loop, checksig_done, chsigctx); } static void -checksig_done(void *arg) { +checksig_done(void *arg, isc_result_t result ISC_ATTR_UNUSED) { checksig_ctx_t *chsigctx = arg; dns_message_t *msg = chsigctx->msg; - chsigctx->cb(chsigctx->cbarg, chsigctx->result); + chsigctx->cb(chsigctx->cbarg, + (result != ISC_R_SUCCESS) ? result : chsigctx->result); dns_view_detach(&chsigctx->view); isc_loop_detach(&chsigctx->loop); @@ -3044,7 +3042,8 @@ dns_message_checksig_async(dns_message_t *msg, dns_view_t *view, dns_view_attach(view, &chsigctx->view); dns_message_clonebuffer(msg); - isc_helper_run(loop, checksig_run, chsigctx); + isc_work_enqueue(loop, ISC_WORKLANE_FAST, checksig_run, checksig_done, + chsigctx); return DNS_R_WAIT; } diff --git a/lib/dns/rpz.c b/lib/dns/rpz.c index 079d34e3501..a4b13460430 100644 --- a/lib/dns/rpz.c +++ b/lib/dns/rpz.c @@ -1661,7 +1661,7 @@ dns__rpz_timer_stop(void *arg) { } static void -update_rpz_done_cb(void *data) { +update_rpz_done_cb(void *data, isc_result_t result ISC_ATTR_UNUSED) { dns_rpz_zone_t *rpz = (dns_rpz_zone_t *)data; char dname[DNS_NAME_FORMATSIZE]; @@ -1958,7 +1958,8 @@ dns__rpz_timer_cb(void *arg) { "rpz: %s: reload start", domain); dns_rpz_zones_ref(rpz->rpzs); - isc_work_enqueue(rpz->loop, update_rpz_cb, update_rpz_done_cb, rpz); + isc_work_enqueue(rpz->loop, ISC_WORKLANE_SLOW, update_rpz_cb, + update_rpz_done_cb, rpz); isc_timer_destroy(&rpz->updatetimer); rpz->loop = NULL; diff --git a/lib/dns/validator.c b/lib/dns/validator.c index 94998ce9a03..38ccb5611ca 100644 --- a/lib/dns/validator.c +++ b/lib/dns/validator.c @@ -18,7 +18,6 @@ #include #include #include -#include #include #include #include @@ -135,7 +134,7 @@ validate_async_done(dns_validator_t *val, isc_result_t result); static isc_result_t validate_async_run(dns_validator_t *val, isc_job_cb cb); static isc_result_t -validate_helper_run(dns_validator_t *val, isc_job_cb cb); +validate_work_enqueue(dns_validator_t *val, isc_job_cb cb); static void validate_dnskey(void *arg); @@ -497,7 +496,7 @@ fetch_callback_dnskey(void *arg) { if (eresult == ISC_R_SUCCESS && rdataset->trust >= dns_trust_secure) { - result = validate_helper_run( + result = validate_work_enqueue( val, resume_answer_with_key); } else { result = validate_async_run(val, resume_answer); @@ -687,8 +686,8 @@ validator_callback_dnskey(void *arg) { * Only extract the dst key if the keyset is secure. */ if (val->frdataset.trust >= dns_trust_secure) { - result = validate_helper_run(val, - resume_answer_with_key); + result = validate_work_enqueue(val, + resume_answer_with_key); } else { result = validate_async_run(val, resume_answer); } @@ -1315,7 +1314,8 @@ seek_dnskey(dns_validator_t *val) { } dns_rdataset_cleanup(&val->fsigrdataset); - return validate_helper_run(val, resume_answer_with_key); + return validate_work_enqueue(val, + resume_answer_with_key); } break; @@ -1773,7 +1773,7 @@ validate_answer_signing_key_done(void *arg) { val->result = ISC_R_CANCELED; } else if (val->key != NULL) { /* Process with next key if we selected one */ - (void)validate_helper_run(val, validate_answer_signing_key); + (void)validate_work_enqueue(val, validate_answer_signing_key); return; } @@ -1841,7 +1841,7 @@ validate_answer_process(void *arg) { goto next_key; } - (void)validate_helper_run(val, validate_answer_signing_key); + (void)validate_work_enqueue(val, validate_answer_signing_key); return; next_key: @@ -1964,10 +1964,15 @@ validate_async_run(dns_validator_t *val, isc_job_cb cb) { return DNS_R_WAIT; } +static void +null_done(void *arg ISC_ATTR_UNUSED, isc_result_t result ISC_ATTR_UNUSED) { + /* no-op for now */ +} + static isc_result_t -validate_helper_run(dns_validator_t *val, isc_job_cb cb) { +validate_work_enqueue(dns_validator_t *val, isc_job_cb cb) { val->attributes |= VALATTR_OFFLOADED; - isc_helper_run(val->loop, cb, val); + isc_work_enqueue(val->loop, ISC_WORKLANE_FAST, cb, null_done, val); return DNS_R_WAIT; } @@ -2306,7 +2311,7 @@ validate_dnskey_dsset_next_done(void *arg) { break; default: /* Continue validation until we have success or no more data */ - (void)validate_helper_run(val, validate_dnskey_dsset_next); + (void)validate_work_enqueue(val, validate_dnskey_dsset_next); return; } @@ -2333,8 +2338,8 @@ validate_dnskey_dsset_first(dns_validator_t *val) { /* continue async run */ result = validate_dnskey_dsset(val); if (result != ISC_R_SUCCESS) { - (void)validate_helper_run(val, - validate_dnskey_dsset_next); + (void)validate_work_enqueue(val, + validate_dnskey_dsset_next); return; } } diff --git a/lib/dns/xfrin.c b/lib/dns/xfrin.c index 652d2da596b..408dc707e9c 100644 --- a/lib/dns/xfrin.c +++ b/lib/dns/xfrin.c @@ -374,10 +374,11 @@ cleanup: } static void -axfr_apply_done(void *arg) { +axfr_apply_done(void *arg, isc_result_t eresult) { xfrin_work_t *work = arg; dns_xfrin_t *xfr = work->xfr; - isc_result_t result = work->result; + isc_result_t result = (eresult == ISC_R_SUCCESS) ? work->result + : eresult; REQUIRE(VALID_XFRIN(xfr)); REQUIRE(VALID_XFRIN_WORK(work)); @@ -421,7 +422,8 @@ axfr_commit(dns_xfrin_t *xfr) { .xfr = dns_xfrin_ref(xfr), }; xfr->diff_running = true; - isc_work_enqueue(xfr->loop, axfr_apply, axfr_apply_done, work); + isc_work_enqueue(xfr->loop, ISC_WORKLANE_SLOW, axfr_apply, + axfr_apply_done, work); } static isc_result_t @@ -613,14 +615,15 @@ ixfr_apply(void *arg) { } static void -ixfr_apply_done(void *arg) { +ixfr_apply_done(void *arg, isc_result_t eresult) { xfrin_work_t *work = arg; REQUIRE(VALID_XFRIN_WORK(work)); dns_xfrin_t *xfr = work->xfr; REQUIRE(VALID_XFRIN(xfr)); - isc_result_t result = work->result; + isc_result_t result = (eresult == ISC_R_SUCCESS) ? work->result + : eresult; if (atomic_load(&xfr->shuttingdown)) { result = ISC_R_SHUTTINGDOWN; @@ -632,7 +635,8 @@ ixfr_apply_done(void *arg) { if (!xfr->retry_axfr && !cds_wfcq_empty(&xfr->diff_head, &xfr->diff_tail)) { - isc_work_enqueue(xfr->loop, ixfr_apply, ixfr_apply_done, work); + isc_work_enqueue(xfr->loop, ISC_WORKLANE_SLOW, ixfr_apply, + ixfr_apply_done, work); return; } @@ -712,7 +716,8 @@ ixfr_commit(dns_xfrin_t *xfr) { .xfr = dns_xfrin_ref(xfr), }; xfr->diff_running = true; - isc_work_enqueue(xfr->loop, ixfr_apply, ixfr_apply_done, work); + isc_work_enqueue(xfr->loop, ISC_WORKLANE_SLOW, ixfr_apply, + ixfr_apply_done, work); } cleanup: diff --git a/lib/isc/helper.c b/lib/isc/helper.c deleted file mode 100644 index 2ef13d8a442..00000000000 --- a/lib/isc/helper.c +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright (C) Internet Systems Consortium, Inc. ("ISC") - * - * SPDX-License-Identifier: MPL-2.0 - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, you can obtain one at https://mozilla.org/MPL/2.0/. - * - * See the COPYRIGHT file distributed with this work for additional - * information regarding copyright ownership. - */ - -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "async_p.h" -#include "job_p.h" -#include "loop_p.h" - -void -isc_helper_run(isc_loop_t *loop, isc_job_cb cb, void *cbarg) { - REQUIRE(VALID_LOOP(loop)); - REQUIRE(cb != NULL); - - isc_loop_t *helper = isc_loop_helper(loop); - isc_job_t *job = isc_mem_get(helper->mctx, sizeof(*job)); - *job = (isc_job_t){ - .cb = cb, - .cbarg = cbarg, - }; - - cds_wfcq_node_init(&job->wfcq_node); - - /* - * cds_wfcq_enqueue() is non-blocking and enqueues the job to async - * queue. - * - * The function returns 'false' in case the queue was empty - in such - * case we need to trigger the async callback. - */ - if (!cds_wfcq_enqueue(&helper->async_jobs.head, - &helper->async_jobs.tail, &job->wfcq_node)) - { - int r = uv_async_send(&helper->async_trigger); - UV_RUNTIME_CHECK(uv_async_send, r); - } -} diff --git a/lib/isc/httpd.c b/lib/isc/httpd.c index 910e15d00fe..465f29da9e4 100644 --- a/lib/isc/httpd.c +++ b/lib/isc/httpd.c @@ -869,7 +869,7 @@ prepare_response(void *arg) { } static void -prepare_response_done(void *arg) { +prepare_response_done(void *arg, isc_result_t result) { isc_region_t r; isc_httpd_sendreq_t *req = arg; isc_httpd_t *httpd = req->httpd; @@ -879,6 +879,11 @@ prepare_response_done(void *arg) { */ isc_buffer_usedregion(req->sendbuffer, &r); + if (result != ISC_R_SUCCESS) { + httpd_senddone(httpd->handle, result, req); + return; + } + isc_nm_send(httpd->handle, &r, httpd_senddone, req); } @@ -939,8 +944,8 @@ httpd_request(isc_nmhandle_t *handle, isc_result_t eresult, isc_httpd_sendreq_t *req = isc__httpd_sendreq_new(httpd); isc_nmhandle_ref(handle); - isc_work_enqueue(isc_loop(), prepare_response, prepare_response_done, - req); + isc_work_enqueue(isc_loop(), ISC_WORKLANE_SLOW, prepare_response, + prepare_response_done, req); return; close_readhandle: diff --git a/lib/isc/include/isc/helper.h b/lib/isc/include/isc/helper.h deleted file mode 100644 index e13ec9b5c02..00000000000 --- a/lib/isc/include/isc/helper.h +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (C) Internet Systems Consortium, Inc. ("ISC") - * - * SPDX-License-Identifier: MPL-2.0 - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, you can obtain one at https://mozilla.org/MPL/2.0/. - * - * See the COPYRIGHT file distributed with this work for additional - * information regarding copyright ownership. - */ - -/*! \file isc/helper.h */ - -#pragma once - -#include - -#include -#include -#include -#include - -void -isc_helper_run(isc_loop_t *loop, isc_job_cb cb, void *cbarg); -/*%< - * Schedule the job callback 'cb' to be run on the 'loop' event loop. - * - * Requires: - * - *\li 'loop' is a valid isc event loop - *\li 'cb' is a callback function, must be non-NULL - *\li 'cbarg' is passed to the 'cb' as the only argument, may be NULL - */ - -#define isc_helper_current(cb, cbarg) isc_async_run(isc_loop(), cb, cbarg) -/*%< - * Helper macro to run the job on the current loop - */ diff --git a/lib/isc/include/isc/loop.h b/lib/isc/include/isc/loop.h index f076bf4423d..e5dbd8ccd31 100644 --- a/lib/isc/include/isc/loop.h +++ b/lib/isc/include/isc/loop.h @@ -15,13 +15,12 @@ #include -#include -#include - #include #include #include #include +#include +#include typedef void (*isc_job_cb)(void *); @@ -205,13 +204,3 @@ isc_loop_shuttingdown(isc_loop_t *loop); * * \li 'loop' is a valid loop and the loop tid matches the current tid. */ - -isc_loop_t * -isc_loop_helper(isc_loop_t *loop); -/*%< - * Returns the helper thread corresponding to the thread ID for 'loop'. - * - * Requires: - * - * \li 'loop' is a valid loop. - */ diff --git a/lib/isc/include/isc/queue.h b/lib/isc/include/isc/queue.h index 02f94d866de..1996ba195cf 100644 --- a/lib/isc/include/isc/queue.h +++ b/lib/isc/include/isc/queue.h @@ -24,7 +24,7 @@ typedef struct isc_queue { sizeof(struct __cds_wfcq_head)]; struct cds_wfcq_tail tail; uint8_t __padding_tail[ISC_OS_CACHELINE_SIZE - - sizeof(struct __cds_wfcq_head)]; + sizeof(struct cds_wfcq_tail)]; } isc_queue_t; typedef struct cds_wfcq_node isc_queue_node_t; diff --git a/lib/isc/include/isc/urcu.h b/lib/isc/include/isc/urcu.h index 20c2902d43e..1962078745f 100644 --- a/lib/isc/include/isc/urcu.h +++ b/lib/isc/include/isc/urcu.h @@ -27,10 +27,20 @@ #include #endif -#include - +#if HAVE_URCU_ASSERT_H +#include +#endif +#if HAVE_URCU_UATOMIC_H +#include +#endif #include +#include #include +#if HAVE_URCU_POINTER_H +#include +#else +#include +#endif #include #include #include @@ -194,3 +204,15 @@ #define _CMM_STORE_SHARED(x, v) CMM_STORE_SHARED(x, v) #endif /* __SANITIZE_THREAD__ */ + +#if !defined(uatomic_load) || !defined(uatomic_store) +#define uatomic_load(ptr, mo) uatomic_read(ptr) +#define uatomic_store(ptr, v, mo) uatomic_set(ptr, v) + +#define CMM_RELAXED __ATOMIC_RELAXED +#define CMM_CONSUME __ATOMIC_CONSUME +#define CMM_ACQUIRE __ATOMIC_ACQUIRE +#define CMM_RELEASE __ATOMIC_RELEASE +#define CMM_ACQ_REL __ATOMIC_ACQ_REL +#define CMM_SEQ_CST __ATOMIC_SEQ_CST +#endif diff --git a/lib/isc/include/isc/work.h b/lib/isc/include/isc/work.h index d986b2e7093..a4567f13f0d 100644 --- a/lib/isc/include/isc/work.h +++ b/lib/isc/include/isc/work.h @@ -11,26 +11,113 @@ * information regarding copyright ownership. */ +/*! \file isc/work.h + * \brief Offload work from an event loop onto a dedicated worker thread. + * + * Each isc event loop has one worker thread per lane (see isc_worklane_t). + * isc_work_enqueue() runs a callback on the worker thread bound to the calling + * loop's lane and, when it finishes, runs a second callback back on that loop. + * The handle it returns can be used to cancel a task that has not started + * running yet. + */ + #pragma once -#include +#include -#include +typedef enum isc_worklane { + ISC_WORKLANE_FAST = 0, /*%< short, bounded tasks (e.g. crypto) */ + ISC_WORKLANE_SLOW, /*%< blocking/long tasks (e.g. zone dump) */ + ISC_WORKLANE_COUNT, +} isc_worklane_t; +/*%< + * Selects which per-loop worker thread runs an enqueued task. Keeping long, + * blocking SLOW work (disk I/O, zone dump/load) on its own lane stops it from + * holding up short FAST tasks behind it. + */ typedef void (*isc_work_cb)(void *arg); -typedef void (*isc_after_work_cb)(void *arg); +typedef void (*isc_work_done_cb)(void *arg, isc_result_t result); typedef struct isc_work isc_work_t; -void -isc_work_enqueue(isc_loop_t *loop, isc_work_cb work_cb, - isc_after_work_cb after_work_cb, void *cbarg); +isc_work_t * +isc_work_enqueue(isc_loop_t *loop, isc_worklane_t lane, isc_work_cb cb, + isc_work_done_cb done_cb, void *cbarg); /*%< - * Schedules work to be handled by the libuv thread pool (see uv_work_t). - * The function specified in `work_cb` will be run by a thread in the - * thread pool; when complete, the `after_work_cb` function will run - * in 'loop' to inform the caller that the work was completed. + * Schedule 'cb' to run on the worker thread bound to 'loop' and 'lane'. When + * 'cb' returns, 'done_cb' is scheduled back on 'loop' with the result of the + * work: ISC_R_SUCCESS normally, or ISC_R_CANCELED if the task was canceled + * before it started (see isc_work_cancel()). + * + * Returns a handle that may be passed to isc_work_cancel(). The handle is + * owned by 'loop' and stays valid until 'after_cb' has run; it must not be used + * afterwards. * * Requires: - * \li 'loop' is a valid event loop. - * \li 'work_cb' and 'after_work_cb' are not NULL. + * + *\li 'loop' is a valid isc event loop. + *\li 'cb' is non-NULL. + *\li 'after_cb' is non-NULL. + *\li 'cbarg' is passed to both callbacks, may be NULL. + */ + +bool +isc_work_cancel(isc_work_t *work); +/*%< + * Try to cancel 'work' before its 'cb' starts running. If the task is still + * queued it is marked canceled and 'cb' will not run; if it is already running + * or has finished this has no effect. Either way the 'after_cb' passed to + * isc_work_enqueue() still runs on the origin loop, with ISC_R_CANCELED when + * the cancel succeeded. Nothing is freed here. + * + * Returns: + * + *\li true the task was still queued; 'cb' will not run. + *\li false the task is already running or done (uv_cancel() semantics). + * + * Requires: + * + *\li 'work' is a handle from isc_work_enqueue() whose 'after_cb' has not run. + */ + +/* private */ + +typedef struct isc__workthread isc__workthread_t; + +isc__workthread_t * +isc__workthread_create(isc_mem_t *mctx, isc_worklane_t lane); +/*%< + * Create one worker thread for 'lane' with its own dispatch queue (the loop + * manager creates one per loop). Used by the loop manager; not for general + * use. + */ + +void +isc__workthread_shutdown(isc__workthread_t *thread); +/*%< + * Begin shutdown of 'thread': set its SHUTDOWN flag (after which new enqueues + * run inline on the caller instead of queuing), then, after an RCU grace period + * that fences any in-flight enqueue, wake the worker so it drains its queue and + * exits. Does not join the worker; see isc__workthread_destroy(). Idempotent. + */ + +void +isc__workthread_pause(isc__workthread_t *thread); +/*%< + * Quiesce 'thread' for isc_loopmgr_pause(): mark it paused and block until it + * has parked. A no-op if the worker is already shutting down. Must be paired + * with isc__workthread_resume() and called from the worker's owning loop. + */ + +void +isc__workthread_resume(isc__workthread_t *thread); +/*%< + * Release a worker previously parked by isc__workthread_pause(). + */ + +void +isc__workthread_destroy(isc__workthread_t **threadp); +/*%< + * Join the worker thread, then free '*threadp' and set it to NULL. Must be + * called after isc__workthread_shutdown(). */ diff --git a/lib/isc/loop.c b/lib/isc/loop.c index 5a7c7763601..062bcfa599e 100644 --- a/lib/isc/loop.c +++ b/lib/isc/loop.c @@ -94,6 +94,11 @@ static void pause_loop(isc_loop_t *loop) { isc_loopmgr_t *loopmgr = isc__loopmgr; + /* Quiesce this loop's own workers before going to the barrier. */ + for (isc_worklane_t lane = 0; lane < ISC_WORKLANE_COUNT; lane++) { + isc__workthread_pause(loop->workthreads[lane]); + } + rcu_thread_offline(); loop->paused = true; @@ -108,6 +113,10 @@ resume_loop(isc_loop_t *loop) { loop->paused = false; rcu_thread_online(); + + for (isc_worklane_t lane = 0; lane < ISC_WORKLANE_COUNT; lane++) { + isc__workthread_resume(loop->workthreads[lane]); + } } static void @@ -180,6 +189,10 @@ shutdown_cb(uv_async_t *handle) { isc_signal_destroy(&loopmgr->sigint); } + for (isc_worklane_t lane = 0; lane < ISC_WORKLANE_COUNT; lane++) { + isc__workthread_shutdown(loop->workthreads[lane]); + } + enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking( &loop->async_jobs.head, &loop->async_jobs.tail, &loop->teardown_jobs.head, &loop->teardown_jobs.tail); @@ -248,16 +261,6 @@ quiescent_cb(uv_prepare_t *handle) { #endif } -static void -helper_close(isc_loop_t *loop) { - int r = uv_loop_close(&loop->loop); - UV_RUNTIME_CHECK(uv_loop_close, r); - - INSIST(cds_wfcq_empty(&loop->async_jobs.head, &loop->async_jobs.tail)); - - isc_mem_detach(&loop->mctx); -} - static void loop_close(isc_loop_t *loop) { int r = uv_loop_close(&loop->loop); @@ -271,32 +274,10 @@ loop_close(isc_loop_t *loop) { isc_mem_detach(&loop->mctx); } -static void * -helper_thread(void *arg) { - isc_loop_t *helper = (isc_loop_t *)arg; - - int r = uv_prepare_start(&helper->quiescent, quiescent_cb); - UV_RUNTIME_CHECK(uv_prepare_start, r); - - isc_barrier_wait(&isc__loopmgr->starting); - - r = uv_run(&helper->loop, UV_RUN_DEFAULT); - UV_RUNTIME_CHECK(uv_run, r); - - /* Invalidate the helper early */ - helper->magic = 0; - - isc_barrier_wait(&isc__loopmgr->stopping); - - return NULL; -} - static void * loop_thread(void *arg) { isc_loop_t *loop = (isc_loop_t *)arg; isc_loopmgr_t *loopmgr = isc__loopmgr; - isc_loop_t *helper = &loopmgr->helpers[loop->tid]; - char name[32]; /* Initialize the thread_local variables*/ REQUIRE(isc__loop_local == NULL || isc__loop_local == loop); @@ -304,14 +285,14 @@ loop_thread(void *arg) { isc__tid_init(loop->tid); - /* Start the helper thread */ - isc_thread_create(helper_thread, helper, &helper->thread); - snprintf(name, sizeof(name), "isc-helper-%04" PRItid, loop->tid); - isc_thread_setname(helper->thread, name); - int r = uv_prepare_start(&loop->quiescent, quiescent_cb); UV_RUNTIME_CHECK(uv_prepare_start, r); + for (isc_worklane_t lane = 0; lane < ISC_WORKLANE_COUNT; lane++) { + loop->workthreads[lane] = + isc__workthread_create(isc__loopmgr->mctx, lane); + } + isc_barrier_wait(&loopmgr->starting); enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking( @@ -330,12 +311,12 @@ loop_thread(void *arg) { /* Invalidate the loop early */ loop->magic = 0; - /* Shutdown the helper thread */ - r = uv_async_send(&helper->shutdown_trigger); - UV_RUNTIME_CHECK(uv_async_send, r); - isc_barrier_wait(&loopmgr->stopping); + for (isc_worklane_t lane = 0; lane < ISC_WORKLANE_COUNT; lane++) { + isc__workthread_destroy(&loop->workthreads[lane]); + } + return NULL; } @@ -343,17 +324,6 @@ loop_thread(void *arg) { * Public */ -static void -threadpool_initialize(uint32_t workers) { - char buf[11]; - int r = uv_os_getenv("UV_THREADPOOL_SIZE", buf, - &(size_t){ sizeof(buf) }); - if (r == UV_ENOENT) { - snprintf(buf, sizeof(buf), "%" PRIu32, workers); - uv_os_setenv("UV_THREADPOOL_SIZE", buf); - } -} - static void loop_destroy(isc_loop_t *loop) { int r = uv_async_send(&loop->destroy_trigger); @@ -373,7 +343,6 @@ isc_loopmgr_create(isc_mem_t *mctx, uint32_t nloops) { isc_loopmgr_t *loopmgr = NULL; - threadpool_initialize(nloops); isc__tid_initcount(nloops); loopmgr = isc_mem_get(mctx, sizeof(*loopmgr)); @@ -384,11 +353,13 @@ isc_loopmgr_create(isc_mem_t *mctx, uint32_t nloops) { isc_mem_attach(mctx, &loopmgr->mctx); - /* We need to double the number for loops and helpers */ - isc_barrier_init(&loopmgr->pausing, loopmgr->nloops * 2); - isc_barrier_init(&loopmgr->resuming, loopmgr->nloops * 2); - isc_barrier_init(&loopmgr->starting, loopmgr->nloops * 2); - isc_barrier_init(&loopmgr->stopping, loopmgr->nloops * 2); + /* We need to double the number for loops */ + isc_barrier_init(&loopmgr->pausing, loopmgr->nloops); + isc_barrier_init(&loopmgr->resuming, loopmgr->nloops); + isc_barrier_init(&loopmgr->starting, + loopmgr->nloops * (1 + ISC_WORKLANE_COUNT)); + isc_barrier_init(&loopmgr->stopping, + loopmgr->nloops * (1 + ISC_WORKLANE_COUNT)); loopmgr->loops = isc_mem_cget(loopmgr->mctx, loopmgr->nloops, sizeof(loopmgr->loops[0])); @@ -397,13 +368,6 @@ isc_loopmgr_create(isc_mem_t *mctx, uint32_t nloops) { loop_init(loop, i, "loop"); } - loopmgr->helpers = isc_mem_cget(loopmgr->mctx, loopmgr->nloops, - sizeof(loopmgr->helpers[0])); - for (size_t i = 0; i < loopmgr->nloops; i++) { - isc_loop_t *loop = &loopmgr->helpers[i]; - loop_init(loop, i, "helper"); - } - isc__loopmgr = loopmgr; loopmgr->sigint = isc_signal_new(isc__loopmgr_signal, loopmgr, SIGINT); @@ -522,13 +486,6 @@ isc_loopmgr_pause(void) { "loop exclusive mode: starting"); } - for (size_t i = 0; i < isc__loopmgr->nloops; i++) { - isc_loop_t *helper = &isc__loopmgr->helpers[i]; - - int r = uv_async_send(&helper->pause_trigger); - UV_RUNTIME_CHECK(uv_async_send, r); - } - for (size_t i = 0; i < isc__loopmgr->nloops; i++) { isc_loop_t *loop = &isc__loopmgr->loops[i]; @@ -543,6 +500,7 @@ isc_loopmgr_pause(void) { RUNTIME_CHECK(atomic_compare_exchange_strong(&isc__loopmgr->paused, &(bool){ false }, true)); + pause_loop(CURRENT_LOOP(isc__loopmgr)); if (isc_log_wouldlog(ISC_LOG_DEBUG(1))) { @@ -586,12 +544,6 @@ isc_loopmgr_destroy(void) { isc__loopmgr = NULL; - /* Wait for all helpers to finish */ - for (size_t i = 0; i < loopmgr->nloops; i++) { - isc_loop_t *helper = &loopmgr->helpers[i]; - isc_thread_join(helper->thread, NULL); - } - /* First wait for all loops to finish */ for (size_t i = 1; i < loopmgr->nloops; i++) { isc_loop_t *loop = &loopmgr->loops[i]; @@ -600,13 +552,6 @@ isc_loopmgr_destroy(void) { loopmgr->magic = 0; - for (size_t i = 0; i < loopmgr->nloops; i++) { - isc_loop_t *helper = &loopmgr->helpers[i]; - helper_close(helper); - } - isc_mem_cput(loopmgr->mctx, loopmgr->helpers, loopmgr->nloops, - sizeof(loopmgr->helpers[0])); - for (size_t i = 0; i < loopmgr->nloops; i++) { isc_loop_t *loop = &loopmgr->loops[i]; loop_close(loop); @@ -690,9 +635,22 @@ isc_loop_shuttingdown(isc_loop_t *loop) { return loop->shuttingdown; } -isc_loop_t * -isc_loop_helper(isc_loop_t *loop) { +isc__workthread_t * +isc__loopmgr_workthread(isc_loop_t *loop, isc_worklane_t lane) { REQUIRE(VALID_LOOP(loop)); + REQUIRE(lane < ISC_WORKLANE_COUNT); + + return loop->workthreads[lane]; +} - return &isc__loopmgr->helpers[loop->tid]; +void +isc__loopmgr_starting(void) { + isc_loopmgr_t *loopmgr = isc__loopmgr; + isc_barrier_wait(&loopmgr->starting); +} + +void +isc__loopmgr_stopping(void) { + isc_loopmgr_t *loopmgr = isc__loopmgr; + isc_barrier_wait(&loopmgr->stopping); } diff --git a/lib/isc/loop_p.h b/lib/isc/loop_p.h index e449afeb7d8..071e677c53b 100644 --- a/lib/isc/loop_p.h +++ b/lib/isc/loop_p.h @@ -73,6 +73,9 @@ struct isc_loop { /* safe memory reclamation */ uv_prepare_t quiescent; + + /* thread pools */ + isc__workthread_t *workthreads[ISC_WORKLANE_COUNT]; }; /* @@ -107,7 +110,6 @@ typedef struct isc_loopmgr { /* per-thread objects */ isc_loop_t *loops; - isc_loop_t *helpers; } isc_loopmgr_t; /* @@ -131,18 +133,15 @@ struct isc_signal { #define JOB_MAGIC ISC_MAGIC('J', 'O', 'B', ' ') #define VALID_JOB(t) ISC_MAGIC_VALID(t, JOB_MAGIC) -/* - * Work to be offloaded to an external thread. - */ -struct isc_work { - uv_work_t work; - isc_loop_t *loop; - isc_work_cb work_cb; - isc_after_work_cb after_work_cb; - void *cbarg; -}; - #define DEFAULT_LOOP(loopmgr) (&(loopmgr)->loops[0]) #define CURRENT_LOOP(loopmgr) (&(loopmgr)->loops[isc_tid()]) #define LOOP(loopmgr, tid) (&(loopmgr)->loops[tid]) #define ON_LOOP(loop) ((loop) == CURRENT_LOOP((loop)->loopmgr)) + +isc__workthread_t * +isc__loopmgr_workthread(isc_loop_t *loop, isc_worklane_t lane); + +void +isc__loopmgr_starting(void); +void +isc__loopmgr_stopping(void); diff --git a/lib/isc/meson.build b/lib/isc/meson.build index a99c3450f94..288f8c306e4 100644 --- a/lib/isc/meson.build +++ b/lib/isc/meson.build @@ -86,7 +86,6 @@ isc_srcset.add( 'hash.c', 'hashmap.c', 'heap.c', - 'helper.c', 'hex.c', 'histo.c', 'ht.c', diff --git a/lib/isc/work.c b/lib/isc/work.c index 0b7cdf5743d..c2af8cfe16a 100644 --- a/lib/isc/work.c +++ b/lib/isc/work.c @@ -11,69 +11,433 @@ * information regarding copyright ownership. */ -#include +#include +#include +#include -#include +#include #include #include +#include +#include +#include #include +#include #include #include #include "loop_p.h" +#define WORK_MAGIC ISC_MAGIC('W', 'o', 'r', 'k') +#define VALID_WORK(t) ISC_MAGIC_VALID(t, WORK_MAGIC) +#define WORKTHREAD_MAGIC ISC_MAGIC('W', 'k', 'T', 'h') +#define VALID_WORKTHREAD(t) ISC_MAGIC_VALID(t, WORKTHREAD_MAGIC) + +enum waitstate { + /* The value a sleeping worker blocks on in FUTEX_WAIT. */ + THREAD_WAITING = 0, + /* Any non-zero bit keeps FUTEX_WAIT from blocking. */ + THREAD_WAKEUP = (1 << 0), + THREAD_RUNNING = (1 << 1), + THREAD_SHUTDOWN = (1 << 2), + THREAD_PAUSE = (1 << 3), /* request from the owning loop */ + THREAD_PAUSED = (1 << 4), /* ack from the worker */ +}; + +/* Sticky bits a paused worker must not drop. */ +#define THREAD_STICKY (THREAD_SHUTDOWN | THREAD_PAUSE | THREAD_PAUSED) + +enum workstate { + WORK_QUEUED = 0, + WORK_RUNNING, + WORK_CANCELED, +}; + +struct isc_work { + unsigned int magic; + uint32_t state; /* enum workstate */ + isc_work_cb cb; /* runs on a worker thread */ + isc_work_done_cb done_cb; /* runs on the origin loop */ + void *cbarg; + isc_loop_t *loop; /* origin loop, referenced */ + struct cds_wfcq_node node; /* dispatch queue linkage */ +}; + +typedef struct isc__workthread { + union { + struct { + unsigned int magic; + isc_worklane_t lane; + isc_mem_t *mctx; + isc_thread_t thread; + struct __cds_wfcq_head qhead; + int32_t state; /* enum waitstate */ + }; + uint8_t __padding0[ISC_OS_CACHELINE_SIZE]; + }; + union { + struct cds_wfcq_tail qtail; + uint8_t __padding1[ISC_OS_CACHELINE_SIZE]; + }; +} isc__workthread_t; + +STATIC_ASSERT(ISC_OS_CACHELINE_SIZE >= sizeof(struct cds_wfcq_tail), + "ISC_OS_CACHELINE_SIZE smaller than sizeof(struct " + "cds_wfcq_tail)"); +STATIC_ASSERT(offsetof(isc__workthread_t, qtail) == ISC_OS_CACHELINE_SIZE, + "isc__workthread_t.qtail not on second cacheline"); +STATIC_ASSERT(sizeof(isc__workthread_t) == 2 * ISC_OS_CACHELINE_SIZE, + "isc__workthread_t is not two cachelines"); + static void -isc__work_cb(uv_work_t *req) { - isc_work_t *work = uv_req_get_data((uv_req_t *)req); +workthread_wake(isc__workthread_t *thread) { + cmm_smp_mb(); + if ((uatomic_load(&thread->state, CMM_RELAXED) & THREAD_RUNNING) != 0) { + /* Actively running; it will notice the queue on its own. */ + return; + } - isc__iterated_hash_initialize(); + uatomic_or(&thread->state, THREAD_WAKEUP); + if (futex_noasync(&thread->state, FUTEX_WAKE, 1, NULL, NULL, 0) < 0) { + FATAL_ERROR("futex_noasync(FUTEX_WAKE): %s", strerror(errno)); + } +} - rcu_register_thread(); +static void +workthread_slumber(isc__workthread_t *thread) { + rcu_thread_offline(); + while (futex_noasync(&thread->state, FUTEX_WAIT, THREAD_WAITING, NULL, + NULL, 0) != 0) + { + if (errno == EWOULDBLOCK) { + break; + } else if (errno != EINTR) { + FATAL_ERROR("futex_noasync(FUTEX_WAIT): %s", + strerror(errno)); + } + /* Or retry if interrupted by signal. */ + } + rcu_thread_online(); +} - work->work_cb(work->cbarg); +static void +workthread_sleep(isc__workthread_t *thread) { + /* + * Drop to WAITING while keeping a pending SHUTDOWN/PAUSE sticky, so the + * FUTEX_WAIT below refuses to block once either is signalled. + */ + uatomic_and(&thread->state, THREAD_STICKY); + cmm_smp_mb(); - rcu_unregister_thread(); + /* + * The queue is the one wake condition that can't live in 'state', so + * recheck it under the fence; SHUTDOWN and WAKEUP are handled by + * FUTEX_WAIT's own value check. + */ + if (cds_wfcq_empty(&thread->qhead, &thread->qtail)) { + workthread_slumber(thread); + } - isc__iterated_hash_shutdown(); + /* Tell the waker we are running (keeping any sticky SHUTDOWN/PAUSE). */ + uatomic_or(&thread->state, THREAD_RUNNING); } +/* + * Acknowledge a pause request: publish PAUSED (dropping RUNNING/WAKEUP) and + * wake the waiting pauser. A new pause clears PAUSED, so the worker re-acks + * and the pauser only ever observes an ack set for its own request, never a + * stale one from the previous pause generation. + */ static void -isc__after_work_cb(uv_work_t *req, int status) { - isc_work_t *work = uv_req_get_data((uv_req_t *)req); +workthread_ack_pause(isc__workthread_t *thread) { + int32_t old, next; + do { + old = uatomic_load(&thread->state, CMM_RELAXED); + next = (old & THREAD_STICKY) | THREAD_PAUSED; + } while (uatomic_cmpxchg(&thread->state, old, next) != old); + + (void)futex_noasync(&thread->state, FUTEX_WAKE, INT_MAX, NULL, NULL, 0); +} + +/* + * Honour a pause: pause until the owning loop clears PAUSE (resume). A fresh + * pause clears PAUSED (see isc__workthread_pause), so (re-)ack whenever PAUSED + * is gone — the pauser only proceeds on an ack set for *its* request, never a + * stale one from the previous generation. Stays RCU-offline while paused so + * it can't hold up an exclusive-mode grace period. + */ +static void +workthread_pause(isc__workthread_t *thread) { + rcu_thread_offline(); + + while (true) { + int32_t old = uatomic_load(&thread->state, CMM_ACQUIRE); + if ((old & (THREAD_PAUSE | THREAD_SHUTDOWN)) != THREAD_PAUSE) { + break; + } + if ((old & THREAD_PAUSED) == 0) { + workthread_ack_pause(thread); + continue; + } + (void)futex_noasync(&thread->state, FUTEX_WAIT, old, NULL, NULL, + 0); + } + + uatomic_and(&thread->state, ~THREAD_PAUSED); + rcu_thread_online(); +} + +static void +work_done(void *arg) { + isc_work_t *work = arg; isc_loop_t *loop = work->loop; + isc_result_t result = (uatomic_load(&work->state, CMM_ACQUIRE) != + WORK_CANCELED) + ? ISC_R_SUCCESS + : ISC_R_CANCELED; - UV_RUNTIME_CHECK(uv_after_work_cb, status); + work->done_cb(work->cbarg, result); - work->after_work_cb(work->cbarg); + work->magic = 0; + isc_mem_put(work->loop->mctx, work, sizeof(*work)); + isc_loop_unref(loop); +} - isc_mem_put(loop->mctx, work, sizeof(*work)); +static void +work_run(isc_work_t *work) { + /* + * The CAS *is* the tombstone check: whoever moves the item out + * of WORK_QUEUED first — this worker or isc_work_cancel() — + * decides whether the callback runs. uatomic_cmpxchg returns the + * prior state, so WORK_QUEUED means we won the race. + */ + uint32_t prev = uatomic_cmpxchg(&work->state, WORK_QUEUED, + WORK_RUNNING); + switch (prev) { + case WORK_QUEUED: + work->cb(work->cbarg); + break; + case WORK_CANCELED: + break; + default: + UNREACHABLE(); + } - isc_loop_detach(&loop); + /* Completion always routes back to the origin loop. */ + isc_async_run(work->loop, work_done, work); } -void -isc_work_enqueue(isc_loop_t *loop, isc_work_cb work_cb, - isc_after_work_cb after_work_cb, void *cbarg) { - isc_work_t *work = NULL; - int r; +static void * +workthread_thread(void *arg) { + isc__workthread_t *thread = arg; + + isc__loopmgr_starting(); + + while (true) { + /* + * Honour a pause before touching the queue (gated on !SHUTDOWN + * so a shutting-down worker exits instead of pausing). + */ + int32_t state = uatomic_load(&thread->state, CMM_ACQUIRE); + if ((state & (THREAD_PAUSE | THREAD_SHUTDOWN)) == THREAD_PAUSE) + { + workthread_pause(thread); + continue; + } + + struct cds_wfcq_node *node; + node = __cds_wfcq_dequeue_blocking(&thread->qhead, + &thread->qtail); + + if (node == NULL) { + /* + * Only exit the loop if there's nothing to do. + */ + if ((uatomic_load(&thread->state, CMM_ACQUIRE) & + THREAD_SHUTDOWN) != 0) + { + synchronize_rcu(); + if (!cds_wfcq_empty(&thread->qhead, + &thread->qtail)) + { + continue; + } + break; + } + + workthread_sleep(thread); + + continue; + } + + isc_work_t *work = caa_container_of(node, isc_work_t, node); + work_run(work); + } + + isc__loopmgr_stopping(); - REQUIRE(VALID_LOOP(loop)); - REQUIRE(isc_loop() == loop); - REQUIRE(work_cb != NULL); - REQUIRE(after_work_cb != NULL); + return NULL; +} + +isc_work_t * +isc_work_enqueue(isc_loop_t *loop, isc_worklane_t lane, isc_work_cb cb, + isc_work_done_cb done_cb, void *cbarg) { + REQUIRE(loop == isc_loop()); + + isc__workthread_t *thread = isc__loopmgr_workthread(loop, lane); - work = isc_mem_get(loop->mctx, sizeof(*work)); + isc_work_t *work = isc_mem_get(loop->mctx, sizeof(*work)); *work = (isc_work_t){ - .work_cb = work_cb, - .after_work_cb = after_work_cb, + .magic = WORK_MAGIC, + .cb = cb, + .done_cb = done_cb, .cbarg = cbarg, + .loop = isc_loop_ref(loop), + .state = WORK_QUEUED, }; - isc_loop_attach(loop, &work->loop); + rcu_read_lock(); + if ((uatomic_load(&thread->state, CMM_ACQUIRE) & THREAD_SHUTDOWN) != 0) + { + rcu_read_unlock(); + + /* + * We are shutting down, so immedaitely run task instead of + * adding more in the queue. (The worker is running the + * remaining enqueue tasks and shutdown after, see + * workthread_thread().) + */ + work_run(work); + } else { + (void)cds_wfcq_enqueue(&thread->qhead, &thread->qtail, + &work->node); + rcu_read_unlock(); + + if ((uatomic_load(&thread->state, CMM_ACQUIRE) & + THREAD_RUNNING) == 0) + { + workthread_wake(thread); + } + } + + return work; +} + +bool +isc_work_cancel(isc_work_t *work) { + REQUIRE(VALID_WORK(work)); + + /* + * Tombstone: QUEUED -> CANCELED. The node stays in the queue + * (no interior unlink in a singly-linked lock-free queue) and + * is discarded by whichever worker dequeues it; after_cb still + * fires with ISC_R_CANCELED. Nothing is freed here. False + * means the callback is running or done — uv_cancel semantics. + */ + return uatomic_cmpxchg(&work->state, WORK_QUEUED, WORK_CANCELED) == + WORK_QUEUED; +} + +isc__workthread_t * +isc__workthread_create(isc_mem_t *mctx, isc_worklane_t lane) { + isc__workthread_t *thread = isc_mem_get(mctx, sizeof(*thread)); + + *thread = (isc__workthread_t){ + .lane = lane, + .magic = WORKTHREAD_MAGIC, + .state = THREAD_WAITING, + .mctx = isc_mem_ref(mctx), + }; + + __cds_wfcq_init(&thread->qhead, &thread->qtail); + + isc_thread_create(workthread_thread, thread, &thread->thread); + + return thread; +} + +void +isc__workthread_shutdown(isc__workthread_t *thread) { + REQUIRE(VALID_WORKTHREAD(thread)); - uv_req_set_data((uv_req_t *)&work->work, work); + /* + * Not called while the worker is paused by isc__workthread_pause(): + * shutdown callbacks run from uv loops, and loopmgr pause keeps every + * loop out of uv_run() until resume, so PAUSE and SHUTDOWN never + * coexist on a worker (the SHUTDOWN checks in the pause path are only + * a belt-and-braces exit if that ever changed). + */ + + /* Set the sticky SHUTDOWN bit once; bail if already shutting down. */ + int32_t old; + do { + old = uatomic_load(&thread->state, CMM_RELAXED); + if ((old & THREAD_SHUTDOWN) != 0) { + return; + } + } while (uatomic_cmpxchg(&thread->state, old, old | THREAD_SHUTDOWN) != + old); + + /* Fence in-flight enqueues (which touch the queue) before draining. */ + synchronize_rcu(); + + workthread_wake(thread); +} + +void +isc__workthread_destroy(isc__workthread_t **threadp) { + REQUIRE(threadp != NULL && VALID_WORKTHREAD(*threadp)); + isc__workthread_t *thread = MOVE_OWNERSHIP(*threadp); + + isc_thread_join(thread->thread, NULL); + + INSIST(cds_wfcq_empty(&thread->qhead, &thread->qtail)); + + thread->magic = 0; + isc_mem_putanddetach(&thread->mctx, thread, sizeof(*thread)); +} + +void +isc__workthread_pause(isc__workthread_t *thread) { + REQUIRE(VALID_WORKTHREAD(thread)); + + /* + * Request a pause, but only if not already shutting down — a + * shutting-down worker heads for the stopping barrier and must never + * be waited on here (that'd be a deadlock). Clearing PAUSED as we set + * PAUSE invalidates any ack left over from the previous generation, so + * the wait below can only succeed on an ack for this request. + */ + int32_t old; + do { + old = uatomic_load(&thread->state, CMM_RELAXED); + if ((old & THREAD_SHUTDOWN) != 0) { + return; + } + } while (uatomic_cmpxchg(&thread->state, old, + (old | THREAD_PAUSE) & ~THREAD_PAUSED) != old); + + workthread_wake(thread); + + /* + * Wait for the worker to acknowledge (PAUSED, form workthread_thread() + * calling workthread_pause()) or for shutdown. + */ + while (true) { + old = uatomic_load(&thread->state, CMM_ACQUIRE); + if ((old & (THREAD_PAUSED | THREAD_SHUTDOWN)) != 0) { + return; + } + (void)futex_noasync(&thread->state, FUTEX_WAIT, old, NULL, NULL, + 0); + } +} + +void +isc__workthread_resume(isc__workthread_t *thread) { + REQUIRE(VALID_WORKTHREAD(thread)); - r = uv_queue_work(&loop->loop, &work->work, isc__work_cb, - isc__after_work_cb); - UV_RUNTIME_CHECK(uv_queue_work, r); + /* Clear the request and wake the paused worker. */ + uatomic_and(&thread->state, ~THREAD_PAUSE); + (void)futex_noasync(&thread->state, FUTEX_WAKE, INT_MAX, NULL, NULL, 0); } diff --git a/meson.build b/meson.build index a3d05d9d256..3a326a4704d 100644 --- a/meson.build +++ b/meson.build @@ -729,6 +729,16 @@ if urcu_dep[1].version().version_compare('<0.13.0') urcu_dep += cc.find_library('liburcu-common') endif +foreach h : [ + 'urcu/uatomic.h', + 'urcu/assert.h', + 'urcu/pointer.h', +] + if cc.has_header(h, dependencies: urcu_dep) + config.set('HAVE_@0@'.format(h.underscorify().to_upper()), 1) + endif +endforeach + config.set_quoted('RCU_FLAVOR', f'liburcu-@rcu_flavor@') config.set_quoted('RCU_VERSION', urcu_dep[0].version()) diff --git a/tests/isc/work_test.c b/tests/isc/work_test.c index 60eb7f6c25c..9433ee6a6a2 100644 --- a/tests/isc/work_test.c +++ b/tests/isc/work_test.c @@ -47,14 +47,15 @@ work_cb(void *arg ISC_ATTR_UNUSED) { } static void -after_work_cb(void *arg ISC_ATTR_UNUSED) { +after_work_cb(void *arg ISC_ATTR_UNUSED, isc_result_t result ISC_ATTR_UNUSED) { assert_int_equal(atomic_load(&scheduled), 1); isc_loopmgr_shutdown(); } static void work_enqueue_cb(void *arg ISC_ATTR_UNUSED) { - isc_work_enqueue(isc_loop(), work_cb, after_work_cb, NULL); + isc_work_enqueue(isc_loop(), ISC_WORKLANE_FAST, work_cb, after_work_cb, + NULL); } ISC_RUN_TEST_IMPL(isc_work_enqueue) {