]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
rec: Use a bounded load-balancing algo to distribute queries
authorRemi Gacogne <remi.gacogne@powerdns.com>
Wed, 20 Feb 2019 16:47:30 +0000 (17:47 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Wed, 27 Mar 2019 12:37:44 +0000 (13:37 +0100)
pdns/pdns_recursor.cc
pdns/recursordist/docs/settings.rst

index b8d1f8ade5e1a98c9c62ff27bfa7abbdd4aafe38..7437e718522df3e9c6f2a790a37f1da1890e1230 100644 (file)
@@ -161,6 +161,8 @@ struct RecThreadInfo
   deferredAdd_t deferredAdds;
   struct ThreadPipeSet pipes;
   std::thread thread;
+  MT_t* mt{nullptr};
+  uint64_t numberOfDistributedQueries{0};
   /* handle the web server, carbon, statistics and the control channel */
   bool isHandler{false};
   /* accept incoming queries (and distributes them to the workers if pdns-distributes-queries is set) */
@@ -226,6 +228,7 @@ static std::set<uint16_t> s_avoidUdpSourcePorts;
 #endif
 static uint16_t s_minUdpSourcePort;
 static uint16_t s_maxUdpSourcePort;
+static double s_balancingFactor;
 
 RecursorControlChannel s_rcc; // only active in the handler thread
 RecursorStats g_stats;
@@ -2385,6 +2388,7 @@ static void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var)
             distributeAsyncFunction(data, boost::bind(doProcessUDPQuestion, data, fromaddr, dest, tv, fd));
           }
           else {
+            ++s_threadInfos[t_id].numberOfDistributedQueries;
             doProcessUDPQuestion(data, fromaddr, dest, tv, fd);
           }
         }
@@ -2633,6 +2637,14 @@ static void doStats(void)
     g_log<<Logger::Notice<<"stats: " <<  broadcastAccFunction<uint64_t>(pleaseGetPacketCacheSize) <<
     " packet cache entries, "<<(int)(100.0*broadcastAccFunction<uint64_t>(pleaseGetPacketCacheHits)/SyncRes::s_queries) << "% packet cache hits"<<endl;
 
+    size_t idx = 0;
+    for (const auto& threadInfo : s_threadInfos) {
+      if(threadInfo.isWorker) {
+        g_log<<Logger::Notice<<"Thread "<<idx<<" has been distributed "<<threadInfo.numberOfDistributedQueries<<" queries"<<endl;
+        ++idx;
+      }
+    }
+
     time_t now = time(0);
     if(lastOutputTime && lastQueryCount && now != lastOutputTime) {
       g_log<<Logger::Notice<<"stats: "<< (SyncRes::s_queries - lastQueryCount) / (now - lastOutputTime) <<" qps (average over "<< (now - lastOutputTime) << " seconds)"<<endl;
@@ -2818,7 +2830,7 @@ void broadcastFunction(const pipefunc_t& func)
 
 static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg)
 {
-  const auto& targetInfo = s_threadInfos[target];
+  auto& targetInfo = s_threadInfos[target];
   if(!targetInfo.isWorker) {
     g_log<<Logger::Error<<"distributeAsyncFunction() tried to assign a query to a non-worker thread"<<endl;
     exit(1);
@@ -2843,9 +2855,48 @@ static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg)
     }
   }
 
+  ++targetInfo.numberOfDistributedQueries;
+
   return true;
 }
 
