]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Add asynchronous work API to the network manager
authorOndřej Surý <ondrej@isc.org>
Thu, 27 May 2021 07:45:07 +0000 (09:45 +0200)
committerOndřej Surý <ondrej@sury.org>
Mon, 31 May 2021 12:52:05 +0000 (14:52 +0200)
The libuv has a support for running long running tasks in the dedicated
threadpools, so it doesn't affect networking IO.

This commit adds isc_nm_work_enqueue() wrapper that would wraps around
the libuv API and runs it on top of associated worker loop.

The only limitation is that the function must be called from inside
network manager thread, so the call to the function should be wrapped
inside a (bound) task.

lib/isc/include/isc/netmgr.h
lib/isc/netmgr/netmgr-int.h
lib/isc/netmgr/netmgr.c
lib/isc/win32/libisc.def.in

index ca40fc6bcdbd14e6ba4358c6369fdbe006ef1b56..b8f19bf15a9fe19ec9ec6e0c319abe0e9a380a11 100644 (file)
@@ -68,6 +68,12 @@ typedef void (*isc_nm_opaquecb_t)(void *arg);
  * callbacks.
  */
 
+typedef void (*isc_nm_workcb_t)(void *arg);
+typedef void (*isc_nm_after_workcb_t)(void *arg, isc_result_t result);
+/*%<
+ * Callback functions for libuv threadpool work (see uv_work_t)
+ */
+
 void
 isc_nm_attach(isc_nm_t *mgr, isc_nm_t **dst);
 void
@@ -529,6 +535,19 @@ isc_nm_task_enqueue(isc_nm_t *mgr, isc_task_t *task, int threadid);
  *     maximum number of 'workers' as specifed in isc_nm_start()
  */
 
+void
+isc_nm_work_offload(isc_nm_t *mgr, isc_nm_workcb_t work_cb,
+                   isc_nm_after_workcb_t after_work_cb, void *data);
+/*%<
+ * Schedules a job to be handled by the libuv thread pool (see uv_work_t).
+ * The function specified in `work_cb` will be run by a thread in the
+ * thread pool; when complete, the `after_work_cb` function will run.
+ *
+ * Requires:
+ * \li 'mgr' is a valid netmgr object.
+ * \li We are currently running in a network manager thread.
+ */
+
 void
 isc__nm_force_tid(int tid);
 /*%<
index dece53bfec3e6c790b20882cd28f460f56637010..9e5445664bb57168f392f8007735f6385dfd96bb 100644 (file)
@@ -646,6 +646,17 @@ typedef union {
        isc__netievent_tlsconnect_t nitc;
 } isc__netievent_storage_t;
 
+/*
+ * Work item for a uv_work threadpool.
+ */
+typedef struct isc__nm_work {
+       isc_nm_t *netmgr;
+       uv_work_t req;
+       isc_nm_workcb_t cb;
+       isc_nm_after_workcb_t after_cb;
+       void *data;
+} isc__nm_work_t;
+
 /*
  * Network manager
  */
index 9692be6a4bb5e844b4f86d71223e5a2695a3d512..0b708b57d5658847cc2da0651c733534a91f830c 100644 (file)
@@ -40,6 +40,7 @@
 #include "netmgr-int.h"
 #include "netmgr_p.h"
 #include "openssl_shim.h"
+#include "trampoline_p.h"
 #include "uv-compat.h"
 
 /*%
@@ -199,6 +200,14 @@ static void
 isc__nm_async_detach(isc__networker_t *worker, isc__netievent_t *ev0);
 static void
 isc__nm_async_close(isc__networker_t *worker, isc__netievent_t *ev0);
+
+static void
+isc__nm_threadpool_initialize(uint32_t workers);
+static void
+isc__nm_work_cb(uv_work_t *req);
+static void
+isc__nm_after_work_cb(uv_work_t *req, int status);
+
 /*%<
  * Issue a 'handle closed' callback on the socket.
  */
@@ -256,6 +265,17 @@ isc__nm_winsock_destroy(void) {
 }
 #endif /* WIN32 */
 
