#include <sys/types.h>
#include <unistd.h>
+#include <isc/async.h>
#include <isc/attributes.h>
#include <isc/base64.h>
#include <isc/buffer.h>
ns_client_request, ifp, accept_cb, ifp, 10,
NULL, NULL, &ifp->tcplistensocket));
ifp->flags |= NS_INTERFACEFLAG_LISTENING;
- isc_job_run(loopmgr, sendquery, ifp->tcplistensocket);
+ isc_async_run(isc_loop_current(loopmgr), sendquery,
+ ifp->tcplistensocket);
return;
#include <stdlib.h>
#include <unistd.h>
+#include <isc/async.h>
#include <isc/attributes.h>
#include <isc/buffer.h>
#include <isc/commandline.h>
#include <isc/condition.h>
-#include <isc/job.h>
#include <isc/loop.h>
#include <isc/netaddr.h>
#include <isc/parseint.h>
static void
process_next_command(void *arg __attribute__((__unused__))) {
+ isc_loop_t *loop = isc_loop_main(loopmgr);
if (cmdline == NULL) {
in_use = false;
} else {
do_next_command(cmdline);
if (ISC_LIST_HEAD(lookup_list) != NULL) {
- isc_job_run(loopmgr, run_loop, NULL);
+ isc_async_run(loop, run_loop, NULL);
return;
}
}
#include <time.h>
#include <unistd.h>
+#include <isc/async.h>
#include <isc/atomic.h>
#include <isc/attributes.h>
#include <isc/base32.h>
#include <isc/file.h>
#include <isc/hash.h>
#include <isc/hex.h>
-#include <isc/job.h>
#include <isc/loop.h>
#include <isc/managers.h>
#include <isc/md.h>
lock_and_dumpnode(dns_fixedname_name(&fname), node);
dns_db_detachnode(gdb, &node);
- isc_job_run(loopmgr, assignwork, NULL);
+ isc_async_run(isc_loop_current(loopmgr), assignwork, NULL);
}
/*%
#include <stdlib.h>
#include <unistd.h>
+#include <isc/async.h>
#include <isc/attributes.h>
#include <isc/base64.h>
#include <isc/buffer.h>
#include <isc/file.h>
#include <isc/getaddresses.h>
#include <isc/hash.h>
-#include <isc/job.h>
#include <isc/lex.h>
#include <isc/log.h>
#include <isc/loop.h>
done_update(void) {
ddebug("done_update()");
- isc_job_run(loopmgr, getinput, NULL);
+ isc_async_run(isc_loop_main(loopmgr), getinput, NULL);
}
static void
struct dns_validator {
unsigned int magic;
dns_view_t *view;
- isc_loopmgr_t *loopmgr;
+ isc_loop_t *loop;
uint32_t tid;
isc_refcount_t references;
dns_validator_create(dns_view_t *view, dns_name_t *name, dns_rdatatype_t type,
dns_rdataset_t *rdataset, dns_rdataset_t *sigrdataset,
dns_message_t *message, unsigned int options,
- isc_loopmgr_t *loop, isc_job_cb cb, void *arg,
+ isc_loop_t *loop, isc_job_cb cb, void *arg,
dns_validator_t **validatorp);
/*%<
* Start a DNSSEC validation.
result = dns_validator_create(
fctx->res->view, name, type, rdataset, sigrdataset, message,
- valoptions, fctx->res->loopmgr, validated, valarg, &validator);
+ valoptions, fctx->loop, validated, valarg, &validator);
RUNTIME_CHECK(result == ISC_R_SUCCESS);
inc_stats(fctx->res, dns_resstatscounter_val);
if ((valoptions & DNS_VALIDATOR_DEFER) == 0) {
if (new_fctx) {
fetchctx_ref(fctx);
- isc_job_run(res->loopmgr, (isc_job_cb)fctx_start, fctx);
+ isc_async_run(fctx->loop, (isc_job_cb)fctx_start, fctx);
}
unlock:
val->result = result;
dns_validator_ref(val);
- isc_job_run(val->loopmgr, validator_done_cb, val);
+ isc_async_run(val->loop, validator_done_cb, val);
}
/*%
dns_validator_ref(val);
return (dns_resolver_createfetch(
val->view->resolver, name, type, NULL, NULL, NULL, NULL, 0,
- fopts, 0, NULL, isc_loop_current(val->loopmgr), callback, val,
- &val->frdataset, &val->fsigrdataset, &val->fetch));
+ fopts, 0, NULL, val->loop, callback, val, &val->frdataset,
+ &val->fsigrdataset, &val->fetch));
}
/*%
validator_logcreate(val, name, type, caller, "validator");
result = dns_validator_create(val->view, name, type, rdataset, sig,
- NULL, vopts, val->loopmgr, cb, val,
+ NULL, vopts, val->loop, cb, val,
&val->subvalidator);
if (result == ISC_R_SUCCESS) {
dns_validator_attach(val, &val->subvalidator->parent);
dns_validator_create(dns_view_t *view, dns_name_t *name, dns_rdatatype_t type,
dns_rdataset_t *rdataset, dns_rdataset_t *sigrdataset,
dns_message_t *message, unsigned int options,
- isc_loopmgr_t *loopmgr, isc_job_cb cb, void *arg,
+ isc_loop_t *loop, isc_job_cb cb, void *arg,
dns_validator_t **validatorp) {
isc_result_t result = ISC_R_FAILURE;
dns_validator_t *val = NULL;
.type = type,
.options = options,
.link = ISC_LINK_INITIALIZER,
- .loopmgr = loopmgr,
+ .loop = loop,
.cb = cb,
.arg = arg };
if ((options & DNS_VALIDATOR_DEFER) == 0) {
dns_validator_ref(val);
- isc_job_run(val->loopmgr, validator_start, val);
+ isc_async_run(val->loop, validator_start, val);
}
*validatorp = val;
validator->options &= ~DNS_VALIDATOR_DEFER;
dns_validator_ref(validator);
- isc_job_run(validator->loopmgr, validator_start, validator);
+ isc_async_run(validator->loop, validator_start, validator);
}
void
ascii.c \
assertions.c \
async.c \
+ async_p.h \
backtrace.c \
base32.c \
base64.c \
#include <isc/uv.h>
#include <isc/work.h>
+#include "async_p.h"
#include "job_p.h"
#include "loop_p.h"
void
isc_async_run(isc_loop_t *loop, isc_job_cb cb, void *cbarg) {
- int r;
- isc_job_t *job = NULL;
-
REQUIRE(VALID_LOOP(loop));
REQUIRE(cb != NULL);
- job = isc__job_new(loop, cb, cbarg);
+ isc_job_t *job = isc_mem_get(loop->mctx, sizeof(*job));
+ *job = (isc_job_t){
+ .link = ISC_LINK_INITIALIZER,
+ .cb = cb,
+ .cbarg = cbarg,
+ };
/*
* Now send the half-initialized job to the loop queue.
- *
- * The ISC_ASTACK_PUSH is counterintuitive here, but uv_idle
- * drains its queue backwards, so if there's more than one event
- * to be processed then they need to be in reverse order.
*/
- ISC_ASTACK_PUSH(loop->queue_jobs, job, link);
+ ISC_ASTACK_PUSH(loop->async_jobs, job, link);
- r = uv_async_send(&loop->queue_trigger);
+ int r = uv_async_send(&loop->async_trigger);
UV_RUNTIME_CHECK(uv_async_send, r);
}
+
+void
+isc__async_cb(uv_async_t *handle) {
+ isc_loop_t *loop = uv_handle_get_data(handle);
+
+ REQUIRE(VALID_LOOP(loop));
+
+ ISC_STACK(isc_job_t) drain = ISC_ASTACK_TO_STACK(loop->async_jobs);
+ ISC_LIST(isc_job_t) jobs = ISC_LIST_INITIALIZER;
+
+ isc_job_t *job = ISC_STACK_POP(drain, link);
+ isc_job_t *next = NULL;
+ while (job != NULL) {
+ ISC_LIST_PREPEND(jobs, job, link);
+
+ job = ISC_STACK_POP(drain, link);
+ }
+
+ for (job = ISC_LIST_HEAD(jobs),
+ next = (job ? ISC_LIST_NEXT(job, link) : NULL);
+ job != NULL;
+ job = next, next = (job ? ISC_LIST_NEXT(job, link) : NULL))
+ {
+ job->cb(job->cbarg);
+
+ isc_mem_put(loop->mctx, job, sizeof(*job));
+ }
+}
+
+void
+isc__async_close(uv_handle_t *handle) {
+ isc_loop_t *loop = uv_handle_get_data(handle);
+
+ isc__async_cb(&loop->async_trigger);
+}
--- /dev/null
+/*
+ * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
+ *
+ * SPDX-License-Identifier: MPL-2.0
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, you can obtain one at https://mozilla.org/MPL/2.0/.
+ *
+ * See the COPYRIGHT file distributed with this work for additional
+ * information regarding copyright ownership.
+ */
+
+#pragma once
+
+#include <isc/async.h>
+#include <isc/job.h>
+#include <isc/loop.h>
+#include <isc/mem.h>
+#include <isc/stack.h>
+#include <isc/uv.h>
+
+typedef ISC_ASTACK(isc_job_t) isc_asyncstack_t;
+
+void
+isc__async_cb(uv_async_t *handle);
+
+void
+isc__async_close(uv_handle_t *handle);
* \brief The isc_async unit provides a way to schedule jobs on any isc
* event loop (isc_loop unit)
*
- * The unit is built around the uv_async_t primitive and locked list with
+ * The unit is built around the uv_async_t primitive and lock-free stack with
* isc_job_cb. Jobs are first scheduled onto the locked list, then the
* uv_async_send() is called and the uv_async_t callback processes the enqueued
* jobs are scheduled to be run on the isc event loop.
#include <isc/types.h>
typedef void (*isc_job_cb)(void *);
+typedef struct isc_job isc_job_t;
+
+struct isc_job {
+ isc_job_cb cb;
+ void *cbarg;
+ ISC_LINK(isc_job_t) link;
+};
+
+#define ISC_JOB_INITIALIZER \
+ { \
+ .link = ISC_LINK_INITIALIZER \
+ }
ISC_LANG_BEGINDECLS
void
-isc_job_run(isc_loopmgr_t *loopmgr, isc_job_cb cb, void *cbarg);
+isc_job_run(isc_loop_t *loop, isc_job_t *job, isc_job_cb cb, void *cbarg);
/*%<
* Schedule the job callback 'cb' to be run on the currently
* running event loop.
*
- * Note: Because of the design of uv_idle_start(), if more than one
- * job is posted at once, the jobs will be pushed onto a stack and
- * executed in last-in-first-out order. To post events that are
- * executed in order posted, use isc_async_run() instead.
- *
* Requires:
*
- *\li 'loopmgr' is the active loop manager.
+ *\li 'loop' is the current loop
+ *\li 'job' is initialized
*\li 'cb' is a callback function, must be non-NULL
*\li 'cbarg' is passed to the 'cb' as the only argument, may be NULL
*/
#include <inttypes.h>
+#include <isc/job.h>
#include <isc/lang.h>
#include <isc/mem.h>
#include <isc/refcount.h>
typedef void(isc_httpdondestroy_t)(void *); /*%< Callback on destroying httpd */
typedef struct isc_interface isc_interface_t; /*%< Interface */
typedef struct isc_interfaceiter isc_interfaceiter_t; /*%< Interface Iterator */
-typedef struct isc_job isc_job_t;
-typedef struct isc_lex isc_lex_t; /*%< Lex */
-typedef struct isc_log isc_log_t; /*%< Log */
-typedef struct isc_logcategory isc_logcategory_t; /*%< Log Category */
-typedef struct isc_logconfig isc_logconfig_t; /*%< Log Configuration */
-typedef struct isc_logmodule isc_logmodule_t; /*%< Log Module */
-typedef struct isc_loop isc_loop_t; /*%< Event loop */
-typedef struct isc_loopmgr isc_loopmgr_t; /*%< Event loop manager */
-typedef struct isc_mem isc_mem_t; /*%< Memory */
-typedef struct isc_mempool isc_mempool_t; /*%< Memory Pool */
-typedef struct isc_netaddr isc_netaddr_t; /*%< Net Address */
-typedef struct isc_netprefix isc_netprefix_t; /*%< Net Prefix */
-typedef struct isc_nm isc_nm_t; /*%< Network manager */
+typedef struct isc_lex isc_lex_t; /*%< Lex */
+typedef struct isc_log isc_log_t; /*%< Log */
+typedef struct isc_logcategory isc_logcategory_t; /*%< Log Category */
+typedef struct isc_logconfig isc_logconfig_t; /*%< Log Configuration */
+typedef struct isc_logmodule isc_logmodule_t; /*%< Log Module */
+typedef struct isc_loop isc_loop_t; /*%< Event loop */
+typedef struct isc_loopmgr isc_loopmgr_t; /*%< Event loop manager */
+typedef struct isc_mem isc_mem_t; /*%< Memory */
+typedef struct isc_mempool isc_mempool_t; /*%< Memory Pool */
+typedef struct isc_netaddr isc_netaddr_t; /*%< Net Address */
+typedef struct isc_netprefix isc_netprefix_t; /*%< Net Prefix */
+typedef struct isc_nm isc_nm_t; /*%< Network manager */
typedef struct isc_nmsocket isc_nmsocket_t; /*%< Network manager socket */
typedef struct isc_nmhandle isc_nmhandle_t; /*%< Network manager handle */
typedef struct isc_portset isc_portset_t; /*%< Port Set */
#include "job_p.h"
#include "loop_p.h"
-#define JOB_MAGIC ISC_MAGIC('J', 'O', 'B', ' ')
-#define VALID_JOB(t) ISC_MAGIC_VALID(t, JOB_MAGIC)
-
-/*
- * Private: static
- */
-
-static void
-isc__job_close_cb(uv_handle_t *handle) {
- isc_job_t *job = uv_handle_get_data(handle);
- isc_loop_t *loop = job->loop;
-
- REQUIRE(loop == isc_loop_current(job->loop->loopmgr));
-
- isc_mem_put(loop->mctx, job, sizeof(*job));
-
- isc_loop_detach(&loop);
-}
-
-static void
-isc__job_destroy(isc_job_t *job) {
- REQUIRE(VALID_JOB(job));
- REQUIRE(job->loop == isc_loop_current(job->loop->loopmgr));
-
- job->magic = 0;
-
- uv_close(&job->idle, isc__job_close_cb);
-}
-
-static void
-isc__job_cb(uv_idle_t *idle) {
- isc_job_t *job = uv_handle_get_data(idle);
- int r;
-
- REQUIRE(job->loop == isc_loop_current(job->loop->loopmgr));
-
- job->cb(job->cbarg);
-
- r = uv_idle_stop(idle);
- UV_RUNTIME_CHECK(uv_idle_stop, r);
-
- isc__job_destroy(job);
-}
-
/*
* Public: #include <isc/job.h>
*/
void
-isc_job_run(isc_loopmgr_t *loopmgr, isc_job_cb cb, void *cbarg) {
- isc_loop_t *loop = isc_loop_current(loopmgr);
- isc_job_t *job = isc__job_new(loop, cb, cbarg);
- isc__job_init(loop, job);
- isc__job_run(job);
+isc_job_run(isc_loop_t *loop, isc_job_t *job, isc_job_cb cb, void *cbarg) {
+ if (ISC_LIST_EMPTY(loop->run_jobs)) {
+ uv_idle_start(&loop->run_trigger, isc__job_cb);
+ }
+
+ job->cb = cb;
+ job->cbarg = cbarg;
+
+ ISC_LIST_APPEND(loop->run_jobs, job, link);
}
/*
* Protected: #include <job_p.h>
*/
-isc_job_t *
-isc__job_new(isc_loop_t *loop, isc_job_cb cb, void *cbarg) {
- isc_job_t *job = NULL;
-
- REQUIRE(VALID_LOOP(loop));
- REQUIRE(cb != NULL);
-
- job = isc_mem_get(loop->mctx, sizeof(*job));
- *job = (isc_job_t){
- .magic = JOB_MAGIC,
- .cb = cb,
- .cbarg = cbarg,
- .link = ISC_LINK_INITIALIZER,
- };
-
- isc_loop_attach(loop, &job->loop);
-
- return (job);
-}
-
void
-isc__job_init(isc_loop_t *loop, isc_job_t *job) {
- int r = uv_idle_init(&loop->loop, &job->idle);
- UV_RUNTIME_CHECK(uv_idle_init, r);
- uv_handle_set_data(&job->idle, job);
+isc__job_cb(uv_idle_t *handle) {
+ isc_loop_t *loop = uv_handle_get_data(handle);
+ ISC_LIST(isc_job_t) jobs = ISC_LIST_INITIALIZER;
+
+ ISC_LIST_MOVE(jobs, loop->run_jobs);
+
+ isc_job_t *job, *next;
+ for (job = ISC_LIST_HEAD(jobs),
+ next = (job != NULL) ? ISC_LIST_NEXT(job, link) : NULL;
+ job != NULL;
+ job = next, next = job ? ISC_LIST_NEXT(job, link) : NULL)
+ {
+ ISC_LIST_UNLINK(jobs, job, link);
+ job->cb(job->cbarg);
+ }
+
+ if (ISC_LIST_EMPTY(loop->run_jobs)) {
+ uv_idle_stop(&loop->run_trigger);
+ }
}
void
-isc__job_run(isc_job_t *job) {
- int r;
-
- REQUIRE(VALID_JOB(job));
- REQUIRE(job->loop == isc_loop_current(job->loop->loopmgr));
+isc__job_close(uv_handle_t *handle) {
+ isc_loop_t *loop = uv_handle_get_data(handle);
- r = uv_idle_start(&job->idle, isc__job_cb);
- UV_RUNTIME_CHECK(uv_idle_start, r);
+ isc__job_cb(&loop->run_trigger);
}
#include <isc/job.h>
#include <isc/loop.h>
+#include <isc/uv.h>
-isc_job_t *
-isc__job_new(isc_loop_t *loop, isc_job_cb cb, void *cbarg);
+typedef ISC_LIST(isc_job_t) isc_joblist_t;
void
-isc__job_init(isc_loop_t *loop, isc_job_t *job);
+isc__job_cb(uv_idle_t *handle);
void
-isc__job_run(isc_job_t *job);
+isc__job_close(uv_handle_t *handle);
#include <isc/uv.h>
#include <isc/work.h>
+#include "async_p.h"
#include "job_p.h"
#include "loop_p.h"
destroy_cb(uv_async_t *handle) {
isc_loop_t *loop = uv_handle_get_data(handle);
+ /* Again, the first close callback here is called last */
+ uv_close(&loop->async_trigger, isc__async_close);
+ uv_close(&loop->run_trigger, isc__job_close);
uv_close(&loop->destroy_trigger, NULL);
- uv_close(&loop->queue_trigger, NULL);
uv_close(&loop->pause_trigger, NULL);
uv_close(&loop->wakeup_trigger, NULL);
uv_close(&loop->quiescent, NULL);
isc_job_t *prev = ISC_LIST_PREV(job, link);
ISC_LIST_UNLINK(loop->teardown_jobs, job, link);
- isc__job_run(job);
+ job->cb(job->cbarg);
- job = prev;
- }
-}
-
-static void
-queue_cb(uv_async_t *handle) {
- isc_loop_t *loop = uv_handle_get_data(handle);
-
- REQUIRE(VALID_LOOP(loop));
-
- ISC_STACK(isc_job_t) drain = ISC_ASTACK_TO_STACK(loop->queue_jobs);
- isc_job_t *job = ISC_STACK_POP(drain, link);
+ isc_mem_put(loop->mctx, job, sizeof(*job));
- while (job != NULL) {
- isc__job_init(loop, job);
- isc__job_run(job);
-
- job = ISC_STACK_POP(drain, link);
+ job = prev;
}
}
*loop = (isc_loop_t){
.tid = tid,
.loopmgr = loopmgr,
- .queue_jobs = ISC_ASTACK_INITIALIZER,
+ .async_jobs = ISC_ASTACK_INITIALIZER,
+ .run_jobs = ISC_LIST_INITIALIZER,
.setup_jobs = ISC_LIST_INITIALIZER,
.teardown_jobs = ISC_LIST_INITIALIZER,
};
UV_RUNTIME_CHECK(uv_async_init, r);
uv_handle_set_data(&loop->shutdown_trigger, loop);
- r = uv_async_init(&loop->loop, &loop->queue_trigger, queue_cb);
+ r = uv_async_init(&loop->loop, &loop->async_trigger, isc__async_cb);
UV_RUNTIME_CHECK(uv_async_init, r);
- uv_handle_set_data(&loop->queue_trigger, loop);
+ uv_handle_set_data(&loop->async_trigger, loop);
+
+ r = uv_idle_init(&loop->loop, &loop->run_trigger);
+ UV_RUNTIME_CHECK(uv_idle_init, r);
+ uv_handle_set_data(&loop->run_trigger, loop);
r = uv_async_init(&loop->loop, &loop->destroy_trigger, destroy_cb);
UV_RUNTIME_CHECK(uv_async_init, r);
}
static void
-loop_run(isc_loop_t *loop) {
- int r;
- isc_job_t *job;
+setup_jobs_cb(void *arg) {
+ isc_loop_t *loop = arg;
+ isc_job_t *job = ISC_LIST_HEAD(loop->setup_jobs);
- job = ISC_LIST_HEAD(loop->setup_jobs);
while (job != NULL) {
isc_job_t *next = ISC_LIST_NEXT(job, link);
ISC_LIST_UNLINK(loop->setup_jobs, job, link);
- isc__job_run(job);
+ job->cb(job->cbarg);
+
+ isc_mem_put(loop->mctx, job, sizeof(*job));
job = next;
}
+}
- r = uv_prepare_start(&loop->quiescent, isc__qsbr_quiescent_cb);
+static void
+loop_run(isc_loop_t *loop) {
+ int r = uv_prepare_start(&loop->quiescent, isc__qsbr_quiescent_cb);
UV_RUNTIME_CHECK(uv_prepare_start, r);
isc_barrier_wait(&loop->loopmgr->starting);
+ isc_async_run(loop, setup_jobs_cb, loop);
+
r = uv_run(&loop->loop, UV_RUN_DEFAULT);
UV_RUNTIME_CHECK(uv_run, r);
int r = uv_loop_close(&loop->loop);
UV_RUNTIME_CHECK(uv_loop_close, r);
- INSIST(ISC_ASTACK_EMPTY(loop->queue_jobs));
+ INSIST(ISC_ASTACK_EMPTY(loop->async_jobs));
+ INSIST(ISC_LIST_EMPTY(loop->run_jobs));
loop->magic = 0;
isc_job_t *
isc_loop_setup(isc_loop_t *loop, isc_job_cb cb, void *cbarg) {
- isc_loopmgr_t *loopmgr = NULL;
- isc_job_t *job = NULL;
-
REQUIRE(VALID_LOOP(loop));
REQUIRE(cb != NULL);
- loopmgr = loop->loopmgr;
+ isc_loopmgr_t *loopmgr = loop->loopmgr;
+ isc_job_t *job = isc_mem_get(loop->mctx, sizeof(*job));
+ *job = (isc_job_t){
+ .cb = cb,
+ .cbarg = cbarg,
+ .link = ISC_LINK_INITIALIZER,
+ };
REQUIRE(loop->tid == isc_tid() || !atomic_load(&loopmgr->running) ||
atomic_load(&loopmgr->paused));
- job = isc__job_new(loop, cb, cbarg);
- isc__job_init(loop, job);
-
- /*
- * The ISC_LIST_PREPEND is counterintuitive here, but actually, the
- * uv_idle_start() puts the item on the HEAD of the internal list, so we
- * want to store items here in reverse order, so on the uv_loop, they
- * are scheduled in the correct order
- */
- ISC_LIST_PREPEND(loop->setup_jobs, job, link);
+ ISC_LIST_APPEND(loop->setup_jobs, job, link);
return (job);
}
isc_job_t *
isc_loop_teardown(isc_loop_t *loop, isc_job_cb cb, void *cbarg) {
- isc_loopmgr_t *loopmgr = NULL;
- isc_job_t *job = NULL;
-
REQUIRE(VALID_LOOP(loop));
- loopmgr = loop->loopmgr;
+ isc_loopmgr_t *loopmgr = loop->loopmgr;
+ isc_job_t *job = isc_mem_get(loop->mctx, sizeof(*job));
+ *job = (isc_job_t){
+ .cb = cb,
+ .cbarg = cbarg,
+ .link = ISC_LINK_INITIALIZER,
+ };
REQUIRE(loop->tid == isc_tid() || !atomic_load(&loopmgr->running) ||
atomic_load(&loopmgr->paused));
- job = isc__job_new(loop, cb, cbarg);
- isc__job_init(loop, job);
-
- /*
- * The ISC_LIST_PREPEND is counterintuitive here, but actually, the
- * uv_idle_start() puts the item on the HEAD of the internal list, so we
- * want to store items here in reverse order, so on the uv_loop, they
- * are scheduled in the correct order
- */
- ISC_LIST_PREPEND(loop->teardown_jobs, job, link);
+ ISC_LIST_APPEND(loop->teardown_jobs, job, link);
return (job);
}
#include <inttypes.h>
#include <isc/barrier.h>
+#include <isc/job.h>
#include <isc/lang.h>
#include <isc/loop.h>
#include <isc/magic.h>
#include <isc/uv.h>
#include <isc/work.h>
+#include "async_p.h"
+#include "job_p.h"
+
/*
* Per-thread loop
*/
#define LOOP_MAGIC ISC_MAGIC('L', 'O', 'O', 'P')
#define VALID_LOOP(t) ISC_MAGIC_VALID(t, LOOP_MAGIC)
-typedef ISC_LIST(isc_job_t) isc_joblist_t;
-typedef ISC_ASTACK(isc_job_t) isc_jobstack_t;
-
struct isc_loop {
int magic;
isc_refcount_t references;
bool shuttingdown;
/* Async queue */
- uv_async_t queue_trigger;
- isc_jobstack_t queue_jobs;
+ uv_async_t async_trigger;
+ isc_asyncstack_t async_jobs;
+
+ /* Jobs queue */
+ uv_idle_t run_trigger;
+ isc_joblist_t run_jobs;
/* Pause */
uv_async_t pause_trigger;
#define JOB_MAGIC ISC_MAGIC('J', 'O', 'B', ' ')
#define VALID_JOB(t) ISC_MAGIC_VALID(t, JOB_MAGIC)
-struct isc_job {
- int magic;
- uv_idle_t idle;
- isc_loop_t *loop;
- isc_job_cb cb;
- void *cbarg;
- ISC_LINK(isc_job_t) link;
-};
-
/*
* Work to be offloaded to an external thread.
*/
isc_nm_recv_cb_t recv;
isc_nm_cb_t send;
isc_nm_cb_t connect;
- isc_nm_accept_cb_t accept;
} isc__nm_cb_t;
/*
isc_nm_timer_t *timer; /* TCP write timer */
int connect_tries; /* connect retries */
isc_result_t result;
- uv_idle_t idle;
union {
uv_handle_t handle;
} uv_req;
ISC_LINK(isc__nm_uvreq_t) link;
ISC_LINK(isc__nm_uvreq_t) active_link;
+
+ isc_job_t job;
};
/*
static void
nmhandle_free(isc_nmsocket_t *sock, isc_nmhandle_t *handle);
-static void
-uvreq_free(uv_handle_t *handle);
-
/*%<
* Issue a 'handle closed' callback on the socket.
*/
* call it now asynchronously.
*/
if (sock->closehandle_cb != NULL) {
- isc_job_run(sock->worker->netmgr->loopmgr,
- isc__nm_closehandle_job, sock);
+ isc_async_run(sock->worker->loop, isc__nm_closehandle_job,
+ sock);
} else {
isc___nmsocket_detach(&sock FLARG_PASS);
}
.connect_tries = 3,
.link = ISC_LINK_INITIALIZER,
.active_link = ISC_LINK_INITIALIZER,
+ .job = ISC_JOB_INITIALIZER,
.magic = UVREQ_MAGIC,
};
uv_handle_set_data(&req->uv_req.handle, req);
- int r = uv_idle_init(&worker->loop->loop, &req->idle);
- UV_RUNTIME_CHECK(uv_idle_init, r);
- uv_handle_set_data(&req->idle, req);
-
isc___nmsocket_attach(sock, &req->sock FLARG_PASS);
ISC_LIST_APPEND(sock->active_uvreqs, req, active_link);
return (req);
}
-static void
-uvreq_free(uv_handle_t *handle) {
- isc__nm_uvreq_t *req = uv_handle_get_data(handle);
- isc_nmsocket_t *sock = req->sock;
-
- isc_mempool_put(sock->worker->uvreq_pool, req);
-
- isc___nmsocket_detach(&sock FLARG_PASS);
-}
-
void
isc___nm_uvreq_put(isc__nm_uvreq_t **reqp FLARG) {
REQUIRE(reqp != NULL && VALID_UVREQ(*reqp));
isc__nmhandle_detach(&handle FLARG_PASS);
}
- uv_close(&req->idle, uvreq_free);
+ isc_mempool_put(sock->worker->uvreq_pool, req);
+
+ isc___nmsocket_detach(&sock FLARG_PASS);
}
void
}
static void
-isc___nm_connectcb(uv_idle_t *handle) {
- isc__nm_uvreq_t *uvreq = uv_handle_get_data(handle);
+isc___nm_connectcb(void *arg) {
+ isc__nm_uvreq_t *uvreq = arg;
uvreq->cb.connect(uvreq->handle, uvreq->result, uvreq->cbarg);
-
- uv_idle_stop(handle);
isc__nm_uvreq_put(&uvreq);
}
REQUIRE(VALID_NMHANDLE(uvreq->handle));
REQUIRE(uvreq->cb.connect != NULL);
+ uvreq->result = eresult;
+
if (!async) {
- uvreq->cb.connect(uvreq->handle, eresult, uvreq->cbarg);
- isc__nm_uvreq_put(&uvreq);
+ isc___nm_connectcb(uvreq);
return;
}
- uvreq->result = eresult;
-
- uv_idle_start(&uvreq->idle, isc___nm_connectcb);
+ isc_job_run(sock->worker->loop, &uvreq->job, isc___nm_connectcb, uvreq);
}
static void
-isc___nm_readcb(uv_idle_t *handle) {
- isc__nm_uvreq_t *uvreq = uv_handle_get_data(handle);
+isc___nm_readcb(void *arg) {
+ isc__nm_uvreq_t *uvreq = arg;
isc_region_t region;
region.base = (unsigned char *)uvreq->uvbuf.base;
region.length = uvreq->uvbuf.len;
-
uvreq->cb.recv(uvreq->handle, uvreq->result, ®ion, uvreq->cbarg);
- uv_idle_stop(handle);
isc__nm_uvreq_put(&uvreq);
}
REQUIRE(VALID_UVREQ(uvreq));
REQUIRE(VALID_NMHANDLE(uvreq->handle));
- if (!async) {
- isc_region_t region;
-
- region.base = (unsigned char *)uvreq->uvbuf.base;
- region.length = uvreq->uvbuf.len;
+ uvreq->result = eresult;
- uvreq->cb.recv(uvreq->handle, eresult, ®ion, uvreq->cbarg);
- isc__nm_uvreq_put(&uvreq);
+ if (!async) {
+ isc___nm_readcb(uvreq);
return;
}
- uvreq->result = eresult;
-
- uv_idle_start(&uvreq->idle, isc___nm_readcb);
+ isc_job_run(sock->worker->loop, &uvreq->job, isc___nm_readcb, uvreq);
}
static void
-isc___nm_sendcb(uv_idle_t *handle) {
- isc__nm_uvreq_t *uvreq = uv_handle_get_data(handle);
+isc___nm_sendcb(void *arg) {
+ isc__nm_uvreq_t *uvreq = arg;
uvreq->cb.send(uvreq->handle, uvreq->result, uvreq->cbarg);
-
- uv_idle_stop(handle);
isc__nm_uvreq_put(&uvreq);
}
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(VALID_UVREQ(uvreq));
REQUIRE(VALID_NMHANDLE(uvreq->handle));
+
uvreq->result = eresult;
if (!async) {
- uvreq->cb.send(uvreq->handle, uvreq->result, uvreq->cbarg);
- isc__nm_uvreq_put(&uvreq);
+ isc___nm_sendcb(uvreq);
return;
}
- uv_idle_start(&uvreq->idle, isc___nm_sendcb);
+ isc_job_run(sock->worker->loop, &uvreq->job, isc___nm_sendcb, uvreq);
}
static void
assert_true(eresult == ISC_R_SUCCESS);
}
- isc_job_run(loopmgr, doh_connect_thread, connect_nm);
+ isc_async_run(isc_loop_current(loopmgr), doh_connect_thread,
+ connect_nm);
}
if (sends <= 0) {
isc_loopmgr_shutdown(loopmgr);
#define MAX_EXECUTED 1000000
+struct test_arg {
+ isc_job_t job;
+ union {
+ int n;
+ void *ptr;
+ } arg;
+};
+
static void
shutdown_cb(void *arg) {
- UNUSED(arg);
+ struct test_arg *ta = arg;
+
+ isc_mem_put(mctx, ta, sizeof(*ta));
isc_loopmgr_shutdown(loopmgr);
}
static void
-job_cb(void *arg __attribute__((__unused__))) {
+job_cb(void *arg) {
+ struct test_arg *ta = arg;
unsigned int n = atomic_fetch_add(&executed, 1);
if (n <= MAX_EXECUTED) {
atomic_fetch_add(&scheduled, 1);
- isc_job_run(loopmgr, job_cb, loopmgr);
+ isc_job_run(isc_loop_current(loopmgr), &ta->job, job_cb, ta);
} else {
- isc_job_run(loopmgr, shutdown_cb, loopmgr);
+ isc_job_run(isc_loop_current(loopmgr), &ta->job, shutdown_cb,
+ ta);
}
}
static void
-job_run_cb(void *arg __attribute__((__unused__))) {
+job_run_cb(void *arg) {
+ struct test_arg *ta = arg;
atomic_fetch_add(&scheduled, 1);
- isc_job_run(loopmgr, job_cb, loopmgr);
+ if (arg == NULL) {
+ ta = isc_mem_get(mctx, sizeof(*ta));
+ *ta = (struct test_arg){ .job = ISC_JOB_INITIALIZER };
+ }
+
+ isc_job_run(isc_loop_current(loopmgr), &ta->job, job_cb, ta);
}
ISC_RUN_TEST_IMPL(isc_job_run) {
atomic_init(&scheduled, 0);
atomic_init(&executed, 0);
- isc_loopmgr_setup(loopmgr, job_run_cb, loopmgr);
+ isc_loopmgr_setup(loopmgr, job_run_cb, NULL);
isc_loopmgr_run(loopmgr);
}
static char string[32] = "";
-int n1 = 1, n2 = 2, n3 = 3, n4 = 4, n5 = 5;
+struct test_arg n1 = { .job = ISC_JOB_INITIALIZER, .arg.n = 1 };
+struct test_arg n2 = { .job = ISC_JOB_INITIALIZER, .arg.n = 2 };
+struct test_arg n3 = { .job = ISC_JOB_INITIALIZER, .arg.n = 3 };
+struct test_arg n4 = { .job = ISC_JOB_INITIALIZER, .arg.n = 4 };
+struct test_arg n5 = { .job = ISC_JOB_INITIALIZER, .arg.n = 5 };
static void
append(void *arg) {
+ struct test_arg *ta = arg;
+
char value[32];
- sprintf(value, "%d", *(int *)arg);
+ sprintf(value, "%d", ta->arg.n);
strlcat(string, value, 10);
}
job_multiple(void *arg) {
UNUSED(arg);
- /* These will be processed in reverse order */
- isc_job_run(loopmgr, append, &n1);
- isc_job_run(loopmgr, append, &n2);
- isc_job_run(loopmgr, append, &n3);
- isc_job_run(loopmgr, append, &n4);
- isc_job_run(loopmgr, append, &n5);
+ /* These will be processed in normal order */
+ isc_job_run(mainloop, &n1.job, append, &n1);
+ isc_job_run(mainloop, &n2.job, append, &n2);
+ isc_job_run(mainloop, &n3.job, append, &n3);
+ isc_job_run(mainloop, &n4.job, append, &n4);
+ isc_job_run(mainloop, &n5.job, append, &n5);
isc_loopmgr_shutdown(loopmgr);
}
string[0] = '\0';
isc_loop_setup(isc_loop_main(loopmgr), job_multiple, loopmgr);
isc_loopmgr_run(loopmgr);
- assert_string_equal(string, "54321");
+ assert_string_equal(string, "12345");
}
ISC_TEST_LIST_START
static void
runjob(void *arg __attribute__((__unused__))) {
+ isc_async_run(isc_loop_current(loopmgr), count, loopmgr);
+
if (isc_tid() == 0) {
- isc_job_run(loopmgr, shutdown_loopmgr, loopmgr);
+ isc_async_run(isc_loop_current(loopmgr), shutdown_loopmgr,
+ loopmgr);
}
-
- isc_job_run(loopmgr, count, loopmgr);
}
ISC_RUN_TEST_IMPL(isc_loopmgr_runjob) {
if (have_expected_cconnects(atomic_fetch_add(&cconnects, 1) + 1)) {
do_cconnects_shutdown(loopmgr);
} else if (do_send) {
- isc_job_run(loopmgr, stream_recv_send_connect,
- (cbarg == NULL ? get_stream_connect_function()
- : (stream_connect_function)cbarg));
+ isc_async_run(isc_loop_current(loopmgr),
+ stream_recv_send_connect,
+ (cbarg == NULL ? get_stream_connect_function()
+ : (stream_connect_function)cbarg));
}
isc_refcount_increment0(&active_creads);
ISC_LOOP_TEST_IMPL(udp_shutdown_connect) {
isc_loopmgr_shutdown(loopmgr);
isc_refcount_increment0(&active_cconnects);
- isc_job_run(loopmgr, udp_connect_udpconnect, netmgr);
+ isc_async_run(isc_loop_current(loopmgr), udp_connect_udpconnect,
+ netmgr);
}
static void
{
do_cconnects_shutdown(loopmgr);
} else if (do_send) {
- isc_job_run(loopmgr, udp__connect, cbarg);
+ isc_async_run(isc_loop_current(loopmgr), udp__connect,
+ cbarg);
}
isc_refcount_increment0(&active_creads);