]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Refactor isc_job_run to not-make any allocations
authorOndřej Surý <ondrej@isc.org>
Mon, 27 Mar 2023 20:40:57 +0000 (22:40 +0200)
committerOndřej Surý <ondrej@isc.org>
Thu, 30 Mar 2023 14:00:52 +0000 (16:00 +0200)
Change the isc_job_run() to not-make any allocations.  The caller must
make sure that it allocates isc_job_t - usually as part of the argument
passed to the callback.

For simple jobs, using isc_async_run() is advised as it allocates its
own separate isc_job_t.

25 files changed:
bin/delv/delv.c
bin/dig/nslookup.c
bin/dnssec/dnssec-signzone.c
bin/nsupdate/nsupdate.c
lib/dns/include/dns/validator.h
lib/dns/resolver.c
lib/dns/validator.c
lib/isc/Makefile.am
lib/isc/async.c
lib/isc/async_p.h [new file with mode: 0644]
lib/isc/include/isc/async.h
lib/isc/include/isc/job.h
lib/isc/include/isc/loop.h
lib/isc/include/isc/types.h
lib/isc/job.c
lib/isc/job_p.h
lib/isc/loop.c
lib/isc/loop_p.h
lib/isc/netmgr/netmgr-int.h
lib/isc/netmgr/netmgr.c
tests/isc/doh_test.c
tests/isc/job_test.c
tests/isc/loop_test.c
tests/isc/netmgr_common.c
tests/isc/udp_test.c

index 357bdfd1adde46bf9f0c2f55c6b1ad49e6c3f0a4..e42af92dd7eb8783ef368048ea8c1f6b40cbe13e 100644 (file)
@@ -25,6 +25,7 @@
 #include <sys/types.h>
 #include <unistd.h>
 
+#include <isc/async.h>
 #include <isc/attributes.h>
 #include <isc/base64.h>
 #include <isc/buffer.h>
@@ -2168,7 +2169,8 @@ run_server(void *arg) {
                                     ns_client_request, ifp, accept_cb, ifp, 10,
                                     NULL, NULL, &ifp->tcplistensocket));
        ifp->flags |= NS_INTERFACEFLAG_LISTENING;
-       isc_job_run(loopmgr, sendquery, ifp->tcplistensocket);
+       isc_async_run(isc_loop_current(loopmgr), sendquery,
+                     ifp->tcplistensocket);
 
        return;
 
index 2d5f0b890d465314ccdb1c2a23d168cab390c171..1012a99bd02b21a9d0901bd10615829428c4b811 100644 (file)
 #include <stdlib.h>
 #include <unistd.h>
 
+#include <isc/async.h>
 #include <isc/attributes.h>
 #include <isc/buffer.h>
 #include <isc/commandline.h>
 #include <isc/condition.h>
-#include <isc/job.h>
 #include <isc/loop.h>
 #include <isc/netaddr.h>
 #include <isc/parseint.h>
