]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
rec: Add support for several distributor threads
authorRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 3 Jul 2018 07:33:46 +0000 (09:33 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Mon, 30 Jul 2018 13:08:56 +0000 (15:08 +0200)
pdns/pdns_recursor.cc
pdns/recursordist/docs/settings.rst

index 1bd772c1b56a6398641c6cb04a2568468e2750cf..ab69e74c81bf1ec51236dd08ad1578d72266084a 100644 (file)
 typedef map<ComboAddress, uint32_t, ComboAddress::addressOnlyLessThan> tcpClientCounts_t;
 
 static thread_local std::shared_ptr<RecursorLua4> t_pdl;
-static thread_local int t_id = -1;
+static thread_local unsigned int t_id = 0;
 static thread_local std::shared_ptr<Regex> t_traceRegex;
 static thread_local std::unique_ptr<tcpClientCounts_t> t_tcpClientCounts;
 #ifdef HAVE_PROTOBUF
@@ -127,32 +127,50 @@ thread_local std::shared_ptr<nod::NODDB> t_nodDBp;
 #endif /* NOD_ENABLED */
 __thread struct timeval g_now; // timestamp, updated (too) frequently
 
+typedef vector<pair<int, function< void(int, any&) > > > deferredAdd_t;
+
 // for communicating with our threads
-struct ThreadPipeSet
-{
-  int writeToThread;
-  int readToThread;
-  int writeFromThread;
-  int readFromThread;
-  int writeQueriesToThread; // this one is non-blocking
-  int readQueriesToThread;
+// effectively readonly after startup
+struct RecThreadInfo
+{
+  struct ThreadPipeSet
+  {
+    int writeToThread{-1};
+    int readToThread{-1};
+    int writeFromThread{-1};
+    int readFromThread{-1};
+    int writeQueriesToThread{-1}; // this one is non-blocking
+    int readQueriesToThread{-1};
+  };
+
+  /* FD corresponding to listening sockets if we have one socket per
+     listener (with reuseport), otherwise all listeners share the
+     same FD and g_deferredAdds is then used instead */
+  deferredAdd_t deferredAdds;
+  struct ThreadPipeSet pipes;
+  std::thread thread;
+  /* handle the web server, carbon, statistics and the control channel */
+  bool isHandler{false};
+  /* accept incoming queries (and distributes them to the workers if pdns-distributes-queries is set) */
+  bool isListener{false};
+  /* process queries */
+  bool isWorker{false};
 };
 
-/* the TID of the thread handling the web server, carbon, statistics and the control channel */
-static const int s_handlerThreadID = -1;
-/* when pdns-distributes-queries is set, the TID of the thread handling, hashing and distributing new queries
-   to the other threads */
-static const int s_distributorThreadID = 0;
+/* first we have the handler thread, t_id == 0 (some other
+   helper threads like SNMP might have t_id == 0 as well)
+   then the distributor threads if any
+   and finally the workers */
+static std::vector<RecThreadInfo> s_threadInfos;
+/* without reuseport, all listeners share the same sockets */
+static deferredAdd_t g_deferredAdds;
 
 typedef vector<int> tcpListenSockets_t;
 typedef map<int, ComboAddress> listenSocketsAddresses_t; // is shared across all threads right now
-typedef vector<pair<int, function< void(int, any&) > > > deferredAdd_t;
 
 static const ComboAddress g_local4("0.0.0.0"), g_local6("::");
-static vector<ThreadPipeSet> g_pipes; // effectively readonly after startup
 static tcpListenSockets_t g_tcpListenSockets;   // shared across threads, but this is fine, never written to from a thread. All threads listen on all sockets
 static listenSocketsAddresses_t g_listenSocketsAddresses; // is shared across all threads right now
-static std::unordered_map<unsigned int, deferredAdd_t> deferredAdds;
 static set<int> g_fromtosockets; // listen sockets that use 'sendfromto()' mechanism
 static vector<ComboAddress> g_localQueryAddresses4, g_localQueryAddresses6;
 static AtomicCounter counter;
@@ -166,6 +184,7 @@ static uint32_t g_disthashseed;
 static unsigned int g_maxTCPPerClient;
 static unsigned int g_networkTimeoutMsec;
 static unsigned int g_maxMThreads;
+static unsigned int g_numDistributorThreads;
 static unsigned int g_numWorkerThreads;
 static int g_tcpTimeout;
 static uint16_t g_udpTruncationThreshold;
@@ -174,9 +193,8 @@ static std::atomic<bool> statsWanted;
 static std::atomic<bool> g_quiet;
 static bool g_logCommonErrors;
 static bool g_anyToTcp;
-static bool g_weDistributeQueries; // if true, only 1 thread listens on the incoming query sockets
+static bool g_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers
 static bool g_reusePort{false};
-static bool g_useOneSocketPerThread;
 static bool g_gettagNeedsEDNSOptions{false};
 static time_t g_statisticsInterval;
 static bool g_useIncomingECS;
@@ -195,7 +213,7 @@ static std::set<uint16_t> s_avoidUdpSourcePorts;
 static uint16_t s_minUdpSourcePort;
 static uint16_t s_maxUdpSourcePort;
 
-RecursorControlChannel s_rcc; // only active in thread 0
+RecursorControlChannel s_rcc; // only active in the handler thread
 RecursorStats g_stats;
 string s_programname="pdns_recursor";
 string s_pidfname;
@@ -310,6 +328,24 @@ int getMTaskerTID()
   return MT->getTid();
 }
 
+static bool isDistributorThread()
+{
+  if (t_id == 0) {
+    return false;
+  }
+
+  return g_weDistributeQueries && s_threadInfos.at(t_id).isListener;
+}
+
+static bool isHandlerThread()
+{
+  if (t_id == 0) {
+    return true;
+  }
+
+  return s_threadInfos.at(t_id).isHandler;
+}
+
 static void handleTCPClientWritable(int fd, FDMultiplexer::funcparam_t& var);
 
 // -1 is error, 0 is timeout, 1 is success
@@ -2208,7 +2244,7 @@ static void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var)
   }
 }
 
