start_next_command(void);
static void
-process_next_command(void *arg ISC_ATTR_UNUSED) {
+process_next_command(void *arg ISC_ATTR_UNUSED,
+ isc_result_t result ISC_ATTR_UNUSED) {
isc_loop_t *loop = isc_loop_main();
if (cmdline == NULL) {
in_use = false;
isc_loopmgr_pause();
if (interactive) {
- isc_work_enqueue(loop, readline_next_command,
+ isc_work_enqueue(loop, ISC_WORKLANE_FAST, readline_next_command,
process_next_command, loop);
} else {
- isc_work_enqueue(loop, fgets_next_command, process_next_command,
- loop);
+ isc_work_enqueue(loop, ISC_WORKLANE_FAST, fgets_next_command,
+ process_next_command, loop);
}
isc_loopmgr_resume();
}
#include <dns/catz.h>
#include <dns/dbiterator.h>
+#include <dns/name.h>
#include <dns/rdatasetiter.h>
#include <dns/view.h>
#include <dns/zone.h>
-#include "dns/name.h"
-
#define DNS_CATZ_ZONE_MAGIC ISC_MAGIC('c', 'a', 't', 'z')
#define DNS_CATZ_ZONES_MAGIC ISC_MAGIC('c', 'a', 't', 's')
#define DNS_CATZ_ENTRY_MAGIC ISC_MAGIC('c', 'a', 't', 'e')
static void
dns__catz_update_cb(void *data);
static void
-dns__catz_done_cb(void *data);
+dns__catz_done_cb(void *data, isc_result_t result);
static isc_result_t
catz_process_zones_entry(dns_catz_zone_t *catz, dns_rdataset_t *value,
"catz: %s: reload start", domain);
dns_catz_zone_ref(catz);
- isc_work_enqueue(catz->loop, dns__catz_update_cb, dns__catz_done_cb,
- catz);
+ isc_work_enqueue(catz->loop, ISC_WORKLANE_SLOW, dns__catz_update_cb,
+ dns__catz_done_cb, catz);
exit:
isc_timer_destroy(&catz->updatetimer);
}
static void
-dns__catz_done_cb(void *data) {
+dns__catz_done_cb(void *data, isc_result_t result ISC_ATTR_UNUSED) {
dns_catz_zone_t *catz = (dns_catz_zone_t *)data;
char dname[DNS_NAME_FORMATSIZE];
}
static void
-master_load_done(void *arg) {
+master_load_done(void *arg, isc_result_t result ISC_ATTR_UNUSED) {
dns_loadctx_t *lctx = arg;
isc_async_run(lctx->loop, master_load_callback, lctx);
dns_loadctx_ref(lctx);
isc_loop_attach(loop, &lctx->loop);
- isc_work_enqueue(isc_loop(), master_load, master_load_done, lctx);
+ isc_work_enqueue(isc_loop(), ISC_WORKLANE_SLOW, master_load,
+ master_load_done, lctx);
*lctxp = lctx;
}
static void
-master_dump_done(void *data) {
+master_dump_done(void *data, isc_result_t result ISC_ATTR_UNUSED) {
dns_dumpctx_t *dctx = data;
isc_async_run(dctx->loop, master_dump_callback, dctx);
dns_dumpctx_ref(dctx);
isc_loop_attach(loop, &dctx->loop);
- isc_work_enqueue(isc_loop(), master_dump, master_dump_done, dctx);
+ isc_work_enqueue(isc_loop(), ISC_WORKLANE_SLOW, master_dump,
+ master_dump_done, dctx);
*dctxp = dctx;
dns_dumpctx_ref(dctx);
isc_loop_attach(loop, &dctx->loop);
- isc_work_enqueue(isc_loop(), master_dump, master_dump_done, dctx);
+ isc_work_enqueue(isc_loop(), ISC_WORKLANE_SLOW, master_dump,
+ master_dump_done, dctx);
*dctxp = dctx;
#include <isc/buffer.h>
#include <isc/hash.h>
#include <isc/hashmap.h>
-#include <isc/helper.h>
#include <isc/log.h>
#include <isc/mem.h>
#include <isc/result.h>
#endif /* ifdef SKAN_MSG_DEBUG */
static void
-checksig_done(void *arg);
+checksig_done(void *arg, isc_result_t result);
static void
checksig_run(void *arg) {
checksig_ctx_t *chsigctx = arg;
chsigctx->result = dns_message_checksig(chsigctx->msg, chsigctx->view);
-
- isc_async_run(chsigctx->loop, checksig_done, chsigctx);
}
static void
-checksig_done(void *arg) {
+checksig_done(void *arg, isc_result_t result ISC_ATTR_UNUSED) {
checksig_ctx_t *chsigctx = arg;
dns_message_t *msg = chsigctx->msg;
- chsigctx->cb(chsigctx->cbarg, chsigctx->result);
+ chsigctx->cb(chsigctx->cbarg,
+ (result != ISC_R_SUCCESS) ? result : chsigctx->result);
dns_view_detach(&chsigctx->view);
isc_loop_detach(&chsigctx->loop);
dns_view_attach(view, &chsigctx->view);
dns_message_clonebuffer(msg);
- isc_helper_run(loop, checksig_run, chsigctx);
+ isc_work_enqueue(loop, ISC_WORKLANE_FAST, checksig_run, checksig_done,
+ chsigctx);
return DNS_R_WAIT;
}
}
static void
-update_rpz_done_cb(void *data) {
+update_rpz_done_cb(void *data, isc_result_t result ISC_ATTR_UNUSED) {
dns_rpz_zone_t *rpz = (dns_rpz_zone_t *)data;
char dname[DNS_NAME_FORMATSIZE];
"rpz: %s: reload start", domain);
dns_rpz_zones_ref(rpz->rpzs);
- isc_work_enqueue(rpz->loop, update_rpz_cb, update_rpz_done_cb, rpz);
+ isc_work_enqueue(rpz->loop, ISC_WORKLANE_SLOW, update_rpz_cb,
+ update_rpz_done_cb, rpz);
isc_timer_destroy(&rpz->updatetimer);
rpz->loop = NULL;
#include <isc/atomic.h>
#include <isc/base32.h>
#include <isc/counter.h>
-#include <isc/helper.h>
#include <isc/job.h>
#include <isc/log.h>
#include <isc/md.h>
static isc_result_t
validate_async_run(dns_validator_t *val, isc_job_cb cb);
static isc_result_t
-validate_helper_run(dns_validator_t *val, isc_job_cb cb);
+validate_work_enqueue(dns_validator_t *val, isc_job_cb cb);
static void
validate_dnskey(void *arg);
if (eresult == ISC_R_SUCCESS &&
rdataset->trust >= dns_trust_secure)
{
- result = validate_helper_run(
+ result = validate_work_enqueue(
val, resume_answer_with_key);
} else {
result = validate_async_run(val, resume_answer);
* Only extract the dst key if the keyset is secure.
*/
if (val->frdataset.trust >= dns_trust_secure) {
- result = validate_helper_run(val,
- resume_answer_with_key);
+ result = validate_work_enqueue(val,
+ resume_answer_with_key);
} else {
result = validate_async_run(val, resume_answer);
}
}
dns_rdataset_cleanup(&val->fsigrdataset);
- return validate_helper_run(val, resume_answer_with_key);
+ return validate_work_enqueue(val,
+ resume_answer_with_key);
}
break;
val->result = ISC_R_CANCELED;
} else if (val->key != NULL) {
/* Process with next key if we selected one */
- (void)validate_helper_run(val, validate_answer_signing_key);
+ (void)validate_work_enqueue(val, validate_answer_signing_key);
return;
}
goto next_key;
}
- (void)validate_helper_run(val, validate_answer_signing_key);
+ (void)validate_work_enqueue(val, validate_answer_signing_key);
return;
next_key:
return DNS_R_WAIT;
}
+static void
+null_done(void *arg ISC_ATTR_UNUSED, isc_result_t result ISC_ATTR_UNUSED) {
+ /* no-op for now */
+}
+
static isc_result_t
-validate_helper_run(dns_validator_t *val, isc_job_cb cb) {
+validate_work_enqueue(dns_validator_t *val, isc_job_cb cb) {
val->attributes |= VALATTR_OFFLOADED;
- isc_helper_run(val->loop, cb, val);
+ isc_work_enqueue(val->loop, ISC_WORKLANE_FAST, cb, null_done, val);
return DNS_R_WAIT;
}
break;
default:
/* Continue validation until we have success or no more data */
- (void)validate_helper_run(val, validate_dnskey_dsset_next);
+ (void)validate_work_enqueue(val, validate_dnskey_dsset_next);
return;
}
/* continue async run */
result = validate_dnskey_dsset(val);
if (result != ISC_R_SUCCESS) {
- (void)validate_helper_run(val,
- validate_dnskey_dsset_next);
+ (void)validate_work_enqueue(val,
+ validate_dnskey_dsset_next);
return;
}
}
}
static void
-axfr_apply_done(void *arg) {
+axfr_apply_done(void *arg, isc_result_t eresult) {
xfrin_work_t *work = arg;
dns_xfrin_t *xfr = work->xfr;
- isc_result_t result = work->result;
+ isc_result_t result = (eresult == ISC_R_SUCCESS) ? work->result
+ : eresult;
REQUIRE(VALID_XFRIN(xfr));
REQUIRE(VALID_XFRIN_WORK(work));
.xfr = dns_xfrin_ref(xfr),
};
xfr->diff_running = true;
- isc_work_enqueue(xfr->loop, axfr_apply, axfr_apply_done, work);
+ isc_work_enqueue(xfr->loop, ISC_WORKLANE_SLOW, axfr_apply,
+ axfr_apply_done, work);
}
static isc_result_t
}
static void
-ixfr_apply_done(void *arg) {
+ixfr_apply_done(void *arg, isc_result_t eresult) {
xfrin_work_t *work = arg;
REQUIRE(VALID_XFRIN_WORK(work));
dns_xfrin_t *xfr = work->xfr;
REQUIRE(VALID_XFRIN(xfr));
- isc_result_t result = work->result;
+ isc_result_t result = (eresult == ISC_R_SUCCESS) ? work->result
+ : eresult;
if (atomic_load(&xfr->shuttingdown)) {
result = ISC_R_SHUTTINGDOWN;
if (!xfr->retry_axfr &&
!cds_wfcq_empty(&xfr->diff_head, &xfr->diff_tail))
{
- isc_work_enqueue(xfr->loop, ixfr_apply, ixfr_apply_done, work);
+ isc_work_enqueue(xfr->loop, ISC_WORKLANE_SLOW, ixfr_apply,
+ ixfr_apply_done, work);
return;
}
.xfr = dns_xfrin_ref(xfr),
};
xfr->diff_running = true;
- isc_work_enqueue(xfr->loop, ixfr_apply, ixfr_apply_done, work);
+ isc_work_enqueue(xfr->loop, ISC_WORKLANE_SLOW, ixfr_apply,
+ ixfr_apply_done, work);
}
cleanup:
+++ /dev/null
-/*
- * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
- *
- * SPDX-License-Identifier: MPL-2.0
- *
- * This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, you can obtain one at https://mozilla.org/MPL/2.0/.
- *
- * See the COPYRIGHT file distributed with this work for additional
- * information regarding copyright ownership.
- */
-
-#include <stdlib.h>
-#include <sys/types.h>
-#include <unistd.h>
-
-#include <isc/atomic.h>
-#include <isc/barrier.h>
-#include <isc/helper.h>
-#include <isc/job.h>
-#include <isc/loop.h>
-#include <isc/magic.h>
-#include <isc/mem.h>
-#include <isc/mutex.h>
-#include <isc/refcount.h>
-#include <isc/result.h>
-#include <isc/signal.h>
-#include <isc/strerr.h>
-#include <isc/thread.h>
-#include <isc/util.h>
-#include <isc/uv.h>
-#include <isc/work.h>
-
-#include "async_p.h"
-#include "job_p.h"
-#include "loop_p.h"
-
-void
-isc_helper_run(isc_loop_t *loop, isc_job_cb cb, void *cbarg) {
- REQUIRE(VALID_LOOP(loop));
- REQUIRE(cb != NULL);
-
- isc_loop_t *helper = isc_loop_helper(loop);
- isc_job_t *job = isc_mem_get(helper->mctx, sizeof(*job));
- *job = (isc_job_t){
- .cb = cb,
- .cbarg = cbarg,
- };
-
- cds_wfcq_node_init(&job->wfcq_node);
-
- /*
- * cds_wfcq_enqueue() is non-blocking and enqueues the job to async
- * queue.
- *
- * The function returns 'false' in case the queue was empty - in such
- * case we need to trigger the async callback.
- */
- if (!cds_wfcq_enqueue(&helper->async_jobs.head,
- &helper->async_jobs.tail, &job->wfcq_node))
- {
- int r = uv_async_send(&helper->async_trigger);
- UV_RUNTIME_CHECK(uv_async_send, r);
- }
-}
}
static void
-prepare_response_done(void *arg) {
+prepare_response_done(void *arg, isc_result_t result) {
isc_region_t r;
isc_httpd_sendreq_t *req = arg;
isc_httpd_t *httpd = req->httpd;
*/
isc_buffer_usedregion(req->sendbuffer, &r);
+ if (result != ISC_R_SUCCESS) {
+ httpd_senddone(httpd->handle, result, req);
+ return;
+ }
+
isc_nm_send(httpd->handle, &r, httpd_senddone, req);
}
isc_httpd_sendreq_t *req = isc__httpd_sendreq_new(httpd);
isc_nmhandle_ref(handle);
- isc_work_enqueue(isc_loop(), prepare_response, prepare_response_done,
- req);
+ isc_work_enqueue(isc_loop(), ISC_WORKLANE_SLOW, prepare_response,
+ prepare_response_done, req);
return;
close_readhandle:
+++ /dev/null
-/*
- * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
- *
- * SPDX-License-Identifier: MPL-2.0
- *
- * This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, you can obtain one at https://mozilla.org/MPL/2.0/.
- *
- * See the COPYRIGHT file distributed with this work for additional
- * information regarding copyright ownership.
- */
-
-/*! \file isc/helper.h */
-
-#pragma once
-
-#include <inttypes.h>
-
-#include <isc/job.h>
-#include <isc/loop.h>
-#include <isc/mem.h>
-#include <isc/types.h>
-
-void
-isc_helper_run(isc_loop_t *loop, isc_job_cb cb, void *cbarg);
-/*%<
- * Schedule the job callback 'cb' to be run on the 'loop' event loop.
- *
- * Requires:
- *
- *\li 'loop' is a valid isc event loop
- *\li 'cb' is a callback function, must be non-NULL
- *\li 'cbarg' is passed to the 'cb' as the only argument, may be NULL
- */
-
-#define isc_helper_current(cb, cbarg) isc_async_run(isc_loop(), cb, cbarg)
-/*%<
- * Helper macro to run the job on the current loop
- */
#include <inttypes.h>
-#include <urcu/compiler.h>
-#include <urcu/system.h>
-
#include <isc/job.h>
#include <isc/mem.h>
#include <isc/refcount.h>
#include <isc/types.h>
+#include <isc/urcu.h>
+#include <isc/work.h>
typedef void (*isc_job_cb)(void *);
*
* \li 'loop' is a valid loop and the loop tid matches the current tid.
*/
-
-isc_loop_t *
-isc_loop_helper(isc_loop_t *loop);
-/*%<
- * Returns the helper thread corresponding to the thread ID for 'loop'.
- *
- * Requires:
- *
- * \li 'loop' is a valid loop.
- */
sizeof(struct __cds_wfcq_head)];
struct cds_wfcq_tail tail;
uint8_t __padding_tail[ISC_OS_CACHELINE_SIZE -
- sizeof(struct __cds_wfcq_head)];
+ sizeof(struct cds_wfcq_tail)];
} isc_queue_t;
typedef struct cds_wfcq_node isc_queue_node_t;
#include <urcu-bp.h>
#endif
-#include <urcu-pointer.h>
-
+#if HAVE_URCU_ASSERT_H
+#include <urcu/assert.h>
+#endif
+#if HAVE_URCU_UATOMIC_H
+#include <urcu/uatomic.h>
+#endif
#include <urcu/compiler.h>
+#include <urcu/futex.h>
#include <urcu/list.h>
+#if HAVE_URCU_POINTER_H
+#include <urcu/pointer.h>
+#else
+#include <urcu-pointer.h>
+#endif
#include <urcu/rculfhash.h>
#include <urcu/rculist.h>
#include <urcu/ref.h>
#define _CMM_STORE_SHARED(x, v) CMM_STORE_SHARED(x, v)
#endif /* __SANITIZE_THREAD__ */
+
+#if !defined(uatomic_load) || !defined(uatomic_store)
+#define uatomic_load(ptr, mo) uatomic_read(ptr)
+#define uatomic_store(ptr, v, mo) uatomic_set(ptr, v)
+
+#define CMM_RELAXED __ATOMIC_RELAXED
+#define CMM_CONSUME __ATOMIC_CONSUME
+#define CMM_ACQUIRE __ATOMIC_ACQUIRE
+#define CMM_RELEASE __ATOMIC_RELEASE
+#define CMM_ACQ_REL __ATOMIC_ACQ_REL
+#define CMM_SEQ_CST __ATOMIC_SEQ_CST
+#endif
* information regarding copyright ownership.
*/
+/*! \file isc/work.h
+ * \brief Offload work from an event loop onto a dedicated worker thread.
+ *
+ * Each isc event loop has one worker thread per lane (see isc_worklane_t).
+ * isc_work_enqueue() runs a callback on the worker thread bound to the calling
+ * loop's lane and, when it finishes, runs a second callback back on that loop.
+ * The handle it returns can be used to cancel a task that has not started
+ * running yet.
+ */
+
#pragma once
-#include <stdlib.h>
+#include <isc/mem.h>
-#include <isc/loop.h>
+typedef enum isc_worklane {
+ ISC_WORKLANE_FAST = 0, /*%< short, bounded tasks (e.g. crypto) */
+ ISC_WORKLANE_SLOW, /*%< blocking/long tasks (e.g. zone dump) */
+ ISC_WORKLANE_COUNT,
+} isc_worklane_t;
+/*%<
+ * Selects which per-loop worker thread runs an enqueued task. Keeping long,
+ * blocking SLOW work (disk I/O, zone dump/load) on its own lane stops it from
+ * holding up short FAST tasks behind it.
+ */
typedef void (*isc_work_cb)(void *arg);
-typedef void (*isc_after_work_cb)(void *arg);
+typedef void (*isc_work_done_cb)(void *arg, isc_result_t result);
typedef struct isc_work isc_work_t;
-void
-isc_work_enqueue(isc_loop_t *loop, isc_work_cb work_cb,
- isc_after_work_cb after_work_cb, void *cbarg);
+isc_work_t *
+isc_work_enqueue(isc_loop_t *loop, isc_worklane_t lane, isc_work_cb cb,
+ isc_work_done_cb done_cb, void *cbarg);
/*%<
- * Schedules work to be handled by the libuv thread pool (see uv_work_t).
- * The function specified in `work_cb` will be run by a thread in the
- * thread pool; when complete, the `after_work_cb` function will run
- * in 'loop' to inform the caller that the work was completed.
+ * Schedule 'cb' to run on the worker thread bound to 'loop' and 'lane'. When
+ * 'cb' returns, 'done_cb' is scheduled back on 'loop' with the result of the
+ * work: ISC_R_SUCCESS normally, or ISC_R_CANCELED if the task was canceled
+ * before it started (see isc_work_cancel()).
+ *
+ * Returns a handle that may be passed to isc_work_cancel(). The handle is
+ * owned by 'loop' and stays valid until 'after_cb' has run; it must not be used
+ * afterwards.
*
* Requires:
- * \li 'loop' is a valid event loop.
- * \li 'work_cb' and 'after_work_cb' are not NULL.
+ *
+ *\li 'loop' is a valid isc event loop.
+ *\li 'cb' is non-NULL.
+ *\li 'after_cb' is non-NULL.
+ *\li 'cbarg' is passed to both callbacks, may be NULL.
+ */
+
+bool
+isc_work_cancel(isc_work_t *work);
+/*%<
+ * Try to cancel 'work' before its 'cb' starts running. If the task is still
+ * queued it is marked canceled and 'cb' will not run; if it is already running
+ * or has finished this has no effect. Either way the 'after_cb' passed to
+ * isc_work_enqueue() still runs on the origin loop, with ISC_R_CANCELED when
+ * the cancel succeeded. Nothing is freed here.
+ *
+ * Returns:
+ *
+ *\li true the task was still queued; 'cb' will not run.
+ *\li false the task is already running or done (uv_cancel() semantics).
+ *
+ * Requires:
+ *
+ *\li 'work' is a handle from isc_work_enqueue() whose 'after_cb' has not run.
+ */
+
+/* private */
+
+typedef struct isc__workthread isc__workthread_t;
+
+isc__workthread_t *
+isc__workthread_create(isc_mem_t *mctx, isc_worklane_t lane);
+/*%<
+ * Create one worker thread for 'lane' with its own dispatch queue (the loop
+ * manager creates one per loop). Used by the loop manager; not for general
+ * use.
+ */
+
+void
+isc__workthread_shutdown(isc__workthread_t *thread);
+/*%<
+ * Begin shutdown of 'thread': set its SHUTDOWN flag (after which new enqueues
+ * run inline on the caller instead of queuing), then, after an RCU grace period
+ * that fences any in-flight enqueue, wake the worker so it drains its queue and
+ * exits. Does not join the worker; see isc__workthread_destroy(). Idempotent.
+ */
+
+void
+isc__workthread_pause(isc__workthread_t *thread);
+/*%<
+ * Quiesce 'thread' for isc_loopmgr_pause(): mark it paused and block until it
+ * has parked. A no-op if the worker is already shutting down. Must be paired
+ * with isc__workthread_resume() and called from the worker's owning loop.
+ */
+
+void
+isc__workthread_resume(isc__workthread_t *thread);
+/*%<
+ * Release a worker previously parked by isc__workthread_pause().
+ */
+
+void
+isc__workthread_destroy(isc__workthread_t **threadp);
+/*%<
+ * Join the worker thread, then free '*threadp' and set it to NULL. Must be
+ * called after isc__workthread_shutdown().
*/
pause_loop(isc_loop_t *loop) {
isc_loopmgr_t *loopmgr = isc__loopmgr;
+ /* Quiesce this loop's own workers before going to the barrier. */
+ for (isc_worklane_t lane = 0; lane < ISC_WORKLANE_COUNT; lane++) {
+ isc__workthread_pause(loop->workthreads[lane]);
+ }
+
rcu_thread_offline();
loop->paused = true;
loop->paused = false;
rcu_thread_online();
+
+ for (isc_worklane_t lane = 0; lane < ISC_WORKLANE_COUNT; lane++) {
+ isc__workthread_resume(loop->workthreads[lane]);
+ }
}
static void
isc_signal_destroy(&loopmgr->sigint);
}
+ for (isc_worklane_t lane = 0; lane < ISC_WORKLANE_COUNT; lane++) {
+ isc__workthread_shutdown(loop->workthreads[lane]);
+ }
+
enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking(
&loop->async_jobs.head, &loop->async_jobs.tail,
&loop->teardown_jobs.head, &loop->teardown_jobs.tail);
#endif
}
-static void
-helper_close(isc_loop_t *loop) {
- int r = uv_loop_close(&loop->loop);
- UV_RUNTIME_CHECK(uv_loop_close, r);
-
- INSIST(cds_wfcq_empty(&loop->async_jobs.head, &loop->async_jobs.tail));
-
- isc_mem_detach(&loop->mctx);
-}
-
static void
loop_close(isc_loop_t *loop) {
int r = uv_loop_close(&loop->loop);
isc_mem_detach(&loop->mctx);
}
-static void *
-helper_thread(void *arg) {
- isc_loop_t *helper = (isc_loop_t *)arg;
-
- int r = uv_prepare_start(&helper->quiescent, quiescent_cb);
- UV_RUNTIME_CHECK(uv_prepare_start, r);
-
- isc_barrier_wait(&isc__loopmgr->starting);
-
- r = uv_run(&helper->loop, UV_RUN_DEFAULT);
- UV_RUNTIME_CHECK(uv_run, r);
-
- /* Invalidate the helper early */
- helper->magic = 0;
-
- isc_barrier_wait(&isc__loopmgr->stopping);
-
- return NULL;
-}
-
static void *
loop_thread(void *arg) {
isc_loop_t *loop = (isc_loop_t *)arg;
isc_loopmgr_t *loopmgr = isc__loopmgr;
- isc_loop_t *helper = &loopmgr->helpers[loop->tid];
- char name[32];
/* Initialize the thread_local variables*/
REQUIRE(isc__loop_local == NULL || isc__loop_local == loop);
isc__tid_init(loop->tid);
- /* Start the helper thread */
- isc_thread_create(helper_thread, helper, &helper->thread);
- snprintf(name, sizeof(name), "isc-helper-%04" PRItid, loop->tid);
- isc_thread_setname(helper->thread, name);
-
int r = uv_prepare_start(&loop->quiescent, quiescent_cb);
UV_RUNTIME_CHECK(uv_prepare_start, r);
+ for (isc_worklane_t lane = 0; lane < ISC_WORKLANE_COUNT; lane++) {
+ loop->workthreads[lane] =
+ isc__workthread_create(isc__loopmgr->mctx, lane);
+ }
+
isc_barrier_wait(&loopmgr->starting);
enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking(
/* Invalidate the loop early */
loop->magic = 0;
- /* Shutdown the helper thread */
- r = uv_async_send(&helper->shutdown_trigger);
- UV_RUNTIME_CHECK(uv_async_send, r);
-
isc_barrier_wait(&loopmgr->stopping);
+ for (isc_worklane_t lane = 0; lane < ISC_WORKLANE_COUNT; lane++) {
+ isc__workthread_destroy(&loop->workthreads[lane]);
+ }
+
return NULL;
}
* Public
*/
-static void
-threadpool_initialize(uint32_t workers) {
- char buf[11];
- int r = uv_os_getenv("UV_THREADPOOL_SIZE", buf,
- &(size_t){ sizeof(buf) });
- if (r == UV_ENOENT) {
- snprintf(buf, sizeof(buf), "%" PRIu32, workers);
- uv_os_setenv("UV_THREADPOOL_SIZE", buf);
- }
-}
-
static void
loop_destroy(isc_loop_t *loop) {
int r = uv_async_send(&loop->destroy_trigger);
isc_loopmgr_t *loopmgr = NULL;
- threadpool_initialize(nloops);
isc__tid_initcount(nloops);
loopmgr = isc_mem_get(mctx, sizeof(*loopmgr));
isc_mem_attach(mctx, &loopmgr->mctx);
- /* We need to double the number for loops and helpers */
- isc_barrier_init(&loopmgr->pausing, loopmgr->nloops * 2);
- isc_barrier_init(&loopmgr->resuming, loopmgr->nloops * 2);
- isc_barrier_init(&loopmgr->starting, loopmgr->nloops * 2);
- isc_barrier_init(&loopmgr->stopping, loopmgr->nloops * 2);
+ /* We need to double the number for loops */
+ isc_barrier_init(&loopmgr->pausing, loopmgr->nloops);
+ isc_barrier_init(&loopmgr->resuming, loopmgr->nloops);
+ isc_barrier_init(&loopmgr->starting,
+ loopmgr->nloops * (1 + ISC_WORKLANE_COUNT));
+ isc_barrier_init(&loopmgr->stopping,
+ loopmgr->nloops * (1 + ISC_WORKLANE_COUNT));
loopmgr->loops = isc_mem_cget(loopmgr->mctx, loopmgr->nloops,
sizeof(loopmgr->loops[0]));
loop_init(loop, i, "loop");
}
- loopmgr->helpers = isc_mem_cget(loopmgr->mctx, loopmgr->nloops,
- sizeof(loopmgr->helpers[0]));
- for (size_t i = 0; i < loopmgr->nloops; i++) {
- isc_loop_t *loop = &loopmgr->helpers[i];
- loop_init(loop, i, "helper");
- }
-
isc__loopmgr = loopmgr;
loopmgr->sigint = isc_signal_new(isc__loopmgr_signal, loopmgr, SIGINT);
"loop exclusive mode: starting");
}
- for (size_t i = 0; i < isc__loopmgr->nloops; i++) {
- isc_loop_t *helper = &isc__loopmgr->helpers[i];
-
- int r = uv_async_send(&helper->pause_trigger);
- UV_RUNTIME_CHECK(uv_async_send, r);
- }
-
for (size_t i = 0; i < isc__loopmgr->nloops; i++) {
isc_loop_t *loop = &isc__loopmgr->loops[i];
RUNTIME_CHECK(atomic_compare_exchange_strong(&isc__loopmgr->paused,
&(bool){ false }, true));
+
pause_loop(CURRENT_LOOP(isc__loopmgr));
if (isc_log_wouldlog(ISC_LOG_DEBUG(1))) {
isc__loopmgr = NULL;
- /* Wait for all helpers to finish */
- for (size_t i = 0; i < loopmgr->nloops; i++) {
- isc_loop_t *helper = &loopmgr->helpers[i];
- isc_thread_join(helper->thread, NULL);
- }
-
/* First wait for all loops to finish */
for (size_t i = 1; i < loopmgr->nloops; i++) {
isc_loop_t *loop = &loopmgr->loops[i];
loopmgr->magic = 0;
- for (size_t i = 0; i < loopmgr->nloops; i++) {
- isc_loop_t *helper = &loopmgr->helpers[i];
- helper_close(helper);
- }
- isc_mem_cput(loopmgr->mctx, loopmgr->helpers, loopmgr->nloops,
- sizeof(loopmgr->helpers[0]));
-
for (size_t i = 0; i < loopmgr->nloops; i++) {
isc_loop_t *loop = &loopmgr->loops[i];
loop_close(loop);
return loop->shuttingdown;
}
-isc_loop_t *
-isc_loop_helper(isc_loop_t *loop) {
+isc__workthread_t *
+isc__loopmgr_workthread(isc_loop_t *loop, isc_worklane_t lane) {
REQUIRE(VALID_LOOP(loop));
+ REQUIRE(lane < ISC_WORKLANE_COUNT);
+
+ return loop->workthreads[lane];
+}
- return &isc__loopmgr->helpers[loop->tid];
+void
+isc__loopmgr_starting(void) {
+ isc_loopmgr_t *loopmgr = isc__loopmgr;
+ isc_barrier_wait(&loopmgr->starting);
+}
+
+void
+isc__loopmgr_stopping(void) {
+ isc_loopmgr_t *loopmgr = isc__loopmgr;
+ isc_barrier_wait(&loopmgr->stopping);
}
/* safe memory reclamation */
uv_prepare_t quiescent;
+
+ /* thread pools */
+ isc__workthread_t *workthreads[ISC_WORKLANE_COUNT];
};
/*
/* per-thread objects */
isc_loop_t *loops;
- isc_loop_t *helpers;
} isc_loopmgr_t;
/*
#define JOB_MAGIC ISC_MAGIC('J', 'O', 'B', ' ')
#define VALID_JOB(t) ISC_MAGIC_VALID(t, JOB_MAGIC)
-/*
- * Work to be offloaded to an external thread.
- */
-struct isc_work {
- uv_work_t work;
- isc_loop_t *loop;
- isc_work_cb work_cb;
- isc_after_work_cb after_work_cb;
- void *cbarg;
-};
-
#define DEFAULT_LOOP(loopmgr) (&(loopmgr)->loops[0])
#define CURRENT_LOOP(loopmgr) (&(loopmgr)->loops[isc_tid()])
#define LOOP(loopmgr, tid) (&(loopmgr)->loops[tid])
#define ON_LOOP(loop) ((loop) == CURRENT_LOOP((loop)->loopmgr))
+
+isc__workthread_t *
+isc__loopmgr_workthread(isc_loop_t *loop, isc_worklane_t lane);
+
+void
+isc__loopmgr_starting(void);
+void
+isc__loopmgr_stopping(void);
'hash.c',
'hashmap.c',
'heap.c',
- 'helper.c',
'hex.c',
'histo.c',
'ht.c',
* information regarding copyright ownership.
*/
-#include <stdlib.h>
+#include <limits.h>
+#include <stddef.h>
+#include <stdint.h>
-#include <isc/iterated_hash.h>
+#include <isc/async.h>
#include <isc/job.h>
#include <isc/loop.h>
+#include <isc/magic.h>
+#include <isc/queue.h>
+#include <isc/thread.h>
#include <isc/urcu.h>
+#include <isc/util.h>
#include <isc/uv.h>
#include <isc/work.h>
#include "loop_p.h"
+#define WORK_MAGIC ISC_MAGIC('W', 'o', 'r', 'k')
+#define VALID_WORK(t) ISC_MAGIC_VALID(t, WORK_MAGIC)
+#define WORKTHREAD_MAGIC ISC_MAGIC('W', 'k', 'T', 'h')
+#define VALID_WORKTHREAD(t) ISC_MAGIC_VALID(t, WORKTHREAD_MAGIC)
+
+enum waitstate {
+ /* The value a sleeping worker blocks on in FUTEX_WAIT. */
+ THREAD_WAITING = 0,
+ /* Any non-zero bit keeps FUTEX_WAIT from blocking. */
+ THREAD_WAKEUP = (1 << 0),
+ THREAD_RUNNING = (1 << 1),
+ THREAD_SHUTDOWN = (1 << 2),
+ THREAD_PAUSE = (1 << 3), /* request from the owning loop */
+ THREAD_PAUSED = (1 << 4), /* ack from the worker */
+};
+
+/* Sticky bits a paused worker must not drop. */
+#define THREAD_STICKY (THREAD_SHUTDOWN | THREAD_PAUSE | THREAD_PAUSED)
+
+enum workstate {
+ WORK_QUEUED = 0,
+ WORK_RUNNING,
+ WORK_CANCELED,
+};
+
+struct isc_work {
+ unsigned int magic;
+ uint32_t state; /* enum workstate */
+ isc_work_cb cb; /* runs on a worker thread */
+ isc_work_done_cb done_cb; /* runs on the origin loop */
+ void *cbarg;
+ isc_loop_t *loop; /* origin loop, referenced */
+ struct cds_wfcq_node node; /* dispatch queue linkage */
+};
+
+typedef struct isc__workthread {
+ union {
+ struct {
+ unsigned int magic;
+ isc_worklane_t lane;
+ isc_mem_t *mctx;
+ isc_thread_t thread;
+ struct __cds_wfcq_head qhead;
+ int32_t state; /* enum waitstate */
+ };
+ uint8_t __padding0[ISC_OS_CACHELINE_SIZE];
+ };
+ union {
+ struct cds_wfcq_tail qtail;
+ uint8_t __padding1[ISC_OS_CACHELINE_SIZE];
+ };
+} isc__workthread_t;
+
+STATIC_ASSERT(ISC_OS_CACHELINE_SIZE >= sizeof(struct cds_wfcq_tail),
+ "ISC_OS_CACHELINE_SIZE smaller than sizeof(struct "
+ "cds_wfcq_tail)");
+STATIC_ASSERT(offsetof(isc__workthread_t, qtail) == ISC_OS_CACHELINE_SIZE,
+ "isc__workthread_t.qtail not on second cacheline");
+STATIC_ASSERT(sizeof(isc__workthread_t) == 2 * ISC_OS_CACHELINE_SIZE,
+ "isc__workthread_t is not two cachelines");
+
static void
-isc__work_cb(uv_work_t *req) {
- isc_work_t *work = uv_req_get_data((uv_req_t *)req);
+workthread_wake(isc__workthread_t *thread) {
+ cmm_smp_mb();
+ if ((uatomic_load(&thread->state, CMM_RELAXED) & THREAD_RUNNING) != 0) {
+ /* Actively running; it will notice the queue on its own. */
+ return;
+ }
- isc__iterated_hash_initialize();
+ uatomic_or(&thread->state, THREAD_WAKEUP);
+ if (futex_noasync(&thread->state, FUTEX_WAKE, 1, NULL, NULL, 0) < 0) {
+ FATAL_ERROR("futex_noasync(FUTEX_WAKE): %s", strerror(errno));
+ }
+}
- rcu_register_thread();
+static void
+workthread_slumber(isc__workthread_t *thread) {
+ rcu_thread_offline();
+ while (futex_noasync(&thread->state, FUTEX_WAIT, THREAD_WAITING, NULL,
+ NULL, 0) != 0)
+ {
+ if (errno == EWOULDBLOCK) {
+ break;
+ } else if (errno != EINTR) {
+ FATAL_ERROR("futex_noasync(FUTEX_WAIT): %s",
+ strerror(errno));
+ }
+ /* Or retry if interrupted by signal. */
+ }
+ rcu_thread_online();
+}
- work->work_cb(work->cbarg);
+static void
+workthread_sleep(isc__workthread_t *thread) {
+ /*
+ * Drop to WAITING while keeping a pending SHUTDOWN/PAUSE sticky, so the
+ * FUTEX_WAIT below refuses to block once either is signalled.
+ */
+ uatomic_and(&thread->state, THREAD_STICKY);
+ cmm_smp_mb();
- rcu_unregister_thread();
+ /*
+ * The queue is the one wake condition that can't live in 'state', so
+ * recheck it under the fence; SHUTDOWN and WAKEUP are handled by
+ * FUTEX_WAIT's own value check.
+ */
+ if (cds_wfcq_empty(&thread->qhead, &thread->qtail)) {
+ workthread_slumber(thread);
+ }
- isc__iterated_hash_shutdown();
+ /* Tell the waker we are running (keeping any sticky SHUTDOWN/PAUSE). */
+ uatomic_or(&thread->state, THREAD_RUNNING);
}
+/*
+ * Acknowledge a pause request: publish PAUSED (dropping RUNNING/WAKEUP) and
+ * wake the waiting pauser. A new pause clears PAUSED, so the worker re-acks
+ * and the pauser only ever observes an ack set for its own request, never a
+ * stale one from the previous pause generation.
+ */
static void
-isc__after_work_cb(uv_work_t *req, int status) {
- isc_work_t *work = uv_req_get_data((uv_req_t *)req);
+workthread_ack_pause(isc__workthread_t *thread) {
+ int32_t old, next;
+ do {
+ old = uatomic_load(&thread->state, CMM_RELAXED);
+ next = (old & THREAD_STICKY) | THREAD_PAUSED;
+ } while (uatomic_cmpxchg(&thread->state, old, next) != old);
+
+ (void)futex_noasync(&thread->state, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
+}
+
+/*
+ * Honour a pause: pause until the owning loop clears PAUSE (resume). A fresh
+ * pause clears PAUSED (see isc__workthread_pause), so (re-)ack whenever PAUSED
+ * is gone — the pauser only proceeds on an ack set for *its* request, never a
+ * stale one from the previous generation. Stays RCU-offline while paused so
+ * it can't hold up an exclusive-mode grace period.
+ */
+static void
+workthread_pause(isc__workthread_t *thread) {
+ rcu_thread_offline();
+
+ while (true) {
+ int32_t old = uatomic_load(&thread->state, CMM_ACQUIRE);
+ if ((old & (THREAD_PAUSE | THREAD_SHUTDOWN)) != THREAD_PAUSE) {
+ break;
+ }
+ if ((old & THREAD_PAUSED) == 0) {
+ workthread_ack_pause(thread);
+ continue;
+ }
+ (void)futex_noasync(&thread->state, FUTEX_WAIT, old, NULL, NULL,
+ 0);
+ }
+
+ uatomic_and(&thread->state, ~THREAD_PAUSED);
+ rcu_thread_online();
+}
+
+static void
+work_done(void *arg) {
+ isc_work_t *work = arg;
isc_loop_t *loop = work->loop;
+ isc_result_t result = (uatomic_load(&work->state, CMM_ACQUIRE) !=
+ WORK_CANCELED)
+ ? ISC_R_SUCCESS
+ : ISC_R_CANCELED;
- UV_RUNTIME_CHECK(uv_after_work_cb, status);
+ work->done_cb(work->cbarg, result);
- work->after_work_cb(work->cbarg);
+ work->magic = 0;
+ isc_mem_put(work->loop->mctx, work, sizeof(*work));
+ isc_loop_unref(loop);
+}
- isc_mem_put(loop->mctx, work, sizeof(*work));
+static void
+work_run(isc_work_t *work) {
+ /*
+ * The CAS *is* the tombstone check: whoever moves the item out
+ * of WORK_QUEUED first — this worker or isc_work_cancel() —
+ * decides whether the callback runs. uatomic_cmpxchg returns the
+ * prior state, so WORK_QUEUED means we won the race.
+ */
+ uint32_t prev = uatomic_cmpxchg(&work->state, WORK_QUEUED,
+ WORK_RUNNING);
+ switch (prev) {
+ case WORK_QUEUED:
+ work->cb(work->cbarg);
+ break;
+ case WORK_CANCELED:
+ break;
+ default:
+ UNREACHABLE();
+ }
- isc_loop_detach(&loop);
+ /* Completion always routes back to the origin loop. */
+ isc_async_run(work->loop, work_done, work);
}
-void
-isc_work_enqueue(isc_loop_t *loop, isc_work_cb work_cb,
- isc_after_work_cb after_work_cb, void *cbarg) {
- isc_work_t *work = NULL;
- int r;
+static void *
+workthread_thread(void *arg) {
+ isc__workthread_t *thread = arg;
+
+ isc__loopmgr_starting();
+
+ while (true) {
+ /*
+ * Honour a pause before touching the queue (gated on !SHUTDOWN
+ * so a shutting-down worker exits instead of pausing).
+ */
+ int32_t state = uatomic_load(&thread->state, CMM_ACQUIRE);
+ if ((state & (THREAD_PAUSE | THREAD_SHUTDOWN)) == THREAD_PAUSE)
+ {
+ workthread_pause(thread);
+ continue;
+ }
+
+ struct cds_wfcq_node *node;
+ node = __cds_wfcq_dequeue_blocking(&thread->qhead,
+ &thread->qtail);
+
+ if (node == NULL) {
+ /*
+ * Only exit the loop if there's nothing to do.
+ */
+ if ((uatomic_load(&thread->state, CMM_ACQUIRE) &
+ THREAD_SHUTDOWN) != 0)
+ {
+ synchronize_rcu();
+ if (!cds_wfcq_empty(&thread->qhead,
+ &thread->qtail))
+ {
+ continue;
+ }
+ break;
+ }
+
+ workthread_sleep(thread);
+
+ continue;
+ }
+
+ isc_work_t *work = caa_container_of(node, isc_work_t, node);
+ work_run(work);
+ }
+
+ isc__loopmgr_stopping();
- REQUIRE(VALID_LOOP(loop));
- REQUIRE(isc_loop() == loop);
- REQUIRE(work_cb != NULL);
- REQUIRE(after_work_cb != NULL);
+ return NULL;
+}
+
+isc_work_t *
+isc_work_enqueue(isc_loop_t *loop, isc_worklane_t lane, isc_work_cb cb,
+ isc_work_done_cb done_cb, void *cbarg) {
+ REQUIRE(loop == isc_loop());
+
+ isc__workthread_t *thread = isc__loopmgr_workthread(loop, lane);
- work = isc_mem_get(loop->mctx, sizeof(*work));
+ isc_work_t *work = isc_mem_get(loop->mctx, sizeof(*work));
*work = (isc_work_t){
- .work_cb = work_cb,
- .after_work_cb = after_work_cb,
+ .magic = WORK_MAGIC,
+ .cb = cb,
+ .done_cb = done_cb,
.cbarg = cbarg,
+ .loop = isc_loop_ref(loop),
+ .state = WORK_QUEUED,
};
- isc_loop_attach(loop, &work->loop);
+ rcu_read_lock();
+ if ((uatomic_load(&thread->state, CMM_ACQUIRE) & THREAD_SHUTDOWN) != 0)
+ {
+ rcu_read_unlock();
+
+ /*
+ * We are shutting down, so immedaitely run task instead of
+ * adding more in the queue. (The worker is running the
+ * remaining enqueue tasks and shutdown after, see
+ * workthread_thread().)
+ */
+ work_run(work);
+ } else {
+ (void)cds_wfcq_enqueue(&thread->qhead, &thread->qtail,
+ &work->node);
+ rcu_read_unlock();
+
+ if ((uatomic_load(&thread->state, CMM_ACQUIRE) &
+ THREAD_RUNNING) == 0)
+ {
+ workthread_wake(thread);
+ }
+ }
+
+ return work;
+}
+
+bool
+isc_work_cancel(isc_work_t *work) {
+ REQUIRE(VALID_WORK(work));
+
+ /*
+ * Tombstone: QUEUED -> CANCELED. The node stays in the queue
+ * (no interior unlink in a singly-linked lock-free queue) and
+ * is discarded by whichever worker dequeues it; after_cb still
+ * fires with ISC_R_CANCELED. Nothing is freed here. False
+ * means the callback is running or done — uv_cancel semantics.
+ */
+ return uatomic_cmpxchg(&work->state, WORK_QUEUED, WORK_CANCELED) ==
+ WORK_QUEUED;
+}
+
+isc__workthread_t *
+isc__workthread_create(isc_mem_t *mctx, isc_worklane_t lane) {
+ isc__workthread_t *thread = isc_mem_get(mctx, sizeof(*thread));
+
+ *thread = (isc__workthread_t){
+ .lane = lane,
+ .magic = WORKTHREAD_MAGIC,
+ .state = THREAD_WAITING,
+ .mctx = isc_mem_ref(mctx),
+ };
+
+ __cds_wfcq_init(&thread->qhead, &thread->qtail);
+
+ isc_thread_create(workthread_thread, thread, &thread->thread);
+
+ return thread;
+}
+
+void
+isc__workthread_shutdown(isc__workthread_t *thread) {
+ REQUIRE(VALID_WORKTHREAD(thread));
- uv_req_set_data((uv_req_t *)&work->work, work);
+ /*
+ * Not called while the worker is paused by isc__workthread_pause():
+ * shutdown callbacks run from uv loops, and loopmgr pause keeps every
+ * loop out of uv_run() until resume, so PAUSE and SHUTDOWN never
+ * coexist on a worker (the SHUTDOWN checks in the pause path are only
+ * a belt-and-braces exit if that ever changed).
+ */
+
+ /* Set the sticky SHUTDOWN bit once; bail if already shutting down. */
+ int32_t old;
+ do {
+ old = uatomic_load(&thread->state, CMM_RELAXED);
+ if ((old & THREAD_SHUTDOWN) != 0) {
+ return;
+ }
+ } while (uatomic_cmpxchg(&thread->state, old, old | THREAD_SHUTDOWN) !=
+ old);
+
+ /* Fence in-flight enqueues (which touch the queue) before draining. */
+ synchronize_rcu();
+
+ workthread_wake(thread);
+}
+
+void
+isc__workthread_destroy(isc__workthread_t **threadp) {
+ REQUIRE(threadp != NULL && VALID_WORKTHREAD(*threadp));
+ isc__workthread_t *thread = MOVE_OWNERSHIP(*threadp);
+
+ isc_thread_join(thread->thread, NULL);
+
+ INSIST(cds_wfcq_empty(&thread->qhead, &thread->qtail));
+
+ thread->magic = 0;
+ isc_mem_putanddetach(&thread->mctx, thread, sizeof(*thread));
+}
+
+void
+isc__workthread_pause(isc__workthread_t *thread) {
+ REQUIRE(VALID_WORKTHREAD(thread));
+
+ /*
+ * Request a pause, but only if not already shutting down — a
+ * shutting-down worker heads for the stopping barrier and must never
+ * be waited on here (that'd be a deadlock). Clearing PAUSED as we set
+ * PAUSE invalidates any ack left over from the previous generation, so
+ * the wait below can only succeed on an ack for this request.
+ */
+ int32_t old;
+ do {
+ old = uatomic_load(&thread->state, CMM_RELAXED);
+ if ((old & THREAD_SHUTDOWN) != 0) {
+ return;
+ }
+ } while (uatomic_cmpxchg(&thread->state, old,
+ (old | THREAD_PAUSE) & ~THREAD_PAUSED) != old);
+
+ workthread_wake(thread);
+
+ /*
+ * Wait for the worker to acknowledge (PAUSED, form workthread_thread()
+ * calling workthread_pause()) or for shutdown.
+ */
+ while (true) {
+ old = uatomic_load(&thread->state, CMM_ACQUIRE);
+ if ((old & (THREAD_PAUSED | THREAD_SHUTDOWN)) != 0) {
+ return;
+ }
+ (void)futex_noasync(&thread->state, FUTEX_WAIT, old, NULL, NULL,
+ 0);
+ }
+}
+
+void
+isc__workthread_resume(isc__workthread_t *thread) {
+ REQUIRE(VALID_WORKTHREAD(thread));
- r = uv_queue_work(&loop->loop, &work->work, isc__work_cb,
- isc__after_work_cb);
- UV_RUNTIME_CHECK(uv_queue_work, r);
+ /* Clear the request and wake the paused worker. */
+ uatomic_and(&thread->state, ~THREAD_PAUSE);
+ (void)futex_noasync(&thread->state, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
urcu_dep += cc.find_library('liburcu-common')
endif
+foreach h : [
+ 'urcu/uatomic.h',
+ 'urcu/assert.h',
+ 'urcu/pointer.h',
+]
+ if cc.has_header(h, dependencies: urcu_dep)
+ config.set('HAVE_@0@'.format(h.underscorify().to_upper()), 1)
+ endif
+endforeach
+
config.set_quoted('RCU_FLAVOR', f'liburcu-@rcu_flavor@')
config.set_quoted('RCU_VERSION', urcu_dep[0].version())
}
static void
-after_work_cb(void *arg ISC_ATTR_UNUSED) {
+after_work_cb(void *arg ISC_ATTR_UNUSED, isc_result_t result ISC_ATTR_UNUSED) {
assert_int_equal(atomic_load(&scheduled), 1);
isc_loopmgr_shutdown();
}
static void
work_enqueue_cb(void *arg ISC_ATTR_UNUSED) {
- isc_work_enqueue(isc_loop(), work_cb, after_work_cb, NULL);
+ isc_work_enqueue(isc_loop(), ISC_WORKLANE_FAST, work_cb, after_work_cb,
+ NULL);
}
ISC_RUN_TEST_IMPL(isc_work_enqueue) {