@@ -889,12 +889,13 @@ start_next_command(void);
 
 static void
 process_next_command(void *arg __attribute__((__unused__))) {
+       isc_loop_t *loop = isc_loop_main(loopmgr);
        if (cmdline == NULL) {
                in_use = false;
        } else {
                do_next_command(cmdline);
                if (ISC_LIST_HEAD(lookup_list) != NULL) {
-                       isc_job_run(loopmgr, run_loop, NULL);
+                       isc_async_run(loop, run_loop, NULL);
                        return;
                }
        }
index 7c9ab5c870730de557b3a23edd6907dd17e9bb72..a8bb5785320a08d824316834a77410c057d51845 100644 (file)
@@ -33,6 +33,7 @@
 #include <time.h>
 #include <unistd.h>
 
+#include <isc/async.h>
 #include <isc/atomic.h>
 #include <isc/attributes.h>
 #include <isc/base32.h>
@@ -41,7 +42,6 @@
 #include <isc/file.h>
 #include <isc/hash.h>
 #include <isc/hex.h>
-#include <isc/job.h>
 #include <isc/loop.h>
 #include <isc/managers.h>
 #include <isc/md.h>
@@ -1667,7 +1667,7 @@ assignwork(void *arg) {
        lock_and_dumpnode(dns_fixedname_name(&fname), node);
        dns_db_detachnode(gdb, &node);
 
-       isc_job_run(loopmgr, assignwork, NULL);
+       isc_async_run(isc_loop_current(loopmgr), assignwork, NULL);
 }
 
 /*%
index e93b8abc2bbac003c1cb19afbd339d015c1a8166..46eb03d423fd776ed68dbbd27a81ec6d76f27b11 100644 (file)
@@ -21,6 +21,7 @@
 #include <stdlib.h>
 #include <unistd.h>
 
+#include <isc/async.h>
 #include <isc/attributes.h>
 #include <isc/base64.h>
 #include <isc/buffer.h>
@@ -28,7 +29,6 @@
 #include <isc/file.h>
 #include <isc/getaddresses.h>
 #include <isc/hash.h>
-#include <isc/job.h>
 #include <isc/lex.h>
 #include <isc/log.h>
 #include <isc/loop.h>
@@ -2424,7 +2424,7 @@ static void
 done_update(void) {
        ddebug("done_update()");
 
-       isc_job_run(loopmgr, getinput, NULL);
+       isc_async_run(isc_loop_main(loopmgr), getinput, NULL);
 }
 
 static void
index b1c743e6f5bcc7e020d8dee29ef7e4d2c6389ba2..19eae8bb5a85aa83ee445e75a2d1838aef8e2720 100644 (file)
@@ -74,7 +74,7 @@
 struct dns_validator {
        unsigned int   magic;
        dns_view_t    *view;
-       isc_loopmgr_t *loopmgr;
+       isc_loop_t    *loop;
        uint32_t       tid;
        isc_refcount_t references;
 
@@ -160,7 +160,7 @@ isc_result_t
 dns_validator_create(dns_view_t *view, dns_name_t *name, dns_rdatatype_t type,
                     dns_rdataset_t *rdataset, dns_rdataset_t *sigrdataset,
                     dns_message_t *message, unsigned int options,
-                    isc_loopmgr_t *loop, isc_job_cb cb, void *arg,
+                    isc_loop_t *loop, isc_job_cb cb, void *arg,
                     dns_validator_t **validatorp);
 /*%<
  * Start a DNSSEC validation.
index 10ba6e807d75fb1009e358597c55b62ace07caeb..e40c2b33c5a358921f01b104d5d82aa0505b43c3 100644 (file)
@@ -977,7 +977,7 @@ valcreate(fetchctx_t *fctx, dns_message_t *message, dns_adbaddrinfo_t *addrinfo,
 
        result = dns_validator_create(
                fctx->res->view, name, type, rdataset, sigrdataset, message,
-               valoptions, fctx->res->loopmgr, validated, valarg, &validator);
+               valoptions, fctx->loop, validated, valarg, &validator);
        RUNTIME_CHECK(result == ISC_R_SUCCESS);
        inc_stats(fctx->res, dns_resstatscounter_val);
        if ((valoptions & DNS_VALIDATOR_DEFER) == 0) {
@@ -10420,7 +10420,7 @@ dns_resolver_createfetch(dns_resolver_t *res, const dns_name_t *name,
 
        if (new_fctx) {
                fetchctx_ref(fctx);
-               isc_job_run(res->loopmgr, (isc_job_cb)fctx_start, fctx);
+               isc_async_run(fctx->loop, (isc_job_cb)fctx_start, fctx);
        }
 
 unlock:
index 6e2efe14743eed2682baa224e49125897260d70f..08a5395ef2fd3c5155b1619f66c3139b7d596443 100644 (file)
@@ -227,7 +227,7 @@ validator_done(dns_validator_t *val, isc_result_t result) {
        val->result = result;
 
        dns_validator_ref(val);
-       isc_job_run(val->loopmgr, validator_done_cb, val);
+       isc_async_run(val->loop, validator_done_cb, val);
 }
 
 /*%
@@ -950,8 +950,8 @@ create_fetch(dns_validator_t *val, dns_name_t *name, dns_rdatatype_t type,
        dns_validator_ref(val);
        return (dns_resolver_createfetch(
                val->view->resolver, name, type, NULL, NULL, NULL, NULL, 0,
-               fopts, 0, NULL, isc_loop_current(val->loopmgr), callback, val,
-               &val->frdataset, &val->fsigrdataset, &val->fetch));
+               fopts, 0, NULL, val->loop, callback, val, &val->frdataset,
+               &val->fsigrdataset, &val->fetch));
 }
 
 /*%
@@ -981,7 +981,7 @@ create_validator(dns_validator_t *val, dns_name_t *name, dns_rdatatype_t type,
 
        validator_logcreate(val, name, type, caller, "validator");
        result = dns_validator_create(val->view, name, type, rdataset, sig,
-                                     NULL, vopts, val->loopmgr, cb, val,
+                                     NULL, vopts, val->loop, cb, val,
                                      &val->subvalidator);
        if (result == ISC_R_SUCCESS) {
                dns_validator_attach(val, &val->subvalidator->parent);
@@ -2978,7 +2978,7 @@ isc_result_t
 dns_validator_create(dns_view_t *view, dns_name_t *name, dns_rdatatype_t type,
                     dns_rdataset_t *rdataset, dns_rdataset_t *sigrdataset,
                     dns_message_t *message, unsigned int options,
-                    isc_loopmgr_t *loopmgr, isc_job_cb cb, void *arg,
+                    isc_loop_t *loop, isc_job_cb cb, void *arg,
                     dns_validator_t **validatorp) {
        isc_result_t result = ISC_R_FAILURE;
        dns_validator_t *val = NULL;
@@ -2997,7 +2997,7 @@ dns_validator_create(dns_view_t *view, dns_name_t *name, dns_rdatatype_t type,
                                  .type = type,
                                  .options = options,
                                  .link = ISC_LINK_INITIALIZER,
-                                 .loopmgr = loopmgr,
+                                 .loop = loop,
                                  .cb = cb,
                                  .arg = arg };
 
@@ -3024,7 +3024,7 @@ dns_validator_create(dns_view_t *view, dns_name_t *name, dns_rdatatype_t type,
 
        if ((options & DNS_VALIDATOR_DEFER) == 0) {
                dns_validator_ref(val);
-               isc_job_run(val->loopmgr, validator_start, val);
+               isc_async_run(val->loop, validator_start, val);
        }
 
        *validatorp = val;
@@ -3050,7 +3050,7 @@ dns_validator_send(dns_validator_t *validator) {
        validator->options &= ~DNS_VALIDATOR_DEFER;
 
        dns_validator_ref(validator);
-       isc_job_run(validator->loopmgr, validator_start, validator);
+       isc_async_run(validator->loop, validator_start, validator);
 }
 
 void
index e822b7345279fe4245cc61c7d79ccdba372918f3..9963c8cc603627db9ff5645669b2c150fa1616b3 100644 (file)
@@ -115,6 +115,7 @@ libisc_la_SOURCES =         \
        ascii.c                 \
        assertions.c            \
        async.c                 \
+       async_p.h               \
        backtrace.c             \
        base32.c                \
        base64.c                \
index 8eab9dba720176df131fa6a28e6b865b0a08a66d..326d3a080309a4f0e747640f23d548efd45ed133 100644 (file)
 #include <isc/uv.h>
 #include <isc/work.h>
 
+#include "async_p.h"
 #include "job_p.h"
 #include "loop_p.h"
 
 void
 isc_async_run(isc_loop_t *loop, isc_job_cb cb, void *cbarg) {
-       int r;
-       isc_job_t *job = NULL;
-
        REQUIRE(VALID_LOOP(loop));
        REQUIRE(cb != NULL);
 
-       job = isc__job_new(loop, cb, cbarg);
+       isc_job_t *job = isc_mem_get(loop->mctx, sizeof(*job));
+       *job = (isc_job_t){
+               .link = ISC_LINK_INITIALIZER,
+               .cb = cb,
+               .cbarg = cbarg,
+       };
 
        /*
         * Now send the half-initialized job to the loop queue.
-        *
-        * The ISC_ASTACK_PUSH is counterintuitive here, but uv_idle
-        * drains its queue backwards, so if there's more than one event
-        * to be processed then they need to be in reverse order.
         */
-       ISC_ASTACK_PUSH(loop->queue_jobs, job, link);
+       ISC_ASTACK_PUSH(loop->async_jobs, job, link);
 
-       r = uv_async_send(&loop->queue_trigger);
+       int r = uv_async_send(&loop->async_trigger);
        UV_RUNTIME_CHECK(uv_async_send, r);
 }
