deferredAdd_t deferredAdds;
struct ThreadPipeSet pipes;
std::thread thread;
+ MT_t* mt{nullptr};
+ uint64_t numberOfDistributedQueries{0};
/* handle the web server, carbon, statistics and the control channel */
bool isHandler{false};
/* accept incoming queries (and distributes them to the workers if pdns-distributes-queries is set) */
#endif
static uint16_t s_minUdpSourcePort;
static uint16_t s_maxUdpSourcePort;
+static double s_balancingFactor;
RecursorControlChannel s_rcc; // only active in the handler thread
RecursorStats g_stats;
if(!tries)
throw PDNSException("Resolver binding to local query client socket on "+sin.toString()+": "+stringerror());
+ setReceiveSocketErrors(ret, family);
setNonBlocking(ret);
return ret;
}
maxanswersize = min(static_cast<uint16_t>(edo.d_packetsize >= 512 ? edo.d_packetsize : 512), g_udpTruncationThreshold);
}
ednsOpts = edo.d_options;
- haveEDNS=true;
maxanswersize -= 11; // EDNS header size
for (const auto& o : edo.d_options) {
distributeAsyncFunction(data, boost::bind(doProcessUDPQuestion, data, fromaddr, dest, tv, fd));
}
else {
+ ++s_threadInfos[t_id].numberOfDistributedQueries;
doProcessUDPQuestion(data, fromaddr, dest, tv, fd);
}
}
g_log<<Logger::Notice<<"stats: " << broadcastAccFunction<uint64_t>(pleaseGetPacketCacheSize) <<
" packet cache entries, "<<(int)(100.0*broadcastAccFunction<uint64_t>(pleaseGetPacketCacheHits)/SyncRes::s_queries) << "% packet cache hits"<<endl;
+ size_t idx = 0;
+ for (const auto& threadInfo : s_threadInfos) {
+ if(threadInfo.isWorker) {
+ g_log<<Logger::Notice<<"Thread "<<idx<<" has been distributed "<<threadInfo.numberOfDistributedQueries<<" queries"<<endl;
+ ++idx;
+ }
+ }
+
time_t now = time(0);
if(lastOutputTime && lastQueryCount && now != lastOutputTime) {
g_log<<Logger::Notice<<"stats: "<< (SyncRes::s_queries - lastQueryCount) / (now - lastOutputTime) <<" qps (average over "<< (now - lastOutputTime) << " seconds)"<<endl;
static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg)
{
- const auto& targetInfo = s_threadInfos[target];
+ auto& targetInfo = s_threadInfos[target];
if(!targetInfo.isWorker) {
g_log<<Logger::Error<<"distributeAsyncFunction() tried to assign a query to a non-worker thread"<<endl;
exit(1);
}
}
+ ++targetInfo.numberOfDistributedQueries;
+
return true;
}
+static unsigned int getWorkerLoad(size_t workerIdx)
+{
+ const auto mt = s_threadInfos[/* skip handler */ 1 + g_numDistributorThreads + workerIdx].mt;
+ if (mt != nullptr) {
+ return mt->numProcesses();
+ }
+ return 0;
+}
+
+static unsigned int selectWorker(unsigned int hash)
+{
+ if (s_balancingFactor == 0) {
+ return /* skip handler */ 1 + g_numDistributorThreads + (hash % g_numWorkerThreads);
+ }
+
+ /* we start with one, representing the query we are currently handling */
+ double currentLoad = 1;
+ std::vector<unsigned int> load(g_numWorkerThreads);
+ for (size_t idx = 0; idx < g_numWorkerThreads; idx++) {
+ load[idx] = getWorkerLoad(idx);
+ currentLoad += load[idx];
+ // cerr<<"load for worker "<<idx<<" is "<<load[idx]<<endl;
+ }
+
+ double targetLoad = (currentLoad / g_numWorkerThreads) * s_balancingFactor;
+ // cerr<<"total load is "<<currentLoad<<", number of workers is "<<g_numWorkerThreads<<", target load is "<<targetLoad<<endl;
+
+ unsigned int worker = hash % g_numWorkerThreads;
+ /* at least one server has to be at or below the average load */
+ if (load[worker] > targetLoad) {
+ ++g_stats.rebalancedQueries;
+ do {
+ // cerr<<"worker "<<worker<<" is above the target load, selecting another one"<<endl;
+ worker = (worker + 1) % g_numWorkerThreads;
+ }
+ while(load[worker] > targetLoad);
+ }
+
+ return /* skip handler */ 1 + g_numDistributorThreads + worker;
+}
+
// This function is only called by the distributor threads, when pdns-distributes-queries is set
void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
{
}
unsigned int hash = hashQuestion(packet.c_str(), packet.length(), g_disthashseed);
- unsigned int target = /* skip handler */ 1 + g_numDistributorThreads + (hash % g_numWorkerThreads);
+ unsigned int target = selectWorker(hash);
ThreadMSG* tmsg = new ThreadMSG();
tmsg->func = func;
static void handleRCC(int fd, FDMultiplexer::funcparam_t& var)
{
- string remote;
- string msg=s_rcc.recv(&remote);
- RecursorControlParser rcp;
- RecursorControlParser::func_t* command;
+ try {
+ string remote;
+ string msg=s_rcc.recv(&remote);
+ RecursorControlParser rcp;
+ RecursorControlParser::func_t* command;
- string answer=rcp.getAnswer(msg, &command);
+ string answer=rcp.getAnswer(msg, &command);
- // If we are inside a chroot, we need to strip
- if (!arg()["chroot"].empty()) {
- size_t len = arg()["chroot"].length();
- remote = remote.substr(len);
- }
+ // If we are inside a chroot, we need to strip
+ if (!arg()["chroot"].empty()) {
+ size_t len = arg()["chroot"].length();
+ remote = remote.substr(len);
+ }
- try {
s_rcc.send(answer, &remote);
command();
}
- catch(std::exception& e) {
+ catch(const std::exception& e) {
g_log<<Logger::Error<<"Error dealing with control socket request: "<<e.what()<<endl;
}
- catch(PDNSException& ae) {
+ catch(const PDNSException& ae) {
g_log<<Logger::Error<<"Error dealing with control socket request: "<<ae.reason<<endl;
}
}
}
SyncRes::s_minimumTTL = ::arg().asNum("minimum-ttl-override");
+ SyncRes::s_minimumECSTTL = ::arg().asNum("ecs-minimum-ttl-override");
SyncRes::s_nopacketcache = ::arg().mustDo("disable-packetcache");
SyncRes::s_ecsipv4limit = ::arg().asNum("ecs-ipv4-bits");
SyncRes::s_ecsipv6limit = ::arg().asNum("ecs-ipv6-bits");
+ SyncRes::clearECSStats();
+ SyncRes::s_ecsipv4cachelimit = ::arg().asNum("ecs-ipv4-cache-bits");
+ SyncRes::s_ecsipv6cachelimit = ::arg().asNum("ecs-ipv6-cache-bits");
+ SyncRes::s_ecscachelimitttl = ::arg().asNum("ecs-cache-limit-ttl");
if (!::arg().isEmpty("ecs-scope-zero-address")) {
ComboAddress scopeZero(::arg()["ecs-scope-zero-address"]);
g_statisticsInterval = ::arg().asNum("statistics-interval");
+ s_balancingFactor = ::arg().asDouble("distribution-load-factor");
+ if (s_balancingFactor != 0.0 && s_balancingFactor < 1.0) {
+ s_balancingFactor = 0.0;
+ g_log<<Logger::Warning<<"Asked to run with a distribution-load-factor below 1.0, disabling it instead"<<endl;
+ }
+
#ifdef SO_REUSEPORT
g_reusePort = ::arg().mustDo("reuseport");
#endif
g_tcpMaxQueriesPerConn=::arg().asNum("max-tcp-queries-per-connection");
s_maxUDPQueriesPerRound=::arg().asNum("max-udp-queries-per-round");
+ blacklistStats(StatComponent::API, ::arg()["stats-api-blacklist"]);
+ blacklistStats(StatComponent::Carbon, ::arg()["stats-carbon-blacklist"]);
+ blacklistStats(StatComponent::RecControl, ::arg()["stats-rec-control-blacklist"]);
+ blacklistStats(StatComponent::SNMP, ::arg()["stats-snmp-blacklist"]);
+
if (::arg().mustDo("snmp-agent")) {
g_snmpAgent = std::make_shared<RecursorSNMPAgent>("recursor", ::arg()["snmp-master-socket"]);
g_snmpAgent->run();
}
MT=std::unique_ptr<MTasker<PacketID,string> >(new MTasker<PacketID,string>(::arg().asNum("stack-size")));
+ threadInfo.mt = MT.get();
#ifdef HAVE_PROTOBUF
/* start protobuf export threads if needed */
g_argc = argc;
g_argv = argv;
g_stats.startupTime=time(0);
+ Utility::srandom();
versionSetProduct(ProductRecursor);
reportBasicTypes();
reportOtherTypes();
::arg().set("webserver-port", "Port of webserver to listen on") = "8082";
::arg().set("webserver-password", "Password required for accessing the webserver") = "";
::arg().set("webserver-allow-from","Webserver access is only allowed from these subnets")="127.0.0.1,::1";
+ ::arg().set("webserver-loglevel", "Amount of logging in the webserver (none, normal, detailed)") = "normal";
::arg().set("carbon-ourname", "If set, overrides our reported hostname for carbon stats")="";
::arg().set("carbon-server", "If set, send metrics in carbon (graphite) format to this server IP address")="";
::arg().set("carbon-interval", "Number of seconds between carbon (graphite) updates")="30";
::arg().set("latency-statistic-size","Number of latency values to calculate the qa-latency average")="10000";
::arg().setSwitch( "disable-packetcache", "Disable packetcache" )= "no";
::arg().set("ecs-ipv4-bits", "Number of bits of IPv4 address to pass for EDNS Client Subnet")="24";
+ ::arg().set("ecs-ipv4-cache-bits", "Maximum number of bits of IPv4 mask to cache ECS response")="24";
::arg().set("ecs-ipv6-bits", "Number of bits of IPv6 address to pass for EDNS Client Subnet")="56";
+ ::arg().set("ecs-ipv6-cache-bits", "Maximum number of bits of IPv6 mask to cache ECS response")="56";
+ ::arg().set("ecs-minimum-ttl-override", "Set under adverse conditions, a minimum TTL for records in ECS-specific answers")="0";
+ ::arg().set("ecs-cache-limit-ttl", "Minimum TTL to cache ECS response")="0";
::arg().set("edns-subnet-whitelist", "List of netmasks and domains that we should enable EDNS subnet for")="";
::arg().set("ecs-add-for", "List of client netmasks for which EDNS Client Subnet will be added")="0.0.0.0/0, ::/0, " LOCAL_NETS_INVERSE;
::arg().set("ecs-scope-zero-address", "Address to send to whitelisted authoritative servers for incoming queries with ECS prefix-length source of 0")="";
::arg().setSwitch("snmp-agent", "If set, register as an SNMP agent")="no";
::arg().set("snmp-master-socket", "If set and snmp-agent is set, the socket to use to register to the SNMP master")="";
+ std::string defaultBlacklistedStats = "cache-bytes, packetcache-bytes, special-memory-usage";
+ for (size_t idx = 0; idx < 32; idx++) {
+ defaultBlacklistedStats += ", ecs-v4-response-bits-" + std::to_string(idx + 1);
+ }
+ for (size_t idx = 0; idx < 128; idx++) {
+ defaultBlacklistedStats += ", ecs-v6-response-bits-" + std::to_string(idx + 1);
+ }
+ ::arg().set("stats-api-blacklist", "List of statistics that are disabled when retrieving the complete list of statistics via the API")=defaultBlacklistedStats;
+ ::arg().set("stats-carbon-blacklist", "List of statistics that are prevented from being exported via Carbon")=defaultBlacklistedStats;
+ ::arg().set("stats-rec-control-blacklist", "List of statistics that are prevented from being exported via rec_control get-all")=defaultBlacklistedStats;
+ ::arg().set("stats-snmp-blacklist", "List of statistics that are prevented from being exported via SNMP")=defaultBlacklistedStats;
+
::arg().set("tcp-fast-open", "Enable TCP Fast Open support on the listening sockets, using the supplied numerical value as the queue size")="0";
::arg().set("nsec3-max-iterations", "Maximum number of iterations allowed for an NSEC3 record")="2500";
::arg().set("udp-source-port-avoid", "List of comma separated UDP port number to avoid")="11211";
::arg().set("rng", "Specify random number generator to use. Valid values are auto,sodium,openssl,getrandom,arc4random,urandom.")="auto";
::arg().set("public-suffix-list-file", "Path to the Public Suffix List file, if any")="";
+ ::arg().set("distribution-load-factor", "The load factor used when PowerDNS is distributing queries to worker threads")="0.0";
#ifdef NOD_ENABLED
::arg().set("new-domain-tracking", "Track newly observed domains (i.e. never seen before).")="no";
::arg().set("new-domain-log", "Log newly observed domains.")="yes";