-static void makeTCPServerSockets(unsigned int threadId)
+static void makeTCPServerSockets(deferredAdd_t& deferredAdds)
 {
   int fd;
   vector<string>locals;
@@ -2283,7 +2319,7 @@ static void makeTCPServerSockets(unsigned int threadId)
     setNonBlocking(fd);
     setSocketSendBuffer(fd, 65000);
     listen(fd, 128);
-    deferredAdds[threadId].push_back(make_pair(fd, handleNewTCPQuestion));
+    deferredAdds.push_back(make_pair(fd, handleNewTCPQuestion));
     g_tcpListenSockets.push_back(fd);
     // we don't need to update g_listenSocketsAddresses since it doesn't work for TCP/IP:
     //  - fd is not that which we know here, but returned from accept()
@@ -2294,7 +2330,7 @@ static void makeTCPServerSockets(unsigned int threadId)
   }
 }
 
-static void makeUDPServerSockets(unsigned int threadId)
+static void makeUDPServerSockets(deferredAdd_t& deferredAdds)
 {
   int one=1;
   vector<string>locals;
@@ -2359,7 +2395,7 @@ static void makeUDPServerSockets(unsigned int threadId)
 
     setNonBlocking(fd);
 
-    deferredAdds[threadId].push_back(make_pair(fd, handleNewUDPQuestion));
+    deferredAdds.push_back(make_pair(fd, handleNewUDPQuestion));
     g_listenSocketsAddresses[fd]=sin;  // this is written to only from the startup thread, not from the workers
     if(sin.sin4.sin_family == AF_INET)
       g_log<<Logger::Error<<"Listening for UDP queries on "<< sin.toString() <<":"<<st.port<<endl;
@@ -2474,7 +2510,7 @@ static void houseKeeping(void *)
         last_rootupdate=now.tv_sec;
     }
 