+static unsigned int getWorkerLoad(size_t workerIdx)
+{
+  const auto mt = s_threadInfos[/* skip handler */ 1 + g_numDistributorThreads + workerIdx].mt;
+  if (mt != nullptr) {
+    return mt->numProcesses();
+  }
+  return 0;
+}
+
+static unsigned int selectWorker(unsigned int hash)
+{
+  if (s_balancingFactor == 0) {
+    return /* skip handler */ 1 + g_numDistributorThreads + (hash % g_numWorkerThreads);
+  }
+
+  /* we start with one, representing the query we are currently handling */
+  double currentLoad = 1;
+  std::vector<unsigned int> load(g_numWorkerThreads);
+  for (size_t idx = 0; idx < g_numWorkerThreads; idx++) {
+    load[idx] = getWorkerLoad(idx);
+    currentLoad += load[idx];
+    // cerr<<"load for worker "<<idx<<" is "<<load[idx]<<endl;
+  }
+
+  double targetLoad = (currentLoad / g_numWorkerThreads) * s_balancingFactor;
+  // cerr<<"total load is "<<currentLoad<<", number of workers is "<<g_numWorkerThreads<<", target load is "<<targetLoad<<endl;
+
+  unsigned int worker = hash % g_numWorkerThreads;
+  /* at least one server has to be below the average load */
+  while(load[worker] > targetLoad) {
+    // cerr<<"worker "<<worker<<" is above the target load, selecting another one"<<endl;
+    worker = (worker + 1) % g_numWorkerThreads;
+  }
+
+  return /* skip handler */ 1 + g_numDistributorThreads + worker;
+}
+
 // This function is only called by the distributor threads, when pdns-distributes-queries is set
 void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
 {
@@ -2855,7 +2906,7 @@ void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
   }
 
   unsigned int hash = hashQuestion(packet.c_str(), packet.length(), g_disthashseed);
-  unsigned int target = /* skip handler */ 1 + g_numDistributorThreads + (hash % g_numWorkerThreads);
+  unsigned int target = selectWorker(hash);
 
   ThreadMSG* tmsg = new ThreadMSG();
   tmsg->func = func;
@@ -3718,6 +3769,8 @@ static int serviceMain(int argc, char*argv[])
 
   g_statisticsInterval = ::arg().asNum("statistics-interval");
 
+  s_balancingFactor = ::arg().asDouble("distribution-load-factor");
+
 #ifdef SO_REUSEPORT
   g_reusePort = ::arg().mustDo("reuseport");
 #endif
@@ -4017,6 +4070,7 @@ try
   }
 
   MT=std::unique_ptr<MTasker<PacketID,string> >(new MTasker<PacketID,string>(::arg().asNum("stack-size")));
+  threadInfo.mt = MT.get();
 
 #ifdef HAVE_PROTOBUF
   /* start protobuf export threads if needed */
@@ -4316,6 +4370,7 @@ int main(int argc, char **argv)
     ::arg().set("udp-source-port-avoid", "List of comma separated UDP port number to avoid")="11211";
     ::arg().set("rng", "Specify random number generator to use. Valid values are auto,sodium,openssl,getrandom,arc4random,urandom.")="auto";
     ::arg().set("public-suffix-list-file", "Path to the Public Suffix List file, if any")="";
+    ::arg().set("distribution-load-factor", "The load factor used when PowerDNS is distributing queries to worker threads")="0.0";
 #ifdef NOD_ENABLED
     ::arg().set("new-domain-tracking", "Track newly observed domains (i.e. never seen before).")="no";
     ::arg().set("new-domain-log", "Log newly observed domains.")="yes";
index 5d9b529bfba408e4af43b8c375a41faefe1811de..431d7aa1d518a5a3b8c6a05577a969cadb5bf22b 100644 (file)
@@ -278,6 +278,25 @@ Do not log to syslog, only to stdout.
 Use this setting when running inside a supervisor that handles logging (like systemd).
 **Note**: do not use this setting in combination with `daemon`_ as all logging will disappear.
 
+.. _setting-distribution-load-factor:
+
+``distribution-load-factor``
+----------------------------
+.. versionadded:: 4.1.12
+
+-  Double
+-  Default: 0.0
+
+If `pdns-distributes-queries`_ is set and this setting is set to another value
+than 0, the distributor thread will use a bounded load-balancing algorithm while
+distributing queries to worker threads, making sure that no thread is assigned
+more queries than distribution-load-factor times the average number of queries
+currently processed by all the workers.
+For example, with a value of 1.25, no server should get more than 125 % of the
+average load. This helps making sure that all the workers have roughly the same
+share of queries, even if the incoming traffic is very skewed, with a larger
+number of requests asking for the same qname.
+
 .. _setting-distributor-threads:
 
 ``distributor-threads``