#include "netmgr-int.h"
#include "netmgr_p.h"
#include "openssl_shim.h"
+#include "trampoline_p.h"
#include "uv-compat.h"
/*%
isc__nm_async_detach(isc__networker_t *worker, isc__netievent_t *ev0);
static void
isc__nm_async_close(isc__networker_t *worker, isc__netievent_t *ev0);
+
+static void
+isc__nm_threadpool_initialize(uint32_t workers);
+static void
+isc__nm_work_cb(uv_work_t *req);
+static void
+isc__nm_after_work_cb(uv_work_t *req, int status);
+
/*%<
* Issue a 'handle closed' callback on the socket.
*/
}
#endif /* WIN32 */
+static void
+isc__nm_threadpool_initialize(uint32_t workers) {
+ char buf[11];
+ int r = uv_os_getenv("UV_THREADPOOL_SIZE", buf,
+ &(size_t){ sizeof(buf) });
+ if (r == UV_ENOENT) {
+ snprintf(buf, sizeof(buf), "%" PRIu32, workers);
+ uv_os_setenv("UV_THREADPOOL_SIZE", buf);
+ }
+}
+
void
isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) {
isc_nm_t *mgr = NULL;
isc__nm_winsock_initialize();
#endif /* WIN32 */
+ isc__nm_threadpool_initialize(workers);
+
mgr = isc_mem_get(mctx, sizeof(*mgr));
*mgr = (isc_nm_t){ .nworkers = workers };
atomic_init(&mgr->workers_paused, 0);
atomic_init(&mgr->paused, false);
atomic_init(&mgr->closing, false);
- atomic_init(&mgr->idle, false);
- atomic_init(&mgr->keepalive, false);
atomic_init(&mgr->recv_tcp_buffer_size, 0);
atomic_init(&mgr->send_tcp_buffer_size, 0);
atomic_init(&mgr->recv_udp_buffer_size, 0);
isc__networker_t *worker = (isc__networker_t *)handle->loop->data;
if (process_all_queues(worker)) {
- /* If we didn't process all the events, we need to enqueue
+ /*
+ * If we didn't process all the events, we need to enqueue
* async_cb to be run in the next iteration of the uv_loop
*/
uv_async_send(handle);
}
}
+static isc_threadresult_t
+isc__nm_work_run(isc_threadarg_t arg) {
+ isc__nm_work_t *work = (isc__nm_work_t *)arg;
+
+ work->cb(work->data);
+
+ return ((isc_threadresult_t)0);
+}
+
+static void
+isc__nm_work_cb(uv_work_t *req) {
+ isc__nm_work_t *work = uv_req_get_data((uv_req_t *)req);
+
+ if (isc_tid_v == SIZE_MAX) {
+ isc__trampoline_t *trampoline_arg =
+ isc__trampoline_get(isc__nm_work_run, work);
+ (void)isc__trampoline_run(trampoline_arg);
+ } else {
+ (void)isc__nm_work_run((isc_threadarg_t)work);
+ }
+}
+
+static void
+isc__nm_after_work_cb(uv_work_t *req, int status) {
+ isc_result_t result = ISC_R_SUCCESS;
+ isc__nm_work_t *work = uv_req_get_data((uv_req_t *)req);
+ isc_nm_t *netmgr = work->netmgr;
+
+ if (status != 0) {
+ result = isc__nm_uverr2result(status);
+ }
+
+ work->after_cb(work->data, result);
+
+ isc_mem_put(netmgr->mctx, work, sizeof(*work));
+
+ isc_nm_detach(&netmgr);
+}
+
+void
+isc_nm_work_offload(isc_nm_t *netmgr, isc_nm_workcb_t work_cb,
+ isc_nm_after_workcb_t after_work_cb, void *data) {
+ isc__networker_t *worker = NULL;
+ isc__nm_work_t *work = NULL;
+ int r;
+
+ REQUIRE(isc__nm_in_netthread());
+ REQUIRE(VALID_NM(netmgr));
+
+ worker = &netmgr->workers[isc_nm_tid()];
+
+ work = isc_mem_get(netmgr->mctx, sizeof(*work));
+ *work = (isc__nm_work_t){
+ .cb = work_cb,
+ .after_cb = after_work_cb,
+ .data = data,
+ };
+
+ isc_nm_attach(netmgr, &work->netmgr);
+
+ uv_req_set_data((uv_req_t *)&work->req, work);
+
+ r = uv_queue_work(&worker->loop, &work->req, isc__nm_work_cb,
+ isc__nm_after_work_cb);
+ RUNTIME_CHECK(r == 0);
+}
+
#ifdef NETMGR_TRACE
/*
* Dump all active sockets in netmgr. We output to stderr
nmsocket_type_totext(sock->type),
isc_refcount_current(&sock->references));
fprintf(stderr,
- "Parent %p, listener %p, server %p, statichandle = %p\n",
+ "Parent %p, listener %p, server %p, statichandle = "
+ "%p\n",
sock->parent, sock->listener, sock->server, sock->statichandle);
fprintf(stderr, "Flags:%s%s%s%s%s\n",
atomic_load(&sock->active) ? " active" : "",