#include <isc/refcount.h>
#include <isc/result.h>
#include <isc/signal.h>
-#include <isc/stack.h>
#include <isc/strerr.h>
#include <isc/thread.h>
#include <isc/util.h>
isc_job_t *job = isc_mem_get(loop->mctx, sizeof(*job));
*job = (isc_job_t){
- .link = ISC_LINK_INITIALIZER,
.cb = cb,
.cbarg = cbarg,
};
+ cds_wfcq_node_init(&job->wfcq_node);
+
/*
- * Now send the half-initialized job to the loop queue.
+ * cds_wfcq_enqueue() is non-blocking and enqueues the job to async
+ * queue.
+ *
+ * The function returns 'false' in case the queue was empty - in such
+ * case we need to trigger the async callback.
*/
- ISC_ASTACK_PUSH(loop->async_jobs, job, link);
-
- int r = uv_async_send(&loop->async_trigger);
- UV_RUNTIME_CHECK(uv_async_send, r);
+ __tsan_release(job);
+ if (!cds_wfcq_enqueue(&loop->async_jobs.head, &loop->async_jobs.tail,
+ &job->wfcq_node))
+ {
+ int r = uv_async_send(&loop->async_trigger);
+ UV_RUNTIME_CHECK(uv_async_send, r);
+ }
}
void
isc__async_cb(uv_async_t *handle) {
isc_loop_t *loop = uv_handle_get_data(handle);
+ isc_jobqueue_t jobs;
REQUIRE(VALID_LOOP(loop));
- ISC_STACK(isc_job_t) drain = ISC_ASTACK_TO_STACK(loop->async_jobs);
- ISC_LIST(isc_job_t) jobs = ISC_LIST_INITIALIZER;
-
- isc_job_t *job = ISC_STACK_POP(drain, link);
- isc_job_t *next = NULL;
- while (job != NULL) {
- ISC_LIST_PREPEND(jobs, job, link);
+ /* Initialize local wfcqueue */
+ __cds_wfcq_init(&jobs.head, &jobs.tail);
- job = ISC_STACK_POP(drain, link);
+ /*
+ * Move all the elements from loop->async_jobs to a local jobs queue.
+ *
+ * __cds_wfcq_splice_blocking() assumes that synchronization is
+ * done externally - there's no internal locking, unlike
+ * cds_wfcq_splice_blocking(), and we do not need to check whether
+ * it needs to block, unlike __cds_wfcq_splice_nonblocking().
+ *
+ * The reason we can use __cds_wfcq_splice_blocking() is that the
+ * only other function we use is cds_wfcq_enqueue() which doesn't
+ * require any synchronization (see the table in urcu/wfcqueue.h
+ * for more details).
+ */
+ enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking(
+ &jobs.head, &jobs.tail, &loop->async_jobs.head,
+ &loop->async_jobs.tail);
+ INSIST(ret != CDS_WFCQ_RET_WOULDBLOCK);
+ if (ret == CDS_WFCQ_RET_SRC_EMPTY) {
+ /*
+ * Nothing to do, the source queue was empty - most
+ * probably we were called from isc__async_close() below.
+ */
+ return;
}
- for (job = ISC_LIST_HEAD(jobs),
- next = (job ? ISC_LIST_NEXT(job, link) : NULL);
- job != NULL;
- job = next, next = (job ? ISC_LIST_NEXT(job, link) : NULL))
- {
+ /*
+ * Walk through the local queue which has now all the members copied
+ * locally, and call the callbacks and free all the isc_job_t(s).
+ */
+ struct cds_wfcq_node *node, *next;
+ __cds_wfcq_for_each_blocking_safe(&jobs.head, &jobs.tail, node, next) {
+ isc_job_t *job = caa_container_of(node, isc_job_t, wfcq_node);
+ __tsan_acquire(job);
+
job->cb(job->cbarg);
isc_mem_put(loop->mctx, job, sizeof(*job));
#include <isc/job.h>
#include <isc/loop.h>
#include <isc/mem.h>
-#include <isc/stack.h>
#include <isc/uv.h>
-typedef ISC_ASTACK(isc_job_t) isc_asyncstack_t;
-
void
isc__async_cb(uv_async_t *handle);
#include <isc/mem.h>
#include <isc/refcount.h>
#include <isc/types.h>
+#include <isc/urcu.h>
typedef void (*isc_job_cb)(void *);
typedef struct isc_job isc_job_t;
struct isc_job {
isc_job_cb cb;
void *cbarg;
- ISC_LINK(isc_job_t) link;
+ union {
+ struct cds_wfcq_node wfcq_node;
+ ISC_LINK(isc_job_t) link;
+ };
};
-#define ISC_JOB_INITIALIZER \
- { \
- .link = ISC_LINK_INITIALIZER \
+#define ISC_JOB_INITIALIZER \
+ { \
+ .link = ISC_LINK_INITIALIZER, \
}
ISC_LANG_BEGINDECLS
* yet been started.
*/
-void
-isc_loop_nosetup(isc_loop_t *loop, isc_job_t *job);
-void
-isc_loop_noteardown(isc_loop_t *loop, isc_job_t *job);
-
void
isc_loopmgr_setup(isc_loopmgr_t *loopmgr, isc_job_cb cb, void *cbarg);
void
job->cb = cb;
job->cbarg = cbarg;
+ ISC_LINK_INIT(job, link);
ISC_LIST_APPEND(loop->run_jobs, job, link);
}
#pragma once
+#include <isc/align.h>
#include <isc/job.h>
#include <isc/loop.h>
+#include <isc/os.h>
#include <isc/uv.h>
+/*%
+ * NOTE: We are using struct __cds_wfcq_head that doesn't have an internal
+ * mutex, because we are only using enqueue and splice, and those don't need
+ * any synchronization (see urcu/wfcqueue.h for detailed description).
+ */
+typedef struct isc_jobqueue {
+ alignas(ISC_OS_CACHELINE_SIZE) struct __cds_wfcq_head head;
+ alignas(ISC_OS_CACHELINE_SIZE) struct cds_wfcq_tail tail;
+} isc_jobqueue_t;
+
typedef ISC_LIST(isc_job_t) isc_joblist_t;
void
static void
shutdown_cb(uv_async_t *handle) {
- isc_job_t *job = NULL;
isc_loop_t *loop = uv_handle_get_data(handle);
isc_loopmgr_t *loopmgr = loop->loopmgr;
isc_signal_destroy(&loopmgr->sigint);
}
- job = ISC_LIST_TAIL(loop->teardown_jobs);
- while (job != NULL) {
- isc_job_t *prev = ISC_LIST_PREV(job, link);
- ISC_LIST_UNLINK(loop->teardown_jobs, job, link);
-
- job->cb(job->cbarg);
-
- isc_mem_put(loop->mctx, job, sizeof(*job));
-
- job = prev;
- }
+ enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking(
+ &loop->async_jobs.head, &loop->async_jobs.tail,
+ &loop->teardown_jobs.head, &loop->teardown_jobs.tail);
+ INSIST(ret != CDS_WFCQ_RET_WOULDBLOCK);
+ int r = uv_async_send(&loop->async_trigger);
+ UV_RUNTIME_CHECK(uv_async_send, r);
}
static void
*loop = (isc_loop_t){
.tid = tid,
.loopmgr = loopmgr,
- .async_jobs = ISC_ASTACK_INITIALIZER,
.run_jobs = ISC_LIST_INITIALIZER,
- .setup_jobs = ISC_LIST_INITIALIZER,
- .teardown_jobs = ISC_LIST_INITIALIZER,
};
+ __cds_wfcq_init(&loop->async_jobs.head, &loop->async_jobs.tail);
+ __cds_wfcq_init(&loop->setup_jobs.head, &loop->setup_jobs.tail);
+ __cds_wfcq_init(&loop->teardown_jobs.head, &loop->teardown_jobs.tail);
+
int r = uv_loop_init(&loop->loop);
UV_RUNTIME_CHECK(uv_loop_init, r);
loop->magic = LOOP_MAGIC;
}
-static void
-setup_jobs_cb(void *arg) {
- isc_loop_t *loop = arg;
- isc_job_t *job = ISC_LIST_HEAD(loop->setup_jobs);
-
- while (job != NULL) {
- isc_job_t *next = ISC_LIST_NEXT(job, link);
- ISC_LIST_UNLINK(loop->setup_jobs, job, link);
-
- job->cb(job->cbarg);
-
- isc_mem_put(loop->mctx, job, sizeof(*job));
-
- job = next;
- }
-}
-
static void
quiescent_cb(uv_prepare_t *handle) {
isc__qsbr_quiescent_cb(handle);
int r = uv_loop_close(&loop->loop);
UV_RUNTIME_CHECK(uv_loop_close, r);
- INSIST(ISC_ASTACK_EMPTY(loop->async_jobs));
+ INSIST(cds_wfcq_empty(&loop->async_jobs.head, &loop->async_jobs.tail));
INSIST(ISC_LIST_EMPTY(loop->run_jobs));
loop->magic = 0;
isc_barrier_wait(&loop->loopmgr->starting);
- isc_async_run(loop, setup_jobs_cb, loop);
+ enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking(
+ &loop->async_jobs.head, &loop->async_jobs.tail,
+ &loop->setup_jobs.head, &loop->setup_jobs.tail);
+ INSIST(ret != CDS_WFCQ_RET_WOULDBLOCK);
+
+ r = uv_async_send(&loop->async_trigger);
+ UV_RUNTIME_CHECK(uv_async_send, r);
r = uv_run(&loop->loop, UV_RUN_DEFAULT);
UV_RUNTIME_CHECK(uv_run, r);
return (NULL);
}
-void
-isc_loop_nosetup(isc_loop_t *loop, isc_job_t *job) {
- ISC_LIST_DEQUEUE(loop->setup_jobs, job, link);
-}
-
-void
-isc_loop_noteardown(isc_loop_t *loop, isc_job_t *job) {
- ISC_LIST_DEQUEUE(loop->teardown_jobs, job, link);
-}
-
/**
* Public
*/
*job = (isc_job_t){
.cb = cb,
.cbarg = cbarg,
- .link = ISC_LINK_INITIALIZER,
};
+ cds_wfcq_node_init(&job->wfcq_node);
+
REQUIRE(loop->tid == isc_tid() || !atomic_load(&loopmgr->running) ||
atomic_load(&loopmgr->paused));
- ISC_LIST_APPEND(loop->setup_jobs, job, link);
+ cds_wfcq_enqueue(&loop->setup_jobs.head, &loop->setup_jobs.tail,
+ &job->wfcq_node);
return (job);
}
*job = (isc_job_t){
.cb = cb,
.cbarg = cbarg,
- .link = ISC_LINK_INITIALIZER,
};
+ cds_wfcq_node_init(&job->wfcq_node);
REQUIRE(loop->tid == isc_tid() || !atomic_load(&loopmgr->running) ||
atomic_load(&loopmgr->paused));
- ISC_LIST_APPEND(loop->teardown_jobs, job, link);
+ cds_wfcq_enqueue(&loop->teardown_jobs.head, &loop->teardown_jobs.tail,
+ &job->wfcq_node);
return (job);
}
#include <isc/refcount.h>
#include <isc/result.h>
#include <isc/signal.h>
-#include <isc/stack.h>
#include <isc/thread.h>
#include <isc/types.h>
+#include <isc/urcu.h>
#include <isc/uv.h>
#include <isc/work.h>
/* Async queue */
uv_async_t async_trigger;
- isc_asyncstack_t async_jobs;
+ isc_jobqueue_t async_jobs;
/* Jobs queue */
uv_idle_t run_trigger;
/* Shutdown */
uv_async_t shutdown_trigger;
- isc_joblist_t setup_jobs;
- isc_joblist_t teardown_jobs;
+ isc_jobqueue_t setup_jobs;
+ isc_jobqueue_t teardown_jobs;
/* Destroy */
uv_async_t destroy_trigger;
.active_handles = ISC_LIST_INITIALIZER,
.active_link = ISC_LINK_INITIALIZER,
.active = true,
- .job = ISC_JOB_INITIALIZER,
- .quotacb = ISC_JOB_INITIALIZER,
};
if (iface != NULL) {
isc__networker_attach(worker, &sock->worker);
sock->uv_handle.handle.data = sock;
- ISC_LINK_INIT(&sock->quotacb, link);
-
switch (type) {
case isc_nm_udpsocket:
case isc_nm_udplistener:
.magic = NMHANDLE_MAGIC,
.active_link = ISC_LINK_INITIALIZER,
.inactive_link = ISC_LINK_INITIALIZER,
- .job = ISC_JOB_INITIALIZER,
};
isc_refcount_init(&handle->references, 1);
.connect_tries = 3,
.link = ISC_LINK_INITIALIZER,
.active_link = ISC_LINK_INITIALIZER,
- .job = ISC_JOB_INITIALIZER,
.magic = UVREQ_MAGIC,
};
uv_handle_set_data(&req->uv_req.handle, req);