#include <sys/stat.h>
#include <unistd.h>
+#include "recpacketcache.hh"
#include "ws-recursor.hh"
#include <pthread.h>
-#include "recpacketcache.hh"
#include "utility.hh"
#include "dns_random.hh"
#ifdef HAVE_LIBSODIUM
typedef map<ComboAddress, uint32_t, ComboAddress::addressOnlyLessThan> tcpClientCounts_t;
static thread_local std::shared_ptr<RecursorLua4> t_pdl;
-static thread_local unsigned int t_id;
+static thread_local int t_id = -1;
static thread_local std::shared_ptr<Regex> t_traceRegex;
static thread_local std::unique_ptr<tcpClientCounts_t> t_tcpClientCounts;
int readToThread;
int writeFromThread;
int readFromThread;
+ int writeQueriesToThread; // this one is non-blocking
+ int readQueriesToThread;
};
-typedef vector<int> tcpListenSockets_t;
+/* the TID of the thread handling the web server, carbon, statistics and the control channel */
+static const int s_handlerThreadID = -1;
+/* when pdns-distributes-queries is set, the TID of the thread handling, hashing and distributing new queries
+ to the other threads */
+static const int s_distributorThreadID = 0;
+
+typedef std::map<int, std::set<int>> tcpListenSockets_t;
typedef map<int, ComboAddress> listenSocketsAddresses_t; // is shared across all threads right now
+
typedef vector<pair<int, function< void(int, any&) > > > deferredAdd_t;
static const ComboAddress g_local4("0.0.0.0"), g_local6("::");
static std::shared_ptr<SyncRes::domainmap_t> g_initialDomainMap; // new threads needs this to be setup
static std::shared_ptr<NetmaskGroup> g_initialAllowFrom; // new thread needs to be setup with this
static size_t g_tcpMaxQueriesPerConn;
+static size_t s_maxUDPQueriesPerRound;
static uint64_t g_latencyStatSize;
static uint32_t g_disthashseed;
static unsigned int g_maxTCPPerClient;
vector<pair<uint16_t, string> > d_ednsOpts;
std::vector<std::string> d_policyTags;
LuaContext::LuaObject d_data;
+ uint32_t d_ttlCap{std::numeric_limits<uint32_t>::max()};
+ bool d_variable{false};
};
MT_t* getMT()
return theArg;
}
-unsigned int getRecursorThreadId()
+int getRecursorThreadId()
{
return t_id;
}
Netmask requestorNM(remote, remote.sin4.sin_family == AF_INET ? maskV4 : maskV6);
const ComboAddress& requestor = requestorNM.getMaskedNetwork();
RecProtoBufMessage message(DNSProtoBufMessage::Query, uniqueId, &requestor, &local, qname, qtype, qclass, id, tcp, len);
+ message.setServerIdentity(SyncRes::s_serverID);
message.setEDNSSubnet(ednssubnet, ednssubnet.isIpv4() ? maskV4 : maskV6);
message.setRequestorId(requestorId);
message.setDeviceId(deviceId);
}
}
-static bool addRecordToPacket(DNSPacketWriter& pw, const DNSRecord& rec, uint32_t& minTTL, const uint16_t maxAnswerSize)
+static bool addRecordToPacket(DNSPacketWriter& pw, const DNSRecord& rec, uint32_t& minTTL, uint32_t ttlCap, const uint16_t maxAnswerSize)
{
- pw.startRecord(rec.d_name, rec.d_type, rec.d_ttl, rec.d_class, rec.d_place);
+ pw.startRecord(rec.d_name, rec.d_type, (rec.d_ttl > ttlCap ? ttlCap : rec.d_ttl), rec.d_class, rec.d_place);
if(rec.d_type != QType::OPT) // their TTL ain't real
minTTL = min(minTTL, rec.d_ttl);
if (luaconfsLocal->protobufServer) {
Netmask requestorNM(dc->d_remote, dc->d_remote.sin4.sin_family == AF_INET ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
const ComboAddress& requestor = requestorNM.getMaskedNetwork();
+ pbMessage.setServerIdentity(SyncRes::s_serverID);
pbMessage.update(dc->d_uuid, &requestor, &dc->d_local, dc->d_tcp, dc->d_mdp.d_header.id);
pbMessage.setEDNSSubnet(dc->d_ednssubnet.source, dc->d_ednssubnet.source.isIpv4() ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
pbMessage.setQuestion(dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass);
pw.getHeader()->rd=dc->d_mdp.d_header.rd;
pw.getHeader()->cd=dc->d_mdp.d_header.cd;
- uint32_t minTTL=std::numeric_limits<uint32_t>::max();
+ /* This is the lowest TTL seen in the records of the response,
+ so we can't cache it for longer than this value.
+ If we have a TTL cap, this value can't be larger than the
+ cap no matter what. */
+ uint32_t minTTL = dc->d_ttlCap;
SyncRes sr(dc->d_now);
}
bool tracedQuery=false; // we could consider letting Lua know about this too
- bool variableAnswer = false;
+ bool variableAnswer = dc->d_variable;
bool shouldNotValidate = false;
/* preresolve expects res (dq.rcode) to be set to RCode::NoError by default */
res = RCode::ServFail;
}
+ dq.validationState = sr.getValidationState();
+
// During lookup, an NSDNAME or NSIP trigger was hit in RPZ
if (res == -2) { // XXX This block should be macro'd, it is repeated post-resolve.
appliedPolicy = sr.d_appliedPolicy;
continue;
}
- if (!addRecordToPacket(pw, *i, minTTL, maxanswersize)) {
+ if (!addRecordToPacket(pw, *i, minTTL, dc->d_ttlCap, maxanswersize)) {
needCommit = false;
break;
}
OPT record. This MUST also occur when a truncated response (using
the DNS header's TC bit) is returned."
*/
- if (addRecordToPacket(pw, makeOpt(edo.d_packetsize, 0, edo.d_Z), minTTL, maxanswersize)) {
+ if (addRecordToPacket(pw, makeOpt(edo.d_packetsize, 0, edo.d_Z), minTTL, dc->d_ttlCap, maxanswersize)) {
pw.commit();
}
}
}
if(sendmsg(dc->d_socket, &msgh, 0) < 0 && g_logCommonErrors)
L<<Logger::Warning<<"Sending UDP reply to client "<<dc->d_remote.toStringWithPort()<<" failed with: "<<strerror(errno)<<endl;
+
if(!SyncRes::s_nopacketcache && !variableAnswer && !sr.wasVariable() ) {
t_packetCache->insertResponsePacket(dc->d_tag, dc->d_qhash, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass,
string((const char*)&*packet.begin(), packet.size()),
}
- sr.d_outqueries ? t_RC->cacheMisses++ : t_RC->cacheHits++;
+ if (sr.d_outqueries || sr.d_authzonequeries) {
+ t_RC->cacheMisses++;
+ }
+ else {
+ t_RC->cacheHits++;
+ }
if(spent < 0.001)
g_stats.answers0_1++;
}
catch(...) {
L<<Logger::Error<<"Any other exception in a resolver context "<< makeLoginfo(dc) <<endl;
+ delete dc;
}
g_stats.maxMThreadStackUsage = max(MT->getMaxStackUsage(), g_stats.maxMThreadStackUsage);
dc->d_tcp=true;
dc->setRemote(&conn->d_remote);
ComboAddress dest;
- memset(&dest, 0, sizeof(dest));
+ dest.reset();
dest.sin4.sin_family = conn->d_remote.sin4.sin_family;
socklen_t len = dest.getSocklen();
getsockname(conn->getFD(), (sockaddr*)&dest, &len); // if this fails, we're ok with it
}
#endif
- if(needECS || (t_pdl && t_pdl->d_gettag)) {
+ if(needECS || (t_pdl && (t_pdl->d_gettag_ffi || t_pdl->d_gettag))) {
try {
std::map<uint16_t, EDNSOptionView> ednsOptions;
dc->d_ecsParsed = true;
dc->d_ecsFound = getQNameAndSubnet(std::string(conn->data, conn->qlen), &qname, &qtype, &qclass, &dc->d_ednssubnet, g_gettagNeedsEDNSOptions ? &ednsOptions : nullptr);
- if(t_pdl && t_pdl->d_gettag) {
+ if(t_pdl) {
try {
- dc->d_tag = t_pdl->gettag(conn->d_remote, dc->d_ednssubnet.source, dest, qname, qtype, &dc->d_policyTags, dc->d_data, ednsOptions, true, requestorId, deviceId);
+ if (t_pdl->d_gettag_ffi) {
+ dc->d_tag = t_pdl->gettag_ffi(conn->d_remote, dc->d_ednssubnet.source, dest, qname, qtype, &dc->d_policyTags, dc->d_data, ednsOptions, true, requestorId, deviceId, dc->d_ttlCap, dc->d_variable);
+ }
+ else if (t_pdl->d_gettag) {
+ dc->d_tag = t_pdl->gettag(conn->d_remote, dc->d_ednssubnet.source, dest, qname, qtype, &dc->d_policyTags, dc->d_data, ednsOptions, true, requestorId, deviceId);
+ }
}
- catch(std::exception& e) {
+ catch(const std::exception& e) {
if(g_logCommonErrors)
L<<Logger::Warning<<"Error parsing a query packet qname='"<<qname<<"' for tag determination, setting tag=0: "<<e.what()<<endl;
}
}
}
- catch(std::exception& e)
+ catch(const std::exception& e)
{
if(g_logCommonErrors)
L<<Logger::Warning<<"Error parsing a query packet for tag determination, setting tag=0: "<<e.what()<<endl;
EDNSSubnetOpts ednssubnet;
bool ecsFound = false;
bool ecsParsed = false;
+ uint32_t ttlCap = std::numeric_limits<uint32_t>::max();
+ bool variable = false;
try {
DNSName qname;
uint16_t qtype=0;
*/
#endif
- if(needECS || (t_pdl && t_pdl->d_gettag)) {
+ if(needECS || (t_pdl && (t_pdl->d_gettag || t_pdl->d_gettag_ffi))) {
try {
std::map<uint16_t, EDNSOptionView> ednsOptions;
ecsFound = getQNameAndSubnet(question, &qname, &qtype, &qclass, &ednssubnet, g_gettagNeedsEDNSOptions ? &ednsOptions : nullptr);
qnameParsed = true;
ecsParsed = true;
- if(t_pdl && t_pdl->d_gettag) {
+ if(t_pdl) {
try {
- ctag=t_pdl->gettag(fromaddr, ednssubnet.source, destaddr, qname, qtype, &policyTags, data, ednsOptions, false, requestorId, deviceId);
+ if (t_pdl->d_gettag_ffi) {
+ ctag = t_pdl->gettag_ffi(fromaddr, ednssubnet.source, destaddr, qname, qtype, &policyTags, data, ednsOptions, false, requestorId, deviceId, ttlCap, variable);
+ }
+ else if (t_pdl->d_gettag) {
+ ctag=t_pdl->gettag(fromaddr, ednssubnet.source, destaddr, qname, qtype, &policyTags, data, ednsOptions, false, requestorId, deviceId);
+ }
}
- catch(std::exception& e) {
+ catch(const std::exception& e) {
if(g_logCommonErrors)
L<<Logger::Warning<<"Error parsing a query packet qname='"<<qname<<"' for tag determination, setting tag=0: "<<e.what()<<endl;
}
}
}
- catch(std::exception& e)
+ catch(const std::exception& e)
{
if(g_logCommonErrors)
L<<Logger::Warning<<"Error parsing a query packet for tag determination, setting tag=0: "<<e.what()<<endl;
bool cacheHit = false;
RecProtoBufMessage pbMessage(DNSProtoBufMessage::DNSProtoBufMessageType::Response);
#ifdef HAVE_PROTOBUF
+ pbMessage.setServerIdentity(SyncRes::s_serverID);
if(luaconfsLocal->protobufServer) {
if (!luaconfsLocal->protobufTaggedOnly || !policyTags.empty()) {
protobufLogQuery(luaconfsLocal->protobufServer, luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, uniqueId, fromaddr, destaddr, ednssubnet.source, false, dh->id, question.size(), qname, qtype, qclass, policyTags, requestorId, deviceId);
}
#endif /* HAVE_PROTOBUF */
+ /* It might seem like a good idea to skip the packet cache lookup if we know that the answer is not cacheable,
+ but it means that the hash would not be computed. If some script decides at a later time to mark back the answer
+ as cacheable we would cache it with a wrong tag, so better safe than sorry. */
if (qnameParsed) {
cacheHit = (!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(ctag, question, qname, qtype, qclass, g_now.tv_sec, &response, &age, &qhash, &pbMessage));
}
dc->d_ecsFound = ecsFound;
dc->d_ecsParsed = ecsParsed;
dc->d_ednssubnet = ednssubnet;
+ dc->d_ttlCap = ttlCap;
+ dc->d_variable = variable;
#ifdef HAVE_PROTOBUF
if (luaconfsLocal->protobufServer || luaconfsLocal->outgoingProtobufServer) {
dc->d_uuid = uniqueId;
fromaddr.sin6.sin6_family=AF_INET6; // this makes sure fromaddr is big enough
fillMSGHdr(&msgh, &iov, cbuf, sizeof(cbuf), data, sizeof(data), &fromaddr);
- for(;;)
+ for(size_t counter = 0; counter < s_maxUDPQueriesPerRound; counter++)
if((len=recvmsg(fd, &msgh, 0)) >= 0) {
firstQuery = false;
struct timeval tv={0,0};
HarvestTimestamp(&msgh, &tv);
ComboAddress dest;
- memset(&dest, 0, sizeof(dest)); // this makes sure we ignore this address if not returned by recvmsg above
+ dest.reset(); // this makes sure we ignore this address if not returned by recvmsg above
auto loc = rplookup(g_listenSocketsAddresses, fd);
if(HarvestDestinationAddress(&msgh, &dest)) {
// but.. need to get port too
ComboAddress sin;
- memset((char *)&sin,0, sizeof(sin));
+ sin.reset();
sin.sin4.sin_family = AF_INET;
if(!IpToU32(st.host, (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
sin.sin6.sin6_family = AF_INET6;
setSocketSendBuffer(fd, 65000);
listen(fd, 128);
deferredAdds[threadId].push_back(make_pair(fd, handleNewTCPQuestion));
- g_tcpListenSockets.push_back(fd);
+ g_tcpListenSockets[threadId].insert(fd);
// we don't need to update g_listenSocketsAddresses since it doesn't work for TCP/IP:
// - fd is not that which we know here, but returned from accept()
if(sin.sin4.sin_family == AF_INET)
ComboAddress sin;
- memset(&sin, 0, sizeof(sin));
+ sin.reset();
sin.sin4.sin_family = AF_INET;
if(!IpToU32(st.host.c_str() , (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
sin.sin6.sin6_family = AF_INET6;
static void houseKeeping(void *)
{
- static thread_local time_t last_stat, last_rootupdate, last_prune, last_secpoll;
+ static thread_local time_t last_rootupdate, last_prune, last_secpoll;
static thread_local int cleanCounter=0;
static thread_local bool s_running; // houseKeeping can get suspended in secpoll, and be restarted, which makes us do duplicate work
try {
last_rootupdate=now.tv_sec;
}
- if(!t_id) {
- if(g_statisticsInterval > 0 && now.tv_sec - last_stat >= g_statisticsInterval) {
- doStats();
- last_stat=time(0);
- }
+ if(t_id == s_distributorThreadID) {
if(now.tv_sec - last_secpoll >= 3600) {
try {
doSecPoll(&last_secpoll);
}
- catch(...) {}
+ catch(std::exception& e)
+ {
+ L<<Logger::Error<<"Exception while performing security poll: "<<e.what()<<endl;
+ }
+ catch(PDNSException& e)
+ {
+ L<<Logger::Error<<"Exception while performing security poll: "<<e.reason<<endl;
+ }
+ catch(ImmediateServFailException &e)
+ {
+ L<<Logger::Error<<"Exception while performing security poll: "<<e.reason<<endl;
+ }
+ catch(...)
+ {
+ L<<Logger::Error<<"Exception while performing security poll"<<endl;
+ }
+
}
}
s_running=false;
tps.readFromThread = fd[0];
tps.writeFromThread = fd[1];
+ if(pipe(fd) < 0)
+ unixDie("Creating pipe for inter-thread communications");
+ tps.readQueriesToThread = fd[0];
+ tps.writeQueriesToThread = fd[1];
+
+ if (!setNonBlocking(tps.writeQueriesToThread)) {
+ unixDie("Making pipe for inter-thread communications non-blocking");
+ }
+
g_pipes.push_back(tps);
}
}
bool wantAnswer;
};
-void broadcastFunction(const pipefunc_t& func, bool skipSelf)
+void broadcastFunction(const pipefunc_t& func)
{
- unsigned int n = 0;
+ /* This function might be called before t_id are set during startup
+ for the initialization of ACLs and domain maps, but the default is the same
+ than the handler thread */
+ if (t_id != s_handlerThreadID) {
+ L<<Logger::Error<<"broadcastFunction() has been called by a worker ("<<t_id<<")"<<endl;
+ exit(1);
+ }
+
+ /* the distributor will call itself below, but if we are the handler thread,
+ call the function ourselves to update the ACL or domain maps for example */
+ func();
+
+ int n = 0;
for(ThreadPipeSet& tps : g_pipes)
{
if(n++ == t_id) {
- if(!skipSelf)
- func(); // don't write to ourselves!
+ func(); // don't write to ourselves!
continue;
}
unixDie("write to thread pipe returned wrong size or error");
}
- string* resp;
+ string* resp = nullptr;
if(read(tps.readFromThread, &resp, sizeof(resp)) != sizeof(resp))
unixDie("read from thread pipe returned wrong size or error");
if(resp) {
-// cerr <<"got response: " << *resp << endl;
delete resp;
+ resp = nullptr;
}
}
}
+// This function is only called by the distributor thread, when pdns-distributes-queries is set
void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
{
+ if (t_id != s_distributorThreadID) {
+ L<<Logger::Error<<"distributeAsyncFunction() has been called by a worker ("<<t_id<<")"<<endl;
+ exit(1);
+ }
+
unsigned int hash = hashQuestion(packet.c_str(), packet.length(), g_disthashseed);
unsigned int target = 1 + (hash % (g_pipes.size()-1));
- if(target == t_id) {
- func();
- return;
+ if(target == static_cast<unsigned int>(s_distributorThreadID)) {
+ L<<Logger::Error<<"distributeAsyncFunction() tried to assign a query to the distributor"<<endl;
+ exit(1);
}
+
ThreadPipeSet& tps = g_pipes[target];
ThreadMSG* tmsg = new ThreadMSG();
tmsg->func = func;
tmsg->wantAnswer = false;
- if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) {
+ ssize_t written = write(tps.writeQueriesToThread, &tmsg, sizeof(tmsg));
+ if (written > 0) {
+ if (static_cast<size_t>(written) != sizeof(tmsg)) {
+ delete tmsg;
+ unixDie("write to thread pipe returned wrong size or error");
+ }
+ }
+ else {
+ int error = errno;
delete tmsg;
- unixDie("write to thread pipe returned wrong size or error");
+ if (error == EAGAIN || error == EWOULDBLOCK) {
+ g_stats.queryPipeFullDrops++;
+ } else {
+ unixDie("write to thread pipe returned wrong size or error:" + std::to_string(error));
+ }
}
}
{
ThreadMSG* tmsg = nullptr;
- if(read(fd, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { // fd == readToThread
+ if(read(fd, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { // fd == readToThread || fd == readQueriesToThread
unixDie("read from thread pipe returned wrong size or error");
}
}
-template<class T> T broadcastAccFunction(const boost::function<T*()>& func, bool skipSelf)
-{
- unsigned int n = 0;
+/*
+ This function should only be called by the handler to gather metrics, wipe the cache,
+ reload the Lua script (not the Lua config) or change the current trace regex,
+ and by the SNMP thread to gather metrics. */
+template<class T> T broadcastAccFunction(const boost::function<T*()>& func)
+{
+ /* the SNMP thread uses id -1 too */
+ if (t_id != s_handlerThreadID) {
+ L<<Logger::Error<<"broadcastAccFunction has been called by a worker ("<<t_id<<")"<<endl;
+ exit(1);
+ }
+
T ret=T();
for(ThreadPipeSet& tps : g_pipes)
{
- if(n++ == t_id) {
- if(!skipSelf) {
- T* resp = (T*)func(); // don't write to ourselves!
- if(resp) {
- //~ cerr <<"got direct: " << *resp << endl;
- ret += *resp;
- delete resp;
- }
- }
- continue;
- }
-
ThreadMSG* tmsg = new ThreadMSG();
tmsg->func = boost::bind(voider<T>, func);
tmsg->wantAnswer = true;
unixDie("write to thread pipe returned wrong size or error");
}
- T* resp;
+ T* resp = nullptr;
if(read(tps.readFromThread, &resp, sizeof(resp)) != sizeof(resp))
unixDie("read from thread pipe returned wrong size or error");
if(resp) {
- //~ cerr <<"got response: " << *resp << endl;
ret += *resp;
delete resp;
+ resp = nullptr;
}
}
return ret;
}
-template string broadcastAccFunction(const boost::function<string*()>& fun, bool skipSelf); // explicit instantiation
-template uint64_t broadcastAccFunction(const boost::function<uint64_t*()>& fun, bool skipSelf); // explicit instantiation
-template vector<ComboAddress> broadcastAccFunction(const boost::function<vector<ComboAddress> *()>& fun, bool skipSelf); // explicit instantiation
-template vector<pair<DNSName,uint16_t> > broadcastAccFunction(const boost::function<vector<pair<DNSName, uint16_t> > *()>& fun, bool skipSelf); // explicit instantiation
+template string broadcastAccFunction(const boost::function<string*()>& fun); // explicit instantiation
+template uint64_t broadcastAccFunction(const boost::function<uint64_t*()>& fun); // explicit instantiation
+template vector<ComboAddress> broadcastAccFunction(const boost::function<vector<ComboAddress> *()>& fun); // explicit instantiation
+template vector<pair<DNSName,uint16_t> > broadcastAccFunction(const boost::function<vector<pair<DNSName, uint16_t> > *()>& fun); // explicit instantiation
static void handleRCC(int fd, FDMultiplexer::funcparam_t& var)
{
}
}
-static void* recursorThread(void*);
+static void* recursorThread(int tid, bool worker);
static void* pleaseSupplantACLs(std::shared_ptr<NetmaskGroup> ng)
{
if(l_initialized) { // only reload configuration file on second call
string configname=::arg()["config-dir"]+"/recursor.conf";
+ if(::arg()["config-name"]!="") {
+ configname=::arg()["config-dir"]+"/recursor-"+::arg()["config-name"]+".conf";
+ }
cleanSlashes(configname);
if(!::arg().preParseFile(configname.c_str(), "allow-from-file"))
g_maxCacheEntries = ::arg().asNum("max-cache-entries");
g_maxPacketCacheEntries = ::arg().asNum("max-packetcache-entries");
-
+
+ luaConfigDelayedThreads delayedLuaThreads;
try {
- loadRecursorLuaConfig(::arg()["lua-config-file"], ::arg().mustDo("daemon"));
+ loadRecursorLuaConfig(::arg()["lua-config-file"], delayedLuaThreads);
}
catch (PDNSException &e) {
L<<Logger::Error<<"Cannot load Lua configuration: "<<e.reason<<endl;
g_lowercaseOutgoing = ::arg().mustDo("lowercase-outgoing");
g_numWorkerThreads = ::arg().asNum("threads");
+ if (g_numWorkerThreads < 1) {
+ L<<Logger::Warning<<"Asked to run with 0 threads, raising to 1 instead"<<endl;
+ g_numWorkerThreads = 1;
+ }
+
g_numThreads = g_numWorkerThreads + g_weDistributeQueries;
g_maxMThreads = ::arg().asNum("max-mthreads");
else {
makeUDPServerSockets(0);
makeTCPServerSockets(0);
+
+ if (!g_weDistributeQueries) {
+ /* we are not distributing queries and we don't have reuseport,
+ so every thread will be listening on all the TCP sockets */
+ for (unsigned int threadId = 1; threadId < g_numWorkerThreads; threadId++) {
+ g_tcpListenSockets[threadId] = g_tcpListenSockets[0];
+ }
+ }
}
SyncRes::parseEDNSSubnetWhitelist(::arg()["edns-subnet-whitelist"]);
L<<Logger::Warning<<"Calling daemonize, going to background"<<endl;
L.toConsole(Logger::Critical);
daemonize();
- loadRecursorLuaConfig(::arg()["lua-config-file"], false);
}
signal(SIGUSR1,usr1Handler);
signal(SIGUSR2,usr2Handler);
Utility::dropUserPrivs(newuid);
+ startLuaConfigDelayedThreads(delayedLuaThreads);
+
makeThreadPipes();
g_tcpTimeout=::arg().asNum("client-tcp-timeout");
g_maxTCPPerClient=::arg().asNum("max-tcp-per-client");
g_tcpMaxQueriesPerConn=::arg().asNum("max-tcp-queries-per-connection");
+ s_maxUDPQueriesPerRound=::arg().asNum("max-udp-queries-per-round");
if (::arg().mustDo("snmp-agent")) {
g_snmpAgent = std::make_shared<RecursorSNMPAgent>("recursor", ::arg()["snmp-master-socket"]);
g_snmpAgent->run();
}
+ /* This thread handles the web server, carbon, statistics and the control channel */
+ std::thread handlerThread(recursorThread, s_handlerThreadID, false);
+
const auto cpusMap = parseCPUMap();
+
+ std::vector<std::thread> workers(g_numThreads);
if(g_numThreads == 1) {
L<<Logger::Warning<<"Operating unthreaded"<<endl;
#ifdef HAVE_SYSTEMD
sd_notify(0, "READY=1");
#endif
setCPUMap(cpusMap, 0, pthread_self());
- recursorThread(0);
+ recursorThread(0, true);
}
else {
- pthread_t tid;
L<<Logger::Warning<<"Launching "<< g_numThreads <<" threads"<<endl;
for(unsigned int n=0; n < g_numThreads; ++n) {
- pthread_create(&tid, 0, recursorThread, (void*)(long)n);
+ workers[n] = std::thread(recursorThread, n, true);
- setCPUMap(cpusMap, n, tid);
+ setCPUMap(cpusMap, n, workers[n].native_handle());
}
- void* res;
#ifdef HAVE_SYSTEMD
sd_notify(0, "READY=1");
#endif
- pthread_join(tid, &res);
+ workers.back().join();
}
return 0;
}
-static void* recursorThread(void* ptr)
+static void* recursorThread(int n, bool worker)
try
{
- t_id=(int) (long) ptr;
+ t_id=n;
SyncRes tmp(g_now); // make sure it allocates tsstorage before we do anything, like primeHints or so..
SyncRes::setDomainMap(g_initialDomainMap);
t_allowFrom = g_initialAllowFrom;
#endif
L<<Logger::Warning<<"Done priming cache with root hints"<<endl;
- try {
- if(!::arg()["lua-dns-script"].empty()) {
- t_pdl = std::make_shared<RecursorLua4>(::arg()["lua-dns-script"]);
- L<<Logger::Warning<<"Loaded 'lua' script from '"<<::arg()["lua-dns-script"]<<"'"<<endl;
+ if(worker && (!g_weDistributeQueries || t_id != s_distributorThreadID)) {
+ try {
+ if(!::arg()["lua-dns-script"].empty()) {
+ t_pdl = std::make_shared<RecursorLua4>(::arg()["lua-dns-script"]);
+ L<<Logger::Warning<<"Loaded 'lua' script from '"<<::arg()["lua-dns-script"]<<"'"<<endl;
+ }
+ }
+ catch(std::exception &e) {
+ L<<Logger::Error<<"Failed to load 'lua' script from '"<<::arg()["lua-dns-script"]<<"': "<<e.what()<<endl;
+ _exit(99);
}
- }
- catch(std::exception &e) {
- L<<Logger::Error<<"Failed to load 'lua' script from '"<<::arg()["lua-dns-script"]<<"': "<<e.what()<<endl;
- _exit(99);
}
unsigned int ringsize=::arg().asNum("stats-ringbuffer-entries") / g_numWorkerThreads;
PacketID pident;
t_fdm=getMultiplexer();
- if(!t_id) {
+
+ if(!worker) {
if(::arg().mustDo("webserver")) {
L<<Logger::Warning << "Enabling web server" << endl;
try {
}
L<<Logger::Error<<"Enabled '"<< t_fdm->getName() << "' multiplexer"<<endl;
}
-
- t_fdm->addReadFD(g_pipes[t_id].readToThread, handlePipeRequest);
-
- if(g_useOneSocketPerThread) {
- for(deferredAdd_t::const_iterator i = deferredAdds[t_id].cbegin(); i != deferredAdds[t_id].cend(); ++i) {
- t_fdm->addReadFD(i->first, i->second);
- }
- }
else {
- if(!g_weDistributeQueries || !t_id) { // if we distribute queries, only t_id = 0 listens
- for(deferredAdd_t::const_iterator i = deferredAdds[0].cbegin(); i != deferredAdds[0].cend(); ++i) {
+ t_fdm->addReadFD(g_pipes[t_id].readToThread, handlePipeRequest);
+ t_fdm->addReadFD(g_pipes[t_id].readQueriesToThread, handlePipeRequest);
+
+ if(g_useOneSocketPerThread) {
+ for(deferredAdd_t::const_iterator i = deferredAdds[t_id].cbegin(); i != deferredAdds[t_id].cend(); ++i) {
t_fdm->addReadFD(i->first, i->second);
}
}
+ else {
+ if(!g_weDistributeQueries || t_id == s_distributorThreadID) { // if we distribute queries, only t_id = 0 listens
+ for(deferredAdd_t::const_iterator i = deferredAdds[0].cbegin(); i != deferredAdds[0].cend(); ++i) {
+ t_fdm->addReadFD(i->first, i->second);
+ }
+ }
+ }
}
registerAllStats();
- if(!t_id) {
+
+ if(!worker) {
t_fdm->addReadFD(s_rcc.d_fd, handleRCC); // control channel
}
bool listenOnTCP(true);
+ time_t last_stat = 0;
time_t last_carbon=0;
time_t carbonInterval=::arg().asNum("carbon-interval");
counter.store(0); // used to periodically execute certain tasks
counter++;
- if(!t_id && statsWanted) {
- doStats();
- }
+ if(!worker) {
+ if(statsWanted || (g_statisticsInterval > 0 && (g_now.tv_sec - last_stat) >= g_statisticsInterval)) {
+ doStats();
+ last_stat = g_now.tv_sec;
+ }
- Utility::gettimeofday(&g_now, 0);
+ Utility::gettimeofday(&g_now, 0);
- if(!t_id && (g_now.tv_sec - last_carbon >= carbonInterval)) {
- MT->makeThread(doCarbonDump, 0);
- last_carbon = g_now.tv_sec;
+ if((g_now.tv_sec - last_carbon) >= carbonInterval) {
+ MT->makeThread(doCarbonDump, 0);
+ last_carbon = g_now.tv_sec;
+ }
}
t_fdm->run(&g_now);
// 'run' updates g_now for us
- if(!g_weDistributeQueries || !t_id) { // if pdns distributes queries, only tid 0 should do this
+ if(worker && (!g_weDistributeQueries || t_id == s_distributorThreadID)) { // if pdns distributes queries, only tid 0 should do this
if(listenOnTCP) {
if(TCPConnection::getCurrentConnections() > maxTcpClients) { // shutdown, too many connections
- for(tcpListenSockets_t::iterator i=g_tcpListenSockets.begin(); i != g_tcpListenSockets.end(); ++i)
- t_fdm->removeReadFD(*i);
+ for(const auto fd : g_tcpListenSockets[t_id]) {
+ t_fdm->removeReadFD(fd);
+ }
listenOnTCP=false;
}
}
else {
if(TCPConnection::getCurrentConnections() <= maxTcpClients) { // reenable
- for(tcpListenSockets_t::iterator i=g_tcpListenSockets.begin(); i != g_tcpListenSockets.end(); ++i)
- t_fdm->addReadFD(*i, handleNewTCPQuestion);
+ for(const auto fd : g_tcpListenSockets[t_id]) {
+ t_fdm->addReadFD(fd, handleNewTCPQuestion);
+ }
listenOnTCP=true;
}
}
::arg().set("packetcache-ttl", "maximum number of seconds to keep a cached entry in packetcache")="3600";
::arg().set("max-packetcache-entries", "maximum number of entries to keep in the packetcache")="500000";
::arg().set("packetcache-servfail-ttl", "maximum number of seconds to keep a cached servfail entry in packetcache")="60";
- ::arg().set("server-id", "Returned when queried for 'server.id' TXT or NSID, defaults to hostname")="";
+ ::arg().set("server-id", "Returned when queried for 'id.server' TXT or NSID, defaults to hostname")="";
::arg().set("stats-ringbuffer-entries", "maximum number of packets to store statistics for")="10000";
::arg().set("version-string", "string reported on version.pdns or version.bind")=fullVersionString();
::arg().set("allow-from", "If set, only allow these comma separated netmasks to recurse")=LOCAL_NETS;
::arg().set("max-qperq", "Maximum outgoing queries per query")="50";
::arg().set("max-total-msec", "Maximum total wall-clock time per query in milliseconds, 0 for unlimited")="7000";
::arg().set("max-recursion-depth", "Maximum number of internal recursion calls per query, 0 for unlimited")="40";
+ ::arg().set("max-udp-queries-per-round", "Maximum number of UDP queries processed per recvmsg() round, before returning back to normal processing")="10000";
::arg().set("include-dir","Include *.conf files from this directory")="";
::arg().set("security-poll-suffix","Domain name from which to query security update notifications")="secpoll.powerdns.com.";