destination = dest;
}
- if (RecThreadInfo::s_weDistributeQueries) {
+ if (RecThreadInfo::weDistributeQueries()) {
std::string localdata = data;
distributeAsyncFunction(data, [localdata, fromaddr, dest, source, destination, tv, fd, proxyProtocolValues, eventTrace]() mutable {
return doProcessUDPQuestion(localdata, fromaddr, dest, source, destination, tv, fd, proxyProtocolValues, eventTrace);
static unsigned int getWorkerLoad(size_t workerIdx)
{
- const auto mt = RecThreadInfo::info(/* skip handler */ 1 + RecThreadInfo::s_numDistributorThreads + workerIdx).mt;
+ const auto mt = RecThreadInfo::info(RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + workerIdx).mt;
if (mt != nullptr) {
return mt->numProcesses();
}
static unsigned int selectWorker(unsigned int hash)
{
if (g_balancingFactor == 0) {
- return /* skip handler */ 1 + RecThreadInfo::s_numDistributorThreads + (hash % RecThreadInfo::s_numWorkerThreads);
+ return RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + (hash % RecThreadInfo::numWorkers());
}
/* we start with one, representing the query we are currently handling */
double currentLoad = 1;
- std::vector<unsigned int> load(RecThreadInfo::s_numWorkerThreads);
- for (size_t idx = 0; idx < RecThreadInfo::s_numWorkerThreads; idx++) {
+ std::vector<unsigned int> load(RecThreadInfo::numWorkers());
+ for (size_t idx = 0; idx < RecThreadInfo::numWorkers(); idx++) {
load[idx] = getWorkerLoad(idx);
currentLoad += load[idx];
- // cerr<<"load for worker "<<idx<<" is "<<load[idx]<<endl;
}
- double targetLoad = (currentLoad / RecThreadInfo::s_numWorkerThreads) * g_balancingFactor;
- // cerr<<"total load is "<<currentLoad<<", number of workers is "<<g_numWorkerThreads<<", target load is "<<targetLoad<<endl;
+ double targetLoad = (currentLoad / RecThreadInfo::numWorkers()) * g_balancingFactor;
- unsigned int worker = hash % RecThreadInfo::s_numWorkerThreads;
+ unsigned int worker = hash % RecThreadInfo::numWorkers();
/* at least one server has to be at or below the average load */
if (load[worker] > targetLoad) {
++g_stats.rebalancedQueries;
do {
- // cerr<<"worker "<<worker<<" is above the target load, selecting another one"<<endl;
- worker = (worker + 1) % RecThreadInfo::s_numWorkerThreads;
+ worker = (worker + 1) % RecThreadInfo::numWorkers();
} while (load[worker] > targetLoad);
}
- return /* skip handler */ 1 + RecThreadInfo::s_numDistributorThreads + worker;
+ return RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + worker;
}
// This function is only called by the distributor threads, when pdns-distributes-queries is set
was full, let's try another one */
unsigned int newTarget = 0;
do {
- newTarget = /* skip handler */ 1 + RecThreadInfo::s_numDistributorThreads + dns_random(RecThreadInfo::s_numWorkerThreads);
+ newTarget = RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + dns_random(RecThreadInfo::numWorkers());
} while (newTarget == target);
if (!trySendingQueryToWorker(newTarget, tmsg)) {
static void checkOrFixFDS()
{
unsigned int availFDs = getFilenumLimit();
- unsigned int wantFDs = g_maxMThreads * RecThreadInfo::s_numWorkerThreads + 25; // even healthier margin then before
- wantFDs += RecThreadInfo::s_numWorkerThreads * TCPOutConnectionManager::s_maxIdlePerThread;
+ unsigned int wantFDs = g_maxMThreads * RecThreadInfo::numWorkers() + 25; // even healthier margin then before
+ wantFDs += RecThreadInfo::numWorkers() * TCPOutConnectionManager::s_maxIdlePerThread;
if (wantFDs > availFDs) {
unsigned int hardlimit = getFilenumLimit(true);
g_log << Logger::Warning << "Raised soft limit on number of filedescriptors to " << wantFDs << " to match max-mthreads and threads settings" << endl;
}
else {
- int newval = (hardlimit - 25 - TCPOutConnectionManager::s_maxIdlePerThread) / RecThreadInfo::s_numWorkerThreads;
+ int newval = (hardlimit - 25 - TCPOutConnectionManager::s_maxIdlePerThread) / RecThreadInfo::numWorkers();
g_log << Logger::Warning << "Insufficient number of filedescriptors available for max-mthreads*threads setting! (" << hardlimit << " < " << wantFDs << "), reducing max-mthreads to " << newval << endl;
g_maxMThreads = newval;
setFilenumLimit(hardlimit);
}
/* thread 0 is the handler / SNMP, worker threads start at 1 */
- for (unsigned int n = 0; n <= RecThreadInfo::numThreads(); ++n) {
+ for (unsigned int n = 0; n < RecThreadInfo::numRecursorThreads(); ++n) {
auto& threadInfo = RecThreadInfo::info(n);
int fd[2];
}
/* this needs to be done before parseACLs(), which call broadcastFunction() */
- RecThreadInfo::s_weDistributeQueries = ::arg().mustDo("pdns-distributes-queries");
- if (RecThreadInfo::s_weDistributeQueries) {
+ RecThreadInfo::setWeDistributeQueries(::arg().mustDo("pdns-distributes-queries"));
+ if (RecThreadInfo::weDistributeQueries()) {
g_log << Logger::Warning << "PowerDNS Recursor itself will distribute queries over threads" << endl;
}
}
g_paddingTag = ::arg().asNum("edns-padding-tag");
- RecThreadInfo::s_numDistributorThreads = ::arg().asNum("distributor-threads");
- RecThreadInfo::s_numWorkerThreads = ::arg().asNum("threads");
- if (RecThreadInfo::s_numWorkerThreads < 1) {
+ RecThreadInfo::setNumDistributorThreads(::arg().asNum("distributor-threads"));
+ RecThreadInfo::setNumWorkerThreads(::arg().asNum("threads"));
+ if (RecThreadInfo::numWorkers() < 1) {
g_log << Logger::Warning << "Asked to run with 0 threads, raising to 1 instead" << endl;
- RecThreadInfo::s_numWorkerThreads = 1;
+ RecThreadInfo::setNumWorkerThreads(1);
}
g_maxMThreads = ::arg().asNum("max-mthreads");
g_reusePort = ::arg().mustDo("reuseport");
#endif
- RecThreadInfo::infos().resize(RecThreadInfo::numThreads() + /* handler */ 1);
+ RecThreadInfo::infos().resize(RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers() + RecThreadInfo::numTaskThreads());
if (g_reusePort) {
- if (RecThreadInfo::s_weDistributeQueries) {
+ if (RecThreadInfo::weDistributeQueries()) {
/* first thread is the handler, then distributors */
- for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numDistributorThreads; threadId++) {
+ for (unsigned int threadId = 1; threadId <= RecThreadInfo::numDistributors(); threadId++) {
auto& info = RecThreadInfo::info(threadId);
auto& deferredAdds = info.deferredAdds;
auto& tcpSockets = info.tcpSockets;
}
else {
/* first thread is the handler, there is no distributor here and workers are accepting queries */
- for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numWorkerThreads; threadId++) {
+ for (unsigned int threadId = 1; threadId <= RecThreadInfo::numWorkers(); threadId++) {
auto& info = RecThreadInfo::info(threadId);
auto& deferredAdds = info.deferredAdds;
auto& tcpSockets = info.tcpSockets;
/* every listener (so distributor if g_weDistributeQueries, workers otherwise)
needs to listen to the shared sockets */
- if (RecThreadInfo::s_weDistributeQueries) {
+ if (RecThreadInfo::weDistributeQueries()) {
/* first thread is the handler, then distributors */
- for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numDistributorThreads; threadId++) {
+ for (unsigned int threadId = 1; threadId <= RecThreadInfo::numDistributors(); threadId++) {
RecThreadInfo::info(threadId).tcpSockets = tcpSockets;
}
}
else {
/* first thread is the handler, there is no distributor here and workers are accepting queries */
- for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numWorkerThreads; threadId++) {
+ for (unsigned int threadId = 1; threadId <= RecThreadInfo::numWorkers(); threadId++) {
RecThreadInfo::info(threadId).tcpSockets = tcpSockets;
}
}
unsigned int currentThreadId = 1;
const auto cpusMap = parseCPUMap();
- if (RecThreadInfo::numThreads() == 1) {
+ if (RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers() == 1) {
g_log << Logger::Warning << "Operating unthreaded" << endl;
#ifdef HAVE_SYSTEMD
sd_notify(0, "READY=1");
auto& handlerInfo = RecThreadInfo::info(0);
handlerInfo.setHandler();
handlerInfo.start(0, "web+stat");
+ auto& taskInfo = RecThreadInfo::info(2);
+ taskInfo.setTaskThread();
+ taskInfo.start(2, "tasks");
setCPUMap(cpusMap, currentThreadId, pthread_self());
if (handlerInfo.exitCode != 0) {
ret = handlerInfo.exitCode;
}
+ taskInfo.thread.join();
+ if (taskInfo.exitCode != 0) {
+ ret = taskInfo.exitCode;
+ }
}
else {
- if (RecThreadInfo::s_weDistributeQueries) {
- for (unsigned int n = 0; n < RecThreadInfo::s_numDistributorThreads; ++n) {
- RecThreadInfo::info(currentThreadId + n).setListener();
+ // Setup RecThreadInfo objects
+ unsigned int tmp = currentThreadId;
+ if (RecThreadInfo::weDistributeQueries()) {
+ for (unsigned int n = 0; n < RecThreadInfo::numDistributors(); ++n) {
+ RecThreadInfo::info(tmp++).setListener();
}
}
- for (unsigned int n = 0; n < RecThreadInfo::s_numWorkerThreads; ++n) {
- auto& info = RecThreadInfo::info(currentThreadId + (RecThreadInfo::s_weDistributeQueries ? RecThreadInfo::s_numDistributorThreads : 0) + n);
- info.setListener(!RecThreadInfo::s_weDistributeQueries);
+ for (unsigned int n = 0; n < RecThreadInfo::numWorkers(); ++n) {
+ auto& info = RecThreadInfo::info(tmp++);
+ info.setListener(!RecThreadInfo::weDistributeQueries());
info.setWorker();
}
+ for (unsigned int n = 0; n < RecThreadInfo::numTaskThreads(); ++n) {
+ auto& info = RecThreadInfo::info(tmp++);
+ info.setTaskThread();
+ }
- if (RecThreadInfo::s_weDistributeQueries) {
- g_log << Logger::Warning << "Launching " << RecThreadInfo::s_numDistributorThreads << " distributor threads" << endl;
- for (unsigned int n = 0; n < RecThreadInfo::s_numDistributorThreads; ++n) {
+ if (RecThreadInfo::weDistributeQueries()) {
+ g_log << Logger::Warning << "Launching " << RecThreadInfo::numDistributors() << " distributor threads" << endl;
+ for (unsigned int n = 0; n < RecThreadInfo::numDistributors(); ++n) {
auto& info = RecThreadInfo::info(currentThreadId);
info.start(currentThreadId++, "distr");
setCPUMap(cpusMap, currentThreadId, info.thread.native_handle()); // XXX off by one?
}
}
- g_log << Logger::Warning << "Launching " << RecThreadInfo::s_numWorkerThreads << " worker threads" << endl;
+ g_log << Logger::Warning << "Launching " << RecThreadInfo::numWorkers() << " worker threads" << endl;
- for (unsigned int n = 0; n < RecThreadInfo::s_numWorkerThreads; ++n) {
+ for (unsigned int n = 0; n < RecThreadInfo::numWorkers(); ++n) {
auto& info = RecThreadInfo::info(currentThreadId);
info.start(currentThreadId++, "worker");
setCPUMap(cpusMap, currentThreadId, info.thread.native_handle()); // XXX off by one?
}
+ for (unsigned int n = 0; n < RecThreadInfo::numTaskThreads(); ++n) {
+ auto& info = RecThreadInfo::info(currentThreadId);
+ info.start(currentThreadId++, "taskThread");
+ setCPUMap(cpusMap, currentThreadId, info.thread.native_handle()); // XXX off by one?
+ }
+
#ifdef HAVE_SYSTEMD
sd_notify(0, "READY=1");
#endif
past = now;
past.tv_sec -= 5;
if (t_last_prune < past) {
- t_packetCache->doPruneTo(g_maxPacketCacheEntries / (RecThreadInfo::s_numWorkerThreads + RecThreadInfo::s_numDistributorThreads));
+ t_packetCache->doPruneTo(g_maxPacketCacheEntries / (RecThreadInfo::numWorkers() + RecThreadInfo::numDistributors()));
time_t limit;
if (!((t_cleanCounter++) % 40)) { // this is a full scan!
}
}
- unsigned int ringsize = ::arg().asNum("stats-ringbuffer-entries") / RecThreadInfo::s_numWorkerThreads;
+ unsigned int ringsize = ::arg().asNum("stats-ringbuffer-entries") / RecThreadInfo::numWorkers();
if (ringsize) {
t_remotes = std::make_unique<addrringbuf_t>();
- if (RecThreadInfo::s_weDistributeQueries)
- t_remotes->set_capacity(::arg().asNum("stats-ringbuffer-entries") / RecThreadInfo::s_numDistributorThreads);
+ if (RecThreadInfo::weDistributeQueries())
+ t_remotes->set_capacity(::arg().asNum("stats-ringbuffer-entries") / RecThreadInfo::numDistributors());
else
t_remotes->set_capacity(ringsize);
t_servfailremotes = std::make_unique<addrringbuf_t>();
return listener;
}
+ bool isTaskThread() const
+ {
+ return taskThread;
+ }
+
void setHandler()
{
handler = true;
listener = flag;
}
+ void setTaskThread()
+ {
+ taskThread = true;
+ }
+
void start(unsigned int id, const string& name)
{
thread = std::thread([id, name] {
t_id = id;
}
- // FD corresponding to TCP sockets this thread is listening on.
- // These FDs are also in deferredAdds when we have one socket per
- // listener, and in g_deferredAdds instead.
+ static unsigned int numHandlers()
+ {
+ return 1;
+ }
+
+ static unsigned int numTaskThreads()
+ {
+ return 1;
+ }
+
+ static unsigned int numWorkers()
+ {
+ return s_numWorkerThreads;
+ }
+
+ static unsigned int numDistributors()
+ {
+ return s_numDistributorThreads;
+ }
+
+ static bool weDistributeQueries()
+ {
+ return s_weDistributeQueries;
+ }
+
+ static void setWeDistributeQueries(bool flag)
+ {
+ s_weDistributeQueries = flag;
+ }
+
+ static void setNumWorkerThreads(unsigned int n)
+ {
+ s_numWorkerThreads = n;
+ }
+
+ static void setNumDistributorThreads(unsigned int n)
+ {
+ s_numDistributorThreads = n;
+ }
+
+ static unsigned int numRecursorThreads()
+ {
+ return numHandlers() + numDistributors() + numWorkers() + numTaskThreads();
+ }
+
+ // FD corresponding to TCP sockets this thread is listening on.
+ // These FDs are also in deferredAdds when we have one socket per
+ // listener, and in g_deferredAdds instead.
std::set<int> tcpSockets;
// FD corresponding to listening sockets if we have one socket per
// listener (with reuseport), otherwise all listeners share the
uint64_t numberOfDistributedQueries{0};
int exitCode{0};
- static bool s_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers
- static unsigned int s_numDistributorThreads;
- static unsigned int s_numWorkerThreads;
-
- static unsigned int numThreads()
- {
- return s_numDistributorThreads + s_numWorkerThreads;
- }
-
private:
- /* handle the web server, carbon, statistics and the control channel */
+ // handle the web server, carbon, statistics and the control channel
bool handler{false};
- /* accept incoming queries (and distributes them to the workers if pdns-distributes-queries is set) */
+ // accept incoming queries (and distributes them to the workers if pdns-distributes-queries is set)
bool listener{false};
- /* process queries */
+ // process queries
bool worker{false};
+ // run async tasks: from TastQueue and ZoneToCache
+ bool taskThread{false};
+
static thread_local unsigned int t_id;
static std::vector<RecThreadInfo> s_threadInfos;
+ static bool s_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers
+ static unsigned int s_numDistributorThreads;
+ static unsigned int s_numWorkerThreads;
};
struct ThreadMSG