+static void
+isc__nm_threadpool_initialize(uint32_t workers) {
+       char buf[11];
+       int r = uv_os_getenv("UV_THREADPOOL_SIZE", buf,
+                            &(size_t){ sizeof(buf) });
+       if (r == UV_ENOENT) {
+               snprintf(buf, sizeof(buf), "%" PRIu32, workers);
+               uv_os_setenv("UV_THREADPOOL_SIZE", buf);
+       }
+}
+
 void
 isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) {
        isc_nm_t *mgr = NULL;
@@ -267,6 +287,8 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) {
        isc__nm_winsock_initialize();
 #endif /* WIN32 */
 
+       isc__nm_threadpool_initialize(workers);
+
        mgr = isc_mem_get(mctx, sizeof(*mgr));
        *mgr = (isc_nm_t){ .nworkers = workers };
 
@@ -280,8 +302,6 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) {
        atomic_init(&mgr->workers_paused, 0);
        atomic_init(&mgr->paused, false);
        atomic_init(&mgr->closing, false);
-       atomic_init(&mgr->idle, false);
-       atomic_init(&mgr->keepalive, false);
        atomic_init(&mgr->recv_tcp_buffer_size, 0);
        atomic_init(&mgr->send_tcp_buffer_size, 0);
        atomic_init(&mgr->recv_udp_buffer_size, 0);
@@ -818,7 +838,8 @@ async_cb(uv_async_t *handle) {
        isc__networker_t *worker = (isc__networker_t *)handle->loop->data;
 
        if (process_all_queues(worker)) {
-               /* If we didn't process all the events, we need to enqueue
+               /*
+                * If we didn't process all the events, we need to enqueue
                 * async_cb to be run in the next iteration of the uv_loop
                 */
                uv_async_send(handle);
@@ -3242,6 +3263,73 @@ isc__nm_set_network_buffers(isc_nm_t *nm, uv_handle_t *handle) {
        }
 }
 
+static isc_threadresult_t
+isc__nm_work_run(isc_threadarg_t arg) {
+       isc__nm_work_t *work = (isc__nm_work_t *)arg;
+
+       work->cb(work->data);
+
+       return ((isc_threadresult_t)0);
+}
+
+static void
+isc__nm_work_cb(uv_work_t *req) {
+       isc__nm_work_t *work = uv_req_get_data((uv_req_t *)req);
+
+       if (isc_tid_v == SIZE_MAX) {
+               isc__trampoline_t *trampoline_arg =
+                       isc__trampoline_get(isc__nm_work_run, work);
+               (void)isc__trampoline_run(trampoline_arg);
+       } else {
+               (void)isc__nm_work_run((isc_threadarg_t)work);
+       }
+}
+
+static void
+isc__nm_after_work_cb(uv_work_t *req, int status) {
+       isc_result_t result = ISC_R_SUCCESS;
+       isc__nm_work_t *work = uv_req_get_data((uv_req_t *)req);
+       isc_nm_t *netmgr = work->netmgr;
+
+       if (status != 0) {
+               result = isc__nm_uverr2result(status);
+       }
+
+       work->after_cb(work->data, result);
+
+       isc_mem_put(netmgr->mctx, work, sizeof(*work));
+
+       isc_nm_detach(&netmgr);
+}
+
+void
+isc_nm_work_offload(isc_nm_t *netmgr, isc_nm_workcb_t work_cb,
+                   isc_nm_after_workcb_t after_work_cb, void *data) {
+       isc__networker_t *worker = NULL;
+       isc__nm_work_t *work = NULL;
+       int r;
+
+       REQUIRE(isc__nm_in_netthread());
+       REQUIRE(VALID_NM(netmgr));
+
+       worker = &netmgr->workers[isc_nm_tid()];
+
+       work = isc_mem_get(netmgr->mctx, sizeof(*work));
+       *work = (isc__nm_work_t){
+               .cb = work_cb,
+               .after_cb = after_work_cb,
+               .data = data,
+       };
+
+       isc_nm_attach(netmgr, &work->netmgr);
+
+       uv_req_set_data((uv_req_t *)&work->req, work);
+
+       r = uv_queue_work(&worker->loop, &work->req, isc__nm_work_cb,
+                         isc__nm_after_work_cb);
+       RUNTIME_CHECK(r == 0);
+}
+
 #ifdef NETMGR_TRACE
 /*
  * Dump all active sockets in netmgr. We output to stderr
@@ -3302,7 +3390,8 @@ nmsocket_dump(isc_nmsocket_t *sock) {
                nmsocket_type_totext(sock->type),
                isc_refcount_current(&sock->references));
        fprintf(stderr,
-               "Parent %p, listener %p, server %p, statichandle = %p\n",
+               "Parent %p, listener %p, server %p, statichandle = "
+               "%p\n",
                sock->parent, sock->listener, sock->server, sock->statichandle);
        fprintf(stderr, "Flags:%s%s%s%s%s\n",
                atomic_load(&sock->active) ? " active" : "",
index fdf402a572acb796d33739e1903a31d186a54950..664ff6c065d495dbc01060e04f0a414a064c1da4 100644 (file)
@@ -474,11 +474,13 @@ isc_nm_tcpdnsconnect
 isc_nm_gettimeouts
 isc_nm_setnetbuffers
 isc_nm_settimeouts
+isc_nm_task_enqueue
 isc_nm_tcpdns_keepalive
 isc_nm_tcpdns_sequential
 isc_nm_tid
 isc_nm_tlsdnsconnect
 isc_nm_udpconnect
+isc_nm_work_offload
 isc_nmsocket_close
 isc__nm_acquire_interlocked
 isc__nm_drop_interlocked