+
+void
+isc__async_cb(uv_async_t *handle) {
+       isc_loop_t *loop = uv_handle_get_data(handle);
+
+       REQUIRE(VALID_LOOP(loop));
+
+       ISC_STACK(isc_job_t) drain = ISC_ASTACK_TO_STACK(loop->async_jobs);
+       ISC_LIST(isc_job_t) jobs = ISC_LIST_INITIALIZER;
+
+       isc_job_t *job = ISC_STACK_POP(drain, link);
+       isc_job_t *next = NULL;
+       while (job != NULL) {
+               ISC_LIST_PREPEND(jobs, job, link);
+
+               job = ISC_STACK_POP(drain, link);
+       }
+
+       for (job = ISC_LIST_HEAD(jobs),
+           next = (job ? ISC_LIST_NEXT(job, link) : NULL);
+            job != NULL;
+            job = next, next = (job ? ISC_LIST_NEXT(job, link) : NULL))
+       {
+               job->cb(job->cbarg);
+
+               isc_mem_put(loop->mctx, job, sizeof(*job));
+       }
+}
+
+void
+isc__async_close(uv_handle_t *handle) {
+       isc_loop_t *loop = uv_handle_get_data(handle);
+
+       isc__async_cb(&loop->async_trigger);
+}
diff --git a/lib/isc/async_p.h b/lib/isc/async_p.h
new file mode 100644 (file)
index 0000000..0d854cd
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
+ *
+ * SPDX-License-Identifier: MPL-2.0
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, you can obtain one at https://mozilla.org/MPL/2.0/.
+ *
+ * See the COPYRIGHT file distributed with this work for additional
+ * information regarding copyright ownership.
+ */
+
+#pragma once
+
+#include <isc/async.h>
+#include <isc/job.h>
+#include <isc/loop.h>
+#include <isc/mem.h>
+#include <isc/stack.h>
+#include <isc/uv.h>
+
+typedef ISC_ASTACK(isc_job_t) isc_asyncstack_t;
+
+void
+isc__async_cb(uv_async_t *handle);
+
+void
+isc__async_close(uv_handle_t *handle);
index a413f2792acf282ac8339d169d004a205bce6dba..ea36a150dd7cf9a418a9be4908bf12214971f526 100644 (file)
@@ -15,7 +15,7 @@
  * \brief The isc_async unit provides a way to schedule jobs on any isc
  * event loop (isc_loop unit)
  *
- * The unit is built around the uv_async_t primitive and locked list with
+ * The unit is built around the uv_async_t primitive and lock-free stack with
  * isc_job_cb.  Jobs are first scheduled onto the locked list, then the
  * uv_async_send() is called and the uv_async_t callback processes the enqueued
  * jobs are scheduled to be run on the isc event loop.
index 19ab75d724d3ffeda26650cd26354eb9c5c50133..8eeb60d0f482d61a30104594697480d7f0d3b264 100644 (file)
 #include <isc/types.h>
 
 typedef void (*isc_job_cb)(void *);
+typedef struct isc_job isc_job_t;
+
+struct isc_job {
+       isc_job_cb cb;
+       void      *cbarg;
+       ISC_LINK(isc_job_t) link;
+};
+
+#define ISC_JOB_INITIALIZER                  \
+       {                                    \
+               .link = ISC_LINK_INITIALIZER \
+       }
 
 ISC_LANG_BEGINDECLS
 
 void
-isc_job_run(isc_loopmgr_t *loopmgr, isc_job_cb cb, void *cbarg);
+isc_job_run(isc_loop_t *loop, isc_job_t *job, isc_job_cb cb, void *cbarg);
 /*%<
  * Schedule the job callback 'cb' to be run on the currently
  * running event loop.
  *
- * Note: Because of the design of uv_idle_start(), if more than one
- * job is posted at once, the jobs will be pushed onto a stack and
- * executed in last-in-first-out order. To post events that are
- * executed in order posted, use isc_async_run() instead.
- *
  * Requires:
  *
- *\li  'loopmgr' is the active loop manager.
+ *\li  'loop' is the current loop
+ *\li  'job' is initialized
  *\li  'cb' is a callback function, must be non-NULL
  *\li  'cbarg' is passed to the 'cb' as the only argument, may be NULL
  */
index 2a85702c2ad6d44d3b8e0ac5b43725a5035558a3..05440375a00c753449dfb434f3b2c305b3687615 100644 (file)
@@ -15,6 +15,7 @@
 
 #include <inttypes.h>
 
+#include <isc/job.h>
 #include <isc/lang.h>
 #include <isc/mem.h>
 #include <isc/refcount.h>
index e8c08043bff48bfbd007d9701be7039a5c80e9a4..0c0b609bc148008f6b58eac7713678fda15222eb 100644 (file)
@@ -48,19 +48,18 @@ typedef struct isc_httpdurl isc_httpdurl_t;        /*%< HTTP URL */
 typedef void(isc_httpdondestroy_t)(void *); /*%< Callback on destroying httpd */
 typedef struct isc_interface    isc_interface_t;     /*%< Interface */
 typedef struct isc_interfaceiter isc_interfaceiter_t; /*%< Interface Iterator */
-typedef struct isc_job          isc_job_t;
-typedef struct isc_lex          isc_lex_t;         /*%< Lex */
-typedef struct isc_log          isc_log_t;         /*%< Log */
-typedef struct isc_logcategory  isc_logcategory_t; /*%< Log Category */
-typedef struct isc_logconfig    isc_logconfig_t;   /*%< Log Configuration */
-typedef struct isc_logmodule    isc_logmodule_t;   /*%< Log Module */
-typedef struct isc_loop                 isc_loop_t;        /*%< Event loop */
-typedef struct isc_loopmgr      isc_loopmgr_t;     /*%< Event loop manager */
-typedef struct isc_mem          isc_mem_t;         /*%< Memory */
-typedef struct isc_mempool      isc_mempool_t;     /*%< Memory Pool */
-typedef struct isc_netaddr      isc_netaddr_t;     /*%< Net Address */
-typedef struct isc_netprefix    isc_netprefix_t;   /*%< Net Prefix */
-typedef struct isc_nm           isc_nm_t;          /*%< Network manager */
+typedef struct isc_lex          isc_lex_t;           /*%< Lex */
+typedef struct isc_log          isc_log_t;           /*%< Log */
+typedef struct isc_logcategory  isc_logcategory_t;   /*%< Log Category */
+typedef struct isc_logconfig    isc_logconfig_t;     /*%< Log Configuration */
+typedef struct isc_logmodule    isc_logmodule_t;     /*%< Log Module */
+typedef struct isc_loop                 isc_loop_t;          /*%< Event loop */
+typedef struct isc_loopmgr      isc_loopmgr_t;       /*%< Event loop manager */
+typedef struct isc_mem          isc_mem_t;           /*%< Memory */
+typedef struct isc_mempool      isc_mempool_t;       /*%< Memory Pool */
+typedef struct isc_netaddr      isc_netaddr_t;       /*%< Net Address */
+typedef struct isc_netprefix    isc_netprefix_t;     /*%< Net Prefix */
+typedef struct isc_nm           isc_nm_t;            /*%< Network manager */
 typedef struct isc_nmsocket     isc_nmsocket_t; /*%< Network manager socket */
 typedef struct isc_nmhandle     isc_nmhandle_t; /*%< Network manager handle */
 typedef struct isc_portset      isc_portset_t;  /*%< Port Set */
index 6bd588a63f1c43a2370f2f56f92af8f47b5f1056..9f64df73e8a54a674a60ec2a112ed183774ffc6d 100644 (file)
 #include "job_p.h"
 #include "loop_p.h"
 
-#define JOB_MAGIC    ISC_MAGIC('J', 'O', 'B', ' ')
-#define VALID_JOB(t) ISC_MAGIC_VALID(t, JOB_MAGIC)
-
-/*
- * Private: static
- */
-
-static void
-isc__job_close_cb(uv_handle_t *handle) {
-       isc_job_t *job = uv_handle_get_data(handle);
-       isc_loop_t *loop = job->loop;
-
-       REQUIRE(loop == isc_loop_current(job->loop->loopmgr));
-
-       isc_mem_put(loop->mctx, job, sizeof(*job));
-
-       isc_loop_detach(&loop);
-}
-
-static void
-isc__job_destroy(isc_job_t *job) {
-       REQUIRE(VALID_JOB(job));
-       REQUIRE(job->loop == isc_loop_current(job->loop->loopmgr));
-
-       job->magic = 0;
-
-       uv_close(&job->idle, isc__job_close_cb);
-}
-
-static void
-isc__job_cb(uv_idle_t *idle) {
-       isc_job_t *job = uv_handle_get_data(idle);
-       int r;
-
-       REQUIRE(job->loop == isc_loop_current(job->loop->loopmgr));
-
-       job->cb(job->cbarg);
-
-       r = uv_idle_stop(idle);
-       UV_RUNTIME_CHECK(uv_idle_stop, r);
-
-       isc__job_destroy(job);
-}
-
 /*
  * Public: #include <isc/job.h>
  */
 
 void
