]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
Split recursorThread() to reduce complexity
authorOtto Moerbeek <otto.moerbeek@open-xchange.com>
Mon, 8 May 2023 08:58:29 +0000 (10:58 +0200)
committerOtto Moerbeek <otto.moerbeek@open-xchange.com>
Mon, 8 May 2023 08:58:29 +0000 (10:58 +0200)
pdns/recursordist/rec-main.cc

index c43e633a2dc6e0e771c8cc9d2cb88e5a2d0542d2..d7243ce21702423fce18f01dffd9f4e88815cbcf 100644 (file)
@@ -45,6 +45,8 @@
 
 #ifdef HAVE_LIBSODIUM
 #include <sodium.h>
+
+#include <cstddef>
 #endif
 
 #ifdef HAVE_SYSTEMD
@@ -2216,194 +2218,201 @@ private:
   const string name;
 };
 
-static void houseKeeping(void*)
+static void houseKeeping0(Logr::log_t log)
 {
-  auto log = g_slog->withName("housekeeping");
-  static thread_local bool t_running; // houseKeeping can get suspended in secpoll, and be restarted, which makes us do duplicate work
-
-  try {
-    if (t_running) {
-      return;
-    }
-    t_running = true;
-
-    struct timeval now;
-    Utility::gettimeofday(&now);
-    t_Counters.updateSnap(now, g_regressionTestMode);
+  struct timeval now
+  {
+  };
+  Utility::gettimeofday(&now);
+  t_Counters.updateSnap(now, g_regressionTestMode);
 
-    // Below are the tasks that run for every recursorThread, including handler and taskThread
+  // Below are the tasks that run for every recursorThread, including handler and taskThread
 
-    static thread_local PeriodicTask pruneTCPTask{"pruneTCPTask", 5};
-    pruneTCPTask.runIfDue(now, [now]() {
-      t_tcp_manager.cleanup(now);
-    });
+  static thread_local PeriodicTask pruneTCPTask{"pruneTCPTask", 5};
+  pruneTCPTask.runIfDue(now, [now]() {
+    t_tcp_manager.cleanup(now);
+  });
 
-    const auto& info = RecThreadInfo::self();
+  const auto& info = RecThreadInfo::self();
 
-    // Threads handling packets process config changes in the input path, but not all threads process input packets
-    // distr threads only process TCP, so that may not happenn very often. So do all periodically.
-    static thread_local PeriodicTask exportConfigTask{"exportConfigTask", 30};
-    auto luaconfsLocal = g_luaconfs.getLocal();
-    exportConfigTask.runIfDue(now, [&luaconfsLocal]() {
-      checkProtobufExport(luaconfsLocal);
-      checkOutgoingProtobufExport(luaconfsLocal);
+  // Threads handling packets process config changes in the input path, but not all threads process input packets
+  // distr threads only process TCP, so that may not happenn very often. So do all periodically.
+  static thread_local PeriodicTask exportConfigTask{"exportConfigTask", 30};
+  auto luaconfsLocal = g_luaconfs.getLocal();
+  exportConfigTask.runIfDue(now, [&luaconfsLocal]() {
+    checkProtobufExport(luaconfsLocal);
+    checkOutgoingProtobufExport(luaconfsLocal);
 #ifdef HAVE_FSTRM
-      checkFrameStreamExport(luaconfsLocal, luaconfsLocal->frameStreamExportConfig, t_frameStreamServersInfo);
-      checkFrameStreamExport(luaconfsLocal, luaconfsLocal->nodFrameStreamExportConfig, t_nodFrameStreamServersInfo);
+    checkFrameStreamExport(luaconfsLocal, luaconfsLocal->frameStreamExportConfig, t_frameStreamServersInfo);
+    checkFrameStreamExport(luaconfsLocal, luaconfsLocal->nodFrameStreamExportConfig, t_nodFrameStreamServersInfo);
 #endif
-    });
+  });
 
-    // Below are the thread specific tasks for the handler and the taskThread
-    // Likley a few handler tasks could be moved to the taskThread
-    if (info.isTaskThread()) {
-      // TaskQueue is run always
-      runTasks(10, g_logCommonErrors);
-
-      static PeriodicTask ztcTask{"ZTC", 60};
-      static map<DNSName, RecZoneToCache::State> ztcStates;
-      ztcTask.runIfDue(now, [&luaconfsLocal]() {
-        RecZoneToCache::maintainStates(luaconfsLocal->ztcConfigs, ztcStates, luaconfsLocal->generation);
-        for (auto& ztc : luaconfsLocal->ztcConfigs) {
-          RecZoneToCache::ZoneToCache(ztc.second, ztcStates.at(ztc.first));
-        }
-      });
-    }
-    else if (info.isHandler()) {
-      if (g_packetCache) {
-        static PeriodicTask packetCacheTask{"packetCacheTask", 5};
-        packetCacheTask.runIfDue(now, []() {
-          g_packetCache->doPruneTo(g_maxPacketCacheEntries);
-        });
+  // Below are the thread specific tasks for the handler and the taskThread
+  // Likley a few handler tasks could be moved to the taskThread
+  if (info.isTaskThread()) {
+    // TaskQueue is run always
+    runTasks(10, g_logCommonErrors);
+
+    static PeriodicTask ztcTask{"ZTC", 60};
+    static map<DNSName, RecZoneToCache::State> ztcStates;
+    ztcTask.runIfDue(now, [&luaconfsLocal]() {
+      RecZoneToCache::maintainStates(luaconfsLocal->ztcConfigs, ztcStates, luaconfsLocal->generation);
+      for (const auto& ztc : luaconfsLocal->ztcConfigs) {
+        RecZoneToCache::ZoneToCache(ztc.second, ztcStates.at(ztc.first));
       }
-      static PeriodicTask recordCachePruneTask{"RecordCachePruneTask", 5};
-      recordCachePruneTask.runIfDue(now, []() {
-        g_recCache->doPrune(g_maxCacheEntries);
+    });
+  }
+  else if (info.isHandler()) {
+    if (g_packetCache) {
+      static PeriodicTask packetCacheTask{"packetCacheTask", 5};
+      packetCacheTask.runIfDue(now, []() {
+        g_packetCache->doPruneTo(g_maxPacketCacheEntries);
       });
+    }
+    static PeriodicTask recordCachePruneTask{"RecordCachePruneTask", 5};
+    recordCachePruneTask.runIfDue(now, []() {
+      g_recCache->doPrune(g_maxCacheEntries);
+    });
 
-      static PeriodicTask negCachePruneTask{"NegCachePrunteTask", 5};
-      negCachePruneTask.runIfDue(now, []() {
-        g_negCache->prune(g_maxCacheEntries / 8);
-      });
+    static PeriodicTask negCachePruneTask{"NegCachePrunteTask", 5};
+    negCachePruneTask.runIfDue(now, []() {
+      g_negCache->prune(g_maxCacheEntries / 8);
+    });
 
-      static PeriodicTask aggrNSECPruneTask{"AggrNSECPruneTask", 5};
-      aggrNSECPruneTask.runIfDue(now, [now]() {
-        if (g_aggressiveNSECCache) {
-          g_aggressiveNSECCache->prune(now.tv_sec);
-        }
-      });
+    static PeriodicTask aggrNSECPruneTask{"AggrNSECPruneTask", 5};
+    aggrNSECPruneTask.runIfDue(now, [now]() {
+      if (g_aggressiveNSECCache) {
+        g_aggressiveNSECCache->prune(now.tv_sec);
+      }
+    });
 
-      static PeriodicTask pruneNSpeedTask{"pruneNSSpeedTask", 30};
-      pruneNSpeedTask.runIfDue(now, [now]() {
-        SyncRes::pruneNSSpeeds(now.tv_sec - 300);
-      });
+    static PeriodicTask pruneNSpeedTask{"pruneNSSpeedTask", 30};
+    pruneNSpeedTask.runIfDue(now, [now]() {
+      SyncRes::pruneNSSpeeds(now.tv_sec - 300);
+    });
+
+    static PeriodicTask pruneEDNSTask{"pruneEDNSTask", 60};
+    pruneEDNSTask.runIfDue(now, [now]() {
+      SyncRes::pruneEDNSStatuses(now.tv_sec);
+    });
 
-      static PeriodicTask pruneEDNSTask{"pruneEDNSTask", 60};
-      pruneEDNSTask.runIfDue(now, [now]() {
-        SyncRes::pruneEDNSStatuses(now.tv_sec);
+    if (SyncRes::s_max_busy_dot_probes > 0) {
+      static PeriodicTask pruneDoTProbeMap{"pruneDoTProbeMapTask", 60};
+      pruneDoTProbeMap.runIfDue(now, [now]() {
+        SyncRes::pruneDoTProbeMap(now.tv_sec);
       });
+    }
 
-      if (SyncRes::s_max_busy_dot_probes > 0) {
-        static PeriodicTask pruneDoTProbeMap{"pruneDoTProbeMapTask", 60};
-        pruneDoTProbeMap.runIfDue(now, [now]() {
-          SyncRes::pruneDoTProbeMap(now.tv_sec);
-        });
-      }
+    static PeriodicTask pruneThrottledTask{"pruneThrottledTask", 5};
+    pruneThrottledTask.runIfDue(now, [now]() {
+      SyncRes::pruneThrottledServers(now.tv_sec);
+    });
 
-      static PeriodicTask pruneThrottledTask{"pruneThrottledTask", 5};
-      pruneThrottledTask.runIfDue(now, [now]() {
-        SyncRes::pruneThrottledServers(now.tv_sec);
-      });
+    static PeriodicTask pruneFailedServersTask{"pruneFailedServerTask", 5};
+    pruneFailedServersTask.runIfDue(now, [now]() {
+      SyncRes::pruneFailedServers(now.tv_sec - static_cast<time_t>(SyncRes::s_serverdownthrottletime * 10));
+    });
 
-      static PeriodicTask pruneFailedServersTask{"pruneFailedServerTask", 5};
-      pruneFailedServersTask.runIfDue(now, [now]() {
-        SyncRes::pruneFailedServers(now.tv_sec - SyncRes::s_serverdownthrottletime * 10);
-      });
+    static PeriodicTask pruneNonResolvingTask{"pruneNonResolvingTask", 5};
+    pruneNonResolvingTask.runIfDue(now, [now]() {
+      SyncRes::pruneNonResolving(now.tv_sec - SyncRes::s_nonresolvingnsthrottletime);
+    });
 
-      static PeriodicTask pruneNonResolvingTask{"pruneNonResolvingTask", 5};
-      pruneNonResolvingTask.runIfDue(now, [now]() {
-        SyncRes::pruneNonResolving(now.tv_sec - SyncRes::s_nonresolvingnsthrottletime);
-      });
+    static PeriodicTask pruneSaveParentSetTask{"pruneSaveParentSetTask", 60};
+    pruneSaveParentSetTask.runIfDue(now, [now]() {
+      SyncRes::pruneSaveParentsNSSets(now.tv_sec);
+    });
 
-      static PeriodicTask pruneSaveParentSetTask{"pruneSaveParentSetTask", 60};
-      pruneSaveParentSetTask.runIfDue(now, [now]() {
-        SyncRes::pruneSaveParentsNSSets(now.tv_sec);
-      });
+    // By default, refresh at 80% of max-cache-ttl with a minimum period of 10s
+    const unsigned int minRootRefreshInterval = 10;
+    static PeriodicTask rootUpdateTask{"rootUpdateTask", std::max(SyncRes::s_maxcachettl * 8 / 10, minRootRefreshInterval)};
+    rootUpdateTask.runIfDue(now, [now, &log, minRootRefreshInterval]() {
+      int res = 0;
+      if (!g_regressionTestMode) {
+        res = SyncRes::getRootNS(now, nullptr, 0, log);
+      }
+      if (res == 0) {
+        // Success, go back to the defaut period
+        rootUpdateTask.setPeriod(std::max(SyncRes::s_maxcachettl * 8 / 10, minRootRefreshInterval));
+      }
+      else {
+        // On failure, go to the middle of the remaining period (initially 80% / 8 = 10%) and shorten the interval on each
+        // failure by dividing the existing interval by 8, keeping the minimum interval at 10s.
+        // So with a 1 day period and failures we'll see a refresh attempt at 69120, 69120+11520, 69120+11520+1440, ...
+        rootUpdateTask.setPeriod(std::max<time_t>(rootUpdateTask.getPeriod() / 8, minRootRefreshInterval));
+      }
+    });
 
-      // By default, refresh at 80% of max-cache-ttl with a minimum period of 10s
-      const unsigned int minRootRefreshInterval = 10;
-      static PeriodicTask rootUpdateTask{"rootUpdateTask", std::max(SyncRes::s_maxcachettl * 8 / 10, minRootRefreshInterval)};
-      rootUpdateTask.runIfDue(now, [now, &log, minRootRefreshInterval]() {
-        int res = 0;
-        if (!g_regressionTestMode) {
-          res = SyncRes::getRootNS(now, nullptr, 0, log);
-        }
-        if (res == 0) {
-          // Success, go back to the defaut period
-          rootUpdateTask.setPeriod(std::max(SyncRes::s_maxcachettl * 8 / 10, minRootRefreshInterval));
-        }
-        else {
-          // On failure, go to the middle of the remaining period (initially 80% / 8 = 10%) and shorten the interval on each
-          // failure by dividing the existing interval by 8, keeping the minimum interval at 10s.
-          // So with a 1 day period and failures we'll see a refresh attempt at 69120, 69120+11520, 69120+11520+1440, ...
-          rootUpdateTask.setPeriod(std::max<time_t>(rootUpdateTask.getPeriod() / 8, minRootRefreshInterval));
-        }
-      });
+    static PeriodicTask secpollTask{"secpollTask", 3600};
+    static time_t t_last_secpoll;
+    secpollTask.runIfDue(now, [&log]() {
+      try {
+        doSecPoll(&t_last_secpoll, log);
+      }
+      catch (const std::exception& e) {
+        SLOG(g_log << Logger::Error << "Exception while performing security poll: " << e.what() << endl,
+             log->error(Logr::Error, e.what(), "Exception while performing security poll"));
+      }
+      catch (const PDNSException& e) {
+        SLOG(g_log << Logger::Error << "Exception while performing security poll: " << e.reason << endl,
+             log->error(Logr::Error, e.reason, "Exception while performing security poll"));
+      }
+      catch (const ImmediateServFailException& e) {
+        SLOG(g_log << Logger::Error << "Exception while performing security poll: " << e.reason << endl,
+             log->error(Logr::Error, e.reason, "Exception while performing security poll"));
+      }
+      catch (const PolicyHitException& e) {
+        SLOG(g_log << Logger::Error << "Policy hit while performing security poll" << endl,
+             log->info(Logr::Error, "Policy hit while performing security poll"));
+      }
+      catch (...) {
+        SLOG(g_log << Logger::Error << "Exception while performing security poll" << endl,
+             log->info(Logr::Error, "Exception while performing security poll"));
+      }
+    });
 
-      static PeriodicTask secpollTask{"secpollTask", 3600};
-      static time_t t_last_secpoll;
-      secpollTask.runIfDue(now, [&log]() {
+    const time_t taInterval = std::max(1, static_cast<int>(luaconfsLocal->trustAnchorFileInfo.interval) * 3600);
+    static PeriodicTask trustAnchorTask{"trustAnchorTask", taInterval};
+    if (!trustAnchorTask.hasRun()) {
+      // Loading the Lua config file already "refreshed" the TAs
+      trustAnchorTask.updateLastRun();
+    }
+    // interval might have ben updated
+    trustAnchorTask.setPeriod(taInterval);
+    trustAnchorTask.runIfDue(now, [&luaconfsLocal, &log]() {
+      if (!luaconfsLocal->trustAnchorFileInfo.fname.empty() && luaconfsLocal->trustAnchorFileInfo.interval != 0) {
+        SLOG(g_log << Logger::Debug << "Refreshing Trust Anchors from file" << endl,
+             log->info(Logr::Debug, "Refreshing Trust Anchors from file"));
         try {
-          doSecPoll(&t_last_secpoll, log);
-        }
-        catch (const std::exception& e) {
-          SLOG(g_log << Logger::Error << "Exception while performing security poll: " << e.what() << endl,
-               log->error(Logr::Error, e.what(), "Exception while performing security poll"));
-        }
-        catch (const PDNSException& e) {
-          SLOG(g_log << Logger::Error << "Exception while performing security poll: " << e.reason << endl,
-               log->error(Logr::Error, e.reason, "Exception while performing security poll"));
-        }
-        catch (const ImmediateServFailException& e) {
-          SLOG(g_log << Logger::Error << "Exception while performing security poll: " << e.reason << endl,
-               log->error(Logr::Error, e.reason, "Exception while performing security poll"));
-        }
-        catch (const PolicyHitException& e) {
-          SLOG(g_log << Logger::Error << "Policy hit while performing security poll" << endl,
-               log->info(Logr::Error, "Policy hit while performing security poll"));
+          map<DNSName, dsmap_t> dsAnchors;
+          if (updateTrustAnchorsFromFile(luaconfsLocal->trustAnchorFileInfo.fname, dsAnchors, log)) {
+            g_luaconfs.modify([&dsAnchors](LuaConfigItems& lci) {
+              lci.dsAnchors = dsAnchors;
+            });
+          }
         }
-        catch (...) {
-          SLOG(g_log << Logger::Error << "Exception while performing security poll" << endl,
-               log->info(Logr::Error, "Exception while performing security poll"));
+        catch (const PDNSException& pe) {
+          SLOG(g_log << Logger::Error << "Unable to update Trust Anchors: " << pe.reason << endl,
+               log->error(Logr::Error, pe.reason, "Unable to update Trust Anchors"));
         }
-      });
-
-      static PeriodicTask trustAnchorTask{"trustAnchorTask", std::max(1U, luaconfsLocal->trustAnchorFileInfo.interval) * 3600};
-      if (!trustAnchorTask.hasRun()) {
-        // Loading the Lua config file already "refreshed" the TAs
-        trustAnchorTask.updateLastRun();
       }
-      // interval might have ben updated
-      trustAnchorTask.setPeriod(std::max(1U, luaconfsLocal->trustAnchorFileInfo.interval) * 3600);
-      trustAnchorTask.runIfDue(now, [&luaconfsLocal, &log]() {
-        if (!luaconfsLocal->trustAnchorFileInfo.fname.empty() && luaconfsLocal->trustAnchorFileInfo.interval != 0) {
-          SLOG(g_log << Logger::Debug << "Refreshing Trust Anchors from file" << endl,
-               log->info(Logr::Debug, "Refreshing Trust Anchors from file"));
-          try {
-            map<DNSName, dsmap_t> dsAnchors;
-            if (updateTrustAnchorsFromFile(luaconfsLocal->trustAnchorFileInfo.fname, dsAnchors, log)) {
-              g_luaconfs.modify([&dsAnchors](LuaConfigItems& lci) {
-                lci.dsAnchors = dsAnchors;
-              });
-            }
-          }
-          catch (const PDNSException& pe) {
-            SLOG(g_log << Logger::Error << "Unable to update Trust Anchors: " << pe.reason << endl,
-                 log->error(Logr::Error, pe.reason, "Unable to update Trust Anchors"));
-          }
-        }
-      });
+    });
+  }
+  t_Counters.updateSnap(g_regressionTestMode);
+}
+
+static void houseKeeping(void* /* ignored */)
+{
+  auto log = g_slog->withName("housekeeping");
+  static thread_local bool t_running; // houseKeeping can get suspended in secpoll, and be restarted, which makes us do duplicate work
+
+  try {
+    if (t_running) {
+      return;
     }
-    t_Counters.updateSnap(g_regressionTestMode);
+    t_running = true;
+    houseKeeping0(log);
     t_running = false;
   }
   catch (const PDNSException& ae) {
@@ -2420,6 +2429,117 @@ static void houseKeeping(void*)
   }
 }
 
+static void runLuaMaintenance(RecThreadInfo& threadInfo, time_t& last_lua_maintenance, time_t luaMaintenanceInterval)
+{
+  if (t_pdl != nullptr) {
+    // lua-dns-script directive is present, call the maintenance callback if needed
+    /* remember that the listener threads handle TCP queries */
+    if (threadInfo.isWorker() || threadInfo.isListener()) {
+      // Only on threads processing queries
+      if (g_now.tv_sec - last_lua_maintenance >= luaMaintenanceInterval) {
+        struct timeval start{};
+        Utility::gettimeofday(&start);
+        t_pdl->maintenance();
+        last_lua_maintenance = g_now.tv_sec;
+        struct timeval stop{};
+        Utility::gettimeofday(&stop);
+        t_Counters.at(rec::Counter::maintenanceUsec) += uSec(stop - start);
+        ++t_Counters.at(rec::Counter::maintenanceCalls);
+      }
+    }
+  }
+}
+
+static void runTCPMaintenance(RecThreadInfo& threadInfo, bool& listenOnTCP, unsigned int maxTcpClients)
+{
+  if (threadInfo.isListener()) {
+    if (listenOnTCP) {
+      if (TCPConnection::getCurrentConnections() > maxTcpClients) { // shutdown, too many connections
+        for (const auto fileDesc : threadInfo.tcpSockets) {
+          t_fdm->removeReadFD(fileDesc);
+        }
+        listenOnTCP = false;
+      }
+    }
+    else {
+      if (TCPConnection::getCurrentConnections() <= maxTcpClients) { // reenable
+        for (const auto fileDesc : threadInfo.tcpSockets) {
+          t_fdm->addReadFD(fileDesc, handleNewTCPQuestion);
+        }
+        listenOnTCP = true;
+      }
+    }
+  }
+}
+
+static void recLoop()
+{
+  unsigned int maxTcpClients = ::arg().asNum("max-tcp-clients");
+  bool listenOnTCP{true};
+  time_t last_stat = 0;
+  time_t last_carbon = 0;
+  time_t last_lua_maintenance = 0;
+  time_t carbonInterval = ::arg().asNum("carbon-interval");
+  time_t luaMaintenanceInterval = ::arg().asNum("lua-maintenance-interval");
+
+  auto& threadInfo = RecThreadInfo::self();
+
+  while (!RecursorControlChannel::stop) {
+    while (MT->schedule(&g_now)) {
+      ; // MTasker letting the mthreads do their thing
+    }
+
+    // Use primes, it avoid not being scheduled in cases where the counter has a regular pattern.
+    // We want to call handler thread often, it gets scheduled about 2 times per second
+    if (((threadInfo.isHandler() || threadInfo.isTaskThread()) && s_counter % 11 == 0) || s_counter % 499 == 0) {
+      struct timeval start{};
+      Utility::gettimeofday(&start);
+      MT->makeThread(houseKeeping, nullptr);
+      if (!threadInfo.isTaskThread()) {
+        struct timeval stop{};
+        Utility::gettimeofday(&stop);
+        t_Counters.at(rec::Counter::maintenanceUsec) += uSec(stop - start);
+        ++t_Counters.at(rec::Counter::maintenanceCalls);
+      }
+    }
+
+    if (s_counter % 55 == 0) {
+      auto expired = t_fdm->getTimeouts(g_now);
+
+      for (const auto & exp : expired) {
+        auto conn = boost::any_cast<shared_ptr<TCPConnection>>(exp.second);
+        if (g_logCommonErrors) {
+          SLOG(g_log << Logger::Warning << "Timeout from remote TCP client " << conn->d_remote.toStringWithPort() << endl, // NOLINT: union access
+               g_slogtcpin->info(Logr::Warning, "Timeout from remote TCP client", "remote", Logging::Loggable(conn->d_remote)));
+        }
+        t_fdm->removeReadFD(exp.first);
+      }
+    }
+
+    s_counter++;
+
+    if (threadInfo.isHandler()) {
+      if (statsWanted || (s_statisticsInterval > 0 && (g_now.tv_sec - last_stat) >= s_statisticsInterval)) {
+        doStats();
+        last_stat = g_now.tv_sec;
+      }
+
+      Utility::gettimeofday(&g_now, nullptr);
+
+      if ((g_now.tv_sec - last_carbon) >= carbonInterval) {
+        MT->makeThread(doCarbonDump, nullptr);
+        last_carbon = g_now.tv_sec;
+      }
+    }
+    runLuaMaintenance(threadInfo, last_lua_maintenance, luaMaintenanceInterval);
+
+    t_fdm->run(&g_now);
+    // 'run' updates g_now for us
+
+    runTCPMaintenance(threadInfo, listenOnTCP, maxTcpClients);
+  }
+}
+
 static void recursorThread()
 {
   auto log = g_slog->withName("runtime");
@@ -2444,7 +2564,7 @@ static void recursorThread()
       if (threadInfo.isHandler()) {
         if (!primeHints()) {
           threadInfo.setExitCode(EXIT_FAILURE);
-          RecursorControlChannel::stop = 1;
+          RecursorControlChannel::stop = true;
           SLOG(g_log << Logger::Critical << "Priming cache failed, stopping" << endl,
                log->info(Logr::Critical, "Priming cache failed, stopping"));
         }
@@ -2454,8 +2574,9 @@ static void recursorThread()
     }
 
 #ifdef NOD_ENABLED
-    if (threadInfo.isWorker())
+    if (threadInfo.isWorker()) {
       setupNODThread(log);
+    }
 #endif /* NOD_ENABLED */
 
     /* the listener threads handle TCP queries */
@@ -2476,12 +2597,14 @@ static void recursorThread()
     }
 
     unsigned int ringsize = ::arg().asNum("stats-ringbuffer-entries") / RecThreadInfo::numWorkers();
-    if (ringsize) {
+    if (ringsize != 0) {
       t_remotes = std::make_unique<addrringbuf_t>();
-      if (RecThreadInfo::weDistributeQueries())
+      if (RecThreadInfo::weDistributeQueries()) {
         t_remotes->set_capacity(::arg().asNum("stats-ringbuffer-entries") / RecThreadInfo::numDistributors());
-      else
+      }
+      else {
         t_remotes->set_capacity(ringsize);
+      }
       t_servfailremotes = std::make_unique<addrringbuf_t>();
       t_servfailremotes->set_capacity(ringsize);
       t_bogusremotes = std::make_unique<addrringbuf_t>();
@@ -2555,15 +2678,6 @@ static void recursorThread()
       t_fdm->addReadFD(g_rcc.d_fd, handleRCC); // control channel
     }
 
-    unsigned int maxTcpClients = ::arg().asNum("max-tcp-clients");
-
-    bool listenOnTCP{true};
-
-    time_t last_stat = 0;
-    time_t last_carbon = 0, last_lua_maintenance = 0;
-    time_t carbonInterval = ::arg().asNum("carbon-interval");
-    time_t luaMaintenanceInterval = ::arg().asNum("lua-maintenance-interval");
-
 #ifdef HAVE_SYSTEMD
     if (threadInfo.isHandler()) {
       // There is a race, as some threads might not be ready yet to do work.
@@ -2572,92 +2686,8 @@ static void recursorThread()
       sd_notify(0, "READY=1");
     }
 #endif
-    while (!RecursorControlChannel::stop) {
-      while (MT->schedule(&g_now))
-        ; // MTasker letting the mthreads do their thing
-
-      // Use primes, it avoid not being scheduled in cases where the counter has a regular pattern.
-      // We want to call handler thread often, it gets scheduled about 2 times per second
-      if (((threadInfo.isHandler() || threadInfo.isTaskThread()) && s_counter % 11 == 0) || s_counter % 499 == 0) {
-        struct timeval start;
-        Utility::gettimeofday(&start);
-        MT->makeThread(houseKeeping, nullptr);
-        if (!threadInfo.isTaskThread()) {
-          struct timeval stop;
-          Utility::gettimeofday(&stop);
-          t_Counters.at(rec::Counter::maintenanceUsec) += uSec(stop - start);
-          ++t_Counters.at(rec::Counter::maintenanceCalls);
-        }
-      }
-
-      if (!(s_counter % 55)) {
-        typedef vector<pair<int, FDMultiplexer::funcparam_t>> expired_t;
-        expired_t expired = t_fdm->getTimeouts(g_now);
-
-        for (expired_t::iterator i = expired.begin(); i != expired.end(); ++i) {
-          shared_ptr<TCPConnection> conn = boost::any_cast<shared_ptr<TCPConnection>>(i->second);
-          if (g_logCommonErrors)
-            SLOG(g_log << Logger::Warning << "Timeout from remote TCP client " << conn->d_remote.toStringWithPort() << endl,
-                 g_slogtcpin->info(Logr::Warning, "Timeout from remote TCP client", "remote", Logging::Loggable(conn->d_remote)));
-          t_fdm->removeReadFD(i->first);
-        }
-      }
-
-      s_counter++;
-
-      if (threadInfo.isHandler()) {
-        if (statsWanted || (s_statisticsInterval > 0 && (g_now.tv_sec - last_stat) >= s_statisticsInterval)) {
-          doStats();
-          last_stat = g_now.tv_sec;
-        }
-
-        Utility::gettimeofday(&g_now, nullptr);
 
-        if ((g_now.tv_sec - last_carbon) >= carbonInterval) {
-          MT->makeThread(doCarbonDump, 0);
-          last_carbon = g_now.tv_sec;
-        }
-      }
-      if (t_pdl != nullptr) {
-        // lua-dns-script directive is present, call the maintenance callback if needed
-        /* remember that the listener threads handle TCP queries */
-        if (threadInfo.isWorker() || threadInfo.isListener()) {
-          // Only on threads processing queries
-          if (g_now.tv_sec - last_lua_maintenance >= luaMaintenanceInterval) {
-            struct timeval start;
-            Utility::gettimeofday(&start);
-            t_pdl->maintenance();
-            last_lua_maintenance = g_now.tv_sec;
-            struct timeval stop;
-            Utility::gettimeofday(&stop);
-            t_Counters.at(rec::Counter::maintenanceUsec) += uSec(stop - start);
-            ++t_Counters.at(rec::Counter::maintenanceCalls);
-          }
-        }
-      }
-
-      t_fdm->run(&g_now);
-      // 'run' updates g_now for us
-
-      if (threadInfo.isListener()) {
-        if (listenOnTCP) {
-          if (TCPConnection::getCurrentConnections() > maxTcpClients) { // shutdown, too many connections
-            for (const auto fd : threadInfo.tcpSockets) {
-              t_fdm->removeReadFD(fd);
-            }
-            listenOnTCP = false;
-          }
-        }
-        else {
-          if (TCPConnection::getCurrentConnections() <= maxTcpClients) { // reenable
-            for (const auto fd : threadInfo.tcpSockets) {
-              t_fdm->addReadFD(fd, handleNewTCPQuestion);
-            }
-            listenOnTCP = true;
-          }
-        }
-      }
-    }
+    recLoop();
   }
   catch (PDNSException& ae) {
     SLOG(g_log << Logger::Error << "Exception: " << ae.reason << endl,