to the other threads */
static const int s_distributorThreadID = 0;
-typedef vector<int> tcpListenSockets_t;
+typedef std::map<int, std::set<int>> tcpListenSockets_t;
typedef map<int, ComboAddress> listenSocketsAddresses_t; // is shared across all threads right now
+
typedef vector<pair<int, function< void(int, any&) > > > deferredAdd_t;
static const ComboAddress g_local4("0.0.0.0"), g_local6("::");
static std::shared_ptr<SyncRes::domainmap_t> g_initialDomainMap; // new threads needs this to be setup
static std::shared_ptr<NetmaskGroup> g_initialAllowFrom; // new thread needs to be setup with this
static size_t g_tcpMaxQueriesPerConn;
+static size_t s_maxUDPQueriesPerRound;
static uint64_t g_latencyStatSize;
static uint32_t g_disthashseed;
static unsigned int g_maxTCPPerClient;
string d_requestorId;
string d_deviceId;
#endif
+ std::string d_query;
EDNSSubnetOpts d_ednssubnet;
bool d_ecsFound{false};
bool d_ecsParsed{false};
int d_socket;
unsigned int d_tag{0};
uint32_t d_qhash{0};
- string d_query;
shared_ptr<TCPConnection> d_tcpConnection;
vector<pair<uint16_t, string> > d_ednsOpts;
std::vector<std::string> d_policyTags;
LuaContext::LuaObject d_data;
uint32_t d_ttlCap{std::numeric_limits<uint32_t>::max()};
+ uint16_t d_ecsBegin{0};
+ uint16_t d_ecsEnd{0};
bool d_variable{false};
};
return theArg;
}
-unsigned int getRecursorThreadId()
+int getRecursorThreadId()
{
- return static_cast<unsigned int>(t_id);
+ return t_id;
}
int getMTaskerTID()
Netmask requestorNM(remote, remote.sin4.sin_family == AF_INET ? maskV4 : maskV6);
const ComboAddress& requestor = requestorNM.getMaskedNetwork();
RecProtoBufMessage message(DNSProtoBufMessage::Query, uniqueId, &requestor, &local, qname, qtype, qclass, id, tcp, len);
+ message.setServerIdentity(SyncRes::s_serverID);
message.setEDNSSubnet(ednssubnet, ednssubnet.isIpv4() ? maskV4 : maskV6);
message.setRequestorId(requestorId);
message.setDeviceId(deviceId);
if (luaconfsLocal->protobufServer) {
Netmask requestorNM(dc->d_remote, dc->d_remote.sin4.sin_family == AF_INET ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
const ComboAddress& requestor = requestorNM.getMaskedNetwork();
+ pbMessage.setServerIdentity(SyncRes::s_serverID);
pbMessage.update(dc->d_uuid, &requestor, &dc->d_local, dc->d_tcp, dc->d_mdp.d_header.id);
pbMessage.setEDNSSubnet(dc->d_ednssubnet.source, dc->d_ednssubnet.source.isIpv4() ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
pbMessage.setQuestion(dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass);
L<<Logger::Warning<<"Sending UDP reply to client "<<dc->d_remote.toStringWithPort()<<" failed with: "<<strerror(errno)<<endl;
if(!SyncRes::s_nopacketcache && !variableAnswer && !sr.wasVariable() ) {
- t_packetCache->insertResponsePacket(dc->d_tag, dc->d_qhash, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass,
+ t_packetCache->insertResponsePacket(dc->d_tag, dc->d_qhash, dc->d_query, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass,
string((const char*)&*packet.begin(), packet.size()),
g_now.tv_sec,
pw.getHeader()->rcode == RCode::ServFail ? SyncRes::s_packetcacheservfailttl :
min(minTTL,SyncRes::s_packetcachettl),
+ dc->d_ecsBegin,
+ dc->d_ecsEnd,
&pbMessage);
}
// else cerr<<"Not putting in packet cache: "<<sr.wasVariable()<<endl;
}
catch(...) {
L<<Logger::Error<<"Any other exception in a resolver context "<< makeLoginfo(dc) <<endl;
+ delete dc;
}
g_stats.maxMThreadStackUsage = max(MT->getMaxStackUsage(), g_stats.maxMThreadStackUsage);
dc->d_tcp=true;
dc->setRemote(&conn->d_remote);
ComboAddress dest;
- memset(&dest, 0, sizeof(dest));
+ dest.reset();
dest.sin4.sin_family = conn->d_remote.sin4.sin_family;
socklen_t len = dest.getSocklen();
getsockname(conn->getFD(), (sockaddr*)&dest, &len); // if this fails, we're ok with it
dc->d_uuid = (*t_uuidGenerator)();
}
+ const struct dnsheader* dh = (const struct dnsheader*) conn->data;
if(luaconfsLocal->protobufServer) {
try {
- const struct dnsheader* dh = (const struct dnsheader*) conn->data;
if (!luaconfsLocal->protobufTaggedOnly) {
protobufLogQuery(luaconfsLocal->protobufServer, luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, dc->d_uuid, conn->d_remote, dest, dc->d_ednssubnet.source, true, dh->id, conn->qlen, qname, qtype, qclass, dc->d_policyTags, dc->d_requestorId, dc->d_deviceId);
}
}
#endif
+ if(t_pdl) {
+ if(t_pdl->ipfilter(dc->d_remote, dc->d_local, *dh)) {
+ delete dc;
+ if(!g_quiet)
+ L<<Logger::Notice<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] DROPPED TCP question from "<<conn->d_remote.toStringWithPort()<<" based on policy"<<endl;
+ g_stats.policyDrops++;
+ return;
+ }
+ }
+
if(dc->d_mdp.d_header.qr) {
delete dc;
g_stats.ignoredCount++;
EDNSSubnetOpts ednssubnet;
bool ecsFound = false;
bool ecsParsed = false;
+ uint16_t ecsBegin = 0;
+ uint16_t ecsEnd = 0;
uint32_t ttlCap = std::numeric_limits<uint32_t>::max();
bool variable = false;
try {
bool cacheHit = false;
RecProtoBufMessage pbMessage(DNSProtoBufMessage::DNSProtoBufMessageType::Response);
#ifdef HAVE_PROTOBUF
+ pbMessage.setServerIdentity(SyncRes::s_serverID);
if(luaconfsLocal->protobufServer) {
if (!luaconfsLocal->protobufTaggedOnly || !policyTags.empty()) {
protobufLogQuery(luaconfsLocal->protobufServer, luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, uniqueId, fromaddr, destaddr, ednssubnet.source, false, dh->id, question.size(), qname, qtype, qclass, policyTags, requestorId, deviceId);
but it means that the hash would not be computed. If some script decides at a later time to mark back the answer
as cacheable we would cache it with a wrong tag, so better safe than sorry. */
if (qnameParsed) {
- cacheHit = (!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(ctag, question, qname, qtype, qclass, g_now.tv_sec, &response, &age, &qhash, &pbMessage));
+ cacheHit = (!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(ctag, question, qname, qtype, qclass, g_now.tv_sec, &response, &age, &qhash, &ecsBegin, &ecsEnd, &pbMessage));
}
else {
- cacheHit = (!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(ctag, question, g_now.tv_sec, &response, &age, &qhash, &pbMessage));
+ cacheHit = (!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(ctag, question, qname, &qtype, &qclass, g_now.tv_sec, &response, &age, &qhash, &ecsBegin, &ecsEnd, &pbMessage));
}
if (cacheHit) {
dc->d_data = data;
dc->d_ecsFound = ecsFound;
dc->d_ecsParsed = ecsParsed;
+ dc->d_ecsBegin = ecsBegin;
+ dc->d_ecsEnd = ecsEnd;
dc->d_ednssubnet = ednssubnet;
dc->d_ttlCap = ttlCap;
dc->d_variable = variable;
fromaddr.sin6.sin6_family=AF_INET6; // this makes sure fromaddr is big enough
fillMSGHdr(&msgh, &iov, cbuf, sizeof(cbuf), data, sizeof(data), &fromaddr);
- for(;;)
+ for(size_t counter = 0; counter < s_maxUDPQueriesPerRound; counter++)
if((len=recvmsg(fd, &msgh, 0)) >= 0) {
firstQuery = false;
struct timeval tv={0,0};
HarvestTimestamp(&msgh, &tv);
ComboAddress dest;
- memset(&dest, 0, sizeof(dest)); // this makes sure we ignore this address if not returned by recvmsg above
+ dest.reset(); // this makes sure we ignore this address if not returned by recvmsg above
auto loc = rplookup(g_listenSocketsAddresses, fd);
if(HarvestDestinationAddress(&msgh, &dest)) {
// but.. need to get port too
ComboAddress sin;
- memset((char *)&sin,0, sizeof(sin));
+ sin.reset();
sin.sin4.sin_family = AF_INET;
if(!IpToU32(st.host, (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
sin.sin6.sin6_family = AF_INET6;
setSocketSendBuffer(fd, 65000);
listen(fd, 128);
deferredAdds[threadId].push_back(make_pair(fd, handleNewTCPQuestion));
- g_tcpListenSockets.push_back(fd);
+ g_tcpListenSockets[threadId].insert(fd);
// we don't need to update g_listenSocketsAddresses since it doesn't work for TCP/IP:
// - fd is not that which we know here, but returned from accept()
if(sin.sin4.sin_family == AF_INET)
ComboAddress sin;
- memset(&sin, 0, sizeof(sin));
+ sin.reset();
sin.sin4.sin_family = AF_INET;
if(!IpToU32(st.host.c_str() , (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
sin.sin6.sin6_family = AF_INET6;
}
}
-// This function is only called by the distributor thread, when pdns-distributes-queries is set
-void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
+static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg)
{
- if (t_id != s_distributorThreadID) {
- L<<Logger::Error<<"distributeAsyncFunction() has been called by a worker ("<<t_id<<")"<<endl;
- exit(1);
- }
-
- unsigned int hash = hashQuestion(packet.c_str(), packet.length(), g_disthashseed);
- unsigned int target = 1 + (hash % (g_pipes.size()-1));
-
if(target == static_cast<unsigned int>(s_distributorThreadID)) {
L<<Logger::Error<<"distributeAsyncFunction() tried to assign a query to the distributor"<<endl;
exit(1);
}
ThreadPipeSet& tps = g_pipes[target];
- ThreadMSG* tmsg = new ThreadMSG();
- tmsg->func = func;
- tmsg->wantAnswer = false;
-
ssize_t written = write(tps.writeQueriesToThread, &tmsg, sizeof(tmsg));
if (written > 0) {
if (static_cast<size_t>(written) != sizeof(tmsg)) {
}
else {
int error = errno;
- delete tmsg;
if (error == EAGAIN || error == EWOULDBLOCK) {
- g_stats.queryPipeFullDrops++;
+ /* the pipe is full, sorry */
+ return false;
} else {
+ delete tmsg;
unixDie("write to thread pipe returned wrong size or error:" + std::to_string(error));
}
}
+
+ return true;
+}
+
+// This function is only called by the distributor thread, when pdns-distributes-queries is set
+void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
+{
+ if (t_id != s_distributorThreadID) {
+ L<<Logger::Error<<"distributeAsyncFunction() has been called by a worker ("<<t_id<<")"<<endl;
+ exit(1);
+ }
+
+ unsigned int hash = hashQuestion(packet.c_str(), packet.length(), g_disthashseed);
+ unsigned int target = 1 + (hash % (g_pipes.size()-1));
+
+ ThreadMSG* tmsg = new ThreadMSG();
+ tmsg->func = func;
+ tmsg->wantAnswer = false;
+
+ if (!trySendingQueryToWorker(target, tmsg)) {
+ /* if this function failed but did not raise an exception, it means that the pipe
+ was full, let's try another one */
+ unsigned int newTarget = 0;
+ do {
+ newTarget = 1 + dns_random(g_pipes.size()-1);
+ } while (newTarget == target);
+
+ if (!trySendingQueryToWorker(newTarget, tmsg)) {
+ g_stats.queryPipeFullDrops++;
+ delete tmsg;
+ }
+ }
}
static void handlePipeRequest(int fd, FDMultiplexer::funcparam_t& var)
exit(1);
}
+ g_signatureInceptionSkew = ::arg().asNum("signature-inception-skew");
+ if (g_signatureInceptionSkew < 0) {
+ L<<Logger::Error<<"A negative value for 'signature-inception-skew' is not allowed"<<endl;
+ exit(1);
+ }
+
g_dnssecLogBogus = ::arg().mustDo("dnssec-log-bogus");
g_maxNSEC3Iterations = ::arg().asNum("nsec3-max-iterations");
g_maxCacheEntries = ::arg().asNum("max-cache-entries");
g_maxPacketCacheEntries = ::arg().asNum("max-packetcache-entries");
-
+
+ luaConfigDelayedThreads delayedLuaThreads;
try {
- loadRecursorLuaConfig(::arg()["lua-config-file"], ::arg().mustDo("daemon"));
+ loadRecursorLuaConfig(::arg()["lua-config-file"], delayedLuaThreads);
}
catch (PDNSException &e) {
L<<Logger::Error<<"Cannot load Lua configuration: "<<e.reason<<endl;
else {
makeUDPServerSockets(0);
makeTCPServerSockets(0);
+
+ if (!g_weDistributeQueries) {
+ /* we are not distributing queries and we don't have reuseport,
+ so every thread will be listening on all the TCP sockets */
+ for (unsigned int threadId = 1; threadId < g_numWorkerThreads; threadId++) {
+ g_tcpListenSockets[threadId] = g_tcpListenSockets[0];
+ }
+ }
}
SyncRes::parseEDNSSubnetWhitelist(::arg()["edns-subnet-whitelist"]);
L<<Logger::Warning<<"Calling daemonize, going to background"<<endl;
L.toConsole(Logger::Critical);
daemonize();
- loadRecursorLuaConfig(::arg()["lua-config-file"], false);
}
signal(SIGUSR1,usr1Handler);
signal(SIGUSR2,usr2Handler);
Utility::dropUserPrivs(newuid);
+ startLuaConfigDelayedThreads(delayedLuaThreads);
+
makeThreadPipes();
g_tcpTimeout=::arg().asNum("client-tcp-timeout");
g_maxTCPPerClient=::arg().asNum("max-tcp-per-client");
g_tcpMaxQueriesPerConn=::arg().asNum("max-tcp-queries-per-connection");
+ s_maxUDPQueriesPerRound=::arg().asNum("max-udp-queries-per-round");
if (::arg().mustDo("snmp-agent")) {
g_snmpAgent = std::make_shared<RecursorSNMPAgent>("recursor", ::arg()["snmp-master-socket"]);
#endif
L<<Logger::Warning<<"Done priming cache with root hints"<<endl;
- try {
- if(!::arg()["lua-dns-script"].empty()) {
- t_pdl = std::make_shared<RecursorLua4>(::arg()["lua-dns-script"]);
- L<<Logger::Warning<<"Loaded 'lua' script from '"<<::arg()["lua-dns-script"]<<"'"<<endl;
+ if(worker) {
+ try {
+ if(!::arg()["lua-dns-script"].empty()) {
+ t_pdl = std::make_shared<RecursorLua4>(::arg()["lua-dns-script"]);
+ L<<Logger::Warning<<"Loaded 'lua' script from '"<<::arg()["lua-dns-script"]<<"'"<<endl;
+ }
+ }
+ catch(std::exception &e) {
+ L<<Logger::Error<<"Failed to load 'lua' script from '"<<::arg()["lua-dns-script"]<<"': "<<e.what()<<endl;
+ _exit(99);
}
- }
- catch(std::exception &e) {
- L<<Logger::Error<<"Failed to load 'lua' script from '"<<::arg()["lua-dns-script"]<<"': "<<e.what()<<endl;
- _exit(99);
}
unsigned int ringsize=::arg().asNum("stats-ringbuffer-entries") / g_numWorkerThreads;
if(worker && (!g_weDistributeQueries || t_id == s_distributorThreadID)) { // if pdns distributes queries, only tid 0 should do this
if(listenOnTCP) {
if(TCPConnection::getCurrentConnections() > maxTcpClients) { // shutdown, too many connections
- for(tcpListenSockets_t::iterator i=g_tcpListenSockets.begin(); i != g_tcpListenSockets.end(); ++i)
- t_fdm->removeReadFD(*i);
+ for(const auto fd : g_tcpListenSockets[t_id]) {
+ t_fdm->removeReadFD(fd);
+ }
listenOnTCP=false;
}
}
else {
if(TCPConnection::getCurrentConnections() <= maxTcpClients) { // reenable
- for(tcpListenSockets_t::iterator i=g_tcpListenSockets.begin(); i != g_tcpListenSockets.end(); ++i)
- t_fdm->addReadFD(*i, handleNewTCPQuestion);
+ for(const auto fd : g_tcpListenSockets[t_id]) {
+ t_fdm->addReadFD(fd, handleNewTCPQuestion);
+ }
listenOnTCP=true;
}
}
::arg().set("trace","if we should output heaps of logging. set to 'fail' to only log failing domains")="off";
::arg().set("dnssec", "DNSSEC mode: off/process-no-validate (default)/process/log-fail/validate")="process-no-validate";
::arg().set("dnssec-log-bogus", "Log DNSSEC bogus validations")="no";
+ ::arg().set("signature-inception-skew", "Allow the signture inception to be off by this number of seconds")="0";
::arg().set("daemon","Operate as a daemon")="no";
::arg().setSwitch("write-pid","Write a PID file")="yes";
::arg().set("loglevel","Amount of logging. Higher is more. Do not set below 3")="6";
::arg().set("max-qperq", "Maximum outgoing queries per query")="50";
::arg().set("max-total-msec", "Maximum total wall-clock time per query in milliseconds, 0 for unlimited")="7000";
::arg().set("max-recursion-depth", "Maximum number of internal recursion calls per query, 0 for unlimited")="40";
+ ::arg().set("max-udp-queries-per-round", "Maximum number of UDP queries processed per recvmsg() round, before returning back to normal processing")="10000";
::arg().set("include-dir","Include *.conf files from this directory")="";
::arg().set("security-poll-suffix","Domain name from which to query security update notifications")="secpoll.powerdns.com.";