]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Split fast and slow task queues
authorOndřej Surý <ondrej@isc.org>
Thu, 11 Jan 2024 11:03:24 +0000 (12:03 +0100)
committerMichał Kępień <michal@isc.org>
Thu, 1 Feb 2024 20:47:29 +0000 (21:47 +0100)
Change the taskmgr (and thus netmgr) in a way that it supports fast and
slow task queues.  The fast queue is used for incoming DNS traffic and
it will pass the processing to the slow queue for sending outgoing DNS
messages and processing resolver messages.

In the future, more tasks might get moved to the slow queues, so the
cached and authoritative DNS traffic can be handled without being slowed
down by operations that take longer time to process.

lib/dns/resolver.c
lib/isc/include/isc/netmgr.h
lib/isc/netmgr/http.c
lib/isc/netmgr/netmgr-int.h
lib/isc/netmgr/netmgr.c
lib/isc/netmgr/tcp.c
lib/isc/netmgr/tcpdns.c
lib/isc/netmgr/tlsdns.c
lib/isc/netmgr/tlsstream.c
lib/isc/netmgr/udp.c

index 4b3d1c0b40a284ad99929c4125550b9567e2db89..60cac293cb1667601668e18c2222bc6da30bdb83 100644 (file)
@@ -10408,8 +10408,8 @@ dns_resolver_create(dns_view_t *view, isc_taskmgr_t *taskmgr,
                 * Since we have a pool of tasks we bind them to task
                 * queues to spread the load evenly
                 */
-               result = isc_task_create_bound(taskmgr, 0,
-                                              &res->buckets[i].task, i);
+               result = isc_task_create_bound(
+                       taskmgr, 0, &res->buckets[i].task, ISC_NM_TASK_SLOW(i));
                if (result != ISC_R_SUCCESS) {
                        ntasks = i;
                        isc_mutex_destroy(&res->buckets[i].lock);
index eff33f6acb7ca71237d7bb2578f60974554ba556..d42cfe9a209e25470020a2b85792ef8714e16432 100644 (file)
@@ -750,6 +750,9 @@ isc_nm_verify_tls_peer_result_string(const isc_nmhandle_t *handle);
  *  \li 'handle' is a valid netmgr handle object.
  */
 
+#define ISC_NM_TASK_SLOW_OFFSET -2
+#define ISC_NM_TASK_SLOW(i)    (ISC_NM_TASK_SLOW_OFFSET - 1 - i)
+
 void
 isc_nm_task_enqueue(isc_nm_t *mgr, isc_task_t *task, int threadid);
 /*%<
index d7a33d5abe2041de47d9d9e7a9b3f55d37023f7a..2220edf3642fa0e51e523cd5b3a39ea39c34fefc 100644 (file)
@@ -2969,7 +2969,7 @@ isc__nm_http_set_max_streams(isc_nmsocket_t *listener,
 void
 isc_nm_http_set_endpoints(isc_nmsocket_t *listener,
                          isc_nm_http_endpoints_t *eps) {
-       size_t nworkers;
+       size_t nlisteners;
 
        REQUIRE(VALID_NMSOCK(listener));
        REQUIRE(listener->type == isc_nm_httplistener);
@@ -2977,8 +2977,8 @@ isc_nm_http_set_endpoints(isc_nmsocket_t *listener,
 
        atomic_store(&eps->in_use, true);
 
-       nworkers = (size_t)listener->mgr->nworkers;
-       for (size_t i = 0; i < nworkers; i++) {
+       nlisteners = (size_t)listener->mgr->nlisteners;
+       for (size_t i = 0; i < nlisteners; i++) {
                isc__netievent__http_eps_t *ievent =
                        isc__nm_get_netievent_httpendpoints(listener->mgr,
                                                            listener, eps);
@@ -3003,20 +3003,20 @@ isc__nm_async_httpendpoints(isc__networker_t *worker, isc__netievent_t *ev0) {
 static void
 http_init_listener_endpoints(isc_nmsocket_t *listener,
                             isc_nm_http_endpoints_t *epset) {
-       size_t nworkers;
+       size_t nlisteners;
 
        REQUIRE(VALID_NMSOCK(listener));
        REQUIRE(VALID_NM(listener->mgr));
        REQUIRE(VALID_HTTP_ENDPOINTS(epset));
 
-       nworkers = (size_t)listener->mgr->nworkers;
-       INSIST(nworkers > 0);
+       nlisteners = (size_t)listener->mgr->nlisteners;
+       INSIST(nlisteners > 0);
 
        listener->h2.listener_endpoints =
                isc_mem_get(listener->mgr->mctx,
-                           sizeof(isc_nm_http_endpoints_t *) * nworkers);
-       listener->h2.n_listener_endpoints = nworkers;
-       for (size_t i = 0; i < nworkers; i++) {
+                           sizeof(isc_nm_http_endpoints_t *) * nlisteners);
+       listener->h2.n_listener_endpoints = nlisteners;
+       for (size_t i = 0; i < nlisteners; i++) {
                listener->h2.listener_endpoints[i] = NULL;
                isc_nm_http_endpoints_attach(
                        epset, &listener->h2.listener_endpoints[i]);
index 364a933128dd05aa6d483e15f0ee7d8f0d9d66fe..6aca9ab92ca1f0f65e5ddd8202635650c19db317 100644 (file)
@@ -776,6 +776,7 @@ struct isc_nm {
        isc_refcount_t references;
        isc_mem_t *mctx;
        int nworkers;
+       int nlisteners;
        isc_mutex_t lock;
        isc_condition_t wkstatecond;
        isc_condition_t wkpausecond;
index b19d468820a3b3d2d1106a1cee17c6be380a7d9d..2310b4b90434a607d11d8411ebb4089412f9e235 100644 (file)
@@ -189,12 +189,12 @@ isc__nm_force_tid(int tid) {
 }
 
 static void
-isc__nm_threadpool_initialize(uint32_t workers) {
+isc__nm_threadpool_initialize(uint32_t nworkers) {
        char buf[11];
        int r = uv_os_getenv("UV_THREADPOOL_SIZE", buf,
                             &(size_t){ sizeof(buf) });
        if (r == UV_ENOENT) {
-               snprintf(buf, sizeof(buf), "%" PRIu32, workers);
+               snprintf(buf, sizeof(buf), "%" PRIu32, nworkers);
                uv_os_setenv("UV_THREADPOOL_SIZE", buf);
        }
 }
@@ -212,11 +212,11 @@ isc__nm_threadpool_initialize(uint32_t workers) {
 #endif
 
 void
-isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) {
+isc__netmgr_create(isc_mem_t *mctx, uint32_t nworkers, isc_nm_t **netmgrp) {
        isc_nm_t *mgr = NULL;
        char name[32];
 
-       REQUIRE(workers > 0);
+       REQUIRE(nworkers > 0);
 
 #ifdef MAXIMAL_UV_VERSION
        if (uv_version() > MAXIMAL_UV_VERSION) {
@@ -234,10 +234,13 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) {
                            uv_version_string(), UV_VERSION_STRING);
        }
 
-       isc__nm_threadpool_initialize(workers);
+       isc__nm_threadpool_initialize(nworkers);
 
        mgr = isc_mem_get(mctx, sizeof(*mgr));
-       *mgr = (isc_nm_t){ .nworkers = workers };
+       *mgr = (isc_nm_t){
+               .nworkers = nworkers * 2,
+               .nlisteners = nworkers,
+       };
 
        isc_mem_attach(mctx, &mgr->mctx);
        isc_mutex_init(&mgr->lock);
@@ -272,11 +275,12 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) {
        atomic_init(&mgr->keepalive, 30000);
        atomic_init(&mgr->advertised, 30000);
 
-       isc_barrier_init(&mgr->pausing, workers);
-       isc_barrier_init(&mgr->resuming, workers);
+       isc_barrier_init(&mgr->pausing, mgr->nworkers);
+       isc_barrier_init(&mgr->resuming, mgr->nworkers);
 
-       mgr->workers = isc_mem_get(mctx, workers * sizeof(isc__networker_t));
-       for (size_t i = 0; i < workers; i++) {
+       mgr->workers = isc_mem_get(mctx,
+                                  mgr->nworkers * sizeof(isc__networker_t));
+       for (int i = 0; i < mgr->nworkers; i++) {
                isc__networker_t *worker = &mgr->workers[i];
                int r;
 
@@ -310,7 +314,7 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) {
                mgr->workers_running++;
                isc_thread_create(nm_thread, &mgr->workers[i], &worker->thread);
 
-               snprintf(name, sizeof(name), "isc-net-%04zu", i);
+               snprintf(name, sizeof(name), "isc-net-%04d", i);
                isc_thread_setname(worker->thread, name);
        }
 
@@ -817,9 +821,15 @@ isc_nm_task_enqueue(isc_nm_t *nm, isc_task_t *task, int threadid) {
        isc__networker_t *worker = NULL;
 
        if (threadid == -1) {
-               tid = (int)isc_random_uniform(nm->nworkers);
+               tid = (int)isc_random_uniform(nm->nlisteners);
+       } else if (threadid == ISC_NM_TASK_SLOW_OFFSET) {
+               tid = nm->nlisteners +
+                     (int)isc_random_uniform(nm->nworkers - nm->nlisteners);
+       } else if (threadid < ISC_NM_TASK_SLOW_OFFSET) {
+               tid = nm->nlisteners + (ISC_NM_TASK_SLOW(threadid) %
+                                       (nm->nworkers - nm->nlisteners));
        } else {
-               tid = threadid % nm->nworkers;
+               tid = threadid % nm->nlisteners;
        }
 
        worker = &nm->workers[tid];
@@ -3778,7 +3788,7 @@ isc__nm_async_settlsctx(isc__networker_t *worker, isc__netievent_t *ev0) {
 static void
 set_tlsctx_workers(isc_nmsocket_t *listener, isc_tlsctx_t *tlsctx) {
        /* Update the TLS context reference for every worker thread. */
-       for (size_t i = 0; i < (size_t)listener->mgr->nworkers; i++) {
+       for (size_t i = 0; i < (size_t)listener->mgr->nlisteners; i++) {
                isc__netievent__tlsctx_t *ievent =
                        isc__nm_get_netievent_settlsctx(listener->mgr, listener,
                                                        tlsctx);
index 2a644fed3ff30a00233c21f7ba268976decf8802..16b53cc57975800537a0ba63112f9c1063389494 100644 (file)
@@ -341,7 +341,7 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer,
                        isc__nm_connectcb(sock, req, result, false);
                } else {
                        isc__nmsocket_clearcb(sock);
-                       sock->tid = isc_random_uniform(mgr->nworkers);
+                       sock->tid = isc_random_uniform(mgr->nlisteners);
                        isc__nm_connectcb(sock, req, result, true);
                }
                atomic_store(&sock->closed, true);
@@ -362,7 +362,7 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer,
                isc__nm_put_netievent_tcpconnect(mgr, ievent);
        } else {
                atomic_init(&sock->active, false);
-               sock->tid = isc_random_uniform(mgr->nworkers);
+               sock->tid = isc_random_uniform(mgr->nlisteners);
                isc__nm_enqueue_ievent(&mgr->workers[sock->tid],
                                       (isc__netievent_t *)ievent);
        }
@@ -457,7 +457,7 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_sockaddr_t *iface,
        isc__nmsocket_init(sock, mgr, isc_nm_tcplistener, iface);
 
        atomic_init(&sock->rchildren, 0);
-       sock->nchildren = mgr->nworkers;
+       sock->nchildren = mgr->nlisteners;
        children_size = sock->nchildren * sizeof(sock->children[0]);
        sock->children = isc_mem_get(mgr->mctx, children_size);
        memset(sock->children, 0, children_size);
index cabc90533d9b75764c41ad3e70f764611ad22da8..b2a0b1016d2cf781fbaa94b2a7e97af20959397c 100644 (file)
@@ -324,7 +324,7 @@ isc_nm_tcpdnsconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer,
                isc__nm_put_netievent_tcpdnsconnect(mgr, ievent);
        } else {
                atomic_init(&sock->active, false);
-               sock->tid = isc_random_uniform(mgr->nworkers);
+               sock->tid = isc_random_uniform(mgr->nlisteners);
                isc__nm_enqueue_ievent(&mgr->workers[sock->tid],
                                       (isc__netievent_t *)ievent);
        }
@@ -422,7 +422,7 @@ isc_nm_listentcpdns(isc_nm_t *mgr, isc_sockaddr_t *iface,
        isc__nmsocket_init(sock, mgr, isc_nm_tcpdnslistener, iface);
 
        atomic_init(&sock->rchildren, 0);
-       sock->nchildren = mgr->nworkers;
+       sock->nchildren = mgr->nlisteners;
        children_size = sock->nchildren * sizeof(sock->children[0]);
        sock->children = isc_mem_get(mgr->mctx, children_size);
        memset(sock->children, 0, children_size);
index cfc62eb5e918717509aa0021bed99715f5516443..feeb1a8d7df83264f14d10c68d482ec9a1a18dec 100644 (file)
@@ -419,7 +419,7 @@ isc_nm_tlsdnsconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer,
                isc__nm_put_netievent_tlsdnsconnect(mgr, ievent);
        } else {
                atomic_init(&sock->active, false);
-               sock->tid = isc_random_uniform(mgr->nworkers);
+               sock->tid = isc_random_uniform(mgr->nlisteners);
                isc__nm_enqueue_ievent(&mgr->workers[sock->tid],
                                       (isc__netievent_t *)ievent);
        }
@@ -532,7 +532,7 @@ isc_nm_listentlsdns(isc_nm_t *mgr, isc_sockaddr_t *iface,
        isc__nmsocket_init(sock, mgr, isc_nm_tlsdnslistener, iface);
 
        atomic_init(&sock->rchildren, 0);
-       sock->nchildren = mgr->nworkers;
+       sock->nchildren = mgr->nlisteners;
        children_size = sock->nchildren * sizeof(sock->children[0]);
        sock->children = isc_mem_get(mgr->mctx, children_size);
        memset(sock->children, 0, children_size);
index 7b490719bba47559783040ab9af4ea41d5d8b369..a3fc6d203c30f7edf545bc1b030b8d73475b8e2b 100644 (file)
@@ -1264,18 +1264,18 @@ isc__nm_tls_verify_tls_peer_result_string(const isc_nmhandle_t *handle) {
 
 static void
 tls_init_listener_tlsctx(isc_nmsocket_t *listener, isc_tlsctx_t *ctx) {
-       size_t nworkers;
+       size_t nlisteners;
 
        REQUIRE(VALID_NM(listener->mgr));
        REQUIRE(ctx != NULL);
 
-       nworkers = (size_t)listener->mgr->nworkers;
-       INSIST(nworkers > 0);
+       nlisteners = (size_t)listener->mgr->nlisteners;
+       INSIST(nlisteners > 0);
 
        listener->tlsstream.listener_tls_ctx = isc_mem_get(
-               listener->mgr->mctx, sizeof(isc_tlsctx_t *) * nworkers);
-       listener->tlsstream.n_listener_tls_ctx = nworkers;
-       for (size_t i = 0; i < nworkers; i++) {
+               listener->mgr->mctx, sizeof(isc_tlsctx_t *) * nlisteners);
+       listener->tlsstream.n_listener_tls_ctx = nlisteners;
+       for (size_t i = 0; i < nlisteners; i++) {
                listener->tlsstream.listener_tls_ctx[i] = NULL;
                isc_tlsctx_attach(ctx,
                                  &listener->tlsstream.listener_tls_ctx[i]);
index 476c7992f6d22ecf4430734ed4b20187b8cf1df2..661de96ac69567d5883a9d00591e2470617da955 100644 (file)
@@ -157,14 +157,14 @@ isc_nm_listenudp(isc_nm_t *mgr, isc_sockaddr_t *iface, isc_nm_recv_cb_t cb,
        REQUIRE(VALID_NM(mgr));
 
        /*
-        * We are creating mgr->nworkers duplicated sockets, one
+        * We are creating mgr->nlisteners duplicated sockets, one
         * socket for each worker thread.
         */
        sock = isc_mem_get(mgr->mctx, sizeof(isc_nmsocket_t));
        isc__nmsocket_init(sock, mgr, isc_nm_udplistener, iface);
 
        atomic_init(&sock->rchildren, 0);
-       sock->nchildren = mgr->nworkers;
+       sock->nchildren = mgr->nlisteners;
        children_size = sock->nchildren * sizeof(sock->children[0]);
        sock->children = isc_mem_get(mgr->mctx, children_size);
        memset(sock->children, 0, children_size);
@@ -1037,7 +1037,7 @@ isc_nm_udpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer,
                isc__nm_put_netievent_udpconnect(mgr, event);
        } else {
                atomic_init(&sock->active, false);
-               sock->tid = isc_random_uniform(mgr->nworkers);
+               sock->tid = isc_random_uniform(mgr->nlisteners);
                isc__nm_enqueue_ievent(&mgr->workers[sock->tid],
                                       (isc__netievent_t *)event);
        }