-    if(t_id == s_distributorThreadID) {
+    if(isHandlerThread()) {
 
       if(now.tv_sec - last_secpoll >= 3600) {
        try {
@@ -2496,7 +2532,6 @@ static void houseKeeping(void *)
         {
           g_log<<Logger::Error<<"Exception while performing security poll"<<endl;
         }
-
       }
     }
     s_running=false;
@@ -2511,30 +2546,32 @@ static void houseKeeping(void *)
 
 static void makeThreadPipes()
 {
-  for(unsigned int n=0; n < g_numThreads; ++n) {
-    struct ThreadPipeSet tps;
+  /* thread 0 is the handler / SNMP, we start at 1 */
+  for(unsigned int n = 1; n <= (g_numWorkerThreads + g_numDistributorThreads); ++n) {
+    auto& threadInfos = s_threadInfos.at(n);
+
     int fd[2];
     if(pipe(fd) < 0)
       unixDie("Creating pipe for inter-thread communications");
 
-    tps.readToThread = fd[0];
-    tps.writeToThread = fd[1];
+    threadInfos.pipes.readToThread = fd[0];
+    threadInfos.pipes.writeToThread = fd[1];
 
     if(pipe(fd) < 0)
       unixDie("Creating pipe for inter-thread communications");
-    tps.readFromThread = fd[0];
-    tps.writeFromThread = fd[1];
+
+    threadInfos.pipes.readFromThread = fd[0];
+    threadInfos.pipes.writeFromThread = fd[1];
 
     if(pipe(fd) < 0)
       unixDie("Creating pipe for inter-thread communications");
-    tps.readQueriesToThread = fd[0];
-    tps.writeQueriesToThread = fd[1];
 
-    if (!setNonBlocking(tps.writeQueriesToThread)) {
+    threadInfos.pipes.readQueriesToThread = fd[0];
+    threadInfos.pipes.writeQueriesToThread = fd[1];
+
+    if (!setNonBlocking(threadInfos.pipes.writeQueriesToThread)) {
       unixDie("Making pipe for inter-thread communications non-blocking");
     }
-
-    g_pipes.push_back(tps);
   }
 }
 
@@ -2546,21 +2583,21 @@ struct ThreadMSG
 
 void broadcastFunction(const pipefunc_t& func)
 {
-  /* This function might be called before t_id are set during startup
-     for the initialization of ACLs and domain maps, but the default is the same
-     than the handler thread */
-  if (t_id != s_handlerThreadID) {
-    g_log<<Logger::Error<<"broadcastFunction() has been called by a worker ("<<t_id<<")"<<endl;
-    exit(1);
+  /* This function might be called by the worker with t_id 0 during startup
+     for the initialization of ACLs and domain maps. After that it should only
+     be called by the handler. */
+
+  if (s_threadInfos.empty() && isHandlerThread()) {
+    /* the handler and  distributors will call themselves below, but
+       during startup we get called while s_threadInfos has not been
+       populated yet to update the ACL or domain maps, so we need to
+       handle that case.
+    */
+    func();
   }
 
-  /* the distributor will call itself below, but if we are the handler thread,
-     call the function ourselves to update the ACL or domain maps for example */
-  func();
-
-  int n = 0;
-  for(ThreadPipeSet& tps : g_pipes)
-  {
+  unsigned int n = 0;
+  for (const auto& threadInfo : s_threadInfos) {
     if(n++ == t_id) {
       func(); // don't write to ourselves!
       continue;
@@ -2569,13 +2606,14 @@ void broadcastFunction(const pipefunc_t& func)
     ThreadMSG* tmsg = new ThreadMSG();
     tmsg->func = func;
     tmsg->wantAnswer = true;
-    if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) {
+    if(write(threadInfo.pipes.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) {
       delete tmsg;
+
       unixDie("write to thread pipe returned wrong size or error");
     }
 
     string* resp = nullptr;
-    if(read(tps.readFromThread, &resp, sizeof(resp)) != sizeof(resp))
+    if(read(threadInfo.pipes.readFromThread, &resp, sizeof(resp)) != sizeof(resp))
       unixDie("read from thread pipe returned wrong size or error");
 
     if(resp) {
@@ -2585,23 +2623,24 @@ void broadcastFunction(const pipefunc_t& func)
   }
 }
 
-// This function is only called by the distributor thread, when pdns-distributes-queries is set
+// This function is only called by the distributor threads, when pdns-distributes-queries is set
 void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
 {
-  if (t_id != s_distributorThreadID) {
+  if (!isDistributorThread()) {
     g_log<<Logger::Error<<"distributeAsyncFunction() has been called by a worker ("<<t_id<<")"<<endl;
     exit(1);
   }
 
   unsigned int hash = hashQuestion(packet.c_str(), packet.length(), g_disthashseed);
-  unsigned int target = 1 + (hash % (g_pipes.size()-1));
+  unsigned int target = /* skip handler */ 1 + g_numDistributorThreads + (hash % g_numWorkerThreads);
 
-  if(target == static_cast<unsigned int>(s_distributorThreadID)) {
-    g_log<<Logger::Error<<"distributeAsyncFunction() tried to assign a query to the distributor"<<endl;
+  const auto& targetInfo = s_threadInfos[target];
+  if(!targetInfo.isWorker) {
+    g_log<<Logger::Error<<"distributeAsyncFunction() tried to assign a query to a non-worker thread"<<endl;
     exit(1);
   }
 
-  ThreadPipeSet& tps = g_pipes[target];
+  const auto& tps = targetInfo.pipes;
   ThreadMSG* tmsg = new ThreadMSG();
   tmsg->func = func;
   tmsg->wantAnswer = false;
@@ -2645,7 +2684,8 @@ static void handlePipeRequest(int fd, FDMultiplexer::funcparam_t& var)
       g_log<<Logger::Error<<"PIPE function we executed created PDNS exception: "<<e.reason<<endl; // but what if they wanted an answer.. we send 0
   }
   if(tmsg->wantAnswer) {
-    if(write(g_pipes[t_id].writeFromThread, &resp, sizeof(resp)) != sizeof(resp)) {
+    const auto& threadInfo = s_threadInfos.at(t_id);
+    if(write(threadInfo.pipes.writeFromThread, &resp, sizeof(resp)) != sizeof(resp)) {
       delete tmsg;
       unixDie("write to thread pipe returned wrong size or error");
     }
@@ -2684,15 +2724,19 @@ vector<pair<DNSName, uint16_t> >& operator+=(vector<pair<DNSName, uint16_t> >&a,
   and by the SNMP thread to gather metrics. */
 template<class T> T broadcastAccFunction(const boost::function<T*()>& func)
 {
-  /* the SNMP thread uses id -1 too */
-  if (t_id != s_handlerThreadID) {
+  if (!isHandlerThread()) {
     g_log<<Logger::Error<<"broadcastAccFunction has been called by a worker ("<<t_id<<")"<<endl;
     exit(1);
   }
 
+  unsigned int n = 0;
   T ret=T();
-  for(ThreadPipeSet& tps : g_pipes)
-  {
+  for (const auto& threadInfo : s_threadInfos) {
+    if (n++ == t_id) {
+      continue;
+    }
+
+    const auto& tps = threadInfo.pipes;
     ThreadMSG* tmsg = new ThreadMSG();
     tmsg->func = boost::bind(voider<T>, func);
     tmsg->wantAnswer = true;
@@ -3015,7 +3059,7 @@ static void checkOrFixFDS()
   }
 }
 
-static void* recursorThread(int tid, bool worker);
+static void* recursorThread(unsigned int tid);
 
 static void* pleaseSupplantACLs(std::shared_ptr<NetmaskGroup> ng)
 {
@@ -3317,9 +3361,10 @@ static int serviceMain(int argc, char*argv[])
 
   g_quiet=::arg().mustDo("quiet");
 
+  /* this needs to be done before parseACLs(), which call broadcastFunction() */
   g_weDistributeQueries = ::arg().mustDo("pdns-distributes-queries");
   if(g_weDistributeQueries) {
-      g_log<<Logger::Warning<<"PowerDNS Recursor itself will distribute queries over threads"<<endl;
+    g_log<<Logger::Warning<<"PowerDNS Recursor itself will distribute queries over threads"<<endl;
   }
 
   setupDelegationOnly();
@@ -3412,13 +3457,14 @@ static int serviceMain(int argc, char*argv[])
 
   g_lowercaseOutgoing = ::arg().mustDo("lowercase-outgoing");
 
+  g_numDistributorThreads = ::arg().asNum("distributor-threads");
   g_numWorkerThreads = ::arg().asNum("threads");
   if (g_numWorkerThreads < 1) {
     g_log<<Logger::Warning<<"Asked to run with 0 threads, raising to 1 instead"<<endl;
     g_numWorkerThreads = 1;
   }
 
-  g_numThreads = g_numWorkerThreads + g_weDistributeQueries;
+  g_numThreads = g_numDistributorThreads + g_numWorkerThreads;
   g_maxMThreads = ::arg().asNum("max-mthreads");
 
   g_gettagNeedsEDNSOptions = ::arg().mustDo("gettag-needs-edns-options");
@@ -3429,17 +3475,31 @@ static int serviceMain(int argc, char*argv[])
   g_reusePort = ::arg().mustDo("reuseport");
 #endif
 
-  g_useOneSocketPerThread = (!g_weDistributeQueries && g_reusePort);
+  s_threadInfos.resize(g_numDistributorThreads + g_numWorkerThreads + /* handler */ 1);
 
-  if (g_useOneSocketPerThread) {
-    for (unsigned int threadId = 0; threadId < g_numWorkerThreads; threadId++) {
-      makeUDPServerSockets(threadId);
-      makeTCPServerSockets(threadId);
+  if (g_reusePort) {
+    if (g_weDistributeQueries) {
+      /* first thread is the handler, then distributors */
+      for (unsigned int threadId = 1; threadId <= g_numDistributorThreads; threadId++) {
+        auto& deferredAdds = s_threadInfos.at(threadId).deferredAdds;
+        makeUDPServerSockets(deferredAdds);
+        makeTCPServerSockets(deferredAdds);
+      }
+    }
+    else {
+      /* first thread is the handler, there is no distributor here and workers are accepting queries */
+      for (unsigned int threadId = 1; threadId <= g_numWorkerThreads; threadId++) {
+        auto& deferredAdds = s_threadInfos.at(threadId).deferredAdds;
+        makeUDPServerSockets(deferredAdds);
+        makeTCPServerSockets(deferredAdds);
+      }
     }
   }
   else {
-    makeUDPServerSockets(0);
-    makeTCPServerSockets(0);
+    /* we don't have reuseport so we can only open one socket per
+       listening addr:port and everyone will listen on it */
+    makeUDPServerSockets(g_deferredAdds);
+    makeTCPServerSockets(g_deferredAdds);
   }
 
 #ifdef NOD_ENABLED
@@ -3552,39 +3612,69 @@ static int serviceMain(int argc, char*argv[])
     s_avoidUdpSourcePorts.insert(port);
   }
 
-  /* This thread handles the web server, carbon, statistics and the control channel */
-  std::thread handlerThread(recursorThread, s_handlerThreadID, false);
-
+  unsigned int currentThreadId = 1;
   const auto cpusMap = parseCPUMap();
 
-  std::vector<std::thread> workers(g_numThreads);
   if(g_numThreads == 1) {
     g_log<<Logger::Warning<<"Operating unthreaded"<<endl;
 #ifdef HAVE_SYSTEMD
     sd_notify(0, "READY=1");
 #endif
-    setCPUMap(cpusMap, 0, pthread_self());
-    recursorThread(0, true);
+
+    /* This thread handles the web server, carbon, statistics and the control channel */
+    auto& handlerInfos = s_threadInfos.at(0);
+    handlerInfos.isHandler = true;
+    handlerInfos.thread = std::thread(recursorThread, 0);
+
+    setCPUMap(cpusMap, currentThreadId, pthread_self());
+
+    auto& infos = s_threadInfos.at(currentThreadId);
+    infos.isListener = true;
+    infos.isWorker = true;
+    recursorThread(currentThreadId++);
   }
   else {
-    g_log<<Logger::Warning<<"Launching "<< g_numThreads <<" threads"<<endl;
-    for(unsigned int n=0; n < g_numThreads; ++n) {
-      workers[n] = std::thread(recursorThread, n, true);
+    g_log<<Logger::Warning<<"Launching "<< g_numWorkerThreads <<" worker threads"<<endl;
+
+    if (g_weDistributeQueries) {
+      g_log<<Logger::Warning<<"Launching "<< g_numDistributorThreads <<" distributor threads"<<endl;
+      for(unsigned int n=0; n < g_numDistributorThreads; ++n) {
+        auto& infos = s_threadInfos.at(currentThreadId);
+        infos.isListener = true;
+        infos.thread = std::thread(recursorThread, currentThreadId++);
 
-      setCPUMap(cpusMap, n, workers[n].native_handle());
+        setCPUMap(cpusMap, currentThreadId, infos.thread.native_handle());
+      }
     }
+
+    for(unsigned int n=0; n < g_numWorkerThreads; ++n) {
+      auto& infos = s_threadInfos.at(currentThreadId);
+      infos.isListener = g_weDistributeQueries ? false : true;
+      infos.isWorker = true;
+      infos.thread = std::thread(recursorThread, currentThreadId++);
+
+      setCPUMap(cpusMap, currentThreadId, infos.thread.native_handle());
+    }
+
 #ifdef HAVE_SYSTEMD
     sd_notify(0, "READY=1");
 #endif
-    workers.back().join();
+
+    /* This thread handles the web server, carbon, statistics and the control channel */
+    auto& infos = s_threadInfos.at(0);
+    infos.isHandler = true;
+    infos.thread = std::thread(recursorThread, 0);
+
+    s_threadInfos.at(0).thread.join();
   }
   return 0;
 }
 
-static void* recursorThread(int n, bool worker)
+static void* recursorThread(unsigned int n)
 try
 {
   t_id=n;
+  auto& threadInfo = s_threadInfos.at(t_id);
   SyncRes tmp(g_now); // make sure it allocates tsstorage before we do anything, like primeHints or so..
   SyncRes::setDomainMap(g_initialDomainMap);
   t_allowFrom = g_initialAllowFrom;
@@ -3620,8 +3710,8 @@ try
   unsigned int ringsize=::arg().asNum("stats-ringbuffer-entries") / g_numWorkerThreads;
   if(ringsize) {
     t_remotes = std::unique_ptr<addrringbuf_t>(new addrringbuf_t());
-    if(g_weDistributeQueries)  // if so, only 1 thread does recvfrom
-      t_remotes->set_capacity(::arg().asNum("stats-ringbuffer-entries"));
+    if(g_weDistributeQueries)
+      t_remotes->set_capacity(::arg().asNum("stats-ringbuffer-entries") / g_numDistributorThreads);
     else
       t_remotes->set_capacity(ringsize);
     t_servfailremotes = std::unique_ptr<addrringbuf_t>(new addrringbuf_t());
@@ -3652,7 +3742,7 @@ try
 
   t_fdm=getMultiplexer();
 
-  if(!worker) {
+  if(threadInfo.isHandler) {
     if(::arg().mustDo("webserver")) {
       g_log<<Logger::Warning << "Enabling web server" << endl;
       try {
@@ -3666,18 +3756,21 @@ try
     g_log<<Logger::Error<<"Enabled '"<< t_fdm->getName() << "' multiplexer"<<endl;
   }
   else {
-    t_fdm->addReadFD(g_pipes[t_id].readToThread, handlePipeRequest);
-    t_fdm->addReadFD(g_pipes[t_id].readQueriesToThread, handlePipeRequest);
 
-    if(g_useOneSocketPerThread) {
-      for(deferredAdd_t::const_iterator i = deferredAdds[t_id].cbegin(); i != deferredAdds[t_id].cend(); ++i) {
-        t_fdm->addReadFD(i->first, i->second);
+    t_fdm->addReadFD(threadInfo.pipes.readToThread, handlePipeRequest);
+    t_fdm->addReadFD(threadInfo.pipes.readQueriesToThread, handlePipeRequest);
+
+    if (threadInfo.isListener) {
+      if (g_reusePort) {
+        /* then every listener has its own FDs */
+        for(const auto deferred : threadInfo.deferredAdds) {
+          t_fdm->addReadFD(deferred.first, deferred.second);
+        }
       }
-    }
-    else {
-      if(!g_weDistributeQueries || t_id == s_distributorThreadID) { // if we distribute queries, only t_id = 0 listens
-        for(deferredAdd_t::const_iterator i = deferredAdds[0].cbegin(); i != deferredAdds[0].cend(); ++i) {
-          t_fdm->addReadFD(i->first, i->second);
+      else {
+        /* otherwise all listeners are listening on the same ones */
+        for(const auto deferred : g_deferredAdds) {
+          t_fdm->addReadFD(deferred.first, deferred.second);
         }
       }
     }
@@ -3685,7 +3778,7 @@ try
 
   registerAllStats();
 
-  if(!worker) {
+  if(threadInfo.isHandler) {
     t_fdm->addReadFD(s_rcc.d_fd, handleRCC); // control channel
   }
 
@@ -3719,7 +3812,7 @@ try
 
     counter++;
 
-    if(!worker) {
+    if(threadInfo.isHandler) {
       if(statsWanted || (g_statisticsInterval > 0 && (g_now.tv_sec - last_stat) >= g_statisticsInterval)) {
         doStats();
         last_stat = g_now.tv_sec;
@@ -3734,7 +3827,7 @@ try
     }
     if (t_pdl != nullptr) {
       // lua-dns-script directive is present, call the maintenance callback if needed
-      if (worker && (!g_weDistributeQueries || t_id != s_distributorThreadID)) {
+      if (threadInfo.isWorker) {
         // Only on threads processing queries
         if(g_now.tv_sec - last_lua_maintenance >= luaMaintenanceInterval) {
           t_pdl->maintenance();
@@ -3746,18 +3839,20 @@ try
     t_fdm->run(&g_now);
     // 'run' updates g_now for us
 
-    if(worker && (!g_weDistributeQueries || t_id == s_distributorThreadID)) { // if pdns distributes queries, only tid 0 should do this
+    if(threadInfo.isListener) {
       if(listenOnTCP) {
        if(TCPConnection::getCurrentConnections() > maxTcpClients) {  // shutdown, too many connections
-         for(tcpListenSockets_t::iterator i=g_tcpListenSockets.begin(); i != g_tcpListenSockets.end(); ++i)
-           t_fdm->removeReadFD(*i);
+         for(const auto fd : g_tcpListenSockets) {
+           t_fdm->removeReadFD(fd);
+          }
          listenOnTCP=false;
        }
       }
       else {
        if(TCPConnection::getCurrentConnections() <= maxTcpClients) {  // reenable
-         for(tcpListenSockets_t::iterator i=g_tcpListenSockets.begin(); i != g_tcpListenSockets.end(); ++i)
-           t_fdm->addReadFD(*i, handleNewTCPQuestion);
+         for(const auto fd : g_tcpListenSockets) {
+           t_fdm->addReadFD(fd, handleNewTCPQuestion);
+          }
          listenOnTCP=true;
        }
       }
@@ -3810,6 +3905,7 @@ int main(int argc, char **argv)
     ::arg().set("setuid","If set, change user id to this uid for more security")="";
     ::arg().set("network-timeout", "Wait this number of milliseconds for network i/o")="1500";
     ::arg().set("threads", "Launch this number of threads")="2";
+    ::arg().set("distributor-threads", "Launch this number of distributor threads, distributing queries to other threads")="0";
     ::arg().set("processes", "Launch this number of processes (EXPERIMENTAL, DO NOT CHANGE)")="1"; // if we un-experimental this, need to fix openssl rand seeding for multiple PIDs!
     ::arg().set("config-name","Name of this virtual configuration - will rename the binary image")="";
     ::arg().set("api-config-dir", "Directory where REST API stores config and zones") = "";
@@ -3963,8 +4059,21 @@ int main(int argc, char **argv)
 
     ::arg().set("delegation-only")=toLower(::arg()["delegation-only"]);
 
-    if(::arg().asNum("threads")==1)
-      ::arg().set("pdns-distributes-queries")="no";
+    if(::arg().asNum("threads")==1) {
+      if (::arg().mustDo("pdns-distributes-queries")) {
+        g_log<<Logger::Warning<<"Only one thread, no need to distribute queries ourselves"<<endl;
+        ::arg().set("pdns-distributes-queries")="no";
+      }
+    }
+
+    if(::arg().mustDo("pdns-distributes-queries") && ::arg().asNum("distributor-threads") <= 0) {
+      g_log<<Logger::Warning<<"Asked to run with pdns-distributes-queries set but no distributor threads, raising to 1"<<endl;
+      ::arg().set("distributor-threads")="1";
+    }
+
+    if (!::arg().mustDo("pdns-distributes-queries")) {
+      ::arg().set("distributor-threads")="0";
+    }
 
     if(::arg().mustDo("help")) {
       cout<<"syntax:"<<endl<<endl;
index a92db4f1d8b26223ba60053a2c2b9052f83f957f..caeb21f1eaa8d11d2e8d86ed88b3ee6e9c069617 100644 (file)
@@ -219,6 +219,10 @@ The number of worker threads is determined by the :ref:`setting-threads` setting
 If :ref:`setting-pdns-distributes-queries` is set, an additional thread is started, assigned the id 0,
 and is the only one listening on client sockets and accepting queries, distributing them to the other worker threads afterwards.
 
+Starting with version 4.2.0, the thread handling the control channel, the webserver and other internal stuff has been assigned
+id 0 and more than one distributor thread can be started using the :ref:`setting-distributor-threads` setting, so the distributor
+threads if any are assigned id 1 and counting, and the other threads follow behind.
+
 This parameter is only available on OS that provides the `pthread_setaffinity_np()` function.
 
 .. _setting-daemon:
@@ -263,6 +267,19 @@ Do not log to syslog, only to stdout.
 Use this setting when running inside a supervisor that handles logging (like systemd).
 **Note**: do not use this setting in combination with `daemon`_ as all logging will disappear.
 
+.. _setting-distributor-threads:
+
+``distributor-threads``
+-----------
+.. versionadded:: 4.2.0
+
+-  Integer
+-  Default: 1 if `pdns-distributes-queries`_ is set, 0 otherwise
+
+If `pdns-distributes-queries`_ is set, spawn this number of distributor threads on startup. Distributor threads
+handle incoming queries and distribute them to other threads based on a hash of the query, to maximize the cache hit
+ratio.
+
 .. _setting-dnssec:
 
 ``dnssec``
@@ -931,7 +948,9 @@ Maximum number of seconds to cache a 'server failure' answer in the packet cache
 -  Boolean
 -  Default: yes
 
-If set, PowerDNS will have only 1 thread listening on client sockets, and distribute work by itself over threads.
+If set, PowerDNS will have only 1 thread listening on client sockets, and distribute work by itself over threads by using a hash of the query,
+maximizing the cache hit ratio. Starting with version 4.2.0, more than one distributing thread can be started using the `distributor-threads`_
+setting.
 Improves performance on Linux.
 
 .. _setting-query-local-address: