#include <isc/log.h>
#include <isc/md.h>
#include <isc/mem.h>
+#include <isc/netmgr.h>
#ifdef WIN32
#include <isc/ntpaths.h>
#endif /* ifdef WIN32 */
dns_namelist_t namelist;
unsigned int resopt;
isc_appctx_t *actx = NULL;
+ isc_nm_t *netmgr = NULL;
isc_taskmgr_t *taskmgr = NULL;
isc_socketmgr_t *socketmgr = NULL;
isc_timermgr_t *timermgr = NULL;
isc_mem_create(&mctx);
CHECK(isc_appctx_create(mctx, &actx));
- CHECK(isc_taskmgr_create(mctx, 1, 0, NULL, &taskmgr));
+ netmgr = isc_nm_start(mctx, 1);
+ CHECK(isc_taskmgr_create(mctx, 0, netmgr, &taskmgr));
CHECK(isc_socketmgr_create(mctx, &socketmgr));
CHECK(isc_timermgr_create(mctx, &timermgr));
if (taskmgr != NULL) {
isc_taskmgr_destroy(&taskmgr);
}
+ if (netmgr != NULL) {
+ isc_nm_destroy(&netmgr);
+ }
if (timermgr != NULL) {
isc_timermgr_destroy(&timermgr);
}
netmgr = isc_nm_start(mctx, 1);
- result = isc_taskmgr_create(mctx, 1, 0, netmgr, &taskmgr);
+ result = isc_taskmgr_create(mctx, 0, netmgr, &taskmgr);
check_result(result, "isc_taskmgr_create");
result = isc_task_create(taskmgr, 0, &global_task);
static unsigned int nverified = 0, nverifyfailed = 0;
static const char *directory = NULL, *dsdir = NULL;
static isc_mutex_t namelock, statslock;
+static isc_nm_t *netmgr = NULL;
static isc_taskmgr_t *taskmgr = NULL;
static dns_db_t *gdb; /* The database */
static dns_dbversion_t *gversion; /* The database version */
print_time(outfp);
print_version(outfp);
- result = isc_taskmgr_create(mctx, ntasks, 0, NULL, &taskmgr);
+ netmgr = isc_nm_start(mctx, ntasks);
+
+ result = isc_taskmgr_create(mctx, 0, netmgr, &taskmgr);
if (result != ISC_R_SUCCESS) {
fatal("failed to create task manager: %s",
isc_result_totext(result));
isc_task_detach(&tasks[i]);
}
isc_taskmgr_destroy(&taskmgr);
+ isc_nm_destroy(&netmgr);
isc_mem_put(mctx, tasks, ntasks * sizeof(isc_task_t *));
postsign();
TIME_NOW(&sign_finish);
return (ISC_R_UNEXPECTED);
}
- result = isc_taskmgr_create(named_g_mctx, named_g_cpus, 0, named_g_nm,
+ result = isc_taskmgr_create(named_g_mctx, 0, named_g_nm,
&named_g_taskmgr);
if (result != ISC_R_SUCCESS) {
UNEXPECTED_ERROR(__FILE__, __LINE__,
* isc_taskmgr_destroy() will block until all tasks have exited.
*/
isc_taskmgr_destroy(&named_g_taskmgr);
+ isc_nm_destroy(&named_g_nm);
isc_timermgr_destroy(&named_g_timermgr);
isc_socketmgr_destroy(&named_g_socketmgr);
-
- /*
- * At this point is safe to destroy the netmgr.
- */
- isc_nm_destroy(&named_g_nm);
}
static void
}
static isc_result_t
-load_zones(named_server_t *server, bool init, bool reconfig) {
+load_zones(named_server_t *server, bool reconfig) {
isc_result_t result;
dns_view_t *view;
ns_zoneload_t *zl;
if (isc_refcount_decrement(&zl->refs) == 1) {
isc_refcount_destroy(&zl->refs);
isc_mem_put(server->mctx, zl, sizeof(*zl));
- } else if (init) {
- /*
- * Place the task manager into privileged mode. This
- * ensures that after we leave task-exclusive mode, no
- * other tasks will be able to run except for the ones
- * that are loading zones. (This should only be done during
- * the initial server setup; it isn't necessary during
- * a reload.)
- */
- isc_taskmgr_setprivilegedmode(named_g_taskmgr);
}
isc_task_endexclusive(server->task);
CHECKFATAL(load_configuration(named_g_conffile, server, true),
"loading configuration");
- CHECKFATAL(load_zones(server, true, false), "loading zones");
+ CHECKFATAL(load_zones(server, false), "loading zones");
#ifdef ENABLE_AFL
named_g_run_done = true;
#endif /* ifdef ENABLE_AFL */
CHECK(loadconfig(server));
- result = load_zones(server, false, false);
+ result = load_zones(server, false);
if (result == ISC_R_SUCCESS) {
isc_log_write(named_g_lctx, NAMED_LOGCATEGORY_GENERAL,
NAMED_LOGMODULE_SERVER, ISC_LOG_INFO,
CHECK(loadconfig(server));
- result = load_zones(server, false, true);
+ result = load_zones(server, true);
if (result == ISC_R_SUCCESS) {
isc_log_write(named_g_lctx, NAMED_LOGCATEGORY_GENERAL,
NAMED_LOGMODULE_SERVER, ISC_LOG_INFO,
static bool use_win2k_gsstsig = false;
static bool tried_other_gsstsig = false;
static bool local_only = false;
+static isc_nm_t *netmgr = NULL;
static isc_taskmgr_t *taskmgr = NULL;
static isc_task_t *global_task = NULL;
static isc_event_t *global_event = NULL;
result = isc_timermgr_create(gmctx, &timermgr);
check_result(result, "dns_timermgr_create");
- result = isc_taskmgr_create(gmctx, 1, 0, NULL, &taskmgr);
+ netmgr = isc_nm_start(gmctx, 1);
+
+ result = isc_taskmgr_create(gmctx, 0, netmgr, &taskmgr);
check_result(result, "isc_taskmgr_create");
result = isc_task_create(taskmgr, 0, &global_task);
ddebug("Shutting down task manager");
isc_taskmgr_destroy(&taskmgr);
+ ddebug("Shutting down network manager");
+ isc_nm_destroy(&netmgr);
+
ddebug("Destroying event");
isc_event_free(&global_event);
isc_mem_create(&rndc_mctx);
netmgr = isc_nm_start(rndc_mctx, 1);
DO("create task manager",
- isc_taskmgr_create(rndc_mctx, 1, 0, netmgr, &taskmgr));
+ isc_taskmgr_create(rndc_mctx, 0, netmgr, &taskmgr));
DO("create task", isc_task_create(taskmgr, 0, &rndc_task));
isc_log_create(rndc_mctx, &log, &logconfig);
isc_log_setcontext(log);
parallel.mk
/*.log
/*.trs
+/resolve
/run.sh
/run.log
/start.sh
isc_result_t result;
isc_log_t *lctx;
isc_logconfig_t *lcfg;
- isc_taskmgr_t *taskmgr;
+ isc_nm_t *netmgr = NULL;
+ isc_taskmgr_t *taskmgr = NULL;
isc_task_t *task;
isc_timermgr_t *timermgr;
isc_socketmgr_t *socketmgr;
RUNCHECK(dst_lib_init(mctx, NULL));
- taskmgr = NULL;
- RUNCHECK(isc_taskmgr_create(mctx, 1, 0, NULL, &taskmgr));
+ netmgr = isc_nm_start(mctx, 1);
+
+ RUNCHECK(isc_taskmgr_create(mctx, 0, netmgr, &taskmgr));
task = NULL;
RUNCHECK(isc_task_create(taskmgr, 0, &task));
timermgr = NULL;
isc_task_shutdown(task);
isc_task_detach(&task);
isc_taskmgr_destroy(&taskmgr);
+ isc_nm_destroy(&netmgr);
dst_lib_destroy();
#include <irs/resconf.h>
+/*
+ * Global contexts
+ */
+
+isc_mem_t *ctxs_mctx = NULL;
+isc_appctx_t *ctxs_actx = NULL;
+isc_nm_t *ctxs_netmgr = NULL;
+isc_taskmgr_t *ctxs_taskmgr = NULL;
+isc_socketmgr_t *ctxs_socketmgr = NULL;
+isc_timermgr_t *ctxs_timermgr = NULL;
+
+static void
+ctxs_destroy(void) {
+ if (ctxs_netmgr != NULL) {
+ isc_nm_closedown(ctxs_netmgr);
+ }
+
+ if (ctxs_taskmgr != NULL) {
+ isc_taskmgr_destroy(&ctxs_taskmgr);
+ }
+
+ if (ctxs_netmgr != NULL) {
+ isc_nm_destroy(&ctxs_netmgr);
+ }
+
+ if (ctxs_timermgr != NULL) {
+ isc_timermgr_destroy(&ctxs_timermgr);
+ }
+
+ if (ctxs_socketmgr != NULL) {
+ isc_socketmgr_destroy(&ctxs_socketmgr);
+ }
+
+ if (ctxs_actx != NULL) {
+ isc_appctx_destroy(&ctxs_actx);
+ }
+
+ if (ctxs_mctx != NULL) {
+ isc_mem_destroy(&ctxs_mctx);
+ }
+}
+
+static isc_result_t
+ctxs_init(void) {
+ isc_result_t result;
+
+ isc_mem_create(&ctxs_mctx);
+
+ result = isc_appctx_create(ctxs_mctx, &ctxs_actx);
+ if (result != ISC_R_SUCCESS) {
+ goto fail;
+ }
+
+ ctxs_netmgr = isc_nm_start(ctxs_mctx, 1);
+
+ result = isc_taskmgr_create(ctxs_mctx, 0, ctxs_netmgr, &ctxs_taskmgr);
+ if (result != ISC_R_SUCCESS) {
+ goto fail;
+ }
+
+ result = isc_socketmgr_create(ctxs_mctx, &ctxs_socketmgr);
+ if (result != ISC_R_SUCCESS) {
+ goto fail;
+ }
+
+ result = isc_timermgr_create(ctxs_mctx, &ctxs_timermgr);
+ if (result != ISC_R_SUCCESS) {
+ goto fail;
+ }
+
+ return (ISC_R_SUCCESS);
+
+fail:
+ ctxs_destroy();
+
+ return (result);
+}
+
static char *algname;
static isc_result_t
}
static void
-set_key(dns_client_t *client, char *keynamestr, char *keystr, bool is_sep,
- isc_mem_t **mctxp) {
+set_key(dns_client_t *client, char *keynamestr, char *keystr, bool is_sep) {
isc_result_t result;
dns_fixedname_t fkeyname;
unsigned int namelen;
isc_region_t r;
dns_secalg_t alg;
- isc_mem_create(mctxp);
-
if (algname != NULL) {
tr.base = algname;
tr.length = strlen(algname);
isc_sockaddr_t sa;
isc_sockaddrlist_t servers;
isc_result_t result;
- unsigned int namelen;
isc_buffer_t b;
dns_fixedname_t fname;
dns_name_t *name = NULL;
ISC_LIST_APPEND(servers, &sa, link);
if (name_space != NULL) {
- namelen = strlen(name_space);
+ unsigned int namelen = strlen(name_space);
isc_buffer_constinit(&b, name_space, namelen);
isc_buffer_add(&b, namelen);
name = dns_fixedname_initname(&fname);
dns_rdatatype_t type = dns_rdatatype_a;
dns_rdataset_t *rdataset;
dns_namelist_t namelist;
- isc_mem_t *keymctx = NULL;
unsigned int clientopt, resopt = 0;
bool is_sep = false;
const char *port = "53";
- isc_mem_t *mctx = NULL;
- isc_appctx_t *actx = NULL;
- isc_taskmgr_t *taskmgr = NULL;
- isc_socketmgr_t *socketmgr = NULL;
- isc_timermgr_t *timermgr = NULL;
struct in_addr in4;
struct in6_addr in6;
isc_sockaddr_t a4, a6;
exit(1);
}
- isc_mem_create(&mctx);
-
- result = isc_appctx_create(mctx, &actx);
- if (result != ISC_R_SUCCESS) {
- goto cleanup;
- }
- result = isc_app_ctxstart(actx);
- if (result != ISC_R_SUCCESS) {
- goto cleanup;
- }
- result = isc_taskmgr_create(mctx, 1, 0, NULL, &taskmgr);
- if (result != ISC_R_SUCCESS) {
- goto cleanup;
- }
- result = isc_socketmgr_create(mctx, &socketmgr);
- if (result != ISC_R_SUCCESS) {
- goto cleanup;
- }
- result = isc_timermgr_create(mctx, &timermgr);
+ result = ctxs_init();
if (result != ISC_R_SUCCESS) {
goto cleanup;
}
clientopt = 0;
- result = dns_client_create(mctx, actx, taskmgr, socketmgr, timermgr,
- clientopt, &client, addr4, addr6);
+ result = dns_client_create(ctxs_mctx, ctxs_actx, ctxs_taskmgr,
+ ctxs_socketmgr, ctxs_timermgr, clientopt,
+ &client, addr4, addr6);
if (result != ISC_R_SUCCESS) {
fprintf(stderr, "dns_client_create failed: %u, %s\n", result,
isc_result_totext(result));
irs_resconf_t *resconf = NULL;
isc_sockaddrlist_t *nameservers;
- result = irs_resconf_load(mctx, "/etc/resolv.conf", &resconf);
+ result = irs_resconf_load(ctxs_mctx, "/etc/resolv.conf",
+ &resconf);
if (result != ISC_R_SUCCESS && result != ISC_R_FILENOTFOUND) {
fprintf(stderr, "irs_resconf_load failed: %u\n",
result);
"while key name is provided\n");
exit(1);
}
- set_key(client, keynamestr, keystr, is_sep, &keymctx);
+ set_key(client, keynamestr, keystr, is_sep);
}
/* Construct qname */
/* Cleanup */
cleanup:
- dns_client_destroy(&client);
-
- if (taskmgr != NULL) {
- isc_taskmgr_destroy(&taskmgr);
- }
- if (timermgr != NULL) {
- isc_timermgr_destroy(&timermgr);
- }
- if (socketmgr != NULL) {
- isc_socketmgr_destroy(&socketmgr);
+ if (client != NULL) {
+ dns_client_destroy(&client);
}
- if (actx != NULL) {
- isc_appctx_destroy(&actx);
- }
- isc_mem_detach(&mctx);
- if (keynamestr != NULL) {
- isc_mem_destroy(&keymctx);
- }
+ ctxs_destroy();
dns_lib_shutdown();
return (0);
int
main(int argc, char *argv[]) {
char *ourkeyname;
- isc_taskmgr_t *taskmgr;
+ isc_nm_t *netmgr = NULL;
+ isc_taskmgr_t *taskmgr = NULL;
isc_timermgr_t *timermgr;
isc_socketmgr_t *socketmgr;
isc_socket_t *sock;
RUNCHECK(dst_lib_init(mctx, NULL));
- taskmgr = NULL;
- RUNCHECK(isc_taskmgr_create(mctx, 1, 0, NULL, &taskmgr));
+ netmgr = isc_nm_start(mctx, 1);
+
+ RUNCHECK(isc_taskmgr_create(mctx, 0, netmgr, &taskmgr));
task = NULL;
RUNCHECK(isc_task_create(taskmgr, 0, &task));
timermgr = NULL;
isc_task_shutdown(task);
isc_task_detach(&task);
isc_taskmgr_destroy(&taskmgr);
+ isc_nm_destroy(&netmgr);
isc_socket_detach(&sock);
isc_socketmgr_destroy(&socketmgr);
isc_timermgr_destroy(&timermgr);
#include <isc/hash.h>
#include <isc/log.h>
#include <isc/mem.h>
+#include <isc/netmgr.h>
#include <isc/print.h>
#include <isc/random.h>
#include <isc/sockaddr.h>
int
main(int argc, char **argv) {
char *keyname;
- isc_taskmgr_t *taskmgr;
+ isc_nm_t *netmgr;
+ isc_taskmgr_t *taskmgr = NULL;
isc_timermgr_t *timermgr;
isc_socketmgr_t *socketmgr;
isc_socket_t *sock;
RUNCHECK(dst_lib_init(mctx, NULL));
- taskmgr = NULL;
- RUNCHECK(isc_taskmgr_create(mctx, 1, 0, NULL, &taskmgr));
+ netmgr = isc_nm_start(mctx, 1);
+
+ RUNCHECK(isc_taskmgr_create(mctx, 0, netmgr, &taskmgr));
task = NULL;
RUNCHECK(isc_task_create(taskmgr, 0, &task));
timermgr = NULL;
isc_task_shutdown(task);
isc_task_detach(&task);
isc_taskmgr_destroy(&taskmgr);
+ isc_nm_destroy(&netmgr);
isc_socket_detach(&sock);
isc_socketmgr_destroy(&socketmgr);
isc_timermgr_destroy(&timermgr);
isc_sockaddr_t bind_any;
isc_log_t *lctx;
isc_logconfig_t *lcfg;
- isc_taskmgr_t *taskmgr;
+ isc_nm_t *netmgr = NULL;
+ isc_taskmgr_t *taskmgr = NULL;
isc_task_t *task;
isc_timermgr_t *timermgr;
isc_socketmgr_t *socketmgr;
fatal("can't choose between IPv4 and IPv6");
}
- taskmgr = NULL;
- RUNCHECK(isc_taskmgr_create(mctx, 1, 0, NULL, &taskmgr));
+ netmgr = isc_nm_start(mctx, 1);
+
+ RUNCHECK(isc_taskmgr_create(mctx, 0, netmgr, &taskmgr));
task = NULL;
RUNCHECK(isc_task_create(taskmgr, 0, &task));
timermgr = NULL;
isc_task_shutdown(task);
isc_task_detach(&task);
isc_taskmgr_destroy(&taskmgr);
+ isc_nm_destroy(&netmgr);
dst_lib_destroy();
Private header files, describing interfaces that are for internal use
within a library but not for public use, are kept in the source tree at the
same level as their related C files, and often have `"_p"` in their names,
-e.g. `lib/isc/task_p.h`.
+e.g. `lib/isc/mem_p.h`.
Header files that define modules should have a structure like the
following. Note that `<isc/lang.h>` MUST be included by any public header
isc_mem_t *dt_mctx = NULL;
isc_log_t *lctx = NULL;
+isc_nm_t *netmgr = NULL;
isc_taskmgr_t *taskmgr = NULL;
isc_task_t *maintask = NULL;
isc_timermgr_t *timermgr = NULL;
if (taskmgr != NULL) {
isc_taskmgr_destroy(&taskmgr);
}
+ if (netmgr != NULL) {
+ isc_nm_destroy(&netmgr);
+ }
if (timermgr != NULL) {
isc_timermgr_destroy(&timermgr);
}
isc_result_t result;
ncpus = isc_os_ncpus();
- CHECK(isc_taskmgr_create(dt_mctx, ncpus, 0, NULL, &taskmgr));
+ netmgr = isc_nm_start(dt_mctx, ncpus);
+ CHECK(isc_taskmgr_create(dt_mctx, 0, netmgr, &taskmgr));
CHECK(isc_timermgr_create(dt_mctx, &timermgr));
CHECK(isc_socketmgr_create(dt_mctx, &socketmgr));
CHECK(isc_task_create(taskmgr, 0, &maintask));
/* Create or resize the zone task pools. */
if (zmgr->zonetasks == NULL) {
result = isc_taskpool_create(zmgr->taskmgr, zmgr->mctx, ntasks,
- 2, &pool);
+ 2, false, &pool);
} else {
- result = isc_taskpool_expand(&zmgr->zonetasks, ntasks, &pool);
+ result = isc_taskpool_expand(&zmgr->zonetasks, ntasks, false,
+ &pool);
}
if (result == ISC_R_SUCCESS) {
zmgr->zonetasks = pool;
}
+ /*
+ * We always set all tasks in the zone-load task pool to
+ * privileged. This prevents other tasks in the system from
+ * running while the server task manager is in privileged
+ * mode.
+ */
pool = NULL;
if (zmgr->loadtasks == NULL) {
result = isc_taskpool_create(zmgr->taskmgr, zmgr->mctx, ntasks,
- 2, &pool);
+ 2, true, &pool);
} else {
- result = isc_taskpool_expand(&zmgr->loadtasks, ntasks, &pool);
+ result = isc_taskpool_expand(&zmgr->loadtasks, ntasks, true,
+ &pool);
}
if (result == ISC_R_SUCCESS) {
zmgr->loadtasks = pool;
}
- /*
- * We always set all tasks in the zone-load task pool to
- * privileged. This prevents other tasks in the system from
- * running while the server task manager is in privileged
- * mode.
- *
- * NOTE: If we start using task privileges for any other
- * part of the system than zone tasks, then this will need to be
- * revisted. In that case we'd want to turn on privileges for
- * zone tasks only when we were loading, and turn them off the
- * rest of the time. For now, however, it's okay to just
- * set it and forget it.
- */
- isc_taskpool_setprivilege(zmgr->loadtasks, true);
-
/* Create or resize the zone memory context pool. */
if (zmgr->mctxpool == NULL) {
result = isc_pool_create(zmgr->mctx, nmctx, mctxfree, mctxinit,
fsaccess_common_p.h \
lib_p.h \
mem_p.h \
- task_p.h \
tls_p.h
libisc_la_CPPFLAGS = \
REQUIRE(ctxp != NULL && *ctxp == NULL);
ctx = isc_mem_get(mctx, sizeof(*ctx));
+ *ctx = (isc_appctx_t){ .magic = 0 };
- ctx->magic = APPCTX_MAGIC;
-
- ctx->mctx = NULL;
isc_mem_attach(mctx, &ctx->mctx);
+ ctx->magic = APPCTX_MAGIC;
*ctxp = ctx;
isc_result_t
isc_nm_http_endpoint(isc_nmsocket_t *sock, const char *uri, isc_nm_recv_cb_t cb,
void *cbarg, size_t extrahandlesize);
+
+void
+isc_nm_task_enqueue(isc_nm_t *mgr, isc_task_t *task, int threadid);
+/*%<
+ * Enqueue the 'task' onto the netmgr ievents queue.
+ *
+ * Requires:
+ * \li 'mgr' is a valid netmgr object
+ * \li 'task' is a valid task
+ * \li 'threadid' is either the preferred netmgr tid or -1, in which case
+ * tid will be picked randomly. The threadid is capped (by modulo) to
+ * maximum number of 'workers' as specifed in isc_nm_start()
+ */
isc_result_t
isc_task_create(isc_taskmgr_t *manager, unsigned int quantum,
isc_task_t **taskp);
-
isc_result_t
isc_task_create_bound(isc_taskmgr_t *manager, unsigned int quantum,
isc_task_t **taskp, int threadid);
/*%<
- * Create a task.
+ * Create a task, optionally bound to a particular threadid.
*
* Notes:
*
*\li #ISC_R_SHUTTINGDOWN
*/
+isc_result_t
+isc_task_run(isc_task_t *task);
+/*%<
+ * Run all the queued events for the 'task', returning
+ * when the queue is empty or the number of events executed
+ * exceeds the 'quantum' specified when the task was created.
+ *
+ * Requires:
+ *
+ *\li 'task' is a valid task.
+ *
+ * Returns:
+ *
+ *\li #ISC_R_SUCCESS
+ *\li #ISC_R_QUOTA
+ */
+
void
isc_task_attach(isc_task_t *source, isc_task_t **targetp);
/*%<
*****/
isc_result_t
-isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
- unsigned int default_quantum, isc_nm_t *nm,
+isc_taskmgr_create(isc_mem_t *mctx, unsigned int default_quantum, isc_nm_t *nm,
isc_taskmgr_t **managerp);
/*%<
* Create a new task manager.
*
* Notes:
*
- *\li 'workers' in the number of worker threads to create. In general,
- * the value should be close to the number of processors in the system.
- * The 'workers' value is advisory only. An attempt will be made to
- * create 'workers' threads, but if at least one thread creation
- * succeeds, isc_taskmgr_create() may return ISC_R_SUCCESS.
- *
*\li If 'default_quantum' is non-zero, then it will be used as the default
* quantum value when tasks are created. If zero, then an implementation
* defined default quantum will be used.
*
*\li 'mctx' is a valid memory context.
*
- *\li workers > 0
- *
*\li managerp != NULL && *managerp == NULL
*
* Ensures:
*\li #ISC_R_NOMEMORY
*\li #ISC_R_NOTHREADS No threads could be created.
*\li #ISC_R_UNEXPECTED An unexpected error occurred.
- *\li #ISC_R_SHUTTINGDOWN The non-threaded, shared, task
+ *\li #ISC_R_SHUTTINGDOWN The non-threaded, shared, task
* manager shutting down.
*/
void
-isc_taskmgr_setprivilegedmode(isc_taskmgr_t *manager);
-
-isc_taskmgrmode_t
-isc_taskmgr_mode(isc_taskmgr_t *manager);
-/*%<
- * Set/get the current operating mode of the task manager. Valid modes are:
- *
- *\li isc_taskmgrmode_normal
- *\li isc_taskmgrmode_privileged
- *
- * In privileged execution mode, only tasks that have had the "privilege"
- * flag set via isc_task_setprivilege() can be executed. When all such
- * tasks are complete, the manager automatically returns to normal mode
- * and proceeds with running non-privileged ready tasks. This means it is
- * necessary to have at least one privileged task waiting on the ready
- * queue *before* setting the manager into privileged execution mode,
- * which in turn means the task which calls this function should be in
- * task-exclusive mode when it does so.
- *
- * Requires:
- *
- *\li 'manager' is a valid task manager.
- */
+isc_taskmgr_attach(isc_taskmgr_t *, isc_taskmgr_t **);
+void
+isc_taskmgr_detach(isc_taskmgr_t *);
void
isc_taskmgr_destroy(isc_taskmgr_t **managerp);
isc_result_t
isc_taskpool_create(isc_taskmgr_t *tmgr, isc_mem_t *mctx, unsigned int ntasks,
- unsigned int quantum, isc_taskpool_t **poolp);
+ unsigned int quantum, bool priv, isc_taskpool_t **poolp);
/*%<
* Create a task pool of "ntasks" tasks, each with quantum
* "quantum".
*/
isc_result_t
-isc_taskpool_expand(isc_taskpool_t **sourcep, unsigned int size,
+isc_taskpool_expand(isc_taskpool_t **sourcep, unsigned int size, bool priv,
isc_taskpool_t **targetp);
/*%<
* \li '*poolp' is a valid task pool.
*/
-void
-isc_taskpool_setprivilege(isc_taskpool_t *pool, bool priv);
-/*%<
- * Set the privilege flag on all tasks in 'pool' to 'priv'. If 'priv' is
- * true, then when the task manager is set into privileged mode, only
- * tasks wihin this pool will be able to execute. (Note: It is important
- * to turn the pool tasks' privilege back off before the last task finishes
- * executing.)
- *
- * Requires:
- * \li 'pool' is a valid task pool.
- */
-
ISC_LANG_ENDDECLS
#endif /* ISC_TASKPOOL_H */
#define ISC_NETMGR_TID_UNKNOWN -1
+/* Must be different from ISC_NETMGR_TID_UNKNOWN */
+#define ISC_NETMGR_NON_INTERLOCKED -2
+
#define ISC_NETMGR_TLSBUF_SIZE 65536
#if !defined(WIN32)
bool finished;
isc_thread_t thread;
isc_queue_t *ievents; /* incoming async events */
+ isc_queue_t *ievents_priv; /* privileged async tasks */
+ isc_queue_t *ievents_task; /* async tasks */
isc_queue_t *ievents_prio; /* priority async events
* used for listening etc.
* can be processed while
typedef enum isc__netievent_type {
netievent_udpconnect,
+ netievent_udpclose,
netievent_udpsend,
netievent_udpread,
netievent_udpstop,
netievent_udpcancel,
- netievent_udpclose,
netievent_tcpconnect,
+ netievent_tcpclose,
netievent_tcpsend,
netievent_tcpstartread,
netievent_tcppauseread,
netievent_tcpaccept,
netievent_tcpstop,
netievent_tcpcancel,
- netievent_tcpclose,
netievent_tcpdnsaccept,
netievent_tcpdnsconnect,
+ netievent_tcpdnsclose,
netievent_tcpdnssend,
netievent_tcpdnsread,
netievent_tcpdnscancel,
- netievent_tcpdnsclose,
netievent_tcpdnsstop,
netievent_tlsclose,
netievent_tlsdnsaccept,
netievent_tlsdnsconnect,
+ netievent_tlsdnsclose,
netievent_tlsdnssend,
netievent_tlsdnsread,
netievent_tlsdnscancel,
- netievent_tlsdnsclose,
netievent_tlsdnsstop,
netievent_tlsdnscycle,
netievent_tlsdnsshutdown,
+ netievent_httpclose,
netievent_httpstop,
netievent_httpsend,
- netievent_httpclose,
- netievent_close,
netievent_shutdown,
netievent_stop,
netievent_pause,
netievent_readcb,
netievent_sendcb,
+ netievent_task,
+ netievent_privilegedtask,
+
netievent_prio = 0xff, /* event type values higher than this
* will be treated as high-priority
* events, which can be processed
netievent_tlsdnslisten,
netievent_resume,
netievent_detach,
+ netievent_close,
+
} isc__netievent_type;
typedef union {
isc__nm_put_netievent(nm, ievent); \
}
+typedef struct isc__netievent__task {
+ isc__netievent_type type;
+ isc_task_t *task;
+} isc__netievent__task_t;
+
+#define NETIEVENT_TASK_TYPE(type) \
+ typedef isc__netievent__task_t isc__netievent_##type##_t;
+
+#define NETIEVENT_TASK_DECL(type) \
+ isc__netievent_##type##_t *isc__nm_get_netievent_##type( \
+ isc_nm_t *nm, isc_task_t *task); \
+ void isc__nm_put_netievent_##type(isc_nm_t *nm, \
+ isc__netievent_##type##_t *ievent);
+
+#define NETIEVENT_TASK_DEF(type) \
+ isc__netievent_##type##_t *isc__nm_get_netievent_##type( \
+ isc_nm_t *nm, isc_task_t *task) { \
+ isc__netievent_##type##_t *ievent = \
+ isc__nm_get_netievent(nm, netievent_##type); \
+ ievent->task = task; \
+ \
+ return (ievent); \
+ } \
+ \
+ void isc__nm_put_netievent_##type(isc_nm_t *nm, \
+ isc__netievent_##type##_t *ievent) { \
+ ievent->task = NULL; \
+ isc__nm_put_netievent(nm, ievent); \
+ }
+
typedef struct isc__netievent_udpsend {
NETIEVENT__SOCKET;
isc_sockaddr_t peer;
uint32_t nworkers;
isc_mutex_t lock;
isc_condition_t wkstatecond;
+ isc_condition_t wkpausecond;
isc__networker_t *workers;
isc_stats_t *stats;
uint_fast32_t workers_paused;
atomic_uint_fast32_t maxudp;
+ atomic_bool paused;
+
/*
* Active connections are being closed and new connections are
* no longer allowed.
* or pause, or we'll deadlock. We have to either re-enqueue our
* event or wait for the other one to finish if we want to pause.
*/
- atomic_bool interlocked;
+ atomic_int interlocked;
/*
* Timeout values for TCP connections, corresponding to
NETIEVENT_TYPE(shutdown);
NETIEVENT_TYPE(stop);
+NETIEVENT_TASK_TYPE(task);
+NETIEVENT_TASK_TYPE(privilegedtask);
+
/* Now declared the helper functions */
NETIEVENT_SOCKET_DECL(close);
NETIEVENT_DECL(shutdown);
NETIEVENT_DECL(stop);
+NETIEVENT_TASK_DECL(task);
+NETIEVENT_TASK_DECL(privilegedtask);
+
void
isc__nm_udp_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result);
void
#include <isc/sockaddr.h>
#include <isc/stats.h>
#include <isc/strerr.h>
+#include <isc/task.h>
#include <isc/thread.h>
#include <isc/tls.h>
#include <isc/util.h>
process_queue(isc__networker_t *worker, isc_queue_t *queue);
static bool
process_priority_queue(isc__networker_t *worker);
-static bool
-process_normal_queue(isc__networker_t *worker);
static void
-process_queues(isc__networker_t *worker);
+process_privilege_queue(isc__networker_t *worker);
+static void
+process_tasks_queue(isc__networker_t *worker);
+static void
+process_normal_queue(isc__networker_t *worker);
static void
isc__nm_async_stop(isc__networker_t *worker, isc__netievent_t *ev0);
isc_nm_t *mgr = NULL;
char name[32];
+ REQUIRE(workers > 0);
+
#ifdef WIN32
isc__nm_winsock_initialize();
#endif /* WIN32 */
isc_mem_attach(mctx, &mgr->mctx);
isc_mutex_init(&mgr->lock);
isc_condition_init(&mgr->wkstatecond);
+ isc_condition_init(&mgr->wkpausecond);
isc_refcount_init(&mgr->references, 1);
atomic_init(&mgr->maxudp, 0);
- atomic_init(&mgr->interlocked, false);
+ atomic_init(&mgr->interlocked, ISC_NETMGR_NON_INTERLOCKED);
#ifdef NETMGR_TRACE
ISC_LIST_INIT(mgr->active_sockets);
isc_condition_init(&worker->cond);
worker->ievents = isc_queue_new(mgr->mctx, 128);
+ worker->ievents_priv = isc_queue_new(mgr->mctx, 128);
+ worker->ievents_task = isc_queue_new(mgr->mctx, 128);
worker->ievents_prio = isc_queue_new(mgr->mctx, 128);
worker->recvbuf = isc_mem_get(mctx, ISC_NETMGR_RECVBUF_SIZE);
worker->sendbuf = isc_mem_get(mctx, ISC_NETMGR_SENDBUF_SIZE);
isc_mempool_put(mgr->evpool, ievent);
}
+ INSIST(isc_queue_dequeue(worker->ievents_priv) ==
+ (uintptr_t)NULL);
+ INSIST(isc_queue_dequeue(worker->ievents_task) ==
+ (uintptr_t)NULL);
+
while ((ievent = (isc__netievent_t *)isc_queue_dequeue(
worker->ievents_prio)) != NULL)
{
INSIST(r == 0);
isc_queue_destroy(worker->ievents);
+ isc_queue_destroy(worker->ievents_priv);
+ isc_queue_destroy(worker->ievents_task);
isc_queue_destroy(worker->ievents_prio);
isc_mutex_destroy(&worker->lock);
isc_condition_destroy(&worker->cond);
}
isc_condition_destroy(&mgr->wkstatecond);
+ isc_condition_destroy(&mgr->wkpausecond);
isc_mutex_destroy(&mgr->lock);
isc_mempool_destroy(&mgr->evpool);
void
isc_nm_pause(isc_nm_t *mgr) {
REQUIRE(VALID_NM(mgr));
- REQUIRE(!isc__nm_in_netthread());
+ uint_fast32_t pausing = 0;
+ REQUIRE(!atomic_load(&mgr->paused));
isc__nm_acquire_interlocked_force(mgr);
for (size_t i = 0; i < mgr->nworkers; i++) {
isc__networker_t *worker = &mgr->workers[i];
- isc__netievent_resume_t *event =
- isc__nm_get_netievent_pause(mgr);
- isc__nm_enqueue_ievent(worker, (isc__netievent_t *)event);
+ if (i != (size_t)isc_nm_tid()) {
+ isc__netievent_resume_t *event =
+ isc__nm_get_netievent_pause(mgr);
+ pausing++;
+ isc__nm_enqueue_ievent(worker,
+ (isc__netievent_t *)event);
+ } else {
+ isc__nm_async_pause(worker, NULL);
+ }
}
LOCK(&mgr->lock);
- while (mgr->workers_paused != mgr->workers_running) {
+ while (mgr->workers_paused != pausing) {
WAIT(&mgr->wkstatecond, &mgr->lock);
}
+ REQUIRE(atomic_compare_exchange_strong(&mgr->paused, &(bool){ false },
+ true));
UNLOCK(&mgr->lock);
}
void
isc_nm_resume(isc_nm_t *mgr) {
REQUIRE(VALID_NM(mgr));
- REQUIRE(!isc__nm_in_netthread());
+ REQUIRE(atomic_load(&mgr->paused));
for (size_t i = 0; i < mgr->nworkers; i++) {
isc__networker_t *worker = &mgr->workers[i];
- isc__netievent_resume_t *event =
- isc__nm_get_netievent_resume(mgr);
- isc__nm_enqueue_ievent(worker, (isc__netievent_t *)event);
+ if (i != (size_t)isc_nm_tid()) {
+ isc__netievent_resume_t *event =
+ isc__nm_get_netievent_resume(mgr);
+ isc__nm_enqueue_ievent(worker,
+ (isc__netievent_t *)event);
+ } else {
+ isc__nm_async_resume(worker, NULL);
+ }
}
LOCK(&mgr->lock);
while (mgr->workers_paused != 0) {
WAIT(&mgr->wkstatecond, &mgr->lock);
}
+ REQUIRE(atomic_compare_exchange_strong(&mgr->paused, &(bool){ true },
+ false));
+ BROADCAST(&mgr->wkpausecond);
UNLOCK(&mgr->lock);
-
isc__nm_drop_interlocked(mgr);
}
isc_nm_destroy(isc_nm_t **mgr0) {
isc_nm_t *mgr = NULL;
int counter = 0;
- uint_fast32_t references;
REQUIRE(mgr0 != NULL);
REQUIRE(VALID_NM(*mgr0));
/*
* Wait for the manager to be dereferenced elsewhere.
*/
- while ((references = isc_refcount_current(&mgr->references)) > 1 &&
- counter++ < 1000)
- {
+ while (isc_refcount_current(&mgr->references) > 1 && counter++ < 1000) {
#ifdef WIN32
_sleep(10);
#else /* ifdef WIN32 */
}
#ifdef NETMGR_TRACE
- isc__nm_dump_active(mgr);
+ if (isc_refcount_current(&mgr->references) > 1) {
+ isc__nm_dump_active(mgr);
+ INSIST(0);
+ ISC_UNREACHABLE();
+ }
#endif
- INSIST(references == 1);
+ /*
+ * Now just patiently wait
+ */
+ while (isc_refcount_current(&mgr->references) > 1) {
+#ifdef WIN32
+ _sleep(10);
+#else /* ifdef WIN32 */
+ usleep(10000);
+#endif /* ifdef WIN32 */
+ }
/*
* Detach final reference.
/*
* nm_thread is a single worker thread, that runs uv_run event loop
* until asked to stop.
+ *
+ * There are four queues for asynchronous events:
+ *
+ * 1. priority queue - netievents on the priority queue are run even when
+ * the taskmgr enters exclusive mode and the netmgr is paused. This
+ * is needed to properly start listening on the interfaces, free
+ * resources on shutdown, or resume from a pause.
+ *
+ * 2. privileged task queue - only privileged tasks are queued here and
+ * this is the first queue that gets processed when network manager
+ * is unpaused using isc_nm_resume(). All netmgr workers need to
+ * clean the privileged task queue before they all proceed to normal
+ * operation. Both task queues are processed when the workers are
+ * shutting down.
+ *
+ * 3. task queue - only (traditional) tasks are scheduled here, and this
+ * queue and the privileged task queue are both processed when the
+ * netmgr workers are finishing. This is needed to process the task
+ * shutdown events.
+ *
+ * 4. normal queue - this is the queue with netmgr events, e.g. reading,
+ * sending, callbacks, etc.
*/
+
static isc_threadresult_t
nm_thread(isc_threadarg_t worker0) {
isc__networker_t *worker = (isc__networker_t *)worker0;
isc_thread_setaffinity(isc__nm_tid_v);
while (true) {
+ /*
+ * uv_run() runs async_cb() in a loop, which processes
+ * all four event queues until a "pause" or "stop" event
+ * is encountered. On pause, we process only priority and
+ * privileged events until resuming.
+ */
int r = uv_run(&worker->loop, UV_RUN_DEFAULT);
- /* There's always the async handle until we are done */
INSIST(r > 0 || worker->finished);
if (worker->paused) {
- LOCK(&worker->lock);
- /* We need to lock the worker first otherwise
- * isc_nm_resume() might slip in before WAIT() in the
- * while loop starts and the signal never gets delivered
- * and we are forever stuck in the paused loop.
- */
+ INSIST(atomic_load(&mgr->interlocked) != isc_nm_tid());
+ /*
+ * We need to lock the worker first; otherwise
+ * isc_nm_resume() might slip in before WAIT() in
+ * the while loop starts, then the signal never
+ * gets delivered and we are stuck forever in the
+ * paused loop.
+ */
+ LOCK(&worker->lock);
LOCK(&mgr->lock);
mgr->workers_paused++;
SIGNAL(&mgr->wkstatecond);
while (worker->paused) {
WAIT(&worker->cond, &worker->lock);
+ UNLOCK(&worker->lock);
(void)process_priority_queue(worker);
+ LOCK(&worker->lock);
}
LOCK(&mgr->lock);
mgr->workers_paused--;
SIGNAL(&mgr->wkstatecond);
UNLOCK(&mgr->lock);
-
UNLOCK(&worker->lock);
+
+ /*
+ * All workers must run the privileged event
+ * queue before we resume from pause.
+ */
+ process_privilege_queue(worker);
+
+ LOCK(&mgr->lock);
+ while (atomic_load(&mgr->paused)) {
+ WAIT(&mgr->wkpausecond, &mgr->lock);
+ }
+ UNLOCK(&mgr->lock);
}
if (r == 0) {
INSIST(!worker->finished);
/*
- * Empty the async queue.
+ * We've fully resumed from pause. Drain the normal
+ * asynchronous event queues before resuming the uv_run()
+ * loop. (This is not strictly necessary, it just ensures
+ * that all pending events are processed before another
+ * pause can slip in.)
*/
- process_queues(worker);
+ process_tasks_queue(worker);
+ process_normal_queue(worker);
}
+ /*
+ * We are shutting down. Process the task queues
+ * (they may include shutdown events) but do not process
+ * the netmgr event queue.
+ */
+ process_privilege_queue(worker);
+ process_tasks_queue(worker);
+
LOCK(&mgr->lock);
mgr->workers_running--;
SIGNAL(&mgr->wkstatecond);
}
/*
- * async_cb is a universal callback for 'async' events sent to event loop.
+ * async_cb() is a universal callback for 'async' events sent to event loop.
* It's the only way to safely pass data to the libuv event loop. We use a
- * single async event and a lockless queue of 'isc__netievent_t' structures
- * passed from other threads.
+ * single async event and a set of lockless queues of 'isc__netievent_t'
+ * structures passed from other threads.
*/
static void
async_cb(uv_async_t *handle) {
isc__networker_t *worker = (isc__networker_t *)handle->loop->data;
- process_queues(worker);
+
+ /*
+ * process_priority_queue() returns false when pausing or stopping,
+ * so we don't want to process the other queues in that case.
+ */
+ if (!process_priority_queue(worker)) {
+ return;
+ }
+
+ process_privilege_queue(worker);
+ process_tasks_queue(worker);
+ process_normal_queue(worker);
}
static void
worker->finished = true;
/* Close the async handler */
uv_close((uv_handle_t *)&worker->async, NULL);
- /* uv_stop(&worker->loop); */
}
static void
isc__nm_async_pause(isc__networker_t *worker, isc__netievent_t *ev0) {
UNUSED(ev0);
REQUIRE(worker->paused == false);
+
worker->paused = true;
uv_stop(&worker->loop);
}
isc__nm_async_resume(isc__networker_t *worker, isc__netievent_t *ev0) {
UNUSED(ev0);
REQUIRE(worker->paused == true);
+
worker->paused = false;
}
+void
+isc_nm_task_enqueue(isc_nm_t *nm, isc_task_t *task, int threadid) {
+ isc__netievent_t *event = NULL;
+ int tid;
+ isc__networker_t *worker = NULL;
+
+ if (threadid == -1) {
+ tid = (int)isc_random_uniform(nm->nworkers);
+ } else {
+ tid = threadid % nm->nworkers;
+ }
+
+ worker = &nm->workers[tid];
+
+ if (isc_task_privilege(task)) {
+ event = (isc__netievent_t *)
+ isc__nm_get_netievent_privilegedtask(nm, task);
+ } else {
+ event = (isc__netievent_t *)isc__nm_get_netievent_task(nm,
+ task);
+ }
+
+ isc__nm_enqueue_ievent(worker, event);
+}
+
+#define isc__nm_async_privilegedtask(worker, ev0) \
+ isc__nm_async_task(worker, ev0)
+
+static void
+isc__nm_async_task(isc__networker_t *worker, isc__netievent_t *ev0) {
+ isc__netievent_task_t *ievent = (isc__netievent_task_t *)ev0;
+ isc_result_t result;
+
+ UNUSED(worker);
+
+ result = isc_task_run(ievent->task);
+
+ switch (result) {
+ case ISC_R_QUOTA:
+ isc_nm_task_enqueue(worker->mgr, (isc_task_t *)ievent->task,
+ isc_nm_tid());
+ return;
+ case ISC_R_SUCCESS:
+ return;
+ default:
+ INSIST(0);
+ ISC_UNREACHABLE();
+ }
+}
+
static bool
process_priority_queue(isc__networker_t *worker) {
return (process_queue(worker, worker->ievents_prio));
}
-static bool
-process_normal_queue(isc__networker_t *worker) {
- return (process_queue(worker, worker->ievents));
+static void
+process_privilege_queue(isc__networker_t *worker) {
+ (void)process_queue(worker, worker->ievents_priv);
}
static void
-process_queues(isc__networker_t *worker) {
- if (!process_priority_queue(worker)) {
- return;
- }
- (void)process_normal_queue(worker);
+process_tasks_queue(isc__networker_t *worker) {
+ (void)process_queue(worker, worker->ievents_task);
+}
+
+static void
+process_normal_queue(isc__networker_t *worker) {
+ (void)process_queue(worker, worker->ievents);
}
/*
/* Don't process more ievents when we are stopping */
NETIEVENT_CASE_NOMORE(stop);
+ NETIEVENT_CASE(privilegedtask);
+ NETIEVENT_CASE(task);
+
NETIEVENT_CASE(udpconnect);
NETIEVENT_CASE(udplisten);
NETIEVENT_CASE(udpstop);
NETIEVENT_CASE(shutdown);
NETIEVENT_CASE(resume);
NETIEVENT_CASE_NOMORE(pause);
-
default:
INSIST(0);
ISC_UNREACHABLE();
NETIEVENT_DEF(shutdown);
NETIEVENT_DEF(stop);
+NETIEVENT_TASK_DEF(task);
+NETIEVENT_TASK_DEF(privilegedtask);
+
void
isc__nm_maybe_enqueue_ievent(isc__networker_t *worker,
isc__netievent_t *event) {
isc_queue_enqueue(worker->ievents_prio, (uintptr_t)event);
SIGNAL(&worker->cond);
UNLOCK(&worker->lock);
+ } else if (event->type == netievent_privilegedtask) {
+ isc_queue_enqueue(worker->ievents_priv, (uintptr_t)event);
+ } else if (event->type == netievent_task) {
+ isc_queue_enqueue(worker->ievents_task, (uintptr_t)event);
} else {
isc_queue_enqueue(worker->ievents, (uintptr_t)event);
}
bool
isc__nm_acquire_interlocked(isc_nm_t *mgr) {
LOCK(&mgr->lock);
- bool success = atomic_compare_exchange_strong(&mgr->interlocked,
- &(bool){ false }, true);
+ bool success = atomic_compare_exchange_strong(
+ &mgr->interlocked, &(int){ ISC_NETMGR_NON_INTERLOCKED },
+ isc_nm_tid());
UNLOCK(&mgr->lock);
return (success);
}
void
isc__nm_drop_interlocked(isc_nm_t *mgr) {
LOCK(&mgr->lock);
- bool success = atomic_compare_exchange_strong(&mgr->interlocked,
- &(bool){ true }, false);
- INSIST(success);
+ int tid = atomic_exchange(&mgr->interlocked,
+ ISC_NETMGR_NON_INTERLOCKED);
+ INSIST(tid != ISC_NETMGR_NON_INTERLOCKED);
BROADCAST(&mgr->wkstatecond);
UNLOCK(&mgr->lock);
}
void
isc__nm_acquire_interlocked_force(isc_nm_t *mgr) {
LOCK(&mgr->lock);
- while (!atomic_compare_exchange_strong(&mgr->interlocked,
- &(bool){ false }, true))
+ while (!atomic_compare_exchange_strong(
+ &mgr->interlocked, &(int){ ISC_NETMGR_NON_INTERLOCKED },
+ isc_nm_tid()))
{
WAIT(&mgr->wkstatecond, &mgr->lock);
}
REQUIRE(csock->fd >= 0);
ievent = isc__nm_get_netievent_tcplisten(mgr, csock);
- isc__nm_enqueue_ievent(&mgr->workers[i],
- (isc__netievent_t *)ievent);
+ isc__nm_maybe_enqueue_ievent(&mgr->workers[i],
+ (isc__netievent_t *)ievent);
}
#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32)
return;
}
- /*
- * If network manager is interlocked, re-enqueue the event for later.
- */
- if (!isc__nm_acquire_interlocked(sock->mgr)) {
- enqueue_stoplistening(sock);
- } else {
- stop_tcp_parent(sock);
- isc__nm_drop_interlocked(sock->mgr);
- }
+ stop_tcp_parent(sock);
}
static void
static void
stop_tcp_child(isc_nmsocket_t *sock) {
+ bool last_child = false;
+
REQUIRE(sock->type == isc_nm_tcpsocket);
REQUIRE(sock->tid == isc_nm_tid());
LOCK(&sock->parent->lock);
sock->parent->rchildren -= 1;
+ last_child = (sock->parent->rchildren == 0);
UNLOCK(&sock->parent->lock);
- BROADCAST(&sock->parent->cond);
+
+ if (last_child) {
+ atomic_store(&sock->parent->closed, true);
+ isc__nmsocket_prep_destroy(sock->parent);
+ }
}
static void
atomic_store(&csock->active, false);
- if (csock->tid == isc_nm_tid()) {
- stop_tcp_child(csock);
- continue;
- }
-
ievent = isc__nm_get_netievent_tcpstop(sock->mgr, csock);
isc__nm_enqueue_ievent(&sock->mgr->workers[csock->tid],
(isc__netievent_t *)ievent);
}
-
- LOCK(&sock->lock);
- while (sock->rchildren > 0) {
- WAIT(&sock->cond, &sock->lock);
- }
- atomic_store(&sock->closed, true);
- UNLOCK(&sock->lock);
-
- isc__nmsocket_prep_destroy(sock);
}
static void
REQUIRE(csock->fd >= 0);
ievent = isc__nm_get_netievent_tcpdnslisten(mgr, csock);
- isc__nm_enqueue_ievent(&mgr->workers[i],
- (isc__netievent_t *)ievent);
+ isc__nm_maybe_enqueue_ievent(&mgr->workers[i],
+ (isc__netievent_t *)ievent);
}
#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32)
return;
}
- /*
- * If network manager is interlocked, re-enqueue the event for later.
- */
- if (!isc__nm_acquire_interlocked(sock->mgr)) {
- enqueue_stoplistening(sock);
- } else {
- stop_tcpdns_parent(sock);
- isc__nm_drop_interlocked(sock->mgr);
- }
+ stop_tcpdns_parent(sock);
}
void
static void
stop_tcpdns_child(isc_nmsocket_t *sock) {
+ bool last_child = false;
+
REQUIRE(sock->type == isc_nm_tcpdnssocket);
REQUIRE(sock->tid == isc_nm_tid());
LOCK(&sock->parent->lock);
sock->parent->rchildren -= 1;
+ last_child = (sock->parent->rchildren == 0);
UNLOCK(&sock->parent->lock);
- BROADCAST(&sock->parent->cond);
+
+ if (last_child) {
+ atomic_store(&sock->parent->closed, true);
+ isc__nmsocket_prep_destroy(sock->parent);
+ }
}
static void
atomic_store(&csock->active, false);
- if (csock->tid == isc_nm_tid()) {
- stop_tcpdns_child(csock);
- continue;
- }
-
ievent = isc__nm_get_netievent_tcpdnsstop(sock->mgr, csock);
isc__nm_enqueue_ievent(&sock->mgr->workers[csock->tid],
(isc__netievent_t *)ievent);
}
-
- LOCK(&sock->lock);
- while (sock->rchildren > 0) {
- WAIT(&sock->cond, &sock->lock);
- }
- atomic_store(&sock->closed, true);
- UNLOCK(&sock->lock);
-
- isc__nmsocket_prep_destroy(sock);
}
static void
REQUIRE(csock->fd >= 0);
ievent = isc__nm_get_netievent_tlsdnslisten(mgr, csock);
- isc__nm_enqueue_ievent(&mgr->workers[i],
- (isc__netievent_t *)ievent);
+ isc__nm_maybe_enqueue_ievent(&mgr->workers[i],
+ (isc__netievent_t *)ievent);
}
#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32)
return;
}
- /*
- * If network manager is interlocked, re-enqueue the event for
- * later.
- */
- if (!isc__nm_acquire_interlocked(sock->mgr)) {
- enqueue_stoplistening(sock);
- } else {
- stop_tlsdns_parent(sock);
- isc__nm_drop_interlocked(sock->mgr);
- }
+ stop_tlsdns_parent(sock);
}
void
static void
stop_tlsdns_child(isc_nmsocket_t *sock) {
+ bool last_child = false;
+
REQUIRE(sock->type == isc_nm_tlsdnssocket);
REQUIRE(sock->tid == isc_nm_tid());
LOCK(&sock->parent->lock);
sock->parent->rchildren -= 1;
+ last_child = (sock->parent->rchildren == 0);
UNLOCK(&sock->parent->lock);
- BROADCAST(&sock->parent->cond);
+
+ if (last_child) {
+ atomic_store(&sock->parent->closed, true);
+ isc__nmsocket_prep_destroy(sock->parent);
+ }
}
static void
atomic_store(&csock->active, false);
- if (csock->tid == isc_nm_tid()) {
- stop_tlsdns_child(csock);
- continue;
- }
-
ievent = isc__nm_get_netievent_tlsdnsstop(sock->mgr, csock);
isc__nm_enqueue_ievent(&sock->mgr->workers[csock->tid],
(isc__netievent_t *)ievent);
}
-
- LOCK(&sock->lock);
- while (sock->rchildren > 0) {
- WAIT(&sock->cond, &sock->lock);
- }
- atomic_store(&sock->closed, true);
- UNLOCK(&sock->lock);
-
- isc__nmsocket_prep_destroy(sock);
}
static void
REQUIRE(csock->fd >= 0);
ievent = isc__nm_get_netievent_udplisten(mgr, csock);
- isc__nm_enqueue_ievent(&mgr->workers[i],
- (isc__netievent_t *)ievent);
+ isc__nm_maybe_enqueue_ievent(&mgr->workers[i],
+ (isc__netievent_t *)ievent);
}
#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32)
/*
* If network manager is paused, re-enqueue the event for later.
*/
- if (!isc__nm_acquire_interlocked(sock->mgr)) {
- enqueue_stoplistening(sock);
- } else {
- stop_udp_parent(sock);
- isc__nm_drop_interlocked(sock->mgr);
- }
+ stop_udp_parent(sock);
}
/*
isc__nm_udp_send(isc_nmhandle_t *handle, const isc_region_t *region,
isc_nm_cb_t cb, void *cbarg) {
isc_nmsocket_t *sock = handle->sock;
- isc_nmsocket_t *psock = NULL, *rsock = sock;
+ isc_nmsocket_t *rsock = NULL;
isc_sockaddr_t *peer = &handle->peer;
isc__nm_uvreq_t *uvreq = NULL;
uint32_t maxudp = atomic_load(&sock->mgr->maxudp);
int ntid;
- uvreq = isc__nm_uvreq_get(sock->mgr, sock);
- uvreq->uvbuf.base = (char *)region->base;
- uvreq->uvbuf.len = region->length;
-
- isc_nmhandle_attach(handle, &uvreq->handle);
-
- uvreq->cb.send = cb;
- uvreq->cbarg = cbarg;
+ INSIST(sock->type == isc_nm_udpsocket);
/*
* We're simulating a firewall blocking UDP packets bigger than
* we need to do so here.
*/
if (maxudp != 0 && region->length > maxudp) {
- isc__nm_uvreq_put(&uvreq, sock);
- isc_nmhandle_detach(&handle); /* FIXME? */
+ isc_nmhandle_detach(&handle);
return;
}
- if (sock->type == isc_nm_udpsocket && !atomic_load(&sock->client)) {
+ if (atomic_load(&sock->client)) {
+ /*
+ * When we are sending from the client socket, we directly use
+ * the socket provided.
+ */
+ rsock = sock;
+ goto send;
+ } else {
+ /*
+ * When we are sending from the server socket, we either use the
+ * socket associated with the network thread we are in, or we
+ * use the thread from the socket associated with the handle.
+ */
INSIST(sock->parent != NULL);
- psock = sock->parent;
- } else if (sock->type == isc_nm_udplistener) {
- psock = sock;
- } else if (!atomic_load(&sock->client)) {
- INSIST(0);
- ISC_UNREACHABLE();
- }
- /*
- * If we're in the network thread, we can send directly. If the
- * handle is associated with a UDP socket, we can reuse its
- * thread (assuming CPU affinity). Otherwise, pick a thread at
- * random.
- */
- if (isc__nm_in_netthread()) {
- ntid = isc_nm_tid();
- } else if (sock->type == isc_nm_udpsocket &&
- !atomic_load(&sock->client)) {
- ntid = sock->tid;
- } else {
- ntid = (int)isc_random_uniform(sock->nchildren);
+ if (isc__nm_in_netthread()) {
+ ntid = isc_nm_tid();
+ } else {
+ ntid = sock->tid;
+ }
+ rsock = &sock->parent->children[ntid];
}
- if (psock != NULL) {
- rsock = &psock->children[ntid];
- }
+send:
+ uvreq = isc__nm_uvreq_get(rsock->mgr, rsock);
+ uvreq->uvbuf.base = (char *)region->base;
+ uvreq->uvbuf.len = region->length;
+
+ isc_nmhandle_attach(handle, &uvreq->handle);
+
+ uvreq->cb.send = cb;
+ uvreq->cbarg = cbarg;
if (isc_nm_tid() == rsock->tid) {
+ REQUIRE(rsock->tid == isc_nm_tid());
isc__netievent_udpsend_t ievent = { .sock = rsock,
.req = uvreq,
.peer = *peer };
REQUIRE(VALID_UVREQ(uvreq));
REQUIRE(VALID_NMHANDLE(uvreq->handle));
+ REQUIRE(sock->tid == isc_nm_tid());
if (status < 0) {
result = isc__nm_uverr2result(status);
REQUIRE(sock->type == isc_nm_udpsocket);
REQUIRE(sock->tid == isc_nm_tid());
+ bool last_child = false;
+
if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false },
true)) {
return;
LOCK(&sock->parent->lock);
sock->parent->rchildren -= 1;
+ last_child = (sock->parent->rchildren == 0);
UNLOCK(&sock->parent->lock);
- BROADCAST(&sock->parent->cond);
+
+ if (last_child) {
+ atomic_store(&sock->parent->closed, true);
+ isc__nmsocket_prep_destroy(sock->parent);
+ }
}
static void
atomic_store(&csock->active, false);
- if (csock->tid == isc_nm_tid()) {
- stop_udp_child(csock);
- continue;
- }
-
ievent = isc__nm_get_netievent_udpstop(sock->mgr, csock);
isc__nm_enqueue_ievent(&sock->mgr->workers[i],
(isc__netievent_t *)ievent);
}
-
- LOCK(&sock->lock);
- while (sock->rchildren > 0) {
- WAIT(&sock->cond, &sock->lock);
- }
- atomic_store(&sock->closed, true);
- UNLOCK(&sock->lock);
-
- isc__nmsocket_prep_destroy(sock);
}
static void
*/
#ifdef ISC_TASK_TRACE
-#define XTRACE(m) \
- fprintf(stderr, "task %p thread %" PRIuPTR ": %s\n", task, \
- isc_thread_self(), (m))
-#define XTTRACE(t, m) \
- fprintf(stderr, "task %p thread %" PRIuPTR ": %s\n", (t), \
- isc_thread_self(), (m))
-#define XTHREADTRACE(m) \
- fprintf(stderr, "thread %" PRIuPTR ": %s\n", isc_thread_self(), (m))
+#define XTRACE(m) \
+ fprintf(stderr, "task %p thread %zu: %s\n", task, isc_tid_v, (m))
+#define XTTRACE(t, m) \
+ fprintf(stderr, "task %p thread %zu: %s\n", (t), isc_tid_v, (m))
+#define XTHREADTRACE(m) fprintf(stderr, "thread %zu: %s\n", isc_tid_v, (m))
#else /* ifdef ISC_TASK_TRACE */
#define XTRACE(m)
#define XTTRACE(t, m)
#define TASK_MAGIC ISC_MAGIC('T', 'A', 'S', 'K')
#define VALID_TASK(t) ISC_MAGIC_VALID(t, TASK_MAGIC)
-typedef struct isc__taskqueue isc__taskqueue_t;
-
struct isc_task {
/* Not locked. */
unsigned int magic;
isc_taskmgr_t *manager;
isc_mutex_t lock;
+ int threadid;
/* Locked by task lock. */
task_state_t state;
int pause_cnt;
isc_time_t tnow;
char name[16];
void *tag;
- unsigned int threadid;
bool bound;
/* Protected by atomics */
atomic_uint_fast32_t flags;
/* Locked by task manager lock. */
LINK(isc_task_t) link;
- LINK(isc_task_t) ready_link;
- LINK(isc_task_t) ready_priority_link;
};
#define TASK_F_SHUTTINGDOWN 0x01
#define TASK_MANAGER_MAGIC ISC_MAGIC('T', 'S', 'K', 'M')
#define VALID_MANAGER(m) ISC_MAGIC_VALID(m, TASK_MANAGER_MAGIC)
-typedef ISC_LIST(isc_task_t) isc__tasklist_t;
-
-struct isc__taskqueue {
- /* Everything locked by lock */
- isc_mutex_t lock;
- isc__tasklist_t ready_tasks;
- isc__tasklist_t ready_priority_tasks;
- isc_condition_t work_available;
- isc_thread_t thread;
- unsigned int threadid;
- isc_taskmgr_t *manager;
-};
-
struct isc_taskmgr {
/* Not locked. */
unsigned int magic;
+ isc_refcount_t references;
isc_mem_t *mctx;
isc_mutex_t lock;
- isc_mutex_t halt_lock;
- isc_condition_t halt_cond;
- unsigned int workers;
atomic_uint_fast32_t tasks_running;
atomic_uint_fast32_t tasks_ready;
- atomic_uint_fast32_t curq;
atomic_uint_fast32_t tasks_count;
- isc__taskqueue_t *queues;
isc_nm_t *nm;
/* Locked by task manager lock. */
unsigned int default_quantum;
LIST(isc_task_t) tasks;
- atomic_uint_fast32_t mode;
- atomic_bool pause_req;
atomic_bool exclusive_req;
atomic_bool exiting;
isc_task_t *excl;
};
-void
-isc__taskmgr_pause(isc_taskmgr_t *manager0);
-void
-isc__taskmgr_resume(isc_taskmgr_t *manager0);
-
#define DEFAULT_DEFAULT_QUANTUM 25
#define FINISHED(m) \
(atomic_load_relaxed(&((m)->exiting)) && \
isc_taskmgr_setexcltask(isc_taskmgr_t *mgr, isc_task_t *task);
isc_result_t
isc_taskmgr_excltask(isc_taskmgr_t *mgr, isc_task_t **taskp);
-static inline bool
-empty_readyq(isc_taskmgr_t *manager, int c);
-
-static inline isc_task_t *
-pop_readyq(isc_taskmgr_t *manager, int c);
-
-static inline void
-push_readyq(isc_taskmgr_t *manager, isc_task_t *task, int c);
-
-static inline void
-wake_all_queues(isc_taskmgr_t *manager);
/***
*** Tasks.
***/
-static inline void
-wake_all_queues(isc_taskmgr_t *manager) {
- for (unsigned int i = 0; i < manager->workers; i++) {
- LOCK(&manager->queues[i].lock);
- BROADCAST(&manager->queues[i].work_available);
- UNLOCK(&manager->queues[i].lock);
- }
-}
-
static void
task_finished(isc_task_t *task) {
isc_taskmgr_t *manager = task->manager;
+ isc_mem_t *mctx = manager->mctx;
REQUIRE(EMPTY(task->events));
REQUIRE(task->nevents == 0);
REQUIRE(EMPTY(task->on_shutdown));
UNLINK(manager->tasks, task, link);
atomic_fetch_sub(&manager->tasks_count, 1);
UNLOCK(&manager->lock);
- if (FINISHED(manager)) {
- /*
- * All tasks have completed and the
- * task manager is exiting. Wake up
- * any idle worker threads so they
- * can exit.
- */
- wake_all_queues(manager);
- }
+
isc_mutex_destroy(&task->lock);
task->magic = 0;
- isc_mem_put(manager->mctx, task, sizeof(*task));
+ isc_mem_put(mctx, task, sizeof(*task));
+
+ isc_taskmgr_detach(manager);
}
isc_result_t
-isc_task_create(isc_taskmgr_t *manager0, unsigned int quantum,
+isc_task_create(isc_taskmgr_t *manager, unsigned int quantum,
isc_task_t **taskp) {
- return (isc_task_create_bound(manager0, quantum, taskp, -1));
+ return (isc_task_create_bound(manager, quantum, taskp, -1));
}
isc_result_t
-isc_task_create_bound(isc_taskmgr_t *manager0, unsigned int quantum,
+isc_task_create_bound(isc_taskmgr_t *manager, unsigned int quantum,
isc_task_t **taskp, int threadid) {
- isc_taskmgr_t *manager = (isc_taskmgr_t *)manager0;
isc_task_t *task;
bool exiting;
REQUIRE(VALID_MANAGER(manager));
REQUIRE(taskp != NULL && *taskp == NULL);
- task = isc_mem_get(manager->mctx, sizeof(*task));
XTRACE("isc_task_create");
- task->manager = manager;
+
+ task = isc_mem_get(manager->mctx, sizeof(*task));
+ *task = (isc_task_t){ 0 };
+
+ isc_taskmgr_attach(manager, &task->manager);
if (threadid == -1) {
/*
* randomly or specified by isc_task_sendto.
*/
task->bound = false;
- task->threadid = 0;
+ task->threadid = -1;
} else {
/*
* Task is pinned to a queue, it'll always be run
* by a specific thread.
*/
task->bound = true;
- task->threadid = threadid % manager->workers;
+ task->threadid = threadid;
}
isc_mutex_init(&task->lock);
memset(task->name, 0, sizeof(task->name));
task->tag = NULL;
INIT_LINK(task, link);
- INIT_LINK(task, ready_link);
- INIT_LINK(task, ready_priority_link);
exiting = false;
LOCK(&manager->lock);
static inline void
task_ready(isc_task_t *task) {
isc_taskmgr_t *manager = task->manager;
- bool has_privilege = isc_task_privilege(task);
-
REQUIRE(VALID_MANAGER(manager));
XTRACE("task_ready");
- LOCK(&manager->queues[task->threadid].lock);
- push_readyq(manager, task, task->threadid);
- if (atomic_load(&manager->mode) == isc_taskmgrmode_normal ||
- has_privilege) {
- SIGNAL(&manager->queues[task->threadid].work_available);
- }
- UNLOCK(&manager->queues[task->threadid].lock);
+ isc_nm_task_enqueue(manager->nm, task, task->threadid);
}
static inline bool
XTRACE("task_send");
+ if (task->bound) {
+ c = task->threadid;
+ } else if (c < 0) {
+ c = -1;
+ }
+
if (task->state == task_state_idle) {
was_idle = true;
task->threadid = c;
* some processing is deferred until after the lock is released.
*/
LOCK(&task->lock);
- /* If task is bound ignore provided cpu. */
- if (task->bound) {
- c = task->threadid;
- } else if (c < 0) {
- c = atomic_fetch_add_explicit(&task->manager->curq, 1,
- memory_order_relaxed);
- }
- c %= task->manager->workers;
was_idle = task_send(task, eventp, c);
UNLOCK(&task->lock);
XTRACE("isc_task_sendanddetach");
LOCK(&task->lock);
- if (task->bound) {
- c = task->threadid;
- } else if (c < 0) {
- c = atomic_fetch_add_explicit(&task->manager->curq, 1,
- memory_order_relaxed);
- }
- c %= task->manager->workers;
idle1 = task_send(task, eventp, c);
idle2 = task_detach(task);
UNLOCK(&task->lock);
XTRACE("isc_task_unsendrange");
- return (dequeue_events((isc_task_t *)task, sender, first, last, tag,
- events, false));
+ return (dequeue_events(task, sender, first, last, tag, events, false));
}
unsigned int
XTRACE("isc_task_unsend");
- return (dequeue_events((isc_task_t *)task, sender, type, type, tag,
- events, false));
+ return (dequeue_events(task, sender, type, type, tag, events, false));
}
isc_result_t
*** Task Manager.
***/
-/*
- * Return true if the current ready list for the manager, which is
- * either ready_tasks or the ready_priority_tasks, depending on whether
- * the manager is currently in normal or privileged execution mode.
- *
- * Caller must hold the task manager lock.
- */
-static inline bool
-empty_readyq(isc_taskmgr_t *manager, int c) {
- isc__tasklist_t queue;
-
- if (atomic_load_relaxed(&manager->mode) == isc_taskmgrmode_normal) {
- queue = manager->queues[c].ready_tasks;
- } else {
- queue = manager->queues[c].ready_priority_tasks;
- }
- return (EMPTY(queue));
-}
-
-/*
- * Dequeue and return a pointer to the first task on the current ready
- * list for the manager.
- * If the task is privileged, dequeue it from the other ready list
- * as well.
- *
- * Caller must hold the task manager lock.
- */
-static inline isc_task_t *
-pop_readyq(isc_taskmgr_t *manager, int c) {
- isc_task_t *task;
-
- if (atomic_load_relaxed(&manager->mode) == isc_taskmgrmode_normal) {
- task = HEAD(manager->queues[c].ready_tasks);
- } else {
- task = HEAD(manager->queues[c].ready_priority_tasks);
- }
-
- if (task != NULL) {
- DEQUEUE(manager->queues[c].ready_tasks, task, ready_link);
- if (ISC_LINK_LINKED(task, ready_priority_link)) {
- DEQUEUE(manager->queues[c].ready_priority_tasks, task,
- ready_priority_link);
- }
- }
-
- return (task);
-}
-
-/*
- * Push 'task' onto the ready_tasks queue. If 'task' has the privilege
- * flag set, then also push it onto the ready_priority_tasks queue.
- *
- * Caller must hold the task queue lock.
- */
-static inline void
-push_readyq(isc_taskmgr_t *manager, isc_task_t *task, int c) {
- if (ISC_LINK_LINKED(task, ready_link)) {
- return;
- }
- ENQUEUE(manager->queues[c].ready_tasks, task, ready_link);
- if (TASK_PRIVILEGED(task)) {
- ENQUEUE(manager->queues[c].ready_priority_tasks, task,
- ready_priority_link);
- }
- atomic_fetch_add_explicit(&manager->tasks_ready, 1,
- memory_order_acquire);
-}
-
-static void
-dispatch(isc_taskmgr_t *manager, unsigned int threadid) {
- isc_task_t *task;
-
- REQUIRE(VALID_MANAGER(manager));
+static isc_result_t
+task_run(isc_task_t *task) {
+ unsigned int dispatch_count = 0;
+ bool finished = false;
+ isc_event_t *event = NULL;
+ isc_result_t result = ISC_R_SUCCESS;
- /* Wait for everything to initialize */
- LOCK(&manager->lock);
- UNLOCK(&manager->lock);
+ REQUIRE(VALID_TASK(task));
+ LOCK(&task->lock);
/*
- * Again we're trying to hold the lock for as short a time as possible
- * and to do as little locking and unlocking as possible.
- *
- * In both while loops, the appropriate lock must be held before the
- * while body starts. Code which acquired the lock at the top of
- * the loop would be more readable, but would result in a lot of
- * extra locking. Compare:
- *
- * Straightforward:
- *
- * LOCK();
- * ...
- * UNLOCK();
- * while (expression) {
- * LOCK();
- * ...
- * UNLOCK();
- *
- * Unlocked part here...
- *
- * LOCK();
- * ...
- * UNLOCK();
- * }
- *
- * Note how if the loop continues we unlock and then immediately lock.
- * For N iterations of the loop, this code does 2N+1 locks and 2N+1
- * unlocks. Also note that the lock is not held when the while
- * condition is tested, which may or may not be important, depending
- * on the expression.
- *
- * As written:
- *
- * LOCK();
- * while (expression) {
- * ...
- * UNLOCK();
- *
- * Unlocked part here...
- *
- * LOCK();
- * ...
- * }
- * UNLOCK();
- *
- * For N iterations of the loop, this code does N+1 locks and N+1
- * unlocks. The while expression is always protected by the lock.
+ * It is possible because that we have a paused task in the queue - it
+ * might have been paused in the meantime and we never hold both queue
+ * and task lock to avoid deadlocks, just bail then.
*/
- LOCK(&manager->queues[threadid].lock);
+ if (task->state != task_state_ready) {
+ UNLOCK(&task->lock);
+ return (ISC_R_SUCCESS);
+ }
- while (!FINISHED(manager)) {
- /*
- * For reasons similar to those given in the comment in
- * isc_task_send() above, it is safe for us to dequeue
- * the task while only holding the manager lock, and then
- * change the task to running state while only holding the
- * task lock.
- *
- * If a pause has been requested, don't do any work
- * until it's been released.
- */
- while ((empty_readyq(manager, threadid) &&
- !atomic_load_relaxed(&manager->pause_req) &&
- !atomic_load_relaxed(&manager->exclusive_req)) &&
- !FINISHED(manager))
- {
- XTHREADTRACE("wait");
- XTHREADTRACE(atomic_load_relaxed(&manager->pause_req)
- ? "paused"
- : "notpaused");
- XTHREADTRACE(
- atomic_load_relaxed(&manager->exclusive_req)
- ? "excreq"
- : "notexcreq");
- WAIT(&manager->queues[threadid].work_available,
- &manager->queues[threadid].lock);
- XTHREADTRACE("awake");
- }
- XTHREADTRACE("working");
+ INSIST(task->state == task_state_ready);
+ task->state = task_state_running;
+ XTRACE("running");
+ XTRACE(task->name);
+ TIME_NOW(&task->tnow);
+ task->now = isc_time_seconds(&task->tnow);
- if (atomic_load_relaxed(&manager->pause_req) ||
- atomic_load_relaxed(&manager->exclusive_req))
- {
- UNLOCK(&manager->queues[threadid].lock);
- XTHREADTRACE("halting");
+ while (true) {
+ if (!EMPTY(task->events)) {
+ event = HEAD(task->events);
+ DEQUEUE(task->events, event, ev_link);
+ task->nevents--;
/*
- * Switching to exclusive mode is done as a
- * 2-phase-lock, checking if we have to switch is
- * done without any locks on pause_req and
- * exclusive_req to save time - the worst
- * thing that can happen is that we'll launch one
- * task more and exclusive task will be postponed a
- * bit.
- *
- * Broadcasting on halt_cond seems suboptimal, but
- * exclusive tasks are rare enough that we don't
- * care.
+ * Execute the event action.
*/
- LOCK(&manager->halt_lock);
- manager->halted++;
- BROADCAST(&manager->halt_cond);
- while (atomic_load_relaxed(&manager->pause_req) ||
- atomic_load_relaxed(&manager->exclusive_req))
- {
- WAIT(&manager->halt_cond, &manager->halt_lock);
+ XTRACE("execute action");
+ XTRACE(task->name);
+ if (event->ev_action != NULL) {
+ UNLOCK(&task->lock);
+ (event->ev_action)(task, event);
+ LOCK(&task->lock);
}
- manager->halted--;
- SIGNAL(&manager->halt_cond);
- UNLOCK(&manager->halt_lock);
-
- LOCK(&manager->queues[threadid].lock);
- /* Restart the loop after */
- continue;
+ XTRACE("execution complete");
+ dispatch_count++;
}
- task = pop_readyq(manager, threadid);
- if (task != NULL) {
- unsigned int dispatch_count = 0;
- bool done = false;
- bool requeue = false;
- bool finished = false;
- isc_event_t *event;
-
- INSIST(VALID_TASK(task));
-
+ if (isc_refcount_current(&task->references) == 0 &&
+ EMPTY(task->events) && !TASK_SHUTTINGDOWN(task))
+ {
/*
- * Note we only unlock the queue lock if we actually
- * have a task to do. We must reacquire the queue
- * lock before exiting the 'if (task != NULL)' block.
+ * There are no references and no pending events for
+ * this task, which means it will not become runnable
+ * again via an external action (such as sending an
+ * event or detaching).
+ *
+ * We initiate shutdown to prevent it from becoming a
+ * zombie.
+ *
+ * We do this here instead of in the "if
+ * EMPTY(task->events)" block below because:
+ *
+ * If we post no shutdown events, we want the task
+ * to finish.
+ *
+ * If we did post shutdown events, will still want
+ * the task's quantum to be applied.
*/
- UNLOCK(&manager->queues[threadid].lock);
- RUNTIME_CHECK(atomic_fetch_sub_explicit(
- &manager->tasks_ready, 1,
- memory_order_release) > 0);
- atomic_fetch_add_explicit(&manager->tasks_running, 1,
- memory_order_acquire);
-
- LOCK(&task->lock);
+ INSIST(!task_shutdown(task));
+ }
+
+ if (EMPTY(task->events)) {
/*
- * It is possible because that we have a paused task
- * in the queue - it might have been paused in the
- * meantime and we never hold both queue and task lock
- * to avoid deadlocks, just bail then.
+ * Nothing else to do for this task right now.
*/
- if (task->state != task_state_ready) {
- UNLOCK(&task->lock);
- LOCK(&manager->queues[threadid].lock);
- continue;
- }
- INSIST(task->state == task_state_ready);
- task->state = task_state_running;
- XTRACE("running");
- XTRACE(task->name);
- TIME_NOW(&task->tnow);
- task->now = isc_time_seconds(&task->tnow);
- do {
- if (!EMPTY(task->events)) {
- event = HEAD(task->events);
- DEQUEUE(task->events, event, ev_link);
- task->nevents--;
-
- /*
- * Execute the event action.
- */
- XTRACE("execute action");
- XTRACE(task->name);
- if (event->ev_action != NULL) {
- UNLOCK(&task->lock);
- (event->ev_action)(
- (isc_task_t *)task,
- event);
- LOCK(&task->lock);
- }
- XTRACE("execution complete");
- dispatch_count++;
- }
-
- if (isc_refcount_current(&task->references) ==
- 0 &&
- EMPTY(task->events) &&
- !TASK_SHUTTINGDOWN(task))
- {
- bool was_idle;
-
- /*
- * There are no references and no
- * pending events for this task,
- * which means it will not become
- * runnable again via an external
- * action (such as sending an event
- * or detaching).
- *
- * We initiate shutdown to prevent
- * it from becoming a zombie.
- *
- * We do this here instead of in
- * the "if EMPTY(task->events)" block
- * below because:
- *
- * If we post no shutdown events,
- * we want the task to finish.
- *
- * If we did post shutdown events,
- * will still want the task's
- * quantum to be applied.
- */
- was_idle = task_shutdown(task);
- INSIST(!was_idle);
- }
-
- if (EMPTY(task->events)) {
- /*
- * Nothing else to do for this task
- * right now.
- */
- XTRACE("empty");
- if (isc_refcount_current(
- &task->references) == 0 &&
- TASK_SHUTTINGDOWN(task)) {
- /*
- * The task is done.
- */
- XTRACE("done");
- finished = true;
- task->state = task_state_done;
- } else {
- if (task->state ==
- task_state_running) {
- task->state =
- task_state_idle;
- } else if (task->state ==
- task_state_pausing) {
- task->state =
- task_state_paused;
- }
- }
- done = true;
+ XTRACE("empty");
+ if (isc_refcount_current(&task->references) == 0 &&
+ TASK_SHUTTINGDOWN(task)) {
+ /*
+ * The task is done.
+ */
+ XTRACE("done");
+ finished = true;
+ task->state = task_state_done;
+ } else {
+ if (task->state == task_state_running) {
+ XTRACE("idling");
+ task->state = task_state_idle;
} else if (task->state == task_state_pausing) {
- /*
- * We got a pause request on this task,
- * stop working on it and switch the
- * state to paused.
- */
XTRACE("pausing");
task->state = task_state_paused;
- done = true;
- } else if (dispatch_count >= task->quantum) {
- /*
- * Our quantum has expired, but
- * there is more work to be done.
- * We'll requeue it to the ready
- * queue later.
- *
- * We don't check quantum until
- * dispatching at least one event,
- * so the minimum quantum is one.
- */
- XTRACE("quantum");
- task->state = task_state_ready;
- requeue = true;
- done = true;
}
- } while (!done);
- UNLOCK(&task->lock);
-
- if (finished) {
- task_finished(task);
}
-
- RUNTIME_CHECK(atomic_fetch_sub_explicit(
- &manager->tasks_running, 1,
- memory_order_release) > 0);
- LOCK(&manager->queues[threadid].lock);
- if (requeue) {
- /*
- * We know we're awake, so we don't have
- * to wakeup any sleeping threads if the
- * ready queue is empty before we requeue.
- *
- * A possible optimization if the queue is
- * empty is to 'goto' the 'if (task != NULL)'
- * block, avoiding the ENQUEUE of the task
- * and the subsequent immediate DEQUEUE
- * (since it is the only executable task).
- * We don't do this because then we'd be
- * skipping the exit_requested check. The
- * cost of ENQUEUE is low anyway, especially
- * when you consider that we'd have to do
- * an extra EMPTY check to see if we could
- * do the optimization. If the ready queue
- * were usually nonempty, the 'optimization'
- * might even hurt rather than help.
- */
- push_readyq(manager, task, threadid);
- }
- }
-
- /*
- * If we are in privileged execution mode and there are no
- * tasks remaining on the current ready queue, then
- * we're stuck. Automatically drop privileges at that
- * point and continue with the regular ready queue.
- */
- if (atomic_load_relaxed(&manager->mode) !=
- isc_taskmgrmode_normal &&
- atomic_load_explicit(&manager->tasks_running,
- memory_order_acquire) == 0)
- {
- UNLOCK(&manager->queues[threadid].lock);
- LOCK(&manager->lock);
+ break;
+ } else if (task->state == task_state_pausing) {
+ /*
+ * We got a pause request on this task, stop working on
+ * it and switch the state to paused.
+ */
+ XTRACE("pausing");
+ task->state = task_state_paused;
+ break;
+ } else if (dispatch_count >= task->quantum) {
/*
- * Check once again, under lock. Mode can only
- * change from privileged to normal anyway, and
- * if we enter this loop twice at the same time
- * we'll end up in a deadlock over queue locks.
+ * Our quantum has expired, but there is more work to be
+ * done. We'll requeue it to the ready queue later.
*
+ * We don't check quantum until dispatching at least one
+ * event, so the minimum quantum is one.
*/
- if (atomic_load(&manager->mode) !=
- isc_taskmgrmode_normal &&
- atomic_load_explicit(&manager->tasks_running,
- memory_order_acquire) == 0)
- {
- bool empty = true;
- unsigned int i;
- for (i = 0; i < manager->workers && empty; i++)
- {
- LOCK(&manager->queues[i].lock);
- empty &= empty_readyq(manager, i);
- UNLOCK(&manager->queues[i].lock);
- }
- if (empty) {
- atomic_store(&manager->mode,
- isc_taskmgrmode_normal);
- wake_all_queues(manager);
- }
- }
- UNLOCK(&manager->lock);
- LOCK(&manager->queues[threadid].lock);
+ XTRACE("quantum");
+ task->state = task_state_ready;
+ result = ISC_R_QUOTA;
+ break;
}
}
- UNLOCK(&manager->queues[threadid].lock);
- /*
- * There might be other dispatchers waiting on empty tasks,
- * wake them up.
- */
- wake_all_queues(manager);
-}
-
-static isc_threadresult_t
-#ifdef _WIN32
- WINAPI
-#endif /* ifdef _WIN32 */
- run(void *queuep) {
- isc__taskqueue_t *tq = queuep;
- isc_taskmgr_t *manager = tq->manager;
- int threadid = tq->threadid;
- isc_thread_setaffinity(threadid);
-
- XTHREADTRACE("starting");
-
- dispatch(manager, threadid);
+ UNLOCK(&task->lock);
- XTHREADTRACE("exiting");
+ if (finished) {
+ task_finished(task);
+ }
-#ifdef OPENSSL_LEAKS
- ERR_remove_state(0);
-#endif /* ifdef OPENSSL_LEAKS */
+ return (result);
+}
- return ((isc_threadresult_t)0);
+isc_result_t
+isc_task_run(isc_task_t *task) {
+ return (task_run(task));
}
static void
manager_free(isc_taskmgr_t *manager) {
- for (unsigned int i = 0; i < manager->workers; i++) {
- isc_mutex_destroy(&manager->queues[i].lock);
- isc_condition_destroy(&manager->queues[i].work_available);
- }
+ isc_refcount_destroy(&manager->references);
+ isc_nm_detach(&manager->nm);
+
isc_mutex_destroy(&manager->lock);
isc_mutex_destroy(&manager->excl_lock);
- isc_mutex_destroy(&manager->halt_lock);
- isc_condition_destroy(&manager->halt_cond);
- isc_mem_put(manager->mctx, manager->queues,
- manager->workers * sizeof(isc__taskqueue_t));
manager->magic = 0;
isc_mem_putanddetach(&manager->mctx, manager, sizeof(*manager));
}
+void
+isc_taskmgr_attach(isc_taskmgr_t *source, isc_taskmgr_t **targetp) {
+ REQUIRE(VALID_MANAGER(source));
+ REQUIRE(targetp != NULL && *targetp == NULL);
+
+ isc_refcount_increment(&source->references);
+
+ *targetp = source;
+}
+
+void
+isc_taskmgr_detach(isc_taskmgr_t *manager) {
+ REQUIRE(VALID_MANAGER(manager));
+
+ if (isc_refcount_decrement(&manager->references) == 1) {
+ manager_free(manager);
+ }
+}
+
isc_result_t
-isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
- unsigned int default_quantum, isc_nm_t *nm,
+isc_taskmgr_create(isc_mem_t *mctx, unsigned int default_quantum, isc_nm_t *nm,
isc_taskmgr_t **managerp) {
- unsigned int i;
isc_taskmgr_t *manager;
/*
* Create a new task manager.
*/
- REQUIRE(workers > 0);
REQUIRE(managerp != NULL && *managerp == NULL);
+ REQUIRE(nm != NULL);
manager = isc_mem_get(mctx, sizeof(*manager));
*manager = (isc_taskmgr_t){ .magic = TASK_MANAGER_MAGIC };
- atomic_store(&manager->mode, isc_taskmgrmode_normal);
isc_mutex_init(&manager->lock);
isc_mutex_init(&manager->excl_lock);
- isc_mutex_init(&manager->halt_lock);
- isc_condition_init(&manager->halt_cond);
-
- manager->workers = workers;
-
if (default_quantum == 0) {
default_quantum = DEFAULT_DEFAULT_QUANTUM;
}
}
INIT_LIST(manager->tasks);
- atomic_store(&manager->tasks_count, 0);
- manager->queues = isc_mem_get(mctx, workers * sizeof(isc__taskqueue_t));
- RUNTIME_CHECK(manager->queues != NULL);
-
+ atomic_init(&manager->tasks_count, 0);
atomic_init(&manager->tasks_running, 0);
atomic_init(&manager->tasks_ready, 0);
- atomic_init(&manager->curq, 0);
atomic_init(&manager->exiting, false);
atomic_store_relaxed(&manager->exclusive_req, false);
- atomic_store_relaxed(&manager->pause_req, false);
isc_mem_attach(mctx, &manager->mctx);
- LOCK(&manager->lock);
- /*
- * Start workers.
- */
- for (i = 0; i < workers; i++) {
- INIT_LIST(manager->queues[i].ready_tasks);
- INIT_LIST(manager->queues[i].ready_priority_tasks);
- isc_mutex_init(&manager->queues[i].lock);
- isc_condition_init(&manager->queues[i].work_available);
-
- manager->queues[i].manager = manager;
- manager->queues[i].threadid = i;
- isc_thread_create(run, &manager->queues[i],
- &manager->queues[i].thread);
- char name[21];
- snprintf(name, sizeof(name), "isc-worker%04u", i);
- isc_thread_setname(manager->queues[i].thread, name);
- }
- UNLOCK(&manager->lock);
+ isc_refcount_init(&manager->references, 1);
- isc_thread_setconcurrency(workers);
-
- *managerp = (isc_taskmgr_t *)manager;
+ *managerp = manager;
return (ISC_R_SUCCESS);
}
isc_taskmgr_destroy(isc_taskmgr_t **managerp) {
isc_taskmgr_t *manager;
isc_task_t *task;
- unsigned int i;
- bool exiting;
/*
* Destroy '*managerp'.
*/
REQUIRE(managerp != NULL);
- manager = (isc_taskmgr_t *)*managerp;
+ manager = *managerp;
REQUIRE(VALID_MANAGER(manager));
XTHREADTRACE("isc_taskmgr_destroy");
/*
* Make sure we only get called once.
*/
- exiting = false;
-
- INSIST(!!atomic_compare_exchange_strong(&manager->exiting, &exiting,
- true));
-
- /*
- * If privileged mode was on, turn it off.
- */
- atomic_store(&manager->mode, isc_taskmgrmode_normal);
+ INSIST(atomic_compare_exchange_strong(&manager->exiting,
+ &(bool){ false }, true));
/*
* Post shutdown event(s) to every task (if they haven't already been
- * posted). To make things easier post idle tasks to worker 0.
+ * posted).
*/
- LOCK(&manager->queues[0].lock);
for (task = HEAD(manager->tasks); task != NULL; task = NEXT(task, link))
{
LOCK(&task->lock);
if (task_shutdown(task)) {
task->threadid = 0;
- push_readyq(manager, task, 0);
+ task_ready(task);
}
UNLOCK(&task->lock);
}
- UNLOCK(&manager->queues[0].lock);
- /*
- * Wake up any sleeping workers. This ensures we get work done if
- * there's work left to do, and if there are already no tasks left
- * it will cause the workers to see manager->exiting.
- */
- wake_all_queues(manager);
UNLOCK(&manager->lock);
- /*
- * Wait for all the worker threads to exit.
- */
- for (i = 0; i < manager->workers; i++) {
- isc_thread_join(manager->queues[i].thread, NULL);
- }
-
- /*
- * Detach from the network manager if it was set.
- */
- if (manager->nm != NULL) {
- isc_nm_detach(&manager->nm);
- }
-
- manager_free(manager);
+ isc_taskmgr_detach(manager);
*managerp = NULL;
}
-void
-isc_taskmgr_setprivilegedmode(isc_taskmgr_t *manager0) {
- isc_taskmgr_t *manager = (isc_taskmgr_t *)manager0;
-
- atomic_store(&manager->mode, isc_taskmgrmode_privileged);
-}
-
-isc_taskmgrmode_t
-isc_taskmgr_mode(isc_taskmgr_t *manager0) {
- isc_taskmgr_t *manager = (isc_taskmgr_t *)manager0;
- return (atomic_load(&manager->mode));
-}
-
-void
-isc__taskmgr_pause(isc_taskmgr_t *manager0) {
- isc_taskmgr_t *manager = (isc_taskmgr_t *)manager0;
-
- LOCK(&manager->halt_lock);
- while (atomic_load_relaxed(&manager->exclusive_req) ||
- atomic_load_relaxed(&manager->pause_req))
- {
- UNLOCK(&manager->halt_lock);
- /* This is ugly but pause is used EXCLUSIVELY in tests */
- isc_thread_yield();
- LOCK(&manager->halt_lock);
- }
-
- atomic_store_relaxed(&manager->pause_req, true);
- while (manager->halted < manager->workers) {
- wake_all_queues(manager);
- WAIT(&manager->halt_cond, &manager->halt_lock);
- }
- UNLOCK(&manager->halt_lock);
-}
-
-void
-isc__taskmgr_resume(isc_taskmgr_t *manager0) {
- isc_taskmgr_t *manager = (isc_taskmgr_t *)manager0;
- LOCK(&manager->halt_lock);
- if (atomic_load(&manager->pause_req)) {
- atomic_store(&manager->pause_req, false);
- while (manager->halted > 0) {
- BROADCAST(&manager->halt_cond);
- WAIT(&manager->halt_cond, &manager->halt_lock);
- }
- }
- UNLOCK(&manager->halt_lock);
-}
-
void
isc_taskmgr_setexcltask(isc_taskmgr_t *mgr, isc_task_t *task) {
REQUIRE(VALID_MANAGER(mgr));
REQUIRE(VALID_TASK(task));
LOCK(&mgr->excl_lock);
if (mgr->excl != NULL) {
- isc_task_detach((isc_task_t **)&mgr->excl);
+ isc_task_detach(&mgr->excl);
}
- isc_task_attach(task, (isc_task_t **)&mgr->excl);
+ isc_task_attach(task, &mgr->excl);
UNLOCK(&mgr->excl_lock);
}
LOCK(&mgr->excl_lock);
if (mgr->excl != NULL) {
- isc_task_attach((isc_task_t *)mgr->excl, taskp);
+ isc_task_attach(mgr->excl, taskp);
} else {
result = ISC_R_NOTFOUND;
}
task->manager->excl == NULL));
UNLOCK(&manager->excl_lock);
- if (atomic_load_relaxed(&manager->exclusive_req) ||
- atomic_load_relaxed(&manager->pause_req))
+ if (!atomic_compare_exchange_strong(&manager->exclusive_req,
+ &(bool){ false }, true))
{
return (ISC_R_LOCKBUSY);
}
- LOCK(&manager->halt_lock);
- INSIST(!atomic_load_relaxed(&manager->exclusive_req) &&
- !atomic_load_relaxed(&manager->pause_req));
- atomic_store_relaxed(&manager->exclusive_req, true);
- while (manager->halted + 1 < manager->workers) {
- wake_all_queues(manager);
- WAIT(&manager->halt_cond, &manager->halt_lock);
- }
- UNLOCK(&manager->halt_lock);
- if (manager->nm != NULL) {
- isc_nm_pause(manager->nm);
- }
+ isc_nm_pause(manager->nm);
+
return (ISC_R_SUCCESS);
}
REQUIRE(task->state == task_state_running);
manager = task->manager;
- if (manager->nm != NULL) {
- isc_nm_resume(manager->nm);
- }
- LOCK(&manager->halt_lock);
- REQUIRE(atomic_load_relaxed(&manager->exclusive_req));
- atomic_store_relaxed(&manager->exclusive_req, false);
- while (manager->halted > 0) {
- BROADCAST(&manager->halt_cond);
- WAIT(&manager->halt_cond, &manager->halt_lock);
- }
- UNLOCK(&manager->halt_lock);
+ isc_nm_resume(manager->nm);
+ REQUIRE(atomic_compare_exchange_strong(&manager->exclusive_req,
+ &(bool){ true }, false));
}
void
task->pause_cnt++;
if (task->pause_cnt > 1) {
/*
- * Someone already paused this thread, just increase
+ * Someone already paused this task, just increase
* the number of pausing clients.
*/
UNLOCK(&task->lock);
void
isc_task_setprivilege(isc_task_t *task, bool priv) {
REQUIRE(VALID_TASK(task));
- isc_taskmgr_t *manager = task->manager;
uint_fast32_t oldflags, newflags;
oldflags = atomic_load_acquire(&task->flags);
}
} while (!atomic_compare_exchange_weak_acq_rel(&task->flags, &oldflags,
newflags));
-
- LOCK(&manager->queues[task->threadid].lock);
- if (priv && ISC_LINK_LINKED(task, ready_link)) {
- ENQUEUE(manager->queues[task->threadid].ready_priority_tasks,
- task, ready_priority_link);
- } else if (!priv && ISC_LINK_LINKED(task, ready_priority_link)) {
- DEQUEUE(manager->queues[task->threadid].ready_priority_tasks,
- task, ready_priority_link);
- }
- UNLOCK(&manager->queues[task->threadid].lock);
}
bool
}
bool
-isc_task_exiting(isc_task_t *t) {
- isc_task_t *task = (isc_task_t *)t;
+isc_task_exiting(isc_task_t *task) {
REQUIRE(VALID_TASK(task));
return (TASK_SHUTTINGDOWN(task));
TRY0(xmlTextWriterWriteString(writer, ISC_XMLCHAR "threaded"));
TRY0(xmlTextWriterEndElement(writer)); /* type */
- TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "worker-threads"));
- TRY0(xmlTextWriterWriteFormatString(writer, "%d", mgr->workers));
- TRY0(xmlTextWriterEndElement(writer)); /* worker-threads */
-
TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "default-quantum"));
TRY0(xmlTextWriterWriteFormatString(writer, "%d",
mgr->default_quantum));
CHECKMEM(obj);
json_object_object_add(tasks, "thread-model", obj);
- obj = json_object_new_int(mgr->workers);
- CHECKMEM(obj);
- json_object_object_add(tasks, "worker-threads", obj);
-
obj = json_object_new_int(mgr->default_quantum);
CHECKMEM(obj);
json_object_object_add(tasks, "default-quantum", obj);
+++ /dev/null
-/*
- * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
- *
- * This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, you can obtain one at https://mozilla.org/MPL/2.0/.
- *
- * See the COPYRIGHT file distributed with this work for additional
- * information regarding copyright ownership.
- */
-
-#ifndef ISC_TASK_P_H
-#define ISC_TASK_P_H
-
-/*! \file */
-
-/*%
- * These functions allow unit tests to manipulate the processing
- * of the task queue. They are not intended as part of the public API.
- */
-void
-isc__taskmgr_pause(isc_taskmgr_t *taskmgr);
-void
-isc__taskmgr_resume(isc_taskmgr_t *taskmgr);
-
-#endif /* ISC_TASK_P_H */
isc_result_t
isc_taskpool_create(isc_taskmgr_t *tmgr, isc_mem_t *mctx, unsigned int ntasks,
- unsigned int quantum, isc_taskpool_t **poolp) {
+ unsigned int quantum, bool priv, isc_taskpool_t **poolp) {
unsigned int i;
isc_taskpool_t *pool = NULL;
- isc_result_t result;
INSIST(ntasks > 0);
/* Create the tasks */
for (i = 0; i < ntasks; i++) {
- result = isc_task_create(tmgr, quantum, &pool->tasks[i]);
+ isc_result_t result = isc_task_create_bound(tmgr, quantum,
+ &pool->tasks[i], i);
if (result != ISC_R_SUCCESS) {
isc_taskpool_destroy(&pool);
return (result);
}
+ isc_task_setprivilege(pool->tasks[i], priv);
isc_task_setname(pool->tasks[i], "taskpool", NULL);
}
}
isc_result_t
-isc_taskpool_expand(isc_taskpool_t **sourcep, unsigned int size,
+isc_taskpool_expand(isc_taskpool_t **sourcep, unsigned int size, bool priv,
isc_taskpool_t **targetp) {
- isc_result_t result;
isc_taskpool_t *pool;
REQUIRE(sourcep != NULL && *sourcep != NULL);
/* Create new tasks */
for (i = pool->ntasks; i < size; i++) {
- result = isc_task_create(pool->tmgr, pool->quantum,
- &newpool->tasks[i]);
+ isc_result_t result =
+ isc_task_create_bound(pool->tmgr, pool->quantum,
+ &newpool->tasks[i], i);
if (result != ISC_R_SUCCESS) {
*sourcep = pool;
isc_taskpool_destroy(&newpool);
return (result);
}
+ isc_task_setprivilege(newpool->tasks[i], priv);
isc_task_setname(newpool->tasks[i], "taskpool", NULL);
}
pool->ntasks * sizeof(isc_task_t *));
isc_mem_putanddetach(&pool->mctx, pool, sizeof(*pool));
}
-
-void
-isc_taskpool_setprivilege(isc_taskpool_t *pool, bool priv) {
- unsigned int i;
-
- REQUIRE(pool != NULL);
-
- for (i = 0; i < pool->ntasks; i++) {
- if (pool->tasks[i] != NULL) {
- isc_task_setprivilege(pool->tasks[i], priv);
- }
- }
-}
if (taskmgr != NULL) {
isc_taskmgr_destroy(&taskmgr);
}
+ if (netmgr != NULL) {
+ isc_nm_destroy(&netmgr);
+ }
if (timermgr != NULL) {
isc_timermgr_destroy(&timermgr);
}
- if (netmgr != NULL) {
- isc_nm_detach(&netmgr);
- }
}
static isc_result_t
isc_hp_init(6 * workers);
netmgr = isc_nm_start(test_mctx, workers);
- CHECK(isc_taskmgr_create(test_mctx, workers, 0, netmgr, &taskmgr));
+ CHECK(isc_taskmgr_create(test_mctx, 0, netmgr, &taskmgr));
CHECK(isc_task_create(taskmgr, 0, &maintask));
isc_taskmgr_setexcltask(taskmgr, maintask);
#include <isc/timer.h>
#include <isc/util.h>
-#include "../task_p.h"
#include "isctest.h"
/* Set to true (or use -v option) for verbose output */
atomic_store(value, atomic_fetch_add(&counter, 1));
}
+#include <isc/thread.h>
+
static void
set_and_drop(isc_task_t *task, isc_event_t *event) {
atomic_int_fast32_t *value = (atomic_int_fast32_t *)event->ev_arg;
isc_event_free(&event);
LOCK(&lock);
- atomic_store(value, (int)isc_taskmgr_mode(taskmgr));
- atomic_fetch_add(&counter, 1);
+ atomic_store(value, atomic_fetch_add(&counter, 1));
UNLOCK(&lock);
}
UNUSED(state);
atomic_init(&counter, 1);
- atomic_init(&a, 0);
- atomic_init(&b, 0);
- atomic_init(&c, 0);
- atomic_init(&d, 0);
- atomic_init(&e, 0);
+ atomic_init(&a, -1);
+ atomic_init(&b, -1);
+ atomic_init(&c, -1);
+ atomic_init(&d, -1);
+ atomic_init(&e, -1);
/*
- * Pause the task manager so we can fill up the work queue
- * without things happening while we do it.
+ * Pause the net/task manager so we can fill up the work
+ * queue without things happening while we do it.
*/
- isc__taskmgr_pause(taskmgr);
+ isc_nm_pause(netmgr);
result = isc_task_create(taskmgr, 0, &task1);
assert_int_equal(result, ISC_R_SUCCESS);
&a, sizeof(isc_event_t));
assert_non_null(event);
- assert_int_equal(atomic_load(&a), 0);
+ assert_int_equal(atomic_load(&a), -1);
isc_task_send(task1, &event);
/* Second event: not privileged */
&b, sizeof(isc_event_t));
assert_non_null(event);
- assert_int_equal(atomic_load(&b), 0);
+ assert_int_equal(atomic_load(&b), -1);
isc_task_send(task2, &event);
/* Third event: privileged */
&c, sizeof(isc_event_t));
assert_non_null(event);
- assert_int_equal(atomic_load(&c), 0);
+ assert_int_equal(atomic_load(&c), -1);
isc_task_send(task1, &event);
/* Fourth event: privileged */
&d, sizeof(isc_event_t));
assert_non_null(event);
- assert_int_equal(atomic_load(&d), 0);
+ assert_int_equal(atomic_load(&d), -1);
isc_task_send(task1, &event);
/* Fifth event: not privileged */
&e, sizeof(isc_event_t));
assert_non_null(event);
- assert_int_equal(atomic_load(&e), 0);
+ assert_int_equal(atomic_load(&e), -1);
isc_task_send(task2, &event);
- assert_int_equal(isc_taskmgr_mode(taskmgr), isc_taskmgrmode_normal);
- isc_taskmgr_setprivilegedmode(taskmgr);
- assert_int_equal(isc_taskmgr_mode(taskmgr), isc_taskmgrmode_privileged);
-
- isc__taskmgr_resume(taskmgr);
+ isc_nm_resume(netmgr);
/* We're waiting for *all* variables to be set */
- while ((atomic_load(&a) == 0 || atomic_load(&b) == 0 ||
- atomic_load(&c) == 0 || atomic_load(&d) == 0 ||
- atomic_load(&e) == 0) &&
+ while ((atomic_load(&a) < 0 || atomic_load(&b) < 0 ||
+ atomic_load(&c) < 0 || atomic_load(&d) < 0 ||
+ atomic_load(&e) < 0) &&
i++ < 5000)
{
isc_test_nap(1000);
assert_true(atomic_load(&d) <= 3);
/* ...and the non-privileged tasks that set b and e, last */
- assert_true(atomic_load(&b) >= 4);
- assert_true(atomic_load(&e) >= 4);
+ assert_true(atomic_load(&b) > 3);
+ assert_true(atomic_load(&e) > 3);
assert_int_equal(atomic_load(&counter), 6);
isc_task_setprivilege(task1, false);
assert_false(isc_task_privilege(task1));
- assert_int_equal(isc_taskmgr_mode(taskmgr), isc_taskmgrmode_normal);
-
isc_task_destroy(&task1);
assert_null(task1);
isc_task_destroy(&task2);
atomic_init(&e, -1);
/*
- * Pause the task manager so we can fill up the work queue
+ * Pause the net/task manager so we can fill up the work queue
* without things happening while we do it.
*/
- isc__taskmgr_pause(taskmgr);
+ isc_nm_pause(netmgr);
result = isc_task_create(taskmgr, 0, &task1);
assert_int_equal(result, ISC_R_SUCCESS);
assert_int_equal(atomic_load(&e), -1);
isc_task_send(task2, &event);
- assert_int_equal(isc_taskmgr_mode(taskmgr), isc_taskmgrmode_normal);
- isc_taskmgr_setprivilegedmode(taskmgr);
- assert_int_equal(isc_taskmgr_mode(taskmgr), isc_taskmgrmode_privileged);
-
- isc__taskmgr_resume(taskmgr);
+ isc_nm_resume(netmgr);
/* We're waiting for all variables to be set. */
while ((atomic_load(&a) == -1 || atomic_load(&b) == -1 ||
* We need to check that all privilege mode events were fired
* in privileged mode, and non privileged in non-privileged.
*/
- assert_true(atomic_load(&a) == isc_taskmgrmode_privileged ||
- atomic_load(&c) == isc_taskmgrmode_privileged ||
- atomic_load(&d) == isc_taskmgrmode_privileged);
+ assert_true(atomic_load(&a) <= 3);
+ assert_true(atomic_load(&c) <= 3);
+ assert_true(atomic_load(&d) <= 3);
/* ...and neither of the non-privileged tasks did... */
- assert_true(atomic_load(&b) == isc_taskmgrmode_normal ||
- atomic_load(&e) == isc_taskmgrmode_normal);
+ assert_true(atomic_load(&b) > 3);
+ assert_true(atomic_load(&e) > 3);
/* ...but all five of them did run. */
assert_int_equal(atomic_load(&counter), 6);
- assert_int_equal(isc_taskmgr_mode(taskmgr), isc_taskmgrmode_normal);
-
isc_task_destroy(&task1);
assert_null(task1);
isc_task_destroy(&task2);
if (atomic_load(&done)) {
isc_mem_put(event->ev_destroy_arg, event->ev_arg, sizeof(int));
isc_event_free(&event);
+ atomic_fetch_sub(&counter, 1);
} else {
isc_task_send(task, &event);
}
UNUSED(state);
+ atomic_init(&counter, 0);
+
for (i = 0; i < 10; i++) {
isc_event_t *event = NULL;
int *v;
assert_non_null(event);
isc_task_send(tasks[i], &event);
+ atomic_fetch_add(&counter, 1);
}
for (i = 0; i < 10; i++) {
isc_task_detach(&tasks[i]);
}
+
+ while (atomic_load(&counter) > 0) {
+ isc_test_nap(1000);
+ }
}
/*
isc_mem_debugging = ISC_MEM_DEBUGRECORD;
isc_mem_create(&mctx);
- result = isc_taskmgr_create(mctx, 4, 0, NULL, &taskmgr);
+ netmgr = isc_nm_start(mctx, 4);
+ result = isc_taskmgr_create(mctx, 0, netmgr, &taskmgr);
assert_int_equal(result, ISC_R_SUCCESS);
atomic_init(&done, false);
UNLOCK(&lock);
isc_taskmgr_destroy(&taskmgr);
+ isc_nm_destroy(&netmgr);
isc_mem_destroy(&mctx);
isc_condition_destroy(&cv);
isc_mutex_destroy(&lock);
#include "isctest.h"
+#define TASK_MAGIC ISC_MAGIC('T', 'A', 'S', 'K')
+#define VALID_TASK(t) ISC_MAGIC_VALID(t, TASK_MAGIC)
+
static int
_setup(void **state) {
isc_result_t result;
UNUSED(state);
- result = isc_taskpool_create(taskmgr, test_mctx, 8, 2, &pool);
+ result = isc_taskpool_create(taskmgr, test_mctx, 8, 2, false, &pool);
assert_int_equal(result, ISC_R_SUCCESS);
assert_int_equal(isc_taskpool_size(pool), 8);
UNUSED(state);
- result = isc_taskpool_create(taskmgr, test_mctx, 10, 2, &pool1);
+ result = isc_taskpool_create(taskmgr, test_mctx, 10, 2, false, &pool1);
assert_int_equal(result, ISC_R_SUCCESS);
assert_int_equal(isc_taskpool_size(pool1), 10);
/* resizing to a smaller size should have no effect */
hold = pool1;
- result = isc_taskpool_expand(&pool1, 5, &pool2);
+ result = isc_taskpool_expand(&pool1, 5, false, &pool2);
assert_int_equal(result, ISC_R_SUCCESS);
assert_int_equal(isc_taskpool_size(pool2), 10);
assert_ptr_equal(pool2, hold);
/* resizing to the same size should have no effect */
hold = pool1;
- result = isc_taskpool_expand(&pool1, 10, &pool2);
+ result = isc_taskpool_expand(&pool1, 10, false, &pool2);
assert_int_equal(result, ISC_R_SUCCESS);
assert_int_equal(isc_taskpool_size(pool2), 10);
assert_ptr_equal(pool2, hold);
/* resizing to larger size should make a new pool */
hold = pool1;
- result = isc_taskpool_expand(&pool1, 20, &pool2);
+ result = isc_taskpool_expand(&pool1, 20, false, &pool2);
assert_int_equal(result, ISC_R_SUCCESS);
assert_int_equal(isc_taskpool_size(pool2), 20);
assert_ptr_not_equal(pool2, hold);
UNUSED(state);
- result = isc_taskpool_create(taskmgr, test_mctx, 2, 2, &pool);
+ result = isc_taskpool_create(taskmgr, test_mctx, 2, 2, false, &pool);
assert_int_equal(result, ISC_R_SUCCESS);
assert_int_equal(isc_taskpool_size(pool), 2);
/* two tasks in pool; make sure we can access them more than twice */
isc_taskpool_gettask(pool, &task1);
- assert_non_null(task1);
+ assert_true(VALID_TASK(task1));
isc_taskpool_gettask(pool, &task2);
- assert_non_null(task2);
+ assert_true(VALID_TASK(task2));
isc_taskpool_gettask(pool, &task3);
- assert_non_null(task3);
+ assert_true(VALID_TASK(task3));
isc_task_destroy(&task1);
isc_task_destroy(&task2);
UNUSED(state);
- result = isc_taskpool_create(taskmgr, test_mctx, 2, 2, &pool);
+ result = isc_taskpool_create(taskmgr, test_mctx, 2, 2, true, &pool);
assert_int_equal(result, ISC_R_SUCCESS);
assert_int_equal(isc_taskpool_size(pool), 2);
- isc_taskpool_setprivilege(pool, true);
-
isc_taskpool_gettask(pool, &task1);
isc_taskpool_gettask(pool, &task2);
isc_taskpool_gettask(pool, &task3);
- assert_non_null(task1);
- assert_non_null(task2);
- assert_non_null(task3);
+ assert_true(VALID_TASK(task1));
+ assert_true(VALID_TASK(task2));
+ assert_true(VALID_TASK(task3));
assert_true(isc_task_privilege(task1));
assert_true(isc_task_privilege(task2));
assert_true(isc_task_privilege(task3));
- isc_taskpool_setprivilege(pool, false);
-
- assert_false(isc_task_privilege(task1));
- assert_false(isc_task_privilege(task2));
- assert_false(isc_task_privilege(task3));
-
isc_task_destroy(&task1);
isc_task_destroy(&task2);
isc_task_destroy(&task3);
setup_test(isc_timertype_ticker, &expires, &interval, test_reset);
}
-static int startflag;
-static int shutdownflag;
+static atomic_bool startflag;
+static atomic_bool shutdownflag;
static isc_timer_t *tickertimer = NULL;
static isc_timer_t *oncetimer = NULL;
static isc_task_t *task1 = NULL;
* in its queue, until signaled by task2.
*/
-static void
-start_event(isc_task_t *task, isc_event_t *event) {
- UNUSED(task);
-
- if (verbose) {
- print_message("# start_event\n");
- }
-
- LOCK(&mx);
- while (!startflag) {
- (void)isc_condition_wait(&cv, &mx);
- }
- UNLOCK(&mx);
-
- isc_event_free(&event);
-}
-
static void
tick_event(isc_task_t *task, isc_event_t *event) {
isc_result_t result;
UNUSED(task);
+ if (!atomic_load(&startflag)) {
+ if (verbose) {
+ print_message("# tick_event %d\n", -1);
+ }
+ isc_event_free(&event);
+ return;
+ }
+
int tick = atomic_fetch_add(&eventcnt, 1);
if (verbose) {
print_message("# tick_event %d\n", tick);
static void
once_event(isc_task_t *task, isc_event_t *event) {
- isc_result_t result;
-
if (verbose) {
print_message("# once_event\n");
}
/*
* Allow task1 to start processing events.
*/
- LOCK(&mx);
- startflag = 1;
-
- result = isc_condition_broadcast(&cv);
- subthread_assert_result_equal(result, ISC_R_SUCCESS);
- UNLOCK(&mx);
+ atomic_store(&startflag, true);
isc_event_free(&event);
isc_task_shutdown(task);
static void
shutdown_purge(isc_task_t *task, isc_event_t *event) {
- isc_result_t result;
-
UNUSED(task);
UNUSED(event);
/*
* Signal shutdown processing complete.
*/
- LOCK(&mx);
- shutdownflag = 1;
-
- result = isc_condition_signal(&cv);
- subthread_assert_result_equal(result, ISC_R_SUCCESS);
- UNLOCK(&mx);
+ atomic_store(&shutdownflag, 1);
isc_event_free(&event);
}
static void
purge(void **state) {
isc_result_t result;
- isc_event_t *event = NULL;
isc_time_t expires;
isc_interval_t interval;
UNUSED(state);
- startflag = 0;
- shutdownflag = 0;
+ atomic_init(&startflag, 0);
+ atomic_init(&shutdownflag, 0);
atomic_init(&eventcnt, 0);
seconds = 1;
nanoseconds = 0;
- isc_mutex_init(&mx);
-
- isc_condition_init(&cv);
-
result = isc_task_create(taskmgr, 0, &task1);
assert_int_equal(result, ISC_R_SUCCESS);
result = isc_task_create(taskmgr, 0, &task2);
assert_int_equal(result, ISC_R_SUCCESS);
- LOCK(&mx);
-
- event = isc_event_allocate(test_mctx, (void *)1, (isc_eventtype_t)1,
- start_event, NULL, sizeof(*event));
- assert_non_null(event);
- isc_task_send(task1, &event);
-
isc_time_settoepoch(&expires);
isc_interval_set(&interval, seconds, 0);
/*
* Wait for shutdown processing to complete.
*/
- while (!shutdownflag) {
- result = isc_condition_wait(&cv, &mx);
- assert_int_equal(result, ISC_R_SUCCESS);
+ while (!atomic_load(&shutdownflag)) {
+ isc_test_nap(1000);
}
- UNLOCK(&mx);
-
assert_int_equal(atomic_load(&errcnt), ISC_R_SUCCESS);
assert_int_equal(atomic_load(&eventcnt), 1);
isc_timer_detach(&oncetimer);
isc_task_destroy(&task1);
isc_task_destroy(&task2);
- isc_mutex_destroy(&mx);
}
int
};
void
-isc_timermgr_poke(isc_timermgr_t *manager0);
+isc_timermgr_poke(isc_timermgr_t *manager);
static inline isc_result_t
schedule(isc_timer_t *timer, isc_time_t *now, bool signal_ok) {
isc_result_t result;
isc_timermgr_t *manager;
isc_time_t due;
- int cmp;
/*!
* Note: the caller must ensure locking.
/*
* Already scheduled.
*/
- cmp = isc_time_compare(&due, &timer->due);
+ int cmp = isc_time_compare(&due, &timer->due);
timer->due = due;
switch (cmp) {
case -1:
static inline void
deschedule(isc_timer_t *timer) {
- bool need_wakeup = false;
isc_timermgr_t *manager;
/*
manager = timer->manager;
if (timer->index > 0) {
+ bool need_wakeup = false;
if (timer->index == 1) {
need_wakeup = true;
}
(void)isc_task_purgerange(timer->task, timer, ISC_TIMEREVENT_FIRSTEVENT,
ISC_TIMEREVENT_LASTEVENT, NULL);
deschedule(timer);
+
UNLINK(manager->timers, timer, link);
UNLOCK(&manager->lock);
void
isc_timer_attach(isc_timer_t *timer, isc_timer_t **timerp) {
- /*
- * Attach *timerp to timer.
- */
-
REQUIRE(VALID_TIMER(timer));
REQUIRE(timerp != NULL && *timerp == NULL);
isc_refcount_increment(&timer->references);
isc_socketmgr_setstats
isc_task_getname
isc_task_gettag
+isc_task_run
isc_task_unsendrange
-isc_taskmgr_mode
-isc__taskmgr_pause
-isc__taskmgr_resume
+isc_taskmgr_attach
+isc_taskmgr_detach
isc_aes128_crypt
isc_aes192_crypt
isc_aes256_crypt
isc_taskmgr_create
isc_taskmgr_destroy
isc_taskmgr_excltask
-isc_taskmgr_mode
@IF NOTYET
isc_taskmgr_renderjson
@END NOTYET
isc_taskmgr_renderxml
@END LIBXML2
isc_taskmgr_setexcltask
-isc_taskmgr_setprivilegedmode
isc_taskpool_create
isc_taskpool_destroy
isc_taskpool_expand
isc_taskpool_gettask
-isc_taskpool_setprivilege
isc_taskpool_size
isc_thread_create
isc_thread_join
isc_mem_t *mctx = NULL;
isc_log_t *lctx = NULL;
+isc_nm_t *netmgr = NULL;
isc_taskmgr_t *taskmgr = NULL;
isc_task_t *maintask = NULL;
isc_timermgr_t *timermgr = NULL;
if (taskmgr != NULL) {
isc_taskmgr_destroy(&taskmgr);
}
+ if (netmgr != NULL) {
+ isc_nm_destroy(&netmgr);
+ }
if (timermgr != NULL) {
isc_timermgr_destroy(&timermgr);
}
isc_event_t *event = NULL;
ncpus = isc_os_ncpus();
- CHECK(isc_taskmgr_create(mctx, ncpus, 0, NULL, &taskmgr));
+ netmgr = isc_nm_start(mctx, ncpus);
+ CHECK(isc_taskmgr_create(mctx, 0, netmgr, &taskmgr));
CHECK(isc_task_create(taskmgr, 0, &maintask));
isc_taskmgr_setexcltask(taskmgr, maintask);
CHECK(isc_task_onshutdown(maintask, shutdown_managers, NULL));
./lib/isc/string.c C 1999,2000,2001,2003,2004,2005,2006,2007,2011,2012,2014,2015,2016,2018,2019,2020,2021
./lib/isc/symtab.c C 1996,1997,1998,1999,2000,2001,2004,2005,2007,2011,2012,2013,2016,2018,2019,2020,2021
./lib/isc/task.c C 1998,1999,2000,2001,2002,2003,2004,2005,2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2016,2017,2018,2019,2020,2021
-./lib/isc/task_p.h C 2018,2019,2020,2021
./lib/isc/taskpool.c C 1999,2000,2001,2004,2005,2007,2011,2012,2013,2016,2018,2019,2020,2021
./lib/isc/tests/aes_test.c C 2014,2016,2018,2019,2020,2021
./lib/isc/tests/buffer_test.c C 2014,2015,2016,2017,2018,2019,2020,2021