static bool g_gettagNeedsEDNSOptions{false};
static time_t g_statisticsInterval;
static bool g_useIncomingECS;
+static bool g_useKernelTimestamp;
std::atomic<uint32_t> g_maxCacheEntries, g_maxPacketCacheEntries;
#ifdef NOD_ENABLED
static bool g_nodEnabled;
uint16_t g_outgoingEDNSBufsize;
bool g_logRPZChanges{false};
+// Used in the Syncres to not throttle certain servers
+GlobalStateHolder<SuffixMatchNode> g_dontThrottleNames;
+GlobalStateHolder<NetmaskGroup> g_dontThrottleNetmasks;
+
#define LOCAL_NETS "127.0.0.0/8, 10.0.0.0/8, 100.64.0.0/10, 169.254.0.0/16, 192.168.0.0/16, 172.16.0.0/12, ::1/128, fc00::/7, fe80::/10"
#define LOCAL_NETS_INVERSE "!127.0.0.0/8, !10.0.0.0/8, !100.64.0.0/10, !169.254.0.0/16, !192.168.0.0/16, !172.16.0.0/12, !::1/128, !fc00::/7, !fe80::/10"
// Bad Nets taken from both:
boost::uuids::uuid d_uuid;
string d_requestorId;
string d_deviceId;
+ struct timeval d_kernelTimestamp{0,0};
#endif
std::string d_query;
std::vector<std::string> d_policyTags;
{
}
- typedef set<int> socks_t;
- socks_t d_socks;
-
// returning -2 means: temporary OS error (ie, out of files), -1 means error related to remote
int getSocket(const ComboAddress& toaddr, int* fd)
{
return -1;
}
- d_socks.insert(*fd);
d_numsocks++;
return 0;
}
- void returnSocket(int fd)
- {
- socks_t::iterator i=d_socks.find(fd);
- if(i==d_socks.end()) {
- throw PDNSException("Trying to return a socket (fd="+std::to_string(fd)+") not in the pool");
- }
- returnSocketLocked(i);
- }
-
// return a socket to the pool, or simply erase it
- void returnSocketLocked(socks_t::iterator& i)
+ void returnSocket(int fd)
{
- if(i==d_socks.end()) {
- throw PDNSException("Trying to return a socket not in the pool");
- }
try {
- t_fdm->removeReadFD(*i);
+ t_fdm->removeReadFD(fd);
}
- catch(FDMultiplexerException& e) {
+ catch(const FDMultiplexerException& e) {
// we sometimes return a socket that has not yet been assigned to t_fdm
}
+
try {
- closesocket(*i);
+ closesocket(fd);
}
catch(const PDNSException& e) {
g_log<<Logger::Error<<"Error closing returned UDP socket: "<<e.reason<<endl;
}
- d_socks.erase(i++);
--d_numsocks;
}
+private:
+
// returns -1 for errors which might go away, throws for ones that won't
static int makeClientSocket(int family)
{
pbMessage->setAppliedPolicyType(appliedPolicy.d_type);
}
pbMessage->setPolicyTags(dc->d_policyTags);
- pbMessage->setQueryTime(dc->d_now.tv_sec, dc->d_now.tv_usec);
+ if (g_useKernelTimestamp && dc->d_kernelTimestamp.tv_sec) {
+ pbMessage->setQueryTime(dc->d_kernelTimestamp.tv_sec, dc->d_kernelTimestamp.tv_usec);
+ }
+ else {
+ pbMessage->setQueryTime(dc->d_now.tv_sec, dc->d_now.tv_usec);
+ }
pbMessage->setRequestorId(dq.requestorId);
pbMessage->setDeviceId(dq.deviceId);
#ifdef NOD_ENABLED
else {
dc->d_tcpConnection->state=TCPConnection::BYTE0;
Utility::gettimeofday(&g_now, 0); // needs to be updated
- t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection);
- t_fdm->setReadTTD(dc->d_socket, g_now, g_tcpTimeout);
+ struct timeval ttd = g_now;
+ ttd.tv_sec += g_tcpTimeout;
+
+ t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection, &ttd);
}
}
}
std::shared_ptr<TCPConnection> tc = std::make_shared<TCPConnection>(newsock, addr);
tc->state=TCPConnection::BYTE0;
- t_fdm->addReadFD(tc->getFD(), handleRunningTCPQuestion, tc);
+ struct timeval ttd;
+ Utility::gettimeofday(&ttd, 0);
+ ttd.tv_sec += g_tcpTimeout;
- struct timeval now;
- Utility::gettimeofday(&now, 0);
- t_fdm->setReadTTD(tc->getFD(), now, g_tcpTimeout);
+ t_fdm->addReadFD(tc->getFD(), handleRunningTCPQuestion, tc, &ttd);
}
}
static string* doProcessUDPQuestion(const std::string& question, const ComboAddress& fromaddr, const ComboAddress& destaddr, struct timeval tv, int fd)
{
gettimeofday(&g_now, 0);
- struct timeval diff = g_now - tv;
- double delta=(diff.tv_sec*1000 + diff.tv_usec/1000.0);
+ if (tv.tv_sec) {
+ struct timeval diff = g_now - tv;
+ double delta=(diff.tv_sec*1000 + diff.tv_usec/1000.0);
- if(tv.tv_sec && delta > 1000.0) {
- g_stats.tooOldDrops++;
- return 0;
+ if(delta > 1000.0) {
+ g_stats.tooOldDrops++;
+ return nullptr;
+ }
}
++g_stats.qcounter;
const ComboAddress& requestor = requestorNM.getMaskedNetwork();
pbMessage->update(uniqueId, &requestor, &destination, false, dh->id);
pbMessage->setEDNSSubnet(ednssubnet.source, ednssubnet.source.isIpv4() ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
- pbMessage->setQueryTime(g_now.tv_sec, g_now.tv_usec);
+ if (g_useKernelTimestamp && tv.tv_sec) {
+ pbMessage->setQueryTime(tv.tv_sec, tv.tv_usec);
+ }
+ else {
+ pbMessage->setQueryTime(g_now.tv_sec, g_now.tv_usec);
+ }
pbMessage->setRequestorId(requestorId);
pbMessage->setDeviceId(deviceId);
protobufLogResponse(*pbMessage);
}
dc->d_requestorId = requestorId;
dc->d_deviceId = deviceId;
+ dc->d_kernelTimestamp = tv;
#endif
MT->makeThread(startDoResolve, (void*) dc.release()); // deletes dc
throw PDNSException("SO_REUSEPORT: "+stringerror());
}
#endif
- socklen_t socklen=sin.getSocklen();
+
+ if (sin.isIPv4()) {
+ try {
+ setSocketIgnorePMTU(fd);
+ }
+ catch(const std::exception& e) {
+ g_log<<Logger::Warning<<"Failed to set IP_MTU_DISCOVER on UDP server socket: "<<e.what()<<endl;
+ }
+ }
+
+ socklen_t socklen=sin.getSocklen();
if (::bind(fd, (struct sockaddr *)&sin, socklen)<0)
throw PDNSException("Resolver binding to server socket on port "+ std::to_string(st.port) +" for "+ st.host+": "+stringerror());
size_t idx = 0;
for (const auto& threadInfo : s_threadInfos) {
if(threadInfo.isWorker) {
- g_log<<Logger::Notice<<"Thread "<<idx<<" has been distributed "<<threadInfo.numberOfDistributedQueries<<" queries"<<endl;
+ g_log<<Logger::Notice<<"stats: thread "<<idx<<" has been distributed "<<threadInfo.numberOfDistributedQueries<<" queries"<<endl;
++idx;
}
}
}
try {
- if(s_running)
+ if(s_running) {
return;
+ }
s_running=true;
struct timeval now;
Utility::gettimeofday(&now, 0);
if(now.tv_sec - last_prune > (time_t)(5 + t_id)) {
- DTime dt;
- dt.setTimeval(now);
t_RC->doPrune(g_maxCacheEntries / g_numThreads); // this function is local to a thread, so fine anyhow
t_packetCache->doPruneTo(g_maxPacketCacheEntries / g_numWorkerThreads);
g_log<<Logger::Error<<"Unable to update Trust Anchors: "<<pe.reason<<endl;
}
}
- s_running=false;
}
+ s_running=false;
}
catch(PDNSException& ae)
{
static void makeThreadPipes()
{
+ auto pipeBufferSize = ::arg().asNum("distribution-pipe-buffer-size");
+ if (pipeBufferSize > 0) {
+ g_log<<Logger::Info<<"Resizing the buffer of the distribution pipe to "<<pipeBufferSize<<endl;
+ }
+
/* thread 0 is the handler / SNMP, we start at 1 */
for(unsigned int n = 1; n <= (g_numWorkerThreads + g_numDistributorThreads); ++n) {
auto& threadInfos = s_threadInfos.at(n);
threadInfos.pipes.readQueriesToThread = fd[0];
threadInfos.pipes.writeQueriesToThread = fd[1];
+ if (pipeBufferSize > 0) {
+ if (!setPipeBufferSize(threadInfos.pipes.writeQueriesToThread, pipeBufferSize)) {
+ g_log<<Logger::Warning<<"Error resizing the buffer of the distribution pipe for thread "<<n<<" to "<<pipeBufferSize<<": "<<strerror(errno)<<endl;
+ auto existingSize = getPipeBufferSize(threadInfos.pipes.writeQueriesToThread);
+ if (existingSize > 0) {
+ g_log<<Logger::Warning<<"The current size of the distribution pipe's buffer for thread "<<n<<" is "<<existingSize<<endl;
+ }
+ }
+ }
+
if (!setNonBlocking(threadInfos.pipes.writeQueriesToThread)) {
unixDie("Making pipe for inter-thread communications non-blocking");
}
template uint64_t broadcastAccFunction(const boost::function<uint64_t*()>& fun); // explicit instantiation
template vector<ComboAddress> broadcastAccFunction(const boost::function<vector<ComboAddress> *()>& fun); // explicit instantiation
template vector<pair<DNSName,uint16_t> > broadcastAccFunction(const boost::function<vector<pair<DNSName, uint16_t> > *()>& fun); // explicit instantiation
+template ThreadTimes broadcastAccFunction(const boost::function<ThreadTimes*()>& fun);
static void handleRCC(int fd, FDMultiplexer::funcparam_t& var)
{
g_statisticsInterval = ::arg().asNum("statistics-interval");
+ {
+ SuffixMatchNode dontThrottleNames;
+ vector<string> parts;
+ stringtok(parts, ::arg()["dont-throttle-names"]);
+ for (const auto &p : parts) {
+ dontThrottleNames.add(DNSName(p));
+ }
+ g_dontThrottleNames.setState(dontThrottleNames);
+
+ NetmaskGroup dontThrottleNetmasks;
+ stringtok(parts, ::arg()["dont-throttle-netmasks"]);
+ for (const auto &p : parts) {
+ dontThrottleNetmasks.addMask(Netmask(p));
+ }
+ g_dontThrottleNetmasks.setState(dontThrottleNetmasks);
+ }
+
s_balancingFactor = ::arg().asDouble("distribution-load-factor");
if (s_balancingFactor != 0.0 && s_balancingFactor < 1.0) {
s_balancingFactor = 0.0;
g_tcpMaxQueriesPerConn=::arg().asNum("max-tcp-queries-per-connection");
s_maxUDPQueriesPerRound=::arg().asNum("max-udp-queries-per-round");
+ g_useKernelTimestamp = ::arg().mustDo("protobuf-use-kernel-timestamp");
+
blacklistStats(StatComponent::API, ::arg()["stats-api-blacklist"]);
blacklistStats(StatComponent::Carbon, ::arg()["stats-carbon-blacklist"]);
blacklistStats(StatComponent::RecControl, ::arg()["stats-rec-control-blacklist"]);
::arg().set("max-tcp-clients","Maximum number of simultaneous TCP clients")="128";
::arg().set("server-down-max-fails","Maximum number of consecutive timeouts (and unreachables) to mark a server as down ( 0 => disabled )")="64";
::arg().set("server-down-throttle-time","Number of seconds to throttle all queries to a server after being marked as down")="60";
+ ::arg().set("dont-throttle-names", "Do not throttle nameservers with this name or suffix")="";
+ ::arg().set("dont-throttle-netmasks", "Do not throttle nameservers with this IP netmask")="";
::arg().set("hint-file", "If set, load root hints from this file")="";
::arg().set("max-cache-entries", "If set, maximum number of entries in the main cache")="1000000";
::arg().set("max-negative-ttl", "maximum number of seconds to keep a negative cached entry in memory")="3600";
::arg().set("max-total-msec", "Maximum total wall-clock time per query in milliseconds, 0 for unlimited")="7000";
::arg().set("max-recursion-depth", "Maximum number of internal recursion calls per query, 0 for unlimited")="40";
::arg().set("max-udp-queries-per-round", "Maximum number of UDP queries processed per recvmsg() round, before returning back to normal processing")="10000";
+ ::arg().set("protobuf-use-kernel-timestamp", "Compute the latency of queries in protobuf messages by using the timestamp set by the kernel when the query was received (when available)")="";
+ ::arg().set("distribution-pipe-buffer-size", "Size in bytes of the internal buffer of the pipe used by the distributor to pass incoming queries to a worker thread")="0";
::arg().set("include-dir","Include *.conf files from this directory")="";
::arg().set("security-poll-suffix","Domain name from which to query security update notifications")="secpoll.powerdns.com.";