From: Remi Gacogne Date: Fri, 13 Dec 2019 09:34:06 +0000 (+0100) Subject: dnsdist: Add unit tests for the load-balancing policies X-Git-Tag: auth-4.3.0-beta2~1^2~14 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=b6e2689548ec85cb619391bce7edc8bd643ad697;p=thirdparty%2Fpdns.git dnsdist: Add unit tests for the load-balancing policies --- diff --git a/pdns/dnsdist-lbpolicies.hh b/pdns/dnsdist-lbpolicies.hh new file mode 100644 index 0000000000..ce763860ad --- /dev/null +++ b/pdns/dnsdist-lbpolicies.hh @@ -0,0 +1,75 @@ +/* + * This file is part of PowerDNS or dnsdist. + * Copyright -- PowerDNS.COM B.V. and its contributors + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. + * + * In addition, for the avoidance of any doubt, permission is granted to + * link this program with OpenSSL and to (re)distribute the binaries + * produced as the result of such linking. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#pragma once + +struct dnsdist_ffi_servers_list_t; +struct dnsdist_ffi_server_t; +struct dnsdist_ffi_dnsquestion_t; + +struct DownstreamState; + +struct ServerPolicy +{ + template using NumberedVector = std::vector >; + using NumberedServerVector = NumberedVector>; + typedef std::function(const NumberedServerVector& servers, const DNSQuestion*)> policyfunc_t; + typedef std::function ffipolicyfunc_t; + + ServerPolicy(const std::string& name_, policyfunc_t policy_, bool isLua_): name(name_), policy(policy_), isLua(isLua_) + { + } + ServerPolicy(const std::string& name_, ffipolicyfunc_t policy_): name(name_), ffipolicy(policy_), isLua(true), isFFI(true) + { + } + ServerPolicy() + { + } + + string name; + policyfunc_t policy; + ffipolicyfunc_t ffipolicy; + bool isLua{false}; + bool isFFI{false}; + + std::string toString() const { + return string("ServerPolicy") + (isLua ? " (Lua)" : "") + " \"" + name + "\""; + } +}; + +struct ServerPool; + +using pools_t=map>; +std::shared_ptr getPool(const pools_t& pools, const std::string& poolName); +std::shared_ptr createPoolIfNotExists(pools_t& pools, const string& poolName); +void setPoolPolicy(pools_t& pools, const string& poolName, std::shared_ptr policy); +void addServerToPool(pools_t& pools, const string& poolName, std::shared_ptr server); +void removeServerFromPool(pools_t& pools, const string& poolName, std::shared_ptr server); + +ServerPolicy::NumberedServerVector getDownstreamCandidates(const map>& pools, const std::string& poolName); + +std::shared_ptr firstAvailable(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq); + +std::shared_ptr leastOutstanding(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq); +std::shared_ptr wrandom(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq); +std::shared_ptr whashed(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq); +std::shared_ptr chashed(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq); +std::shared_ptr roundrobin(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq); diff --git a/pdns/dnsdist-lua.cc b/pdns/dnsdist-lua.cc index 63972b50f2..cef2742d45 100644 --- a/pdns/dnsdist-lua.cc +++ b/pdns/dnsdist-lua.cc @@ -326,6 +326,9 @@ void setupLuaConfig(bool client, bool configCheck) // create but don't connect the socket in client or check-config modes ret=std::make_shared(serverAddr, sourceAddr, sourceItf, sourceItfName, numberOfSockets, !(client || configCheck)); + if (!(client || configCheck)) { + infolog("Added downstream server %s", serverAddr.toStringWithPort()); + } if(vars.count("qps")) { int qpsVal=std::stoi(boost::get(vars["qps"])); diff --git a/pdns/dnsdist.cc b/pdns/dnsdist.cc index 95cd7ef10d..c27c132dec 100644 --- a/pdns/dnsdist.cc +++ b/pdns/dnsdist.cc @@ -142,7 +142,6 @@ bool g_servFailOnNoPolicy{false}; bool g_truncateTC{false}; bool g_fixupCase{false}; bool g_preserveTrailingData{false}; -bool g_roundrobinFailOnNoServer{false}; std::set g_capabilitiesToRetain; @@ -708,353 +707,10 @@ catch(...) errlog("UDP responder thread died because of an exception: %s", "unknown"); } -bool DownstreamState::reconnect() -{ - std::unique_lock tl(connectLock, std::try_to_lock); - if (!tl.owns_lock()) { - /* we are already reconnecting */ - return false; - } - - connected = false; - for (auto& fd : sockets) { - if (fd != -1) { - if (sockets.size() > 1) { - std::lock_guard lock(socketsLock); - mplexer->removeReadFD(fd); - } - /* shutdown() is needed to wake up recv() in the responderThread */ - shutdown(fd, SHUT_RDWR); - close(fd); - fd = -1; - } - if (!IsAnyAddress(remote)) { - fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0); - if (!IsAnyAddress(sourceAddr)) { - SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1); - if (!sourceItfName.empty()) { -#ifdef SO_BINDTODEVICE - int res = setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, sourceItfName.c_str(), sourceItfName.length()); - if (res != 0) { - infolog("Error setting up the interface on backend socket '%s': %s", remote.toStringWithPort(), stringerror()); - } -#endif - } - - SBind(fd, sourceAddr); - } - try { - SConnect(fd, remote); - if (sockets.size() > 1) { - std::lock_guard lock(socketsLock); - mplexer->addReadFD(fd, [](int, boost::any) {}); - } - connected = true; - } - catch(const std::runtime_error& error) { - infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what()); - connected = false; - break; - } - } - } - - /* if at least one (re-)connection failed, close all sockets */ - if (!connected) { - for (auto& fd : sockets) { - if (fd != -1) { - if (sockets.size() > 1) { - std::lock_guard lock(socketsLock); - mplexer->removeReadFD(fd); - } - /* shutdown() is needed to wake up recv() in the responderThread */ - shutdown(fd, SHUT_RDWR); - close(fd); - fd = -1; - } - } - } - - return connected; -} -void DownstreamState::hash() -{ - vinfolog("Computing hashes for id=%s and weight=%d", id, weight); - auto w = weight; - WriteLock wl(&d_lock); - hashes.clear(); - while (w > 0) { - std::string uuid = boost::str(boost::format("%s-%d") % id % w); - unsigned int wshash = burtleCI((const unsigned char*)uuid.c_str(), uuid.size(), g_hashperturb); - hashes.insert(wshash); - --w; - } -} - -void DownstreamState::setId(const boost::uuids::uuid& newId) -{ - id = newId; - // compute hashes only if already done - if (!hashes.empty()) { - hash(); - } -} - -void DownstreamState::setWeight(int newWeight) -{ - if (newWeight < 1) { - errlog("Error setting server's weight: downstream weight value must be greater than 0."); - return ; - } - weight = newWeight; - if (!hashes.empty()) { - hash(); - } -} - -DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, const std::string& sourceItfName_, size_t numberOfSockets, bool connect=true): sourceItfName(sourceItfName_), remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_), name(remote_.toStringWithPort()), nameWithAddr(remote_.toStringWithPort()) -{ - pthread_rwlock_init(&d_lock, nullptr); - id = getUniqueID(); - threadStarted.clear(); - - mplexer = std::unique_ptr(FDMultiplexer::getMultiplexerSilent()); - - sockets.resize(numberOfSockets); - for (auto& fd : sockets) { - fd = -1; - } - - if (connect && !IsAnyAddress(remote)) { - reconnect(); - idStates.resize(g_maxOutstanding); - sw.start(); - infolog("Added downstream server %s", remote.toStringWithPort()); - } - -} - std::mutex g_luamutex; LuaContext g_lua; - -GlobalStateHolder g_policy; - -shared_ptr firstAvailable(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq) -{ - for(auto& d : servers) { - if(d.second->isUp() && d.second->qps.check()) - return d.second; - } - return leastOutstanding(servers, dq); -} - -// get server with least outstanding queries, and within those, with the lowest order, and within those: the fastest -shared_ptr leastOutstanding(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq) -{ - if (servers.size() == 1 && servers[0].second->isUp()) { - return servers[0].second; - } - - vector, shared_ptr>> poss; - /* so you might wonder, why do we go through this trouble? The data on which we sort could change during the sort, - which would suck royally and could even lead to crashes. So first we snapshot on what we sort, and then we sort */ - poss.reserve(servers.size()); - for(auto& d : servers) { - if(d.second->isUp()) { - poss.push_back({make_tuple(d.second->outstanding.load(), d.second->order, d.second->latencyUsec), d.second}); - } - } - if(poss.empty()) - return shared_ptr(); - nth_element(poss.begin(), poss.begin(), poss.end(), [](const decltype(poss)::value_type& a, const decltype(poss)::value_type& b) { return a.first < b.first; }); - return poss.begin()->second; -} - -shared_ptr valrandom(unsigned int val, const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq) -{ - vector>> poss; - int sum = 0; - int max = std::numeric_limits::max(); - - for(auto& d : servers) { // w=1, w=10 -> 1, 11 - if(d.second->isUp()) { - // Don't overflow sum when adding high weights - if(d.second->weight > max - sum) { - sum = max; - } else { - sum += d.second->weight; - } - - poss.push_back({sum, d.second}); - } - } - - // Catch poss & sum are empty to avoid SIGFPE - if(poss.empty()) - return shared_ptr(); - - int r = val % sum; - auto p = upper_bound(poss.begin(), poss.end(),r, [](int r_, const decltype(poss)::value_type& a) { return r_ < a.first;}); - if(p==poss.end()) - return shared_ptr(); - return p->second; -} - -shared_ptr wrandom(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq) -{ - return valrandom(random(), servers, dq); -} - -uint32_t g_hashperturb; -double g_consistentHashBalancingFactor = 0; -shared_ptr whashed(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq) -{ - return valrandom(dq->qname->hash(g_hashperturb), servers, dq); -} - -shared_ptr chashed(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq) -{ - unsigned int qhash = dq->qname->hash(g_hashperturb); - unsigned int sel = std::numeric_limits::max(); - unsigned int min = std::numeric_limits::max(); - shared_ptr ret = nullptr, first = nullptr; - - double targetLoad = std::numeric_limits::max(); - if (g_consistentHashBalancingFactor > 0) { - /* we start with one, representing the query we are currently handling */ - double currentLoad = 1; - for (const auto& pair : servers) { - currentLoad += pair.second->outstanding; - } - targetLoad = (currentLoad / servers.size()) * g_consistentHashBalancingFactor; - } - - for (const auto& d: servers) { - if (d.second->isUp() && d.second->outstanding <= targetLoad) { - // make sure hashes have been computed - if (d.second->hashes.empty()) { - d.second->hash(); - } - { - ReadLock rl(&(d.second->d_lock)); - const auto& server = d.second; - // we want to keep track of the last hash - if (min > *(server->hashes.begin())) { - min = *(server->hashes.begin()); - first = server; - } - - auto hash_it = server->hashes.lower_bound(qhash); - if (hash_it != server->hashes.end()) { - if (*hash_it < sel) { - sel = *hash_it; - ret = server; - } - } - } - } - } - if (ret != nullptr) { - return ret; - } - if (first != nullptr) { - return first; - } - return shared_ptr(); -} - -shared_ptr roundrobin(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq) -{ - ServerPolicy::NumberedServerVector poss; - - for(auto& d : servers) { - if(d.second->isUp()) { - poss.push_back(d); - } - } - - const auto *res=&poss; - if(poss.empty() && !g_roundrobinFailOnNoServer) - res = &servers; - - if(res->empty()) - return shared_ptr(); - - static unsigned int counter; - - return (*res)[(counter++) % res->size()].second; -} - ComboAddress g_serverControl{"127.0.0.1:5199"}; -std::shared_ptr createPoolIfNotExists(pools_t& pools, const string& poolName) -{ - std::shared_ptr pool; - pools_t::iterator it = pools.find(poolName); - if (it != pools.end()) { - pool = it->second; - } - else { - if (!poolName.empty()) - vinfolog("Creating pool %s", poolName); - pool = std::make_shared(); - pools.insert(std::pair >(poolName, pool)); - } - return pool; -} - -void setPoolPolicy(pools_t& pools, const string& poolName, std::shared_ptr policy) -{ - std::shared_ptr pool = createPoolIfNotExists(pools, poolName); - if (!poolName.empty()) { - vinfolog("Setting pool %s server selection policy to %s", poolName, policy->name); - } else { - vinfolog("Setting default pool server selection policy to %s", policy->name); - } - pool->policy = policy; -} - -void addServerToPool(pools_t& pools, const string& poolName, std::shared_ptr server) -{ - std::shared_ptr pool = createPoolIfNotExists(pools, poolName); - if (!poolName.empty()) { - vinfolog("Adding server to pool %s", poolName); - } else { - vinfolog("Adding server to default pool"); - } - pool->addServer(server); -} - -void removeServerFromPool(pools_t& pools, const string& poolName, std::shared_ptr server) -{ - std::shared_ptr pool = getPool(pools, poolName); - - if (!poolName.empty()) { - vinfolog("Removing server from pool %s", poolName); - } - else { - vinfolog("Removing server from default pool"); - } - - pool->removeServer(server); -} - -std::shared_ptr getPool(const pools_t& pools, const std::string& poolName) -{ - pools_t::const_iterator it = pools.find(poolName); - - if (it == pools.end()) { - throw std::out_of_range("No pool named " + poolName); - } - - return it->second; -} - -ServerPolicy::NumberedServerVector getDownstreamCandidates(const pools_t& pools, const std::string& poolName) -{ - std::shared_ptr pool = getPool(pools, poolName); - return pool->getServers(); -} static void spoofResponseFromString(DNSQuestion& dq, const string& spoofContent, bool raw) { diff --git a/pdns/dnsdist.hh b/pdns/dnsdist.hh index 7b469983dc..ce27a4e0fe 100644 --- a/pdns/dnsdist.hh +++ b/pdns/dnsdist.hh @@ -39,6 +39,7 @@ #include "dnscrypt.hh" #include "dnsdist-cache.hh" #include "dnsdist-dynbpf.hh" +#include "dnsdist-lbpolicies.hh" #include "dnsname.hh" #include "doh.hh" #include "ednsoptions.hh" @@ -907,38 +908,6 @@ public: mutable std::atomic d_matches{0}; }; -struct dnsdist_ffi_servers_list_t; -struct dnsdist_ffi_server_t; -struct dnsdist_ffi_dnsquestion_t; - -struct ServerPolicy -{ - template using NumberedVector = std::vector >; - using NumberedServerVector = NumberedVector>; - typedef std::function(const NumberedServerVector& servers, const DNSQuestion*)> policyfunc_t; - typedef std::function ffipolicyfunc_t; - - ServerPolicy(const std::string& name_, policyfunc_t policy_, bool isLua_): name(name_), policy(policy_), isLua(isLua_) - { - } - ServerPolicy(const std::string& name_, ffipolicyfunc_t policy_): name(name_), ffipolicy(policy_), isLua(true), isFFI(true) - { - } - ServerPolicy() - { - } - - string name; - policyfunc_t policy; - ffipolicyfunc_t ffipolicy; - bool isLua{false}; - bool isFFI{false}; - - std::string toString() const { - return string("ServerPolicy") + (isLua ? " (Lua)" : "") + " \"" + name + "\""; - } -}; - struct ServerPool { ServerPool() @@ -1030,10 +999,6 @@ private: pthread_rwlock_t d_lock; bool d_useECS{false}; }; -using pools_t=map>; -void setPoolPolicy(pools_t& pools, const string& poolName, std::shared_ptr policy); -void addServerToPool(pools_t& pools, const string& poolName, std::shared_ptr server); -void removeServerFromPool(pools_t& pools, const string& poolName, std::shared_ptr server); struct CarbonConfig { @@ -1136,17 +1101,7 @@ struct LocalHolders struct dnsheader; void controlThread(int fd, ComboAddress local); -std::shared_ptr getPool(const pools_t& pools, const std::string& poolName); -std::shared_ptr createPoolIfNotExists(pools_t& pools, const string& poolName); -ServerPolicy::NumberedServerVector getDownstreamCandidates(const pools_t& pools, const std::string& poolName); - -std::shared_ptr firstAvailable(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq); - -std::shared_ptr leastOutstanding(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq); -std::shared_ptr wrandom(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq); -std::shared_ptr whashed(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq); -std::shared_ptr chashed(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq); -std::shared_ptr roundrobin(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq); +vector> setupLua(bool client, const std::string& config); struct WebserverConfig { diff --git a/pdns/dnsdistdist/Makefile.am b/pdns/dnsdistdist/Makefile.am index 6e49348621..1a48b590ad 100644 --- a/pdns/dnsdistdist/Makefile.am +++ b/pdns/dnsdistdist/Makefile.am @@ -124,6 +124,7 @@ dnsdist_SOURCES = \ dns.cc dns.hh \ dnscrypt.cc dnscrypt.hh \ dnsdist.cc dnsdist.hh \ + dnsdist-backend.cc \ dnsdist-dynbpf.cc dnsdist-dynbpf.hh \ dnsdist-cache.cc dnsdist-cache.hh \ dnsdist-carbon.cc \ @@ -134,6 +135,7 @@ dnsdist_SOURCES = \ dnsdist-healthchecks.cc dnsdist-healthchecks.hh \ dnsdist-idstate.cc \ dnsdist-kvs.hh dnsdist-kvs.cc \ + dnsdist-lbpolicies.cc dnsdist-lbpolicies.hh \ dnsdist-lua.hh dnsdist-lua.cc \ dnsdist-lua-actions.cc \ dnsdist-lua-bindings.cc \ @@ -206,6 +208,7 @@ testrunner_SOURCES = \ test-dnsdist_cc.cc \ test-dnsdistdynblocks_hh.cc \ test-dnsdistkvs_cc.cc \ + test-dnsdistlbpolicies_cc.cc \ test-dnsdistpacketcache_cc.cc \ test-dnsdistrings_cc.cc \ test-dnsdistrules_cc.cc \ @@ -215,10 +218,12 @@ testrunner_SOURCES = \ cachecleaner.hh \ circular_buffer.hh \ dnsdist.hh \ + dnsdist-backend.cc \ dnsdist-cache.cc dnsdist-cache.hh \ dnsdist-dynblocks.cc dnsdist-dynblocks.hh \ dnsdist-ecs.cc dnsdist-ecs.hh \ dnsdist-kvs.cc dnsdist-kvs.hh \ + dnsdist-lbpolicies.cc dnsdist-lbpolicies.hh \ dnsdist-rings.hh \ dnsdist-xpf.cc dnsdist-xpf.hh \ dnscrypt.cc dnscrypt.hh \ @@ -243,6 +248,7 @@ testrunner_SOURCES = \ statnode.cc statnode.hh \ threadname.hh threadname.cc \ testrunner.cc \ + uuid-utils.hh uuid-utils.cc \ xpf.cc xpf.hh dnsdist_LDFLAGS = \ diff --git a/pdns/dnsdistdist/dnsdist-backend.cc b/pdns/dnsdistdist/dnsdist-backend.cc new file mode 100644 index 0000000000..af9439dc08 --- /dev/null +++ b/pdns/dnsdistdist/dnsdist-backend.cc @@ -0,0 +1,149 @@ +/* + * This file is part of PowerDNS or dnsdist. + * Copyright -- PowerDNS.COM B.V. and its contributors + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. + * + * In addition, for the avoidance of any doubt, permission is granted to + * link this program with OpenSSL and to (re)distribute the binaries + * produced as the result of such linking. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "dnsdist.hh" +#include "dolog.hh" + +bool DownstreamState::reconnect() +{ + std::unique_lock tl(connectLock, std::try_to_lock); + if (!tl.owns_lock()) { + /* we are already reconnecting */ + return false; + } + + connected = false; + for (auto& fd : sockets) { + if (fd != -1) { + if (sockets.size() > 1) { + std::lock_guard lock(socketsLock); + mplexer->removeReadFD(fd); + } + /* shutdown() is needed to wake up recv() in the responderThread */ + shutdown(fd, SHUT_RDWR); + close(fd); + fd = -1; + } + if (!IsAnyAddress(remote)) { + fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0); + if (!IsAnyAddress(sourceAddr)) { + SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1); + if (!sourceItfName.empty()) { +#ifdef SO_BINDTODEVICE + int res = setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, sourceItfName.c_str(), sourceItfName.length()); + if (res != 0) { + infolog("Error setting up the interface on backend socket '%s': %s", remote.toStringWithPort(), stringerror()); + } +#endif + } + + SBind(fd, sourceAddr); + } + try { + SConnect(fd, remote); + if (sockets.size() > 1) { + std::lock_guard lock(socketsLock); + mplexer->addReadFD(fd, [](int, boost::any) {}); + } + connected = true; + } + catch(const std::runtime_error& error) { + infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what()); + connected = false; + break; + } + } + } + + /* if at least one (re-)connection failed, close all sockets */ + if (!connected) { + for (auto& fd : sockets) { + if (fd != -1) { + if (sockets.size() > 1) { + std::lock_guard lock(socketsLock); + mplexer->removeReadFD(fd); + } + /* shutdown() is needed to wake up recv() in the responderThread */ + shutdown(fd, SHUT_RDWR); + close(fd); + fd = -1; + } + } + } + + return connected; +} +void DownstreamState::hash() +{ + vinfolog("Computing hashes for id=%s and weight=%d", id, weight); + auto w = weight; + WriteLock wl(&d_lock); + hashes.clear(); + while (w > 0) { + std::string uuid = boost::str(boost::format("%s-%d") % id % w); + unsigned int wshash = burtleCI((const unsigned char*)uuid.c_str(), uuid.size(), g_hashperturb); + hashes.insert(wshash); + --w; + } +} + +void DownstreamState::setId(const boost::uuids::uuid& newId) +{ + id = newId; + // compute hashes only if already done + if (!hashes.empty()) { + hash(); + } +} + +void DownstreamState::setWeight(int newWeight) +{ + if (newWeight < 1) { + errlog("Error setting server's weight: downstream weight value must be greater than 0."); + return ; + } + weight = newWeight; + if (!hashes.empty()) { + hash(); + } +} + +DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, const std::string& sourceItfName_, size_t numberOfSockets, bool connect=true): sourceItfName(sourceItfName_), remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_), name(remote_.toStringWithPort()), nameWithAddr(remote_.toStringWithPort()) +{ + pthread_rwlock_init(&d_lock, nullptr); + id = getUniqueID(); + threadStarted.clear(); + + mplexer = std::unique_ptr(FDMultiplexer::getMultiplexerSilent()); + + sockets.resize(numberOfSockets); + for (auto& fd : sockets) { + fd = -1; + } + + if (connect && !IsAnyAddress(remote)) { + reconnect(); + idStates.resize(g_maxOutstanding); + sw.start(); + } + +} diff --git a/pdns/dnsdistdist/dnsdist-lbpolicies.cc b/pdns/dnsdistdist/dnsdist-lbpolicies.cc new file mode 100644 index 0000000000..acb8f27780 --- /dev/null +++ b/pdns/dnsdistdist/dnsdist-lbpolicies.cc @@ -0,0 +1,243 @@ +/* + * This file is part of PowerDNS or dnsdist. + * Copyright -- PowerDNS.COM B.V. and its contributors + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. + * + * In addition, for the avoidance of any doubt, permission is granted to + * link this program with OpenSSL and to (re)distribute the binaries + * produced as the result of such linking. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "dnsdist.hh" +#include "dnsdist-lbpolicies.hh" +#include "dolog.hh" + +GlobalStateHolder g_policy; +bool g_roundrobinFailOnNoServer{false}; + +// get server with least outstanding queries, and within those, with the lowest order, and within those: the fastest +shared_ptr leastOutstanding(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq) +{ + if (servers.size() == 1 && servers[0].second->isUp()) { + return servers[0].second; + } + + vector, shared_ptr>> poss; + /* so you might wonder, why do we go through this trouble? The data on which we sort could change during the sort, + which would suck royally and could even lead to crashes. So first we snapshot on what we sort, and then we sort */ + poss.reserve(servers.size()); + for(auto& d : servers) { + if(d.second->isUp()) { + poss.push_back({make_tuple(d.second->outstanding.load(), d.second->order, d.second->latencyUsec), d.second}); + } + } + if(poss.empty()) + return shared_ptr(); + nth_element(poss.begin(), poss.begin(), poss.end(), [](const decltype(poss)::value_type& a, const decltype(poss)::value_type& b) { return a.first < b.first; }); + return poss.begin()->second; +} + +shared_ptr firstAvailable(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq) +{ + for(auto& d : servers) { + if(d.second->isUp() && d.second->qps.check()) + return d.second; + } + return leastOutstanding(servers, dq); +} + +static shared_ptr valrandom(unsigned int val, const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq) +{ + vector>> poss; + int sum = 0; + int max = std::numeric_limits::max(); + + for(auto& d : servers) { // w=1, w=10 -> 1, 11 + if(d.second->isUp()) { + // Don't overflow sum when adding high weights + if(d.second->weight > max - sum) { + sum = max; + } else { + sum += d.second->weight; + } + + poss.push_back({sum, d.second}); + } + } + + // Catch poss & sum are empty to avoid SIGFPE + if(poss.empty()) + return shared_ptr(); + + int r = val % sum; + auto p = upper_bound(poss.begin(), poss.end(),r, [](int r_, const decltype(poss)::value_type& a) { return r_ < a.first;}); + if(p==poss.end()) + return shared_ptr(); + return p->second; +} + +shared_ptr wrandom(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq) +{ + return valrandom(random(), servers, dq); +} + +uint32_t g_hashperturb; +double g_consistentHashBalancingFactor = 0; +shared_ptr whashed(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq) +{ + return valrandom(dq->qname->hash(g_hashperturb), servers, dq); +} + +shared_ptr chashed(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq) +{ + unsigned int qhash = dq->qname->hash(g_hashperturb); + unsigned int sel = std::numeric_limits::max(); + unsigned int min = std::numeric_limits::max(); + shared_ptr ret = nullptr, first = nullptr; + + double targetLoad = std::numeric_limits::max(); + if (g_consistentHashBalancingFactor > 0) { + /* we start with one, representing the query we are currently handling */ + double currentLoad = 1; + for (const auto& pair : servers) { + currentLoad += pair.second->outstanding; + } + targetLoad = (currentLoad / servers.size()) * g_consistentHashBalancingFactor; + } + + for (const auto& d: servers) { + if (d.second->isUp() && d.second->outstanding <= targetLoad) { + // make sure hashes have been computed + if (d.second->hashes.empty()) { + d.second->hash(); + } + { + ReadLock rl(&(d.second->d_lock)); + const auto& server = d.second; + // we want to keep track of the last hash + if (min > *(server->hashes.begin())) { + min = *(server->hashes.begin()); + first = server; + } + + auto hash_it = server->hashes.lower_bound(qhash); + if (hash_it != server->hashes.end()) { + if (*hash_it < sel) { + sel = *hash_it; + ret = server; + } + } + } + } + } + if (ret != nullptr) { + return ret; + } + if (first != nullptr) { + return first; + } + return shared_ptr(); +} + +shared_ptr roundrobin(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq) +{ + ServerPolicy::NumberedServerVector poss; + + for(auto& d : servers) { + if(d.second->isUp()) { + poss.push_back(d); + } + } + + const auto *res=&poss; + if(poss.empty() && !g_roundrobinFailOnNoServer) + res = &servers; + + if(res->empty()) + return shared_ptr(); + + static unsigned int counter; + + return (*res)[(counter++) % res->size()].second; +} + +ServerPolicy::NumberedServerVector getDownstreamCandidates(const pools_t& pools, const std::string& poolName) +{ + std::shared_ptr pool = getPool(pools, poolName); + return pool->getServers(); +} + +std::shared_ptr createPoolIfNotExists(pools_t& pools, const string& poolName) +{ + std::shared_ptr pool; + pools_t::iterator it = pools.find(poolName); + if (it != pools.end()) { + pool = it->second; + } + else { + if (!poolName.empty()) + vinfolog("Creating pool %s", poolName); + pool = std::make_shared(); + pools.insert(std::pair >(poolName, pool)); + } + return pool; +} + +void setPoolPolicy(pools_t& pools, const string& poolName, std::shared_ptr policy) +{ + std::shared_ptr pool = createPoolIfNotExists(pools, poolName); + if (!poolName.empty()) { + vinfolog("Setting pool %s server selection policy to %s", poolName, policy->name); + } else { + vinfolog("Setting default pool server selection policy to %s", policy->name); + } + pool->policy = policy; +} + +void addServerToPool(pools_t& pools, const string& poolName, std::shared_ptr server) +{ + std::shared_ptr pool = createPoolIfNotExists(pools, poolName); + if (!poolName.empty()) { + vinfolog("Adding server to pool %s", poolName); + } else { + vinfolog("Adding server to default pool"); + } + pool->addServer(server); +} + +void removeServerFromPool(pools_t& pools, const string& poolName, std::shared_ptr server) +{ + std::shared_ptr pool = getPool(pools, poolName); + + if (!poolName.empty()) { + vinfolog("Removing server from pool %s", poolName); + } + else { + vinfolog("Removing server from default pool"); + } + + pool->removeServer(server); +} + +std::shared_ptr getPool(const pools_t& pools, const std::string& poolName) +{ + pools_t::const_iterator it = pools.find(poolName); + + if (it == pools.end()) { + throw std::out_of_range("No pool named " + poolName); + } + + return it->second; +} diff --git a/pdns/dnsdistdist/dnsdist-lbpolicies.hh b/pdns/dnsdistdist/dnsdist-lbpolicies.hh new file mode 120000 index 0000000000..020353fc0d --- /dev/null +++ b/pdns/dnsdistdist/dnsdist-lbpolicies.hh @@ -0,0 +1 @@ +../dnsdist-lbpolicies.hh \ No newline at end of file diff --git a/pdns/dnsdistdist/test-dnsdistlbpolicies_cc.cc b/pdns/dnsdistdist/test-dnsdistlbpolicies_cc.cc new file mode 100644 index 0000000000..1e8418c70f --- /dev/null +++ b/pdns/dnsdistdist/test-dnsdistlbpolicies_cc.cc @@ -0,0 +1,351 @@ + +#define BOOST_TEST_DYN_LINK +#define BOOST_TEST_NO_MAIN + +#include +#include + +#include "dnsdist.hh" +#include "dnsdist-lua-ffi.hh" +#include "dolog.hh" + +uint16_t g_maxOutstanding{std::numeric_limits::max()}; +std::mutex g_luamutex; + +static std::shared_ptr getSelectedBackendFromPolicy(const ServerPolicy& policy, const ServerPolicy::NumberedServerVector& servers, DNSQuestion& dq) +{ + std::shared_ptr selectedBackend{nullptr}; + + if (policy.isLua) { + if (!policy.isFFI) { + std::lock_guard lock(g_luamutex); + selectedBackend = policy.policy(servers, &dq); + } + else { + dnsdist_ffi_dnsquestion_t dnsq(&dq); + dnsdist_ffi_servers_list_t serversList(servers); + unsigned int selected = 0; + { + std::lock_guard lock(g_luamutex); + selected = policy.ffipolicy(&serversList, &dnsq); + } + selectedBackend = servers.at(selected).second; + } + } + else { + selectedBackend = policy.policy(servers, &dq); + } + + return selectedBackend; +} + +static DNSQuestion getDQ(const DNSName* providedName = nullptr) +{ + static const DNSName qname("powerdns.com."); + static const ComboAddress lc("127.0.0.1:53"); + static const ComboAddress rem("192.0.2.1:42"); + static struct timespec queryRealTime; + static struct dnsheader dh; + + memset(&dh, 0, sizeof(dh)); + uint16_t qtype = QType::A; + uint16_t qclass = QClass::IN; + size_t bufferSize = 0; + size_t queryLen = 0; + bool isTcp = false; + gettime(&queryRealTime, true); + + DNSQuestion dq(providedName ? providedName : &qname, qtype, qclass, qname.wirelength(), &lc, &rem, &dh, bufferSize, queryLen, isTcp, &queryRealTime); + return dq; +} + +BOOST_AUTO_TEST_SUITE(dnsdistlbpolicies) + +BOOST_AUTO_TEST_CASE(test_firstAvailable) { + auto dq = getDQ(); + + ServerPolicy pol{"firstAvailable", firstAvailable, false}; + ServerPolicy::NumberedServerVector servers; + servers.push_back({ 1, std::make_shared(ComboAddress("192.0.2.1:53")) }); + + /* servers start as 'down' */ + auto server = getSelectedBackendFromPolicy(pol, servers, dq); + BOOST_CHECK(server == nullptr); + + /* mark the server as 'up' */ + servers.at(0).second->setUp(); + server = getSelectedBackendFromPolicy(pol, servers, dq); + BOOST_CHECK(server != nullptr); + + /* add a second server, we should still get the first one */ + servers.push_back({ 2, std::make_shared(ComboAddress("192.0.2.2:53")) }); + server = getSelectedBackendFromPolicy(pol, servers, dq); + BOOST_REQUIRE(server != nullptr); + BOOST_CHECK(server == servers.at(0).second); + + /* mark the first server as 'down', second as 'up' */ + servers.at(0).second->setDown(); + servers.at(1).second->setUp(); + server = getSelectedBackendFromPolicy(pol, servers, dq); + BOOST_REQUIRE(server != nullptr); + BOOST_CHECK(server == servers.at(1).second); +} + +BOOST_AUTO_TEST_CASE(test_leastOutstanding) { + auto dq = getDQ(); + + ServerPolicy pol{"leastOutstanding", leastOutstanding, false}; + ServerPolicy::NumberedServerVector servers; + servers.push_back({ 1, std::make_shared(ComboAddress("192.0.2.1:53")) }); + + /* servers start as 'down' */ + auto server = getSelectedBackendFromPolicy(pol, servers, dq); + BOOST_CHECK(server == nullptr); + + /* mark the server as 'up' */ + servers.at(0).second->setUp(); + server = getSelectedBackendFromPolicy(pol, servers, dq); + BOOST_CHECK(server != nullptr); + + /* add a second server, we should still get the first one */ + servers.push_back({ 2, std::make_shared(ComboAddress("192.0.2.2:53")) }); + server = getSelectedBackendFromPolicy(pol, servers, dq); + BOOST_REQUIRE(server != nullptr); + BOOST_CHECK(server == servers.at(0).second); + + /* mark the first server as 'down', second as 'up' */ + servers.at(0).second->setDown(); + servers.at(1).second->setUp(); + server = getSelectedBackendFromPolicy(pol, servers, dq); + BOOST_REQUIRE(server != nullptr); + BOOST_CHECK(server == servers.at(1).second); + + /* mark both servers as 'up', increase the outstanding count of the first one */ + servers.at(0).second->setUp(); + servers.at(0).second->outstanding = 42; + servers.at(1).second->setUp(); + server = getSelectedBackendFromPolicy(pol, servers, dq); + BOOST_REQUIRE(server != nullptr); + BOOST_CHECK(server == servers.at(1).second); +} + +BOOST_AUTO_TEST_CASE(test_wrandom) { + auto dq = getDQ(); + + ServerPolicy pol{"wrandom", wrandom, false}; + ServerPolicy::NumberedServerVector servers; + std::map, uint64_t> serversMap; + for (size_t idx = 1; idx <= 10; idx++) { + servers.push_back({ idx + 1, std::make_shared(ComboAddress("192.0.2." + std::to_string(idx) + ":53")) }); + serversMap[servers.at(idx - 1).second] = 0; + servers.at(idx - 1).second->setUp(); + } + + for (size_t idx = 0; idx < 1000; idx++) { + auto server = getSelectedBackendFromPolicy(pol, servers, dq); + BOOST_REQUIRE(serversMap.count(server) == 1); + ++serversMap[server]; + } + uint64_t total = 0; + for (const auto& entry : serversMap) { + BOOST_CHECK_GT(entry.second, 0); + BOOST_CHECK_GT(entry.second, (1000 / servers.size() / 2)); + BOOST_CHECK_LT(entry.second, (1000 / servers.size() * 2)); + total += entry.second; + } + BOOST_CHECK_EQUAL(total, 1000); + + /* reset */ + for (auto& entry : serversMap) { + entry.second = 0; + BOOST_CHECK_EQUAL(entry.first->weight, 1); + } + + /* reset */ + for (auto& entry : serversMap) { + entry.second = 0; + BOOST_CHECK_EQUAL(entry.first->weight, 1); + } + /* change the weight of the last server to 100, default is 1 */ + servers.at(servers.size()-1).second->weight = 100; + + for (size_t idx = 0; idx < 1000; idx++) { + auto server = getSelectedBackendFromPolicy(pol, servers, dq); + BOOST_REQUIRE(serversMap.count(server) == 1); + ++serversMap[server]; + } + + total = 0; + uint64_t totalW = 0; + for (const auto& entry : serversMap) { + total += entry.second; + totalW += entry.first->weight; + } + BOOST_CHECK_EQUAL(total, 1000); + auto last = servers.at(servers.size()-1).second; + const auto got = serversMap[last]; + float expected = (1000 * 1.0 * last->weight) / totalW; + BOOST_CHECK_GT(got, expected / 2); + BOOST_CHECK_LT(got, expected * 2); +} + +BOOST_AUTO_TEST_CASE(test_whashed) { + std::vector names; + names.reserve(1000); + for (size_t idx = 0; idx < 1000; idx++) { + names.push_back(DNSName("powerdns-" + std::to_string(idx) + ".com.")); + } + + ServerPolicy pol{"whashed", whashed, false}; + ServerPolicy::NumberedServerVector servers; + std::map, uint64_t> serversMap; + for (size_t idx = 1; idx <= 10; idx++) { + servers.push_back({ idx + 1, std::make_shared(ComboAddress("192.0.2." + std::to_string(idx) + ":53")) }); + serversMap[servers.at(idx - 1).second] = 0; + servers.at(idx - 1).second->setUp(); + } + + for (const auto& name : names) { + auto dq = getDQ(&name); + auto server = getSelectedBackendFromPolicy(pol, servers, dq); + BOOST_REQUIRE(serversMap.count(server) == 1); + ++serversMap[server]; + } + uint64_t total = 0; + for (const auto& entry : serversMap) { + BOOST_CHECK_GT(entry.second, 0); + BOOST_CHECK_GT(entry.second, (names.size() / servers.size() / 2)); + BOOST_CHECK_LT(entry.second, (names.size() / servers.size() * 2)); + total += entry.second; + } + BOOST_CHECK_EQUAL(total, names.size()); + + /* reset */ + for (auto& entry : serversMap) { + entry.second = 0; + BOOST_CHECK_EQUAL(entry.first->weight, 1); + } + + /* request 1000 times the same name, we should go to the same server every time */ + { + auto dq = getDQ(&names.at(0)); + auto server = getSelectedBackendFromPolicy(pol, servers, dq); + for (size_t idx = 0; idx < 1000; idx++) { + BOOST_CHECK(getSelectedBackendFromPolicy(pol, servers, dq) == server); + } + } + + /* reset */ + for (auto& entry : serversMap) { + entry.second = 0; + BOOST_CHECK_EQUAL(entry.first->weight, 1); + } + /* change the weight of the last server to 100, default is 1 */ + servers.at(servers.size()-1).second->setWeight(100); + + for (const auto& name : names) { + auto dq = getDQ(&name); + auto server = getSelectedBackendFromPolicy(pol, servers, dq); + BOOST_REQUIRE(serversMap.count(server) == 1); + ++serversMap[server]; + } + + total = 0; + uint64_t totalW = 0; + for (const auto& entry : serversMap) { + total += entry.second; + totalW += entry.first->weight; + } + BOOST_CHECK_EQUAL(total, names.size()); + auto last = servers.at(servers.size()-1).second; + const auto got = serversMap[last]; + float expected = (names.size() * 1.0 * last->weight) / totalW; + BOOST_CHECK_GT(got, expected / 2); + BOOST_CHECK_LT(got, expected * 2); +} + +BOOST_AUTO_TEST_CASE(test_chashed) { + bool existingVerboseValue = g_verbose; + g_verbose = false; + + std::vector names; + names.reserve(1000); + for (size_t idx = 0; idx < 1000; idx++) { + names.push_back(DNSName("powerdns-" + std::to_string(idx) + ".com.")); + } + + ServerPolicy pol{"chashed", chashed, false}; + ServerPolicy::NumberedServerVector servers; + std::map, uint64_t> serversMap; + for (size_t idx = 1; idx <= 10; idx++) { + servers.push_back({ idx + 1, std::make_shared(ComboAddress("192.0.2." + std::to_string(idx) + ":53")) }); + serversMap[servers.at(idx - 1).second] = 0; + servers.at(idx - 1).second->setUp(); + /* we need to have a weight of at least 1000 to get an optimal repartition with the consistent hashing algo */ + servers.at(idx - 1).second->setWeight(1000); + /* make sure that the hashes have been computed */ + servers.at(idx - 1).second->hash(); + } + + for (const auto& name : names) { + auto dq = getDQ(&name); + auto server = getSelectedBackendFromPolicy(pol, servers, dq); + BOOST_REQUIRE(serversMap.count(server) == 1); + ++serversMap[server]; + } + uint64_t total = 0; + for (const auto& entry : serversMap) { + BOOST_CHECK_GT(entry.second, 0); + BOOST_CHECK_GT(entry.second, (names.size() / servers.size() / 2)); + BOOST_CHECK_LT(entry.second, (names.size() / servers.size() * 2)); + total += entry.second; + } + BOOST_CHECK_EQUAL(total, names.size()); + + /* reset */ + for (auto& entry : serversMap) { + entry.second = 0; + BOOST_CHECK_EQUAL(entry.first->weight, 1000); + } + + /* request 1000 times the same name, we should go to the same server every time */ + { + auto dq = getDQ(&names.at(0)); + auto server = getSelectedBackendFromPolicy(pol, servers, dq); + for (size_t idx = 0; idx < 1000; idx++) { + BOOST_CHECK(getSelectedBackendFromPolicy(pol, servers, dq) == server); + } + } + + /* reset */ + for (auto& entry : serversMap) { + entry.second = 0; + BOOST_CHECK_EQUAL(entry.first->weight, 1000); + } + /* change the weight of the last server to 100000, others stay at 1000 */ + servers.at(servers.size()-1).second->setWeight(100000); + + for (const auto& name : names) { + auto dq = getDQ(&name); + auto server = getSelectedBackendFromPolicy(pol, servers, dq); + BOOST_REQUIRE(serversMap.count(server) == 1); + ++serversMap[server]; + } + + total = 0; + uint64_t totalW = 0; + for (const auto& entry : serversMap) { + total += entry.second; + totalW += entry.first->weight; + } + BOOST_CHECK_EQUAL(total, names.size()); + auto last = servers.at(servers.size()-1).second; + const auto got = serversMap[last]; + float expected = (names.size() * 1.0 * last->weight) / totalW; + BOOST_CHECK_GT(got, expected / 2); + BOOST_CHECK_LT(got, expected * 2); + + g_verbose = existingVerboseValue; +} + +BOOST_AUTO_TEST_SUITE_END()