#endif /* NOD_ENABLED */
thread_local std::shared_ptr<RecursorLua4> t_pdl;
-thread_local unsigned int t_id = 0;
thread_local std::shared_ptr<Regex> t_traceRegex;
thread_local std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> t_protobufServers{nullptr};
thread_local std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> t_outgoingProtobufServers{nullptr};
}
if (!g_quiet || tracedQuery) {
- g_log << Logger::Warning << t_id << " [" << MT->getTid() << "/" << MT->numProcesses() << "] " << (dc->d_tcp ? "TCP " : "") << "question for '" << dc->d_mdp.d_qname << "|"
+ g_log << Logger::Warning << RecThreadInfo::id() << " [" << MT->getTid() << "/" << MT->numProcesses() << "] " << (dc->d_tcp ? "TCP " : "") << "question for '" << dc->d_mdp.d_qname << "|"
<< QType(dc->d_mdp.d_qtype) << "' from " << dc->getRemote();
if (!dc->d_ednssubnet.source.empty()) {
g_log << " (ecs " << dc->d_ednssubnet.source.toString() << ")";
// Now it always uses an integral number of microseconds, except for averages, which use doubles
uint64_t spentUsec = uSec(sr.getNow() - dc->d_now);
if (!g_quiet) {
- g_log << Logger::Error << t_id << " [" << MT->getTid() << "/" << MT->numProcesses() << "] answer to " << (dc->d_mdp.d_header.rd ? "" : "non-rd ") << "question '" << dc->d_mdp.d_qname << "|" << DNSRecordContent::NumberToType(dc->d_mdp.d_qtype);
+ g_log << Logger::Error << RecThreadInfo::id() << " [" << MT->getTid() << "/" << MT->numProcesses() << "] answer to " << (dc->d_mdp.d_header.rd ? "" : "non-rd ") << "question '" << dc->d_mdp.d_qname << "|" << DNSRecordContent::NumberToType(dc->d_mdp.d_qtype);
g_log << "': " << ntohs(pw.getHeader()->ancount) << " answers, " << ntohs(pw.getHeader()->arcount) << " additional, took " << sr.d_outqueries << " packets, " << sr.d_totUsec / 1000.0 << " netw ms, " << spentUsec / 1000.0 << " tot ms, " << sr.d_throttledqueries << " throttled, " << sr.d_timeouts << " timeouts, " << sr.d_tcpoutqueries << "/" << sr.d_dotoutqueries << " tcp/dot connections, rcode=" << res;
if (!shouldNotValidate && sr.isDNSSECValidationRequested()) {
static string* doProcessUDPQuestion(const std::string& question, const ComboAddress& fromaddr, const ComboAddress& destaddr, ComboAddress source, ComboAddress destination, struct timeval tv, int fd, std::vector<ProxyProtocolValue>& proxyProtocolValues, RecEventTrace& eventTrace)
{
- ++g_threadInfos[t_id].numberOfDistributedQueries;
+ ++(RecThreadInfo::self().numberOfDistributedQueries);
gettimeofday(&g_now, nullptr);
if (tv.tv_sec) {
struct timeval diff = g_now - tv;
eventTrace.add(RecEventTrace::PCacheCheck, cacheHit, false);
if (cacheHit) {
if (!g_quiet) {
- g_log << Logger::Notice << t_id << " question answered from packet cache tag=" << ctag << " from " << source.toStringWithPort() << (source != fromaddr ? " (via " + fromaddr.toStringWithPort() + ")" : "") << endl;
+ g_log << Logger::Notice << RecThreadInfo::id() << " question answered from packet cache tag=" << ctag << " from " << source.toStringWithPort() << (source != fromaddr ? " (via " + fromaddr.toStringWithPort() + ")" : "") << endl;
}
struct msghdr msgh;
struct iovec iov;
bool ipf = t_pdl->ipfilter(source, destination, *dh, eventTrace);
if (ipf) {
if (!g_quiet) {
- g_log << Logger::Notice << t_id << " [" << MT->getTid() << "/" << MT->numProcesses() << "] DROPPED question from " << source.toStringWithPort() << (source != fromaddr ? " (via " + fromaddr.toStringWithPort() + ")" : "") << " based on policy" << endl;
+ g_log << Logger::Notice << RecThreadInfo::id() << " [" << MT->getTid() << "/" << MT->numProcesses() << "] DROPPED question from " << source.toStringWithPort() << (source != fromaddr ? " (via " + fromaddr.toStringWithPort() + ")" : "") << " based on policy" << endl;
}
g_stats.policyDrops++;
return 0;
}
if (!g_quiet) {
- g_log << Logger::Notice << t_id << " got NOTIFY for " << qname.toLogString() << " from " << source.toStringWithPort() << (source != fromaddr ? " (via " + fromaddr.toStringWithPort() + ")" : "") << endl;
+ g_log << Logger::Notice << RecThreadInfo::id() << " got NOTIFY for " << qname.toLogString() << " from " << source.toStringWithPort() << (source != fromaddr ? " (via " + fromaddr.toStringWithPort() + ")" : "") << endl;
}
requestWipeCaches(qname);
if (MT->numProcesses() > g_maxMThreads) {
if (!g_quiet)
- g_log << Logger::Notice << t_id << " [" << MT->getTid() << "/" << MT->numProcesses() << "] DROPPED question from " << source.toStringWithPort() << (source != fromaddr ? " (via " + fromaddr.toStringWithPort() + ")" : "") << ", over capacity" << endl;
+ g_log << Logger::Notice << RecThreadInfo::id() << " [" << MT->getTid() << "/" << MT->numProcesses() << "] DROPPED question from " << source.toStringWithPort() << (source != fromaddr ? " (via " + fromaddr.toStringWithPort() + ")" : "") << ", over capacity" << endl;
g_stats.overCapacityDrops++;
return 0;
destination = dest;
}
- if (g_weDistributeQueries) {
+ if (RecThreadInfo::s_weDistributeQueries) {
std::string localdata = data;
distributeAsyncFunction(data, [localdata, fromaddr, dest, source, destination, tv, fd, proxyProtocolValues, eventTrace]() mutable {
return doProcessUDPQuestion(localdata, fromaddr, dest, source, destination, tv, fd, proxyProtocolValues, eventTrace);
static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg)
{
auto& targetInfo = g_threadInfos[target];
- if (!targetInfo.isWorker) {
+ if (!targetInfo.isWorker()) {
g_log << Logger::Error << "distributeAsyncFunction() tried to assign a query to a non-worker thread" << endl;
_exit(1);
}
static unsigned int getWorkerLoad(size_t workerIdx)
{
- const auto mt = g_threadInfos[/* skip handler */ 1 + g_numDistributorThreads + workerIdx].mt;
+ const auto mt = g_threadInfos[/* skip handler */ 1 + RecThreadInfo::s_numDistributorThreads + workerIdx].mt;
if (mt != nullptr) {
return mt->numProcesses();
}
static unsigned int selectWorker(unsigned int hash)
{
if (g_balancingFactor == 0) {
- return /* skip handler */ 1 + g_numDistributorThreads + (hash % g_numWorkerThreads);
+ return /* skip handler */ 1 + RecThreadInfo::s_numDistributorThreads + (hash % RecThreadInfo::s_numWorkerThreads);
}
/* we start with one, representing the query we are currently handling */
double currentLoad = 1;
- std::vector<unsigned int> load(g_numWorkerThreads);
- for (size_t idx = 0; idx < g_numWorkerThreads; idx++) {
+ std::vector<unsigned int> load(RecThreadInfo::s_numWorkerThreads);
+ for (size_t idx = 0; idx < RecThreadInfo::s_numWorkerThreads; idx++) {
load[idx] = getWorkerLoad(idx);
currentLoad += load[idx];
// cerr<<"load for worker "<<idx<<" is "<<load[idx]<<endl;
}
- double targetLoad = (currentLoad / g_numWorkerThreads) * g_balancingFactor;
+ double targetLoad = (currentLoad / RecThreadInfo::s_numWorkerThreads) * g_balancingFactor;
// cerr<<"total load is "<<currentLoad<<", number of workers is "<<g_numWorkerThreads<<", target load is "<<targetLoad<<endl;
- unsigned int worker = hash % g_numWorkerThreads;
+ unsigned int worker = hash % RecThreadInfo::s_numWorkerThreads;
/* at least one server has to be at or below the average load */
if (load[worker] > targetLoad) {
++g_stats.rebalancedQueries;
do {
// cerr<<"worker "<<worker<<" is above the target load, selecting another one"<<endl;
- worker = (worker + 1) % g_numWorkerThreads;
+ worker = (worker + 1) % RecThreadInfo::s_numWorkerThreads;
} while (load[worker] > targetLoad);
}
- return /* skip handler */ 1 + g_numDistributorThreads + worker;
+ return /* skip handler */ 1 + RecThreadInfo::s_numDistributorThreads + worker;
}
// This function is only called by the distributor threads, when pdns-distributes-queries is set
void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
{
- if (!isDistributorThread()) {
- g_log << Logger::Error << "distributeAsyncFunction() has been called by a worker (" << t_id << ")" << endl;
+ if (!RecThreadInfo::self().isDistributor()) {
+ g_log << Logger::Error << "distributeAsyncFunction() has been called by a worker (" << RecThreadInfo::id() << ")" << endl;
_exit(1);
}
was full, let's try another one */
unsigned int newTarget = 0;
do {
- newTarget = /* skip handler */ 1 + g_numDistributorThreads + dns_random(g_numWorkerThreads);
+ newTarget = /* skip handler */ 1 + RecThreadInfo::s_numDistributorThreads + dns_random(RecThreadInfo::s_numWorkerThreads);
} while (newTarget == target);
if (!trySendingQueryToWorker(newTarget, tmsg)) {
#include "validate-recursor.hh"
#include "pubsuffix.hh"
#include "opensslsigners.hh"
-#include "threadname.hh"
#include "ws-recursor.hh"
#include "rec-taskqueue.hh"
#include "secpoll-recursor.hh"
#endif /* NOD_ENABLED */
std::atomic<bool> statsWanted;
-unsigned int g_numWorkerThreads;
uint32_t g_disthashseed;
-bool g_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers
bool g_useIncomingECS;
uint16_t g_xpfRRCode{0};
NetmaskGroup g_proxyProtocolACL;
std::shared_ptr<NetmaskGroup> g_initialAllowNotifyFrom; // new threads need this to be setup
std::shared_ptr<notifyset_t> g_initialAllowNotifyFor; // new threads need this to be setup
bool g_logRPZChanges{false};
-unsigned int g_numDistributorThreads;
-unsigned int g_numThreads;
static time_t s_statisticsInterval;
bool g_addExtendedResolutionDNSErrors;
static std::atomic<uint32_t> s_counter;
and finally the workers */
std::vector<RecThreadInfo> g_threadInfos;
+bool RecThreadInfo::s_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers
+unsigned int RecThreadInfo::s_numDistributorThreads;
+unsigned int RecThreadInfo::s_numWorkerThreads;
+thread_local unsigned int RecThreadInfo::t_id;
+
ArgvMap& arg()
{
static ArgvMap theArg;
static void checkOrFixFDS()
{
unsigned int availFDs = getFilenumLimit();
- unsigned int wantFDs = g_maxMThreads * g_numWorkerThreads + 25; // even healthier margin then before
- wantFDs += g_numWorkerThreads * TCPOutConnectionManager::s_maxIdlePerThread;
+ unsigned int wantFDs = g_maxMThreads * RecThreadInfo::s_numWorkerThreads + 25; // even healthier margin then before
+ wantFDs += RecThreadInfo::s_numWorkerThreads * TCPOutConnectionManager::s_maxIdlePerThread;
if (wantFDs > availFDs) {
unsigned int hardlimit = getFilenumLimit(true);
g_log << Logger::Warning << "Raised soft limit on number of filedescriptors to " << wantFDs << " to match max-mthreads and threads settings" << endl;
}
else {
- int newval = (hardlimit - 25 - TCPOutConnectionManager::s_maxIdlePerThread) / g_numWorkerThreads;
+ int newval = (hardlimit - 25 - TCPOutConnectionManager::s_maxIdlePerThread) / RecThreadInfo::s_numWorkerThreads;
g_log << Logger::Warning << "Insufficient number of filedescriptors available for max-mthreads*threads setting! (" << hardlimit << " < " << wantFDs << "), reducing max-mthreads to " << newval << endl;
g_maxMThreads = newval;
setFilenumLimit(hardlimit);
}
/* thread 0 is the handler / SNMP, worker threads start at 1 */
- for (unsigned int n = 0; n <= (g_numWorkerThreads + g_numDistributorThreads); ++n) {
+ for (unsigned int n = 0; n <= (RecThreadInfo::s_numWorkerThreads + RecThreadInfo::s_numDistributorThreads); ++n) {
auto& threadInfos = g_threadInfos.at(n);
int fd[2];
size_t idx = 0;
for (const auto& threadInfo : g_threadInfos) {
- if (threadInfo.isWorker) {
+ if (threadInfo.isWorker()) {
g_log << Logger::Notice << "stats: thread " << idx << " has been distributed " << threadInfo.numberOfDistributedQueries << " queries" << endl;
++idx;
}
for the initialization of ACLs and domain maps. After that it should only
be called by the handler. */
- if (g_threadInfos.empty() && isHandlerThread()) {
+ if (g_threadInfos.empty() && RecThreadInfo::id() == 0) {
/* the handler and distributors will call themselves below, but
during startup we get called while g_threadInfos has not been
populated yet to update the ACL or domain maps, so we need to
unsigned int n = 0;
for (const auto& threadInfo : g_threadInfos) {
- if (n++ == t_id) {
+ if (n++ == RecThreadInfo::id()) {
func(); // don't write to ourselves!
continue;
}
template <class T>
T broadcastAccFunction(const boost::function<T*()>& func)
{
- if (!isHandlerThread()) {
- g_log << Logger::Error << "broadcastAccFunction has been called by a worker (" << t_id << ")" << endl;
+ if (!RecThreadInfo::self().isHandler()) {
+ g_log << Logger::Error << "broadcastAccFunction has been called by a worker (" << RecThreadInfo::id() << ")" << endl;
_exit(1);
}
unsigned int n = 0;
T ret = T();
for (const auto& threadInfo : g_threadInfos) {
- if (n++ == t_id) {
+ if (n++ == RecThreadInfo::id()) {
continue;
}
}
/* this needs to be done before parseACLs(), which call broadcastFunction() */
- g_weDistributeQueries = ::arg().mustDo("pdns-distributes-queries");
- if (g_weDistributeQueries) {
+ RecThreadInfo::s_weDistributeQueries = ::arg().mustDo("pdns-distributes-queries");
+ if (RecThreadInfo::s_weDistributeQueries) {
g_log << Logger::Warning << "PowerDNS Recursor itself will distribute queries over threads" << endl;
}
}
g_paddingTag = ::arg().asNum("edns-padding-tag");
- g_numDistributorThreads = ::arg().asNum("distributor-threads");
- g_numWorkerThreads = ::arg().asNum("threads");
- if (g_numWorkerThreads < 1) {
+ RecThreadInfo::s_numDistributorThreads = ::arg().asNum("distributor-threads");
+ RecThreadInfo::s_numWorkerThreads = ::arg().asNum("threads");
+ if (RecThreadInfo::s_numWorkerThreads < 1) {
g_log << Logger::Warning << "Asked to run with 0 threads, raising to 1 instead" << endl;
- g_numWorkerThreads = 1;
+ RecThreadInfo::s_numWorkerThreads = 1;
}
- g_numThreads = g_numDistributorThreads + g_numWorkerThreads;
g_maxMThreads = ::arg().asNum("max-mthreads");
int64_t maxInFlight = ::arg().asNum("max-concurrent-requests-per-tcp-connection");
g_reusePort = ::arg().mustDo("reuseport");
#endif
- g_threadInfos.resize(g_numDistributorThreads + g_numWorkerThreads + /* handler */ 1);
+ g_threadInfos.resize(RecThreadInfo::s_numDistributorThreads + RecThreadInfo::s_numWorkerThreads + /* handler */ 1);
if (g_reusePort) {
- if (g_weDistributeQueries) {
+ if (RecThreadInfo::s_weDistributeQueries) {
/* first thread is the handler, then distributors */
- for (unsigned int threadId = 1; threadId <= g_numDistributorThreads; threadId++) {
+ for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numDistributorThreads; threadId++) {
auto& deferredAdds = g_threadInfos.at(threadId).deferredAdds;
auto& tcpSockets = g_threadInfos.at(threadId).tcpSockets;
makeUDPServerSockets(deferredAdds);
}
else {
/* first thread is the handler, there is no distributor here and workers are accepting queries */
- for (unsigned int threadId = 1; threadId <= g_numWorkerThreads; threadId++) {
+ for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numWorkerThreads; threadId++) {
auto& deferredAdds = g_threadInfos.at(threadId).deferredAdds;
auto& tcpSockets = g_threadInfos.at(threadId).tcpSockets;
makeUDPServerSockets(deferredAdds);
/* every listener (so distributor if g_weDistributeQueries, workers otherwise)
needs to listen to the shared sockets */
- if (g_weDistributeQueries) {
+ if (RecThreadInfo::s_weDistributeQueries) {
/* first thread is the handler, then distributors */
- for (unsigned int threadId = 1; threadId <= g_numDistributorThreads; threadId++) {
+ for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numDistributorThreads; threadId++) {
g_threadInfos.at(threadId).tcpSockets = tcpSockets;
}
}
else {
/* first thread is the handler, there is no distributor here and workers are accepting queries */
- for (unsigned int threadId = 1; threadId <= g_numWorkerThreads; threadId++) {
+ for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numWorkerThreads; threadId++) {
g_threadInfos.at(threadId).tcpSockets = tcpSockets;
}
}
unsigned int currentThreadId = 1;
const auto cpusMap = parseCPUMap();
- if (g_numThreads == 1) {
+ if (RecThreadInfo::numThreads() == 1) {
g_log << Logger::Warning << "Operating unthreaded" << endl;
#ifdef HAVE_SYSTEMD
sd_notify(0, "READY=1");
#endif
/* This thread handles the web server, carbon, statistics and the control channel */
- auto& handlerInfos = g_threadInfos.at(0);
- handlerInfos.isHandler = true;
- handlerInfos.thread = std::thread(recursorThread, 0, "web+stat");
+ auto& handlerInfo = g_threadInfos.at(0);
+ handlerInfo.setHandler();
+ handlerInfo.start(0, "web+stat");
setCPUMap(cpusMap, currentThreadId, pthread_self());
- auto& infos = g_threadInfos.at(currentThreadId);
- infos.isListener = true;
- infos.isWorker = true;
- recursorThread(currentThreadId++, "worker");
+ auto& info = g_threadInfos.at(currentThreadId);
+ info.setListener();
+ info.setWorker();
+ info.setThreadId(currentThreadId++);
+ recursorThread();
- handlerInfos.thread.join();
- if (handlerInfos.exitCode != 0) {
- ret = handlerInfos.exitCode;
+ handlerInfo.thread.join();
+ if (handlerInfo.exitCode != 0) {
+ ret = handlerInfo.exitCode;
}
}
else {
-
- if (g_weDistributeQueries) {
- for (unsigned int n = 0; n < g_numDistributorThreads; ++n) {
- auto& infos = g_threadInfos.at(currentThreadId + n);
- infos.isListener = true;
+ if (RecThreadInfo::s_weDistributeQueries) {
+ for (unsigned int n = 0; n < RecThreadInfo::s_numDistributorThreads; ++n) {
+ g_threadInfos.at(currentThreadId + n).setListener();
}
}
- for (unsigned int n = 0; n < g_numWorkerThreads; ++n) {
- auto& infos = g_threadInfos.at(currentThreadId + (g_weDistributeQueries ? g_numDistributorThreads : 0) + n);
- infos.isListener = !g_weDistributeQueries;
- infos.isWorker = true;
+ for (unsigned int n = 0; n < RecThreadInfo::s_numWorkerThreads; ++n) {
+ auto& info = g_threadInfos.at(currentThreadId + (RecThreadInfo::s_weDistributeQueries ? RecThreadInfo::s_numDistributorThreads : 0) + n);
+ info.setListener(!RecThreadInfo::s_weDistributeQueries);
+ info.setWorker();
}
- if (g_weDistributeQueries) {
- g_log << Logger::Warning << "Launching " << g_numDistributorThreads << " distributor threads" << endl;
- for (unsigned int n = 0; n < g_numDistributorThreads; ++n) {
- auto& infos = g_threadInfos.at(currentThreadId);
- infos.thread = std::thread(recursorThread, currentThreadId++, "distr");
- setCPUMap(cpusMap, currentThreadId, infos.thread.native_handle());
+ if (RecThreadInfo::s_weDistributeQueries) {
+ g_log << Logger::Warning << "Launching " << RecThreadInfo::s_numDistributorThreads << " distributor threads" << endl;
+ for (unsigned int n = 0; n < RecThreadInfo::s_numDistributorThreads; ++n) {
+ auto& info = g_threadInfos.at(currentThreadId);
+ info.start(currentThreadId++, "distr");
+ setCPUMap(cpusMap, currentThreadId, info.thread.native_handle());
}
}
- g_log << Logger::Warning << "Launching " << g_numWorkerThreads << " worker threads" << endl;
+ g_log << Logger::Warning << "Launching " << RecThreadInfo::s_numWorkerThreads << " worker threads" << endl;
- for (unsigned int n = 0; n < g_numWorkerThreads; ++n) {
- auto& infos = g_threadInfos.at(currentThreadId);
- infos.thread = std::thread(recursorThread, currentThreadId++, "worker");
- setCPUMap(cpusMap, currentThreadId, infos.thread.native_handle());
+ for (unsigned int n = 0; n < RecThreadInfo::s_numWorkerThreads; ++n) {
+ auto& info = g_threadInfos.at(currentThreadId);
+ info.start(currentThreadId++, "worker");
+ setCPUMap(cpusMap, currentThreadId, info.thread.native_handle());
}
#ifdef HAVE_SYSTEMD
#endif
/* This thread handles the web server, carbon, statistics and the control channel */
- auto& infos = g_threadInfos.at(0);
- infos.isHandler = true;
- infos.thread = std::thread(recursorThread, 0, "web+stat");
+ auto& info = g_threadInfos.at(0);
+ info.setHandler();
+ info.start(0, "web+stat");
for (auto& ti : g_threadInfos) {
ti.thread.join();
g_log << Logger::Error << "PIPE function we executed created PDNS exception: " << e.reason << endl; // but what if they wanted an answer.. we send 0
}
if (tmsg->wantAnswer) {
- const auto& threadInfo = g_threadInfos.at(t_id);
- if (write(threadInfo.pipes.writeFromThread, &resp, sizeof(resp)) != sizeof(resp)) {
+ if (write(RecThreadInfo::self().pipes.writeFromThread, &resp, sizeof(resp)) != sizeof(resp)) {
delete tmsg;
unixDie("write to thread pipe returned wrong size or error");
}
past = now;
past.tv_sec -= 5;
if (t_last_prune < past) {
- t_packetCache->doPruneTo(g_maxPacketCacheEntries / (g_numWorkerThreads + g_numDistributorThreads));
+ t_packetCache->doPruneTo(g_maxPacketCacheEntries / (RecThreadInfo::s_numWorkerThreads + RecThreadInfo::s_numDistributorThreads));
time_t limit;
if (!((t_cleanCounter++) % 40)) { // this is a full scan!
Utility::gettimeofday(&t_last_prune, nullptr);
}
- if (isHandlerThread()) {
+ if (RecThreadInfo::self().isHandler()) {
if (now.tv_sec - s_last_ZTC_prune > 60) {
s_last_ZTC_prune = now.tv_sec;
static map<DNSName, RecZoneToCache::State> ztcStates;
RecZoneToCache::ZoneToCache(ztc.second, ztcStates.at(ztc.first));
}
}
-
if (now.tv_sec - s_last_RC_prune > 5) {
g_recCache->doPrune(g_maxCacheEntries);
g_negCache->prune(g_maxCacheEntries / 10);
}
}
-void* recursorThread(unsigned int n, const string& threadName)
-try {
- t_id = n;
- auto& threadInfo = g_threadInfos.at(t_id);
-
- static string threadPrefix = "pdns-r/";
- setThreadName(threadPrefix + threadName);
-
- SyncRes tmp(g_now); // make sure it allocates tsstorage before we do anything, like primeHints or so..
- SyncRes::setDomainMap(g_initialDomainMap);
- t_allowFrom = g_initialAllowFrom;
- t_allowNotifyFrom = g_initialAllowNotifyFrom;
- t_allowNotifyFor = g_initialAllowNotifyFor;
- t_udpclientsocks = std::make_unique<UDPClientSocks>();
- t_tcpClientCounts = std::make_unique<tcpClientCounts_t>();
-
- if (threadInfo.isHandler) {
- if (!primeHints()) {
- threadInfo.exitCode = EXIT_FAILURE;
- RecursorControlChannel::stop = 1;
- g_log << Logger::Critical << "Priming cache failed, stopping" << endl;
- return nullptr;
+void* recursorThread()
+{
+ try {
+ auto& threadInfo = RecThreadInfo::self();
+ {
+ SyncRes tmp(g_now); // make sure it allocates tsstorage before we do anything, like primeHints or so..
+ SyncRes::setDomainMap(g_initialDomainMap);
+ t_allowFrom = g_initialAllowFrom;
+ t_allowNotifyFrom = g_initialAllowNotifyFrom;
+ t_allowNotifyFor = g_initialAllowNotifyFor;
+ t_udpclientsocks = std::make_unique<UDPClientSocks>();
+ t_tcpClientCounts = std::make_unique<tcpClientCounts_t>();
+
+ if (threadInfo.isHandler()) {
+ if (!primeHints()) {
+ threadInfo.exitCode = EXIT_FAILURE;
+ RecursorControlChannel::stop = 1;
+ g_log << Logger::Critical << "Priming cache failed, stopping" << endl;
+ return nullptr;
+ }
+ g_log << Logger::Debug << "Done priming cache with root hints" << endl;
+ }
}
- g_log << Logger::Debug << "Done priming cache with root hints" << endl;
- }
- t_packetCache = std::make_unique<RecursorPacketCache>();
+ t_packetCache = std::make_unique<RecursorPacketCache>();
#ifdef NOD_ENABLED
- if (threadInfo.isWorker)
- setupNODThread();
+ if (threadInfo.isWorker())
+ setupNODThread();
#endif /* NOD_ENABLED */
- /* the listener threads handle TCP queries */
- if (threadInfo.isWorker || threadInfo.isListener) {
- try {
- if (!::arg()["lua-dns-script"].empty()) {
- t_pdl = std::make_shared<RecursorLua4>();
- t_pdl->loadFile(::arg()["lua-dns-script"]);
- g_log << Logger::Warning << "Loaded 'lua' script from '" << ::arg()["lua-dns-script"] << "'" << endl;
+ /* the listener threads handle TCP queries */
+ if (threadInfo.isWorker() || threadInfo.isListener()) {
+ try {
+ if (!::arg()["lua-dns-script"].empty()) {
+ t_pdl = std::make_shared<RecursorLua4>();
+ t_pdl->loadFile(::arg()["lua-dns-script"]);
+ g_log << Logger::Warning << "Loaded 'lua' script from '" << ::arg()["lua-dns-script"] << "'" << endl;
+ }
+ }
+ catch (std::exception& e) {
+ g_log << Logger::Error << "Failed to load 'lua' script from '" << ::arg()["lua-dns-script"] << "': " << e.what() << endl;
+ _exit(99);
}
}
- catch (std::exception& e) {
- g_log << Logger::Error << "Failed to load 'lua' script from '" << ::arg()["lua-dns-script"] << "': " << e.what() << endl;
- _exit(99);
- }
- }
- unsigned int ringsize = ::arg().asNum("stats-ringbuffer-entries") / g_numWorkerThreads;
- if (ringsize) {
- t_remotes = std::make_unique<addrringbuf_t>();
- if (g_weDistributeQueries)
- t_remotes->set_capacity(::arg().asNum("stats-ringbuffer-entries") / g_numDistributorThreads);
- else
- t_remotes->set_capacity(ringsize);
- t_servfailremotes = std::make_unique<addrringbuf_t>();
- t_servfailremotes->set_capacity(ringsize);
- t_bogusremotes = std::make_unique<addrringbuf_t>();
- t_bogusremotes->set_capacity(ringsize);
- t_largeanswerremotes = std::make_unique<addrringbuf_t>();
- t_largeanswerremotes->set_capacity(ringsize);
- t_timeouts = std::make_unique<addrringbuf_t>();
- t_timeouts->set_capacity(ringsize);
-
- t_queryring = std::make_unique<boost::circular_buffer<pair<DNSName, uint16_t>>>();
- t_queryring->set_capacity(ringsize);
- t_servfailqueryring = std::make_unique<boost::circular_buffer<pair<DNSName, uint16_t>>>();
- t_servfailqueryring->set_capacity(ringsize);
- t_bogusqueryring = std::make_unique<boost::circular_buffer<pair<DNSName, uint16_t>>>();
- t_bogusqueryring->set_capacity(ringsize);
- }
- MT = std::make_unique<MT_t>(::arg().asNum("stack-size"));
- threadInfo.mt = MT.get();
-
- /* start protobuf export threads if needed */
- auto luaconfsLocal = g_luaconfs.getLocal();
- checkProtobufExport(luaconfsLocal);
- checkOutgoingProtobufExport(luaconfsLocal);
+ unsigned int ringsize = ::arg().asNum("stats-ringbuffer-entries") / RecThreadInfo::s_numWorkerThreads;
+ if (ringsize) {
+ t_remotes = std::make_unique<addrringbuf_t>();
+ if (RecThreadInfo::s_weDistributeQueries)
+ t_remotes->set_capacity(::arg().asNum("stats-ringbuffer-entries") / RecThreadInfo::s_numDistributorThreads);
+ else
+ t_remotes->set_capacity(ringsize);
+ t_servfailremotes = std::make_unique<addrringbuf_t>();
+ t_servfailremotes->set_capacity(ringsize);
+ t_bogusremotes = std::make_unique<addrringbuf_t>();
+ t_bogusremotes->set_capacity(ringsize);
+ t_largeanswerremotes = std::make_unique<addrringbuf_t>();
+ t_largeanswerremotes->set_capacity(ringsize);
+ t_timeouts = std::make_unique<addrringbuf_t>();
+ t_timeouts->set_capacity(ringsize);
+
+ t_queryring = std::make_unique<boost::circular_buffer<pair<DNSName, uint16_t>>>();
+ t_queryring->set_capacity(ringsize);
+ t_servfailqueryring = std::make_unique<boost::circular_buffer<pair<DNSName, uint16_t>>>();
+ t_servfailqueryring->set_capacity(ringsize);
+ t_bogusqueryring = std::make_unique<boost::circular_buffer<pair<DNSName, uint16_t>>>();
+ t_bogusqueryring->set_capacity(ringsize);
+ }
+ MT = std::make_unique<MT_t>(::arg().asNum("stack-size"));
+ threadInfo.mt = MT.get();
+
+ /* start protobuf export threads if needed */
+ auto luaconfsLocal = g_luaconfs.getLocal();
+ checkProtobufExport(luaconfsLocal);
+ checkOutgoingProtobufExport(luaconfsLocal);
#ifdef HAVE_FSTRM
- checkFrameStreamExport(luaconfsLocal);
+ checkFrameStreamExport(luaconfsLocal);
#endif
- PacketID pident;
+ PacketID pident;
- t_fdm = getMultiplexer();
+ t_fdm = getMultiplexer();
- RecursorWebServer* rws = nullptr;
+ RecursorWebServer* rws = nullptr;
- t_fdm->addReadFD(threadInfo.pipes.readToThread, handlePipeRequest);
+ t_fdm->addReadFD(threadInfo.pipes.readToThread, handlePipeRequest);
- if (threadInfo.isHandler) {
- if (::arg().mustDo("webserver")) {
- g_log << Logger::Warning << "Enabling web server" << endl;
- try {
- rws = new RecursorWebServer(t_fdm);
- }
- catch (const PDNSException& e) {
- g_log << Logger::Error << "Unable to start the internal web server: " << e.reason << endl;
- _exit(99);
+ if (threadInfo.isHandler()) {
+ if (::arg().mustDo("webserver")) {
+ g_log << Logger::Warning << "Enabling web server" << endl;
+ try {
+ rws = new RecursorWebServer(t_fdm);
+ }
+ catch (const PDNSException& e) {
+ g_log << Logger::Error << "Unable to start the internal web server: " << e.reason << endl;
+ _exit(99);
+ }
}
+ g_log << Logger::Info << "Enabled '" << t_fdm->getName() << "' multiplexer" << endl;
}
- g_log << Logger::Info << "Enabled '" << t_fdm->getName() << "' multiplexer" << endl;
- }
- else {
- t_fdm->addReadFD(threadInfo.pipes.readQueriesToThread, handlePipeRequest);
+ else {
+ t_fdm->addReadFD(threadInfo.pipes.readQueriesToThread, handlePipeRequest);
- if (threadInfo.isListener) {
- if (g_reusePort) {
- /* then every listener has its own FDs */
- for (const auto& deferred : threadInfo.deferredAdds) {
- t_fdm->addReadFD(deferred.first, deferred.second);
+ if (threadInfo.isListener()) {
+ if (g_reusePort) {
+ /* then every listener has its own FDs */
+ for (const auto& deferred : threadInfo.deferredAdds) {
+ t_fdm->addReadFD(deferred.first, deferred.second);
+ }
}
- }
- else {
- /* otherwise all listeners are listening on the same ones */
- for (const auto& deferred : g_deferredAdds) {
- t_fdm->addReadFD(deferred.first, deferred.second);
+ else {
+ /* otherwise all listeners are listening on the same ones */
+ for (const auto& deferred : g_deferredAdds) {
+ t_fdm->addReadFD(deferred.first, deferred.second);
+ }
}
}
}
- }
- registerAllStats();
+ registerAllStats();
- if (threadInfo.isHandler) {
- t_fdm->addReadFD(g_rcc.d_fd, handleRCC); // control channel
- }
+ if (threadInfo.isHandler()) {
+ t_fdm->addReadFD(g_rcc.d_fd, handleRCC); // control channel
+ }
- unsigned int maxTcpClients = ::arg().asNum("max-tcp-clients");
+ unsigned int maxTcpClients = ::arg().asNum("max-tcp-clients");
- bool listenOnTCP(true);
+ bool listenOnTCP(true);
- time_t last_stat = 0;
- time_t last_carbon = 0, last_lua_maintenance = 0;
- time_t carbonInterval = ::arg().asNum("carbon-interval");
- time_t luaMaintenanceInterval = ::arg().asNum("lua-maintenance-interval");
- s_counter.store(0); // used to periodically execute certain tasks
+ time_t last_stat = 0;
+ time_t last_carbon = 0, last_lua_maintenance = 0;
+ time_t carbonInterval = ::arg().asNum("carbon-interval");
+ time_t luaMaintenanceInterval = ::arg().asNum("lua-maintenance-interval");
+ s_counter.store(0); // used to periodically execute certain tasks
- while (!RecursorControlChannel::stop) {
- while (MT->schedule(&g_now))
- ; // MTasker letting the mthreads do their thing
+ while (!RecursorControlChannel::stop) {
+ while (MT->schedule(&g_now))
+ ; // MTasker letting the mthreads do their thing
- // Use primes, it avoid not being scheduled in cases where the counter has a regular pattern.
- // We want to call handler thread often, it gets scheduled about 2 times per second
- if ((threadInfo.isHandler && s_counter % 11 == 0) || s_counter % 499 == 0) {
- MT->makeThread(houseKeeping, 0);
- }
+ // Use primes, it avoid not being scheduled in cases where the counter has a regular pattern.
+ // We want to call handler thread often, it gets scheduled about 2 times per second
+ if ((threadInfo.isHandler() && s_counter % 11 == 0) || s_counter % 499 == 0) {
+ MT->makeThread(houseKeeping, 0);
+ }
- if (!(s_counter % 55)) {
- typedef vector<pair<int, FDMultiplexer::funcparam_t>> expired_t;
- expired_t expired = t_fdm->getTimeouts(g_now);
+ if (!(s_counter % 55)) {
+ typedef vector<pair<int, FDMultiplexer::funcparam_t>> expired_t;
+ expired_t expired = t_fdm->getTimeouts(g_now);
- for (expired_t::iterator i = expired.begin(); i != expired.end(); ++i) {
- shared_ptr<TCPConnection> conn = boost::any_cast<shared_ptr<TCPConnection>>(i->second);
- if (g_logCommonErrors)
- g_log << Logger::Warning << "Timeout from remote TCP client " << conn->d_remote.toStringWithPort() << endl;
- t_fdm->removeReadFD(i->first);
+ for (expired_t::iterator i = expired.begin(); i != expired.end(); ++i) {
+ shared_ptr<TCPConnection> conn = boost::any_cast<shared_ptr<TCPConnection>>(i->second);
+ if (g_logCommonErrors)
+ g_log << Logger::Warning << "Timeout from remote TCP client " << conn->d_remote.toStringWithPort() << endl;
+ t_fdm->removeReadFD(i->first);
+ }
}
- }
- s_counter++;
+ s_counter++;
- if (threadInfo.isHandler) {
- if (statsWanted || (s_statisticsInterval > 0 && (g_now.tv_sec - last_stat) >= s_statisticsInterval)) {
- doStats();
- last_stat = g_now.tv_sec;
- }
+ if (threadInfo.isHandler()) {
+ if (statsWanted || (s_statisticsInterval > 0 && (g_now.tv_sec - last_stat) >= s_statisticsInterval)) {
+ doStats();
+ last_stat = g_now.tv_sec;
+ }
- Utility::gettimeofday(&g_now, nullptr);
+ Utility::gettimeofday(&g_now, nullptr);
- if ((g_now.tv_sec - last_carbon) >= carbonInterval) {
- MT->makeThread(doCarbonDump, 0);
- last_carbon = g_now.tv_sec;
+ if ((g_now.tv_sec - last_carbon) >= carbonInterval) {
+ MT->makeThread(doCarbonDump, 0);
+ last_carbon = g_now.tv_sec;
+ }
}
- }
- if (t_pdl != nullptr) {
- // lua-dns-script directive is present, call the maintenance callback if needed
- /* remember that the listener threads handle TCP queries */
- if (threadInfo.isWorker || threadInfo.isListener) {
- // Only on threads processing queries
- if (g_now.tv_sec - last_lua_maintenance >= luaMaintenanceInterval) {
- t_pdl->maintenance();
- last_lua_maintenance = g_now.tv_sec;
+ if (t_pdl != nullptr) {
+ // lua-dns-script directive is present, call the maintenance callback if needed
+ /* remember that the listener threads handle TCP queries */
+ if (threadInfo.isWorker() || threadInfo.isListener()) {
+ // Only on threads processing queries
+ if (g_now.tv_sec - last_lua_maintenance >= luaMaintenanceInterval) {
+ t_pdl->maintenance();
+ last_lua_maintenance = g_now.tv_sec;
+ }
}
}
- }
- t_fdm->run(&g_now);
- // 'run' updates g_now for us
+ t_fdm->run(&g_now);
+ // 'run' updates g_now for us
- if (threadInfo.isListener) {
- if (listenOnTCP) {
- if (TCPConnection::getCurrentConnections() > maxTcpClients) { // shutdown, too many connections
- for (const auto fd : threadInfo.tcpSockets) {
- t_fdm->removeReadFD(fd);
+ if (threadInfo.isListener()) {
+ if (listenOnTCP) {
+ if (TCPConnection::getCurrentConnections() > maxTcpClients) { // shutdown, too many connections
+ for (const auto fd : threadInfo.tcpSockets) {
+ t_fdm->removeReadFD(fd);
+ }
+ listenOnTCP = false;
}
- listenOnTCP = false;
}
- }
- else {
- if (TCPConnection::getCurrentConnections() <= maxTcpClients) { // reenable
- for (const auto fd : threadInfo.tcpSockets) {
- t_fdm->addReadFD(fd, handleNewTCPQuestion);
+ else {
+ if (TCPConnection::getCurrentConnections() <= maxTcpClients) { // reenable
+ for (const auto fd : threadInfo.tcpSockets) {
+ t_fdm->addReadFD(fd, handleNewTCPQuestion);
+ }
+ listenOnTCP = true;
}
- listenOnTCP = true;
}
}
}
+ delete rws;
+ delete t_fdm;
+ return nullptr;
+ }
+ catch (PDNSException& ae) {
+ g_log << Logger::Error << "Exception: " << ae.reason << endl;
+ return nullptr;
+ }
+ catch (std::exception& e) {
+ g_log << Logger::Error << "STL Exception: " << e.what() << endl;
+ return nullptr;
+ }
+ catch (...) {
+ g_log << Logger::Error << "any other exception in main: " << endl;
+ return nullptr;
}
- delete rws;
- delete t_fdm;
- return 0;
-}
-catch (PDNSException& ae) {
- g_log << Logger::Error << "Exception: " << ae.reason << endl;
- return 0;
-}
-catch (std::exception& e) {
- g_log << Logger::Error << "STL Exception: " << e.what() << endl;
- return 0;
-}
-catch (...) {
- g_log << Logger::Error << "any other exception in main: " << endl;
- return 0;
}
int main(int argc, char** argv)
try {
if (fname.empty()) {
t_pdl.reset();
- g_log << Logger::Info << t_id << " Unloaded current lua script" << endl;
+ g_log << Logger::Info << RecThreadInfo::id() << " Unloaded current lua script" << endl;
return new RecursorControlChannel::Answer{0, string("unloaded\n")};
}
else {
t_pdl = std::make_shared<RecursorLua4>();
int err = t_pdl->loadFile(fname);
if (err != 0) {
- string msg = std::to_string(t_id) + " Retaining current script, could not read '" + fname + "': " + stringerror(err);
+ string msg = std::to_string(RecThreadInfo::id()) + " Retaining current script, could not read '" + fname + "': " + stringerror(err);
g_log << Logger::Error << msg << endl;
return new RecursorControlChannel::Answer{1, msg + "\n"};
}
}
}
catch (std::exception& e) {
- g_log << Logger::Error << t_id << " Retaining current script, error from '" << fname << "': " << e.what() << endl;
+ g_log << Logger::Error << RecThreadInfo::id() << " Retaining current script, error from '" << fname << "': " << e.what() << endl;
return new RecursorControlChannel::Answer{1, string("retaining current script, error from '" + fname + "': " + e.what() + "\n")};
}
- g_log << Logger::Warning << t_id << " (Re)loaded lua script from '" << fname << "'" << endl;
+ g_log << Logger::Warning << RecThreadInfo::id() << " (Re)loaded lua script from '" << fname << "'" << endl;
return new RecursorControlChannel::Answer{0, string("(re)loaded '" + fname + "'\n")};
}