-isc_job_run(isc_loopmgr_t *loopmgr, isc_job_cb cb, void *cbarg) {
-       isc_loop_t *loop = isc_loop_current(loopmgr);
-       isc_job_t *job = isc__job_new(loop, cb, cbarg);
-       isc__job_init(loop, job);
-       isc__job_run(job);
+isc_job_run(isc_loop_t *loop, isc_job_t *job, isc_job_cb cb, void *cbarg) {
+       if (ISC_LIST_EMPTY(loop->run_jobs)) {
+               uv_idle_start(&loop->run_trigger, isc__job_cb);
+       }
+
+       job->cb = cb;
+       job->cbarg = cbarg;
+
+       ISC_LIST_APPEND(loop->run_jobs, job, link);
 }
 
 /*
  * Protected: #include <job_p.h>
  */
 
-isc_job_t *
-isc__job_new(isc_loop_t *loop, isc_job_cb cb, void *cbarg) {
-       isc_job_t *job = NULL;
-
-       REQUIRE(VALID_LOOP(loop));
-       REQUIRE(cb != NULL);
-
-       job = isc_mem_get(loop->mctx, sizeof(*job));
-       *job = (isc_job_t){
-               .magic = JOB_MAGIC,
-               .cb = cb,
-               .cbarg = cbarg,
-               .link = ISC_LINK_INITIALIZER,
-       };
-
-       isc_loop_attach(loop, &job->loop);
-
-       return (job);
-}
-
 void
-isc__job_init(isc_loop_t *loop, isc_job_t *job) {
-       int r = uv_idle_init(&loop->loop, &job->idle);
-       UV_RUNTIME_CHECK(uv_idle_init, r);
-       uv_handle_set_data(&job->idle, job);
+isc__job_cb(uv_idle_t *handle) {
+       isc_loop_t *loop = uv_handle_get_data(handle);
+       ISC_LIST(isc_job_t) jobs = ISC_LIST_INITIALIZER;
+
+       ISC_LIST_MOVE(jobs, loop->run_jobs);
+
+       isc_job_t *job, *next;
+       for (job = ISC_LIST_HEAD(jobs),
+           next = (job != NULL) ? ISC_LIST_NEXT(job, link) : NULL;
+            job != NULL;
+            job = next, next = job ? ISC_LIST_NEXT(job, link) : NULL)
+       {
+               ISC_LIST_UNLINK(jobs, job, link);
+               job->cb(job->cbarg);
+       }
+
+       if (ISC_LIST_EMPTY(loop->run_jobs)) {
+               uv_idle_stop(&loop->run_trigger);
+       }
 }
 
 void
-isc__job_run(isc_job_t *job) {
-       int r;
-
-       REQUIRE(VALID_JOB(job));
-       REQUIRE(job->loop == isc_loop_current(job->loop->loopmgr));
+isc__job_close(uv_handle_t *handle) {
+       isc_loop_t *loop = uv_handle_get_data(handle);
 
-       r = uv_idle_start(&job->idle, isc__job_cb);
-       UV_RUNTIME_CHECK(uv_idle_start, r);
+       isc__job_cb(&loop->run_trigger);
 }
index b021975cb5c30359502fe862beb313c42ba1a6fc..89fa07807714209095e9c96bf576a15ac69b0afc 100644 (file)
 
 #include <isc/job.h>
 #include <isc/loop.h>
+#include <isc/uv.h>
 
-isc_job_t *
-isc__job_new(isc_loop_t *loop, isc_job_cb cb, void *cbarg);
+typedef ISC_LIST(isc_job_t) isc_joblist_t;
 
 void
-isc__job_init(isc_loop_t *loop, isc_job_t *job);
+isc__job_cb(uv_idle_t *handle);
 
 void
-isc__job_run(isc_job_t *job);
+isc__job_close(uv_handle_t *handle);
index 61d28c3ba16acc5fa6603a53ef10b819ef5a3ab0..65250c14af42c2085eff48206d7e764b3e91ca30 100644 (file)
@@ -38,6 +38,7 @@
 #include <isc/uv.h>
 #include <isc/work.h>
 
+#include "async_p.h"
 #include "job_p.h"
 #include "loop_p.h"
 
@@ -140,8 +141,10 @@ static void
 destroy_cb(uv_async_t *handle) {
        isc_loop_t *loop = uv_handle_get_data(handle);
 
+       /* Again, the first close callback here is called last */
+       uv_close(&loop->async_trigger, isc__async_close);
+       uv_close(&loop->run_trigger, isc__job_close);
        uv_close(&loop->destroy_trigger, NULL);
-       uv_close(&loop->queue_trigger, NULL);
        uv_close(&loop->pause_trigger, NULL);
        uv_close(&loop->wakeup_trigger, NULL);
        uv_close(&loop->quiescent, NULL);
@@ -175,26 +178,11 @@ shutdown_cb(uv_async_t *handle) {
                isc_job_t *prev = ISC_LIST_PREV(job, link);
                ISC_LIST_UNLINK(loop->teardown_jobs, job, link);
 
-               isc__job_run(job);
+               job->cb(job->cbarg);
 
-               job = prev;
-       }
-}
-
-static void
-queue_cb(uv_async_t *handle) {
-       isc_loop_t *loop = uv_handle_get_data(handle);
-
-       REQUIRE(VALID_LOOP(loop));
-
-       ISC_STACK(isc_job_t) drain = ISC_ASTACK_TO_STACK(loop->queue_jobs);
-       isc_job_t *job = ISC_STACK_POP(drain, link);
+               isc_mem_put(loop->mctx, job, sizeof(*job));
 
-       while (job != NULL) {
-               isc__job_init(loop, job);
-               isc__job_run(job);
-
-               job = ISC_STACK_POP(drain, link);
+               job = prev;
        }
 }
 
@@ -209,7 +197,8 @@ loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid) {
        *loop = (isc_loop_t){
                .tid = tid,
                .loopmgr = loopmgr,
-               .queue_jobs = ISC_ASTACK_INITIALIZER,
+               .async_jobs = ISC_ASTACK_INITIALIZER,
+               .run_jobs = ISC_LIST_INITIALIZER,
                .setup_jobs = ISC_LIST_INITIALIZER,
                .teardown_jobs = ISC_LIST_INITIALIZER,
        };
@@ -225,9 +214,13 @@ loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid) {
        UV_RUNTIME_CHECK(uv_async_init, r);
        uv_handle_set_data(&loop->shutdown_trigger, loop);
 
-       r = uv_async_init(&loop->loop, &loop->queue_trigger, queue_cb);
+       r = uv_async_init(&loop->loop, &loop->async_trigger, isc__async_cb);
        UV_RUNTIME_CHECK(uv_async_init, r);
-       uv_handle_set_data(&loop->queue_trigger, loop);
+       uv_handle_set_data(&loop->async_trigger, loop);
+
+       r = uv_idle_init(&loop->loop, &loop->run_trigger);
+       UV_RUNTIME_CHECK(uv_idle_init, r);
+       uv_handle_set_data(&loop->run_trigger, loop);
 
        r = uv_async_init(&loop->loop, &loop->destroy_trigger, destroy_cb);
        UV_RUNTIME_CHECK(uv_async_init, r);
@@ -251,25 +244,31 @@ loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid) {
 }
 
 static void
-loop_run(isc_loop_t *loop) {
-       int r;
-       isc_job_t *job;
+setup_jobs_cb(void *arg) {
+       isc_loop_t *loop = arg;
+       isc_job_t *job = ISC_LIST_HEAD(loop->setup_jobs);
 
-       job = ISC_LIST_HEAD(loop->setup_jobs);
        while (job != NULL) {
                isc_job_t *next = ISC_LIST_NEXT(job, link);
                ISC_LIST_UNLINK(loop->setup_jobs, job, link);
 
-               isc__job_run(job);
+               job->cb(job->cbarg);
+
+               isc_mem_put(loop->mctx, job, sizeof(*job));
 
                job = next;
        }
+}
 
-       r = uv_prepare_start(&loop->quiescent, isc__qsbr_quiescent_cb);
+static void
+loop_run(isc_loop_t *loop) {
+       int r = uv_prepare_start(&loop->quiescent, isc__qsbr_quiescent_cb);
        UV_RUNTIME_CHECK(uv_prepare_start, r);
 
        isc_barrier_wait(&loop->loopmgr->starting);
 
+       isc_async_run(loop, setup_jobs_cb, loop);
+
        r = uv_run(&loop->loop, UV_RUN_DEFAULT);
        UV_RUNTIME_CHECK(uv_run, r);
 
@@ -281,7 +280,8 @@ loop_close(isc_loop_t *loop) {
        int r = uv_loop_close(&loop->loop);
        UV_RUNTIME_CHECK(uv_loop_close, r);
 
-       INSIST(ISC_ASTACK_EMPTY(loop->queue_jobs));
+       INSIST(ISC_ASTACK_EMPTY(loop->async_jobs));
+       INSIST(ISC_LIST_EMPTY(loop->run_jobs));
 
        loop->magic = 0;
 
@@ -382,53 +382,41 @@ isc_loopmgr_create(isc_mem_t *mctx, uint32_t nloops, isc_loopmgr_t **loopmgrp) {
 
 isc_job_t *
 isc_loop_setup(isc_loop_t *loop, isc_job_cb cb, void *cbarg) {
-       isc_loopmgr_t *loopmgr = NULL;
-       isc_job_t *job = NULL;
-
        REQUIRE(VALID_LOOP(loop));
        REQUIRE(cb != NULL);
 
-       loopmgr = loop->loopmgr;
+       isc_loopmgr_t *loopmgr = loop->loopmgr;
+       isc_job_t *job = isc_mem_get(loop->mctx, sizeof(*job));
+       *job = (isc_job_t){
+               .cb = cb,
+               .cbarg = cbarg,
+               .link = ISC_LINK_INITIALIZER,
+       };
 
        REQUIRE(loop->tid == isc_tid() || !atomic_load(&loopmgr->running) ||
                atomic_load(&loopmgr->paused));
 
-       job = isc__job_new(loop, cb, cbarg);
-       isc__job_init(loop, job);
-
-       /*
-        * The ISC_LIST_PREPEND is counterintuitive here, but actually, the
-        * uv_idle_start() puts the item on the HEAD of the internal list, so we
-        * want to store items here in reverse order, so on the uv_loop, they
-        * are scheduled in the correct order
-        */
-       ISC_LIST_PREPEND(loop->setup_jobs, job, link);
+       ISC_LIST_APPEND(loop->setup_jobs, job, link);
 
        return (job);
 }
 
 isc_job_t *
 isc_loop_teardown(isc_loop_t *loop, isc_job_cb cb, void *cbarg) {
-       isc_loopmgr_t *loopmgr = NULL;
-       isc_job_t *job = NULL;
-
        REQUIRE(VALID_LOOP(loop));
 
-       loopmgr = loop->loopmgr;
+       isc_loopmgr_t *loopmgr = loop->loopmgr;
+       isc_job_t *job = isc_mem_get(loop->mctx, sizeof(*job));
+       *job = (isc_job_t){
+               .cb = cb,
+               .cbarg = cbarg,
+               .link = ISC_LINK_INITIALIZER,
+       };
 
        REQUIRE(loop->tid == isc_tid() || !atomic_load(&loopmgr->running) ||
                atomic_load(&loopmgr->paused));
 
-       job = isc__job_new(loop, cb, cbarg);
-       isc__job_init(loop, job);
-
-       /*
-        * The ISC_LIST_PREPEND is counterintuitive here, but actually, the
-        * uv_idle_start() puts the item on the HEAD of the internal list, so we
-        * want to store items here in reverse order, so on the uv_loop, they
-        * are scheduled in the correct order
-        */
-       ISC_LIST_PREPEND(loop->teardown_jobs, job, link);
+       ISC_LIST_APPEND(loop->teardown_jobs, job, link);
 
        return (job);
 }
index 667f8510974c7d9538627dcf7c7e5155b55987cb..8810a36619776729443d6779a4535bde5c61b5e1 100644 (file)
@@ -16,6 +16,7 @@
 #include <inttypes.h>
 
 #include <isc/barrier.h>
+#include <isc/job.h>
 #include <isc/lang.h>
 #include <isc/loop.h>
 #include <isc/magic.h>
 #include <isc/uv.h>
 #include <isc/work.h>
 
+#include "async_p.h"
+#include "job_p.h"
+
 /*
  * Per-thread loop
  */
 #define LOOP_MAGIC    ISC_MAGIC('L', 'O', 'O', 'P')
 #define VALID_LOOP(t) ISC_MAGIC_VALID(t, LOOP_MAGIC)
 
-typedef ISC_LIST(isc_job_t) isc_joblist_t;
-typedef ISC_ASTACK(isc_job_t) isc_jobstack_t;
-
 struct isc_loop {
        int magic;
        isc_refcount_t references;
@@ -56,8 +57,12 @@ struct isc_loop {
        bool shuttingdown;
 
        /* Async queue */
-       uv_async_t queue_trigger;
-       isc_jobstack_t queue_jobs;
+       uv_async_t async_trigger;
+       isc_asyncstack_t async_jobs;
+
+       /* Jobs queue */
+       uv_idle_t run_trigger;
+       isc_joblist_t run_jobs;
 
        /* Pause */
        uv_async_t pause_trigger;
@@ -130,15 +135,6 @@ struct isc_signal {
 #define JOB_MAGIC    ISC_MAGIC('J', 'O', 'B', ' ')
 #define VALID_JOB(t) ISC_MAGIC_VALID(t, JOB_MAGIC)
 
-struct isc_job {
-       int magic;
-       uv_idle_t idle;
-       isc_loop_t *loop;
-       isc_job_cb cb;
-       void *cbarg;
-       ISC_LINK(isc_job_t) link;
-};
-
 /*
  * Work to be offloaded to an external thread.
  */
index 023fbec40bace2fedf4c782abf88114b94246cb6..1fdcec75284a06c343ede92220ddf6444e39da47 100644 (file)
@@ -252,7 +252,6 @@ typedef union {
        isc_nm_recv_cb_t recv;
        isc_nm_cb_t send;
        isc_nm_cb_t connect;
-       isc_nm_accept_cb_t accept;
 } isc__nm_cb_t;
 
 /*
@@ -278,7 +277,6 @@ struct isc__nm_uvreq {
        isc_nm_timer_t *timer; /* TCP write timer */
        int connect_tries;     /* connect retries */
        isc_result_t result;
-       uv_idle_t idle;
 
        union {
                uv_handle_t handle;
@@ -293,6 +291,8 @@ struct isc__nm_uvreq {
        } uv_req;
        ISC_LINK(isc__nm_uvreq_t) link;
        ISC_LINK(isc__nm_uvreq_t) active_link;
+
+       isc_job_t job;
 };
 
 /*
index aede4e92a8a50015a1da62780a352f58de9efc4b..9ba0874675007a72dd81ce3a0b407ead856d26d6 100644 (file)
@@ -120,9 +120,6 @@ nmsocket_maybe_destroy(isc_nmsocket_t *sock FLARG);
 static void
 nmhandle_free(isc_nmsocket_t *sock, isc_nmhandle_t *handle);
 
-static void
-uvreq_free(uv_handle_t *handle);
-
 /*%<
  * Issue a 'handle closed' callback on the socket.
  */
@@ -1021,8 +1018,8 @@ nmhandle_destroy(isc_nmhandle_t *handle) {
         * call it now asynchronously.
         */
        if (sock->closehandle_cb != NULL) {
-               isc_job_run(sock->worker->netmgr->loopmgr,
-                           isc__nm_closehandle_job, sock);
+               isc_async_run(sock->worker->loop, isc__nm_closehandle_job,
+                             sock);
        } else {
                isc___nmsocket_detach(&sock FLARG_PASS);
        }
@@ -1600,14 +1597,11 @@ isc___nm_uvreq_get(isc_nmsocket_t *sock FLARG) {
                .connect_tries = 3,
                .link = ISC_LINK_INITIALIZER,
                .active_link = ISC_LINK_INITIALIZER,
+               .job = ISC_JOB_INITIALIZER,
                .magic = UVREQ_MAGIC,
        };
        uv_handle_set_data(&req->uv_req.handle, req);
 
-       int r = uv_idle_init(&worker->loop->loop, &req->idle);
-       UV_RUNTIME_CHECK(uv_idle_init, r);
-       uv_handle_set_data(&req->idle, req);
-
        isc___nmsocket_attach(sock, &req->sock FLARG_PASS);
 
        ISC_LIST_APPEND(sock->active_uvreqs, req, active_link);
@@ -1615,16 +1609,6 @@ isc___nm_uvreq_get(isc_nmsocket_t *sock FLARG) {
        return (req);
 }
 
-static void
-uvreq_free(uv_handle_t *handle) {
-       isc__nm_uvreq_t *req = uv_handle_get_data(handle);
-       isc_nmsocket_t *sock = req->sock;
-
-       isc_mempool_put(sock->worker->uvreq_pool, req);
-
-       isc___nmsocket_detach(&sock FLARG_PASS);
-}
-
 void
 isc___nm_uvreq_put(isc__nm_uvreq_t **reqp FLARG) {
        REQUIRE(reqp != NULL && VALID_UVREQ(*reqp));
@@ -1644,7 +1628,9 @@ isc___nm_uvreq_put(isc__nm_uvreq_t **reqp FLARG) {
                isc__nmhandle_detach(&handle FLARG_PASS);
        }
 
-       uv_close(&req->idle, uvreq_free);
+       isc_mempool_put(sock->worker->uvreq_pool, req);
+
+       isc___nmsocket_detach(&sock FLARG_PASS);
 }
 
 void
@@ -1838,12 +1824,10 @@ isc__nmsocket_barrier_init(isc_nmsocket_t *listener) {
 }
 
 static void
-isc___nm_connectcb(uv_idle_t *handle) {
-       isc__nm_uvreq_t *uvreq = uv_handle_get_data(handle);
+isc___nm_connectcb(void *arg) {
+       isc__nm_uvreq_t *uvreq = arg;
 
        uvreq->cb.connect(uvreq->handle, uvreq->result, uvreq->cbarg);
-
-       uv_idle_stop(handle);
        isc__nm_uvreq_put(&uvreq);
 }
 
@@ -1855,28 +1839,25 @@ isc__nm_connectcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
        REQUIRE(VALID_NMHANDLE(uvreq->handle));
        REQUIRE(uvreq->cb.connect != NULL);
 
+       uvreq->result = eresult;
+
        if (!async) {
-               uvreq->cb.connect(uvreq->handle, eresult, uvreq->cbarg);
-               isc__nm_uvreq_put(&uvreq);
+               isc___nm_connectcb(uvreq);
                return;
        }
 
-       uvreq->result = eresult;
-
-       uv_idle_start(&uvreq->idle, isc___nm_connectcb);
+       isc_job_run(sock->worker->loop, &uvreq->job, isc___nm_connectcb, uvreq);
 }
 
 static void
-isc___nm_readcb(uv_idle_t *handle) {
-       isc__nm_uvreq_t *uvreq = uv_handle_get_data(handle);
+isc___nm_readcb(void *arg) {
+       isc__nm_uvreq_t *uvreq = arg;
        isc_region_t region;
 
        region.base = (unsigned char *)uvreq->uvbuf.base;
        region.length = uvreq->uvbuf.len;
-
        uvreq->cb.recv(uvreq->handle, uvreq->result, &region, uvreq->cbarg);
 
-       uv_idle_stop(handle);
        isc__nm_uvreq_put(&uvreq);
 }
 
@@ -1887,29 +1868,21 @@ isc__nm_readcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
        REQUIRE(VALID_UVREQ(uvreq));
        REQUIRE(VALID_NMHANDLE(uvreq->handle));
 
-       if (!async) {
-               isc_region_t region;
-
-               region.base = (unsigned char *)uvreq->uvbuf.base;
-               region.length = uvreq->uvbuf.len;
+       uvreq->result = eresult;
 
-               uvreq->cb.recv(uvreq->handle, eresult, &region, uvreq->cbarg);
-               isc__nm_uvreq_put(&uvreq);
+       if (!async) {
+               isc___nm_readcb(uvreq);
                return;
        }
 
-       uvreq->result = eresult;
-
-       uv_idle_start(&uvreq->idle, isc___nm_readcb);
+       isc_job_run(sock->worker->loop, &uvreq->job, isc___nm_readcb, uvreq);
 }
 
 static void
-isc___nm_sendcb(uv_idle_t *handle) {
-       isc__nm_uvreq_t *uvreq = uv_handle_get_data(handle);
+isc___nm_sendcb(void *arg) {
+       isc__nm_uvreq_t *uvreq = arg;
 
        uvreq->cb.send(uvreq->handle, uvreq->result, uvreq->cbarg);
-
-       uv_idle_stop(handle);
        isc__nm_uvreq_put(&uvreq);
 }
 
@@ -1919,15 +1892,15 @@ isc__nm_sendcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
        REQUIRE(VALID_NMSOCK(sock));
        REQUIRE(VALID_UVREQ(uvreq));
        REQUIRE(VALID_NMHANDLE(uvreq->handle));
+
        uvreq->result = eresult;
 
        if (!async) {
-               uvreq->cb.send(uvreq->handle, uvreq->result, uvreq->cbarg);
-               isc__nm_uvreq_put(&uvreq);
+               isc___nm_sendcb(uvreq);
                return;
        }
 
-       uv_idle_start(&uvreq->idle, isc___nm_sendcb);
+       isc_job_run(sock->worker->loop, &uvreq->job, isc___nm_sendcb, uvreq);
 }
 
 static void
index 9bdf18107e89d9ee085312059a09dd11151e592b..2c770ef0fce4bf6c80c5495592106fdedae805c2 100644 (file)
@@ -702,7 +702,8 @@ doh_receive_send_reply_cb(isc_nmhandle_t *handle, isc_result_t eresult,
                        assert_true(eresult == ISC_R_SUCCESS);
                }
 
-               isc_job_run(loopmgr, doh_connect_thread, connect_nm);
+               isc_async_run(isc_loop_current(loopmgr), doh_connect_thread,
+                             connect_nm);
        }
        if (sends <= 0) {
                isc_loopmgr_shutdown(loopmgr);
index 2a8c049bed795ecf7fbc3311de374f96ff2701f1..9c3a9fff4a220722cb76b3ae47dd757de97b0da0 100644 (file)
@@ -39,37 +39,55 @@ static atomic_uint executed;
 
 #define MAX_EXECUTED 1000000
 
+struct test_arg {
+       isc_job_t job;
+       union {
+               int n;
+               void *ptr;
+       } arg;
+};
+
 static void
 shutdown_cb(void *arg) {
-       UNUSED(arg);
+       struct test_arg *ta = arg;
+
+       isc_mem_put(mctx, ta, sizeof(*ta));
 
        isc_loopmgr_shutdown(loopmgr);
 }
 
 static void
-job_cb(void *arg __attribute__((__unused__))) {
+job_cb(void *arg) {
+       struct test_arg *ta = arg;
        unsigned int n = atomic_fetch_add(&executed, 1);
 
        if (n <= MAX_EXECUTED) {
                atomic_fetch_add(&scheduled, 1);
-               isc_job_run(loopmgr, job_cb, loopmgr);
+               isc_job_run(isc_loop_current(loopmgr), &ta->job, job_cb, ta);
        } else {
-               isc_job_run(loopmgr, shutdown_cb, loopmgr);
+               isc_job_run(isc_loop_current(loopmgr), &ta->job, shutdown_cb,
+                           ta);
        }
 }
 
 static void
-job_run_cb(void *arg __attribute__((__unused__))) {
+job_run_cb(void *arg) {
+       struct test_arg *ta = arg;
        atomic_fetch_add(&scheduled, 1);
 
-       isc_job_run(loopmgr, job_cb, loopmgr);
+       if (arg == NULL) {
+               ta = isc_mem_get(mctx, sizeof(*ta));
+               *ta = (struct test_arg){ .job = ISC_JOB_INITIALIZER };
+       }
+
+       isc_job_run(isc_loop_current(loopmgr), &ta->job, job_cb, ta);
 }
 
 ISC_RUN_TEST_IMPL(isc_job_run) {
        atomic_init(&scheduled, 0);
        atomic_init(&executed, 0);
 
-       isc_loopmgr_setup(loopmgr, job_run_cb, loopmgr);
+       isc_loopmgr_setup(loopmgr, job_run_cb, NULL);
 
        isc_loopmgr_run(loopmgr);
 
@@ -77,12 +95,18 @@ ISC_RUN_TEST_IMPL(isc_job_run) {
 }
 
 static char string[32] = "";
-int n1 = 1, n2 = 2, n3 = 3, n4 = 4, n5 = 5;
+struct test_arg n1 = { .job = ISC_JOB_INITIALIZER, .arg.n = 1 };
+struct test_arg n2 = { .job = ISC_JOB_INITIALIZER, .arg.n = 2 };
+struct test_arg n3 = { .job = ISC_JOB_INITIALIZER, .arg.n = 3 };
+struct test_arg n4 = { .job = ISC_JOB_INITIALIZER, .arg.n = 4 };
+struct test_arg n5 = { .job = ISC_JOB_INITIALIZER, .arg.n = 5 };
 
 static void
 append(void *arg) {
+       struct test_arg *ta = arg;
+
        char value[32];
-       sprintf(value, "%d", *(int *)arg);
+       sprintf(value, "%d", ta->arg.n);
        strlcat(string, value, 10);
 }
 
@@ -90,12 +114,12 @@ static void
 job_multiple(void *arg) {
        UNUSED(arg);
 
-       /* These will be processed in reverse order */
-       isc_job_run(loopmgr, append, &n1);
-       isc_job_run(loopmgr, append, &n2);
-       isc_job_run(loopmgr, append, &n3);
-       isc_job_run(loopmgr, append, &n4);
-       isc_job_run(loopmgr, append, &n5);
+       /* These will be processed in normal order */
+       isc_job_run(mainloop, &n1.job, append, &n1);
+       isc_job_run(mainloop, &n2.job, append, &n2);
+       isc_job_run(mainloop, &n3.job, append, &n3);
+       isc_job_run(mainloop, &n4.job, append, &n4);
+       isc_job_run(mainloop, &n5.job, append, &n5);
        isc_loopmgr_shutdown(loopmgr);
 }
 
@@ -103,7 +127,7 @@ ISC_RUN_TEST_IMPL(isc_job_multiple) {
        string[0] = '\0';
        isc_loop_setup(isc_loop_main(loopmgr), job_multiple, loopmgr);
        isc_loopmgr_run(loopmgr);
-       assert_string_equal(string, "54321");
+       assert_string_equal(string, "12345");
 }
 
 ISC_TEST_LIST_START
index 27f118eb5ceab700f380896c3ff32cac5849c2b9..d863309c51130679684194e2914b2f96c859d458 100644 (file)
@@ -66,11 +66,12 @@ ISC_RUN_TEST_IMPL(isc_loopmgr) {
 
 static void
 runjob(void *arg __attribute__((__unused__))) {
+       isc_async_run(isc_loop_current(loopmgr), count, loopmgr);
+
        if (isc_tid() == 0) {
-               isc_job_run(loopmgr, shutdown_loopmgr, loopmgr);
+               isc_async_run(isc_loop_current(loopmgr), shutdown_loopmgr,
+                             loopmgr);
        }
-
-       isc_job_run(loopmgr, count, loopmgr);
 }
 
 ISC_RUN_TEST_IMPL(isc_loopmgr_runjob) {
index ce8aff3b65c2b21612c2e4ae53125db659cd0f9e..ad44777918ae1a12b03663459ef6c7e9a0f9e0bb 100644 (file)
@@ -368,9 +368,10 @@ connect_connect_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
        if (have_expected_cconnects(atomic_fetch_add(&cconnects, 1) + 1)) {
                do_cconnects_shutdown(loopmgr);
        } else if (do_send) {
-               isc_job_run(loopmgr, stream_recv_send_connect,
-                           (cbarg == NULL ? get_stream_connect_function()
-                                          : (stream_connect_function)cbarg));
+               isc_async_run(isc_loop_current(loopmgr),
+                             stream_recv_send_connect,
+                             (cbarg == NULL ? get_stream_connect_function()
+                                            : (stream_connect_function)cbarg));
        }
 
        isc_refcount_increment0(&active_creads);
index c086bbd1268ebf4e5fe97ede50f4a932f9c323f6..54d4489a8a1e56520e8941f92b21d7888ae168d3 100644 (file)
@@ -540,7 +540,8 @@ ISC_TEARDOWN_TEST_IMPL(udp_shutdown_connect) {
 ISC_LOOP_TEST_IMPL(udp_shutdown_connect) {
        isc_loopmgr_shutdown(loopmgr);
        isc_refcount_increment0(&active_cconnects);
-       isc_job_run(loopmgr, udp_connect_udpconnect, netmgr);
+       isc_async_run(isc_loop_current(loopmgr), udp_connect_udpconnect,
+                     netmgr);
 }
 
 static void
@@ -864,7 +865,8 @@ udp__connect_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
                {
                        do_cconnects_shutdown(loopmgr);
                } else if (do_send) {
-                       isc_job_run(loopmgr, udp__connect, cbarg);
+                       isc_async_run(isc_loop_current(loopmgr), udp__connect,
+                                     cbarg);
                }
 
                isc_refcount_increment0(&active_creads);