deferredAdd_t deferredAdds;
struct ThreadPipeSet pipes;
std::thread thread;
+ MT_t* mt{nullptr};
+ uint64_t numberOfDistributedQueries{0};
/* handle the web server, carbon, statistics and the control channel */
bool isHandler{false};
/* accept incoming queries (and distributes them to the workers if pdns-distributes-queries is set) */
#endif
static uint16_t s_minUdpSourcePort;
static uint16_t s_maxUdpSourcePort;
+static double s_balancingFactor;
RecursorControlChannel s_rcc; // only active in the handler thread
RecursorStats g_stats;
distributeAsyncFunction(data, boost::bind(doProcessUDPQuestion, data, fromaddr, dest, tv, fd));
}
else {
+ ++s_threadInfos[t_id].numberOfDistributedQueries;
doProcessUDPQuestion(data, fromaddr, dest, tv, fd);
}
}
g_log<<Logger::Notice<<"stats: " << broadcastAccFunction<uint64_t>(pleaseGetPacketCacheSize) <<
" packet cache entries, "<<(int)(100.0*broadcastAccFunction<uint64_t>(pleaseGetPacketCacheHits)/SyncRes::s_queries) << "% packet cache hits"<<endl;
+ size_t idx = 0;
+ for (const auto& threadInfo : s_threadInfos) {
+ if(threadInfo.isWorker) {
+ g_log<<Logger::Notice<<"Thread "<<idx<<" has been distributed "<<threadInfo.numberOfDistributedQueries<<" queries"<<endl;
+ ++idx;
+ }
+ }
+
time_t now = time(0);
if(lastOutputTime && lastQueryCount && now != lastOutputTime) {
g_log<<Logger::Notice<<"stats: "<< (SyncRes::s_queries - lastQueryCount) / (now - lastOutputTime) <<" qps (average over "<< (now - lastOutputTime) << " seconds)"<<endl;
static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg)
{
- const auto& targetInfo = s_threadInfos[target];
+ auto& targetInfo = s_threadInfos[target];
if(!targetInfo.isWorker) {
g_log<<Logger::Error<<"distributeAsyncFunction() tried to assign a query to a non-worker thread"<<endl;
exit(1);
}
}
+ ++targetInfo.numberOfDistributedQueries;
+
return true;
}
+static unsigned int getWorkerLoad(size_t workerIdx)
+{
+ const auto mt = s_threadInfos[/* skip handler */ 1 + g_numDistributorThreads + workerIdx].mt;
+ if (mt != nullptr) {
+ return mt->numProcesses();
+ }
+ return 0;
+}
+
+static unsigned int selectWorker(unsigned int hash)
+{
+ if (s_balancingFactor == 0) {
+ return /* skip handler */ 1 + g_numDistributorThreads + (hash % g_numWorkerThreads);
+ }
+
+ /* we start with one, representing the query we are currently handling */
+ double currentLoad = 1;
+ std::vector<unsigned int> load(g_numWorkerThreads);
+ for (size_t idx = 0; idx < g_numWorkerThreads; idx++) {
+ load[idx] = getWorkerLoad(idx);
+ currentLoad += load[idx];
+ // cerr<<"load for worker "<<idx<<" is "<<load[idx]<<endl;
+ }
+
+ double targetLoad = (currentLoad / g_numWorkerThreads) * s_balancingFactor;
+ // cerr<<"total load is "<<currentLoad<<", number of workers is "<<g_numWorkerThreads<<", target load is "<<targetLoad<<endl;
+
+ unsigned int worker = hash % g_numWorkerThreads;
+ /* at least one server has to be below the average load */
+ while(load[worker] > targetLoad) {
+ // cerr<<"worker "<<worker<<" is above the target load, selecting another one"<<endl;
+ worker = (worker + 1) % g_numWorkerThreads;
+ }
+
+ return /* skip handler */ 1 + g_numDistributorThreads + worker;
+}
+
// This function is only called by the distributor threads, when pdns-distributes-queries is set
void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
{
}
unsigned int hash = hashQuestion(packet.c_str(), packet.length(), g_disthashseed);
- unsigned int target = /* skip handler */ 1 + g_numDistributorThreads + (hash % g_numWorkerThreads);
+ unsigned int target = selectWorker(hash);
ThreadMSG* tmsg = new ThreadMSG();
tmsg->func = func;
g_statisticsInterval = ::arg().asNum("statistics-interval");
+ s_balancingFactor = ::arg().asDouble("distribution-load-factor");
+
#ifdef SO_REUSEPORT
g_reusePort = ::arg().mustDo("reuseport");
#endif
}
MT=std::unique_ptr<MTasker<PacketID,string> >(new MTasker<PacketID,string>(::arg().asNum("stack-size")));
+ threadInfo.mt = MT.get();
#ifdef HAVE_PROTOBUF
/* start protobuf export threads if needed */
::arg().set("udp-source-port-avoid", "List of comma separated UDP port number to avoid")="11211";
::arg().set("rng", "Specify random number generator to use. Valid values are auto,sodium,openssl,getrandom,arc4random,urandom.")="auto";
::arg().set("public-suffix-list-file", "Path to the Public Suffix List file, if any")="";
+ ::arg().set("distribution-load-factor", "The load factor used when PowerDNS is distributing queries to worker threads")="0.0";
#ifdef NOD_ENABLED
::arg().set("new-domain-tracking", "Track newly observed domains (i.e. never seen before).")="no";
::arg().set("new-domain-log", "Log newly observed domains.")="yes";
Use this setting when running inside a supervisor that handles logging (like systemd).
**Note**: do not use this setting in combination with `daemon`_ as all logging will disappear.
+.. _setting-distribution-load-factor:
+
+``distribution-load-factor``
+----------------------------
+.. versionadded:: 4.1.12
+
+- Double
+- Default: 0.0
+
+If `pdns-distributes-queries`_ is set and this setting is set to another value
+than 0, the distributor thread will use a bounded load-balancing algorithm while
+distributing queries to worker threads, making sure that no thread is assigned
+more queries than distribution-load-factor times the average number of queries
+currently processed by all the workers.
+For example, with a value of 1.25, no server should get more than 125 % of the
+average load. This helps making sure that all the workers have roughly the same
+share of queries, even if the incoming traffic is very skewed, with a larger
+number of requests asking for the same qname.
+
.. _setting-distributor-threads:
``distributor-threads``