--- /dev/null
+/*
+ * This file is part of PowerDNS or dnsdist.
+ * Copyright -- PowerDNS.COM B.V. and its contributors
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In addition, for the avoidance of any doubt, permission is granted to
+ * link this program with OpenSSL and to (re)distribute the binaries
+ * produced as the result of such linking.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#pragma once
+
+struct dnsdist_ffi_servers_list_t;
+struct dnsdist_ffi_server_t;
+struct dnsdist_ffi_dnsquestion_t;
+
+struct DownstreamState;
+
+struct ServerPolicy
+{
+ template <class T> using NumberedVector = std::vector<std::pair<unsigned int, T> >;
+ using NumberedServerVector = NumberedVector<shared_ptr<DownstreamState>>;
+ typedef std::function<shared_ptr<DownstreamState>(const NumberedServerVector& servers, const DNSQuestion*)> policyfunc_t;
+ typedef std::function<unsigned int(dnsdist_ffi_servers_list_t* servers, dnsdist_ffi_dnsquestion_t* dq)> ffipolicyfunc_t;
+
+ ServerPolicy(const std::string& name_, policyfunc_t policy_, bool isLua_): name(name_), policy(policy_), isLua(isLua_)
+ {
+ }
+ ServerPolicy(const std::string& name_, ffipolicyfunc_t policy_): name(name_), ffipolicy(policy_), isLua(true), isFFI(true)
+ {
+ }
+ ServerPolicy()
+ {
+ }
+
+ string name;
+ policyfunc_t policy;
+ ffipolicyfunc_t ffipolicy;
+ bool isLua{false};
+ bool isFFI{false};
+
+ std::string toString() const {
+ return string("ServerPolicy") + (isLua ? " (Lua)" : "") + " \"" + name + "\"";
+ }
+};
+
+struct ServerPool;
+
+using pools_t=map<std::string,std::shared_ptr<ServerPool>>;
+std::shared_ptr<ServerPool> getPool(const pools_t& pools, const std::string& poolName);
+std::shared_ptr<ServerPool> createPoolIfNotExists(pools_t& pools, const string& poolName);
+void setPoolPolicy(pools_t& pools, const string& poolName, std::shared_ptr<ServerPolicy> policy);
+void addServerToPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server);
+void removeServerFromPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server);
+
+ServerPolicy::NumberedServerVector getDownstreamCandidates(const map<std::string,std::shared_ptr<ServerPool>>& pools, const std::string& poolName);
+
+std::shared_ptr<DownstreamState> firstAvailable(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq);
+
+std::shared_ptr<DownstreamState> leastOutstanding(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq);
+std::shared_ptr<DownstreamState> wrandom(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq);
+std::shared_ptr<DownstreamState> whashed(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq);
+std::shared_ptr<DownstreamState> chashed(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq);
+std::shared_ptr<DownstreamState> roundrobin(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq);
// create but don't connect the socket in client or check-config modes
ret=std::make_shared<DownstreamState>(serverAddr, sourceAddr, sourceItf, sourceItfName, numberOfSockets, !(client || configCheck));
+ if (!(client || configCheck)) {
+ infolog("Added downstream server %s", serverAddr.toStringWithPort());
+ }
if(vars.count("qps")) {
int qpsVal=std::stoi(boost::get<string>(vars["qps"]));
bool g_truncateTC{false};
bool g_fixupCase{false};
bool g_preserveTrailingData{false};
-bool g_roundrobinFailOnNoServer{false};
std::set<std::string> g_capabilitiesToRetain;
errlog("UDP responder thread died because of an exception: %s", "unknown");
}
-bool DownstreamState::reconnect()
-{
- std::unique_lock<std::mutex> tl(connectLock, std::try_to_lock);
- if (!tl.owns_lock()) {
- /* we are already reconnecting */
- return false;
- }
-
- connected = false;
- for (auto& fd : sockets) {
- if (fd != -1) {
- if (sockets.size() > 1) {
- std::lock_guard<std::mutex> lock(socketsLock);
- mplexer->removeReadFD(fd);
- }
- /* shutdown() is needed to wake up recv() in the responderThread */
- shutdown(fd, SHUT_RDWR);
- close(fd);
- fd = -1;
- }
- if (!IsAnyAddress(remote)) {
- fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0);
- if (!IsAnyAddress(sourceAddr)) {
- SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
- if (!sourceItfName.empty()) {
-#ifdef SO_BINDTODEVICE
- int res = setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, sourceItfName.c_str(), sourceItfName.length());
- if (res != 0) {
- infolog("Error setting up the interface on backend socket '%s': %s", remote.toStringWithPort(), stringerror());
- }
-#endif
- }
-
- SBind(fd, sourceAddr);
- }
- try {
- SConnect(fd, remote);
- if (sockets.size() > 1) {
- std::lock_guard<std::mutex> lock(socketsLock);
- mplexer->addReadFD(fd, [](int, boost::any) {});
- }
- connected = true;
- }
- catch(const std::runtime_error& error) {
- infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what());
- connected = false;
- break;
- }
- }
- }
-
- /* if at least one (re-)connection failed, close all sockets */
- if (!connected) {
- for (auto& fd : sockets) {
- if (fd != -1) {
- if (sockets.size() > 1) {
- std::lock_guard<std::mutex> lock(socketsLock);
- mplexer->removeReadFD(fd);
- }
- /* shutdown() is needed to wake up recv() in the responderThread */
- shutdown(fd, SHUT_RDWR);
- close(fd);
- fd = -1;
- }
- }
- }
-
- return connected;
-}
-void DownstreamState::hash()
-{
- vinfolog("Computing hashes for id=%s and weight=%d", id, weight);
- auto w = weight;
- WriteLock wl(&d_lock);
- hashes.clear();
- while (w > 0) {
- std::string uuid = boost::str(boost::format("%s-%d") % id % w);
- unsigned int wshash = burtleCI((const unsigned char*)uuid.c_str(), uuid.size(), g_hashperturb);
- hashes.insert(wshash);
- --w;
- }
-}
-
-void DownstreamState::setId(const boost::uuids::uuid& newId)
-{
- id = newId;
- // compute hashes only if already done
- if (!hashes.empty()) {
- hash();
- }
-}
-
-void DownstreamState::setWeight(int newWeight)
-{
- if (newWeight < 1) {
- errlog("Error setting server's weight: downstream weight value must be greater than 0.");
- return ;
- }
- weight = newWeight;
- if (!hashes.empty()) {
- hash();
- }
-}
-
-DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, const std::string& sourceItfName_, size_t numberOfSockets, bool connect=true): sourceItfName(sourceItfName_), remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_), name(remote_.toStringWithPort()), nameWithAddr(remote_.toStringWithPort())
-{
- pthread_rwlock_init(&d_lock, nullptr);
- id = getUniqueID();
- threadStarted.clear();
-
- mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
-
- sockets.resize(numberOfSockets);
- for (auto& fd : sockets) {
- fd = -1;
- }
-
- if (connect && !IsAnyAddress(remote)) {
- reconnect();
- idStates.resize(g_maxOutstanding);
- sw.start();
- infolog("Added downstream server %s", remote.toStringWithPort());
- }
-
-}
-
std::mutex g_luamutex;
LuaContext g_lua;
-
-GlobalStateHolder<ServerPolicy> g_policy;
-
-shared_ptr<DownstreamState> firstAvailable(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq)
-{
- for(auto& d : servers) {
- if(d.second->isUp() && d.second->qps.check())
- return d.second;
- }
- return leastOutstanding(servers, dq);
-}
-
-// get server with least outstanding queries, and within those, with the lowest order, and within those: the fastest
-shared_ptr<DownstreamState> leastOutstanding(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq)
-{
- if (servers.size() == 1 && servers[0].second->isUp()) {
- return servers[0].second;
- }
-
- vector<pair<tuple<int,int,double>, shared_ptr<DownstreamState>>> poss;
- /* so you might wonder, why do we go through this trouble? The data on which we sort could change during the sort,
- which would suck royally and could even lead to crashes. So first we snapshot on what we sort, and then we sort */
- poss.reserve(servers.size());
- for(auto& d : servers) {
- if(d.second->isUp()) {
- poss.push_back({make_tuple(d.second->outstanding.load(), d.second->order, d.second->latencyUsec), d.second});
- }
- }
- if(poss.empty())
- return shared_ptr<DownstreamState>();
- nth_element(poss.begin(), poss.begin(), poss.end(), [](const decltype(poss)::value_type& a, const decltype(poss)::value_type& b) { return a.first < b.first; });
- return poss.begin()->second;
-}
-
-shared_ptr<DownstreamState> valrandom(unsigned int val, const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq)
-{
- vector<pair<int, shared_ptr<DownstreamState>>> poss;
- int sum = 0;
- int max = std::numeric_limits<int>::max();
-
- for(auto& d : servers) { // w=1, w=10 -> 1, 11
- if(d.second->isUp()) {
- // Don't overflow sum when adding high weights
- if(d.second->weight > max - sum) {
- sum = max;
- } else {
- sum += d.second->weight;
- }
-
- poss.push_back({sum, d.second});
- }
- }
-
- // Catch poss & sum are empty to avoid SIGFPE
- if(poss.empty())
- return shared_ptr<DownstreamState>();
-
- int r = val % sum;
- auto p = upper_bound(poss.begin(), poss.end(),r, [](int r_, const decltype(poss)::value_type& a) { return r_ < a.first;});
- if(p==poss.end())
- return shared_ptr<DownstreamState>();
- return p->second;
-}
-
-shared_ptr<DownstreamState> wrandom(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq)
-{
- return valrandom(random(), servers, dq);
-}
-
-uint32_t g_hashperturb;
-double g_consistentHashBalancingFactor = 0;
-shared_ptr<DownstreamState> whashed(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq)
-{
- return valrandom(dq->qname->hash(g_hashperturb), servers, dq);
-}
-
-shared_ptr<DownstreamState> chashed(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq)
-{
- unsigned int qhash = dq->qname->hash(g_hashperturb);
- unsigned int sel = std::numeric_limits<unsigned int>::max();
- unsigned int min = std::numeric_limits<unsigned int>::max();
- shared_ptr<DownstreamState> ret = nullptr, first = nullptr;
-
- double targetLoad = std::numeric_limits<double>::max();
- if (g_consistentHashBalancingFactor > 0) {
- /* we start with one, representing the query we are currently handling */
- double currentLoad = 1;
- for (const auto& pair : servers) {
- currentLoad += pair.second->outstanding;
- }
- targetLoad = (currentLoad / servers.size()) * g_consistentHashBalancingFactor;
- }
-
- for (const auto& d: servers) {
- if (d.second->isUp() && d.second->outstanding <= targetLoad) {
- // make sure hashes have been computed
- if (d.second->hashes.empty()) {
- d.second->hash();
- }
- {
- ReadLock rl(&(d.second->d_lock));
- const auto& server = d.second;
- // we want to keep track of the last hash
- if (min > *(server->hashes.begin())) {
- min = *(server->hashes.begin());
- first = server;
- }
-
- auto hash_it = server->hashes.lower_bound(qhash);
- if (hash_it != server->hashes.end()) {
- if (*hash_it < sel) {
- sel = *hash_it;
- ret = server;
- }
- }
- }
- }
- }
- if (ret != nullptr) {
- return ret;
- }
- if (first != nullptr) {
- return first;
- }
- return shared_ptr<DownstreamState>();
-}
-
-shared_ptr<DownstreamState> roundrobin(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq)
-{
- ServerPolicy::NumberedServerVector poss;
-
- for(auto& d : servers) {
- if(d.second->isUp()) {
- poss.push_back(d);
- }
- }
-
- const auto *res=&poss;
- if(poss.empty() && !g_roundrobinFailOnNoServer)
- res = &servers;
-
- if(res->empty())
- return shared_ptr<DownstreamState>();
-
- static unsigned int counter;
-
- return (*res)[(counter++) % res->size()].second;
-}
-
ComboAddress g_serverControl{"127.0.0.1:5199"};
-std::shared_ptr<ServerPool> createPoolIfNotExists(pools_t& pools, const string& poolName)
-{
- std::shared_ptr<ServerPool> pool;
- pools_t::iterator it = pools.find(poolName);
- if (it != pools.end()) {
- pool = it->second;
- }
- else {
- if (!poolName.empty())
- vinfolog("Creating pool %s", poolName);
- pool = std::make_shared<ServerPool>();
- pools.insert(std::pair<std::string,std::shared_ptr<ServerPool> >(poolName, pool));
- }
- return pool;
-}
-
-void setPoolPolicy(pools_t& pools, const string& poolName, std::shared_ptr<ServerPolicy> policy)
-{
- std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
- if (!poolName.empty()) {
- vinfolog("Setting pool %s server selection policy to %s", poolName, policy->name);
- } else {
- vinfolog("Setting default pool server selection policy to %s", policy->name);
- }
- pool->policy = policy;
-}
-
-void addServerToPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
-{
- std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
- if (!poolName.empty()) {
- vinfolog("Adding server to pool %s", poolName);
- } else {
- vinfolog("Adding server to default pool");
- }
- pool->addServer(server);
-}
-
-void removeServerFromPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
-{
- std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
-
- if (!poolName.empty()) {
- vinfolog("Removing server from pool %s", poolName);
- }
- else {
- vinfolog("Removing server from default pool");
- }
-
- pool->removeServer(server);
-}
-
-std::shared_ptr<ServerPool> getPool(const pools_t& pools, const std::string& poolName)
-{
- pools_t::const_iterator it = pools.find(poolName);
-
- if (it == pools.end()) {
- throw std::out_of_range("No pool named " + poolName);
- }
-
- return it->second;
-}
-
-ServerPolicy::NumberedServerVector getDownstreamCandidates(const pools_t& pools, const std::string& poolName)
-{
- std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
- return pool->getServers();
-}
static void spoofResponseFromString(DNSQuestion& dq, const string& spoofContent, bool raw)
{
#include "dnscrypt.hh"
#include "dnsdist-cache.hh"
#include "dnsdist-dynbpf.hh"
+#include "dnsdist-lbpolicies.hh"
#include "dnsname.hh"
#include "doh.hh"
#include "ednsoptions.hh"
mutable std::atomic<uint64_t> d_matches{0};
};
-struct dnsdist_ffi_servers_list_t;
-struct dnsdist_ffi_server_t;
-struct dnsdist_ffi_dnsquestion_t;
-
-struct ServerPolicy
-{
- template <class T> using NumberedVector = std::vector<std::pair<unsigned int, T> >;
- using NumberedServerVector = NumberedVector<shared_ptr<DownstreamState>>;
- typedef std::function<shared_ptr<DownstreamState>(const NumberedServerVector& servers, const DNSQuestion*)> policyfunc_t;
- typedef std::function<unsigned int(dnsdist_ffi_servers_list_t* servers, dnsdist_ffi_dnsquestion_t* dq)> ffipolicyfunc_t;
-
- ServerPolicy(const std::string& name_, policyfunc_t policy_, bool isLua_): name(name_), policy(policy_), isLua(isLua_)
- {
- }
- ServerPolicy(const std::string& name_, ffipolicyfunc_t policy_): name(name_), ffipolicy(policy_), isLua(true), isFFI(true)
- {
- }
- ServerPolicy()
- {
- }
-
- string name;
- policyfunc_t policy;
- ffipolicyfunc_t ffipolicy;
- bool isLua{false};
- bool isFFI{false};
-
- std::string toString() const {
- return string("ServerPolicy") + (isLua ? " (Lua)" : "") + " \"" + name + "\"";
- }
-};
-
struct ServerPool
{
ServerPool()
pthread_rwlock_t d_lock;
bool d_useECS{false};
};
-using pools_t=map<std::string,std::shared_ptr<ServerPool>>;
-void setPoolPolicy(pools_t& pools, const string& poolName, std::shared_ptr<ServerPolicy> policy);
-void addServerToPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server);
-void removeServerFromPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server);
struct CarbonConfig
{
struct dnsheader;
void controlThread(int fd, ComboAddress local);
-std::shared_ptr<ServerPool> getPool(const pools_t& pools, const std::string& poolName);
-std::shared_ptr<ServerPool> createPoolIfNotExists(pools_t& pools, const string& poolName);
-ServerPolicy::NumberedServerVector getDownstreamCandidates(const pools_t& pools, const std::string& poolName);
-
-std::shared_ptr<DownstreamState> firstAvailable(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq);
-
-std::shared_ptr<DownstreamState> leastOutstanding(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq);
-std::shared_ptr<DownstreamState> wrandom(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq);
-std::shared_ptr<DownstreamState> whashed(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq);
-std::shared_ptr<DownstreamState> chashed(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq);
-std::shared_ptr<DownstreamState> roundrobin(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq);
+vector<std::function<void(void)>> setupLua(bool client, const std::string& config);
struct WebserverConfig
{
dns.cc dns.hh \
dnscrypt.cc dnscrypt.hh \
dnsdist.cc dnsdist.hh \
+ dnsdist-backend.cc \
dnsdist-dynbpf.cc dnsdist-dynbpf.hh \
dnsdist-cache.cc dnsdist-cache.hh \
dnsdist-carbon.cc \
dnsdist-healthchecks.cc dnsdist-healthchecks.hh \
dnsdist-idstate.cc \
dnsdist-kvs.hh dnsdist-kvs.cc \
+ dnsdist-lbpolicies.cc dnsdist-lbpolicies.hh \
dnsdist-lua.hh dnsdist-lua.cc \
dnsdist-lua-actions.cc \
dnsdist-lua-bindings.cc \
test-dnsdist_cc.cc \
test-dnsdistdynblocks_hh.cc \
test-dnsdistkvs_cc.cc \
+ test-dnsdistlbpolicies_cc.cc \
test-dnsdistpacketcache_cc.cc \
test-dnsdistrings_cc.cc \
test-dnsdistrules_cc.cc \
cachecleaner.hh \
circular_buffer.hh \
dnsdist.hh \
+ dnsdist-backend.cc \
dnsdist-cache.cc dnsdist-cache.hh \
dnsdist-dynblocks.cc dnsdist-dynblocks.hh \
dnsdist-ecs.cc dnsdist-ecs.hh \
dnsdist-kvs.cc dnsdist-kvs.hh \
+ dnsdist-lbpolicies.cc dnsdist-lbpolicies.hh \
dnsdist-rings.hh \
dnsdist-xpf.cc dnsdist-xpf.hh \
dnscrypt.cc dnscrypt.hh \
statnode.cc statnode.hh \
threadname.hh threadname.cc \
testrunner.cc \
+ uuid-utils.hh uuid-utils.cc \
xpf.cc xpf.hh
dnsdist_LDFLAGS = \
--- /dev/null
+/*
+ * This file is part of PowerDNS or dnsdist.
+ * Copyright -- PowerDNS.COM B.V. and its contributors
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In addition, for the avoidance of any doubt, permission is granted to
+ * link this program with OpenSSL and to (re)distribute the binaries
+ * produced as the result of such linking.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include "dnsdist.hh"
+#include "dolog.hh"
+
+bool DownstreamState::reconnect()
+{
+ std::unique_lock<std::mutex> tl(connectLock, std::try_to_lock);
+ if (!tl.owns_lock()) {
+ /* we are already reconnecting */
+ return false;
+ }
+
+ connected = false;
+ for (auto& fd : sockets) {
+ if (fd != -1) {
+ if (sockets.size() > 1) {
+ std::lock_guard<std::mutex> lock(socketsLock);
+ mplexer->removeReadFD(fd);
+ }
+ /* shutdown() is needed to wake up recv() in the responderThread */
+ shutdown(fd, SHUT_RDWR);
+ close(fd);
+ fd = -1;
+ }
+ if (!IsAnyAddress(remote)) {
+ fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0);
+ if (!IsAnyAddress(sourceAddr)) {
+ SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
+ if (!sourceItfName.empty()) {
+#ifdef SO_BINDTODEVICE
+ int res = setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, sourceItfName.c_str(), sourceItfName.length());
+ if (res != 0) {
+ infolog("Error setting up the interface on backend socket '%s': %s", remote.toStringWithPort(), stringerror());
+ }
+#endif
+ }
+
+ SBind(fd, sourceAddr);
+ }
+ try {
+ SConnect(fd, remote);
+ if (sockets.size() > 1) {
+ std::lock_guard<std::mutex> lock(socketsLock);
+ mplexer->addReadFD(fd, [](int, boost::any) {});
+ }
+ connected = true;
+ }
+ catch(const std::runtime_error& error) {
+ infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what());
+ connected = false;
+ break;
+ }
+ }
+ }
+
+ /* if at least one (re-)connection failed, close all sockets */
+ if (!connected) {
+ for (auto& fd : sockets) {
+ if (fd != -1) {
+ if (sockets.size() > 1) {
+ std::lock_guard<std::mutex> lock(socketsLock);
+ mplexer->removeReadFD(fd);
+ }
+ /* shutdown() is needed to wake up recv() in the responderThread */
+ shutdown(fd, SHUT_RDWR);
+ close(fd);
+ fd = -1;
+ }
+ }
+ }
+
+ return connected;
+}
+void DownstreamState::hash()
+{
+ vinfolog("Computing hashes for id=%s and weight=%d", id, weight);
+ auto w = weight;
+ WriteLock wl(&d_lock);
+ hashes.clear();
+ while (w > 0) {
+ std::string uuid = boost::str(boost::format("%s-%d") % id % w);
+ unsigned int wshash = burtleCI((const unsigned char*)uuid.c_str(), uuid.size(), g_hashperturb);
+ hashes.insert(wshash);
+ --w;
+ }
+}
+
+void DownstreamState::setId(const boost::uuids::uuid& newId)
+{
+ id = newId;
+ // compute hashes only if already done
+ if (!hashes.empty()) {
+ hash();
+ }
+}
+
+void DownstreamState::setWeight(int newWeight)
+{
+ if (newWeight < 1) {
+ errlog("Error setting server's weight: downstream weight value must be greater than 0.");
+ return ;
+ }
+ weight = newWeight;
+ if (!hashes.empty()) {
+ hash();
+ }
+}
+
+DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, const std::string& sourceItfName_, size_t numberOfSockets, bool connect=true): sourceItfName(sourceItfName_), remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_), name(remote_.toStringWithPort()), nameWithAddr(remote_.toStringWithPort())
+{
+ pthread_rwlock_init(&d_lock, nullptr);
+ id = getUniqueID();
+ threadStarted.clear();
+
+ mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
+
+ sockets.resize(numberOfSockets);
+ for (auto& fd : sockets) {
+ fd = -1;
+ }
+
+ if (connect && !IsAnyAddress(remote)) {
+ reconnect();
+ idStates.resize(g_maxOutstanding);
+ sw.start();
+ }
+
+}
--- /dev/null
+/*
+ * This file is part of PowerDNS or dnsdist.
+ * Copyright -- PowerDNS.COM B.V. and its contributors
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In addition, for the avoidance of any doubt, permission is granted to
+ * link this program with OpenSSL and to (re)distribute the binaries
+ * produced as the result of such linking.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include "dnsdist.hh"
+#include "dnsdist-lbpolicies.hh"
+#include "dolog.hh"
+
+GlobalStateHolder<ServerPolicy> g_policy;
+bool g_roundrobinFailOnNoServer{false};
+
+// get server with least outstanding queries, and within those, with the lowest order, and within those: the fastest
+shared_ptr<DownstreamState> leastOutstanding(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq)
+{
+ if (servers.size() == 1 && servers[0].second->isUp()) {
+ return servers[0].second;
+ }
+
+ vector<pair<tuple<int,int,double>, shared_ptr<DownstreamState>>> poss;
+ /* so you might wonder, why do we go through this trouble? The data on which we sort could change during the sort,
+ which would suck royally and could even lead to crashes. So first we snapshot on what we sort, and then we sort */
+ poss.reserve(servers.size());
+ for(auto& d : servers) {
+ if(d.second->isUp()) {
+ poss.push_back({make_tuple(d.second->outstanding.load(), d.second->order, d.second->latencyUsec), d.second});
+ }
+ }
+ if(poss.empty())
+ return shared_ptr<DownstreamState>();
+ nth_element(poss.begin(), poss.begin(), poss.end(), [](const decltype(poss)::value_type& a, const decltype(poss)::value_type& b) { return a.first < b.first; });
+ return poss.begin()->second;
+}
+
+shared_ptr<DownstreamState> firstAvailable(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq)
+{
+ for(auto& d : servers) {
+ if(d.second->isUp() && d.second->qps.check())
+ return d.second;
+ }
+ return leastOutstanding(servers, dq);
+}
+
+static shared_ptr<DownstreamState> valrandom(unsigned int val, const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq)
+{
+ vector<pair<int, shared_ptr<DownstreamState>>> poss;
+ int sum = 0;
+ int max = std::numeric_limits<int>::max();
+
+ for(auto& d : servers) { // w=1, w=10 -> 1, 11
+ if(d.second->isUp()) {
+ // Don't overflow sum when adding high weights
+ if(d.second->weight > max - sum) {
+ sum = max;
+ } else {
+ sum += d.second->weight;
+ }
+
+ poss.push_back({sum, d.second});
+ }
+ }
+
+ // Catch poss & sum are empty to avoid SIGFPE
+ if(poss.empty())
+ return shared_ptr<DownstreamState>();
+
+ int r = val % sum;
+ auto p = upper_bound(poss.begin(), poss.end(),r, [](int r_, const decltype(poss)::value_type& a) { return r_ < a.first;});
+ if(p==poss.end())
+ return shared_ptr<DownstreamState>();
+ return p->second;
+}
+
+shared_ptr<DownstreamState> wrandom(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq)
+{
+ return valrandom(random(), servers, dq);
+}
+
+uint32_t g_hashperturb;
+double g_consistentHashBalancingFactor = 0;
+shared_ptr<DownstreamState> whashed(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq)
+{
+ return valrandom(dq->qname->hash(g_hashperturb), servers, dq);
+}
+
+shared_ptr<DownstreamState> chashed(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq)
+{
+ unsigned int qhash = dq->qname->hash(g_hashperturb);
+ unsigned int sel = std::numeric_limits<unsigned int>::max();
+ unsigned int min = std::numeric_limits<unsigned int>::max();
+ shared_ptr<DownstreamState> ret = nullptr, first = nullptr;
+
+ double targetLoad = std::numeric_limits<double>::max();
+ if (g_consistentHashBalancingFactor > 0) {
+ /* we start with one, representing the query we are currently handling */
+ double currentLoad = 1;
+ for (const auto& pair : servers) {
+ currentLoad += pair.second->outstanding;
+ }
+ targetLoad = (currentLoad / servers.size()) * g_consistentHashBalancingFactor;
+ }
+
+ for (const auto& d: servers) {
+ if (d.second->isUp() && d.second->outstanding <= targetLoad) {
+ // make sure hashes have been computed
+ if (d.second->hashes.empty()) {
+ d.second->hash();
+ }
+ {
+ ReadLock rl(&(d.second->d_lock));
+ const auto& server = d.second;
+ // we want to keep track of the last hash
+ if (min > *(server->hashes.begin())) {
+ min = *(server->hashes.begin());
+ first = server;
+ }
+
+ auto hash_it = server->hashes.lower_bound(qhash);
+ if (hash_it != server->hashes.end()) {
+ if (*hash_it < sel) {
+ sel = *hash_it;
+ ret = server;
+ }
+ }
+ }
+ }
+ }
+ if (ret != nullptr) {
+ return ret;
+ }
+ if (first != nullptr) {
+ return first;
+ }
+ return shared_ptr<DownstreamState>();
+}
+
+shared_ptr<DownstreamState> roundrobin(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq)
+{
+ ServerPolicy::NumberedServerVector poss;
+
+ for(auto& d : servers) {
+ if(d.second->isUp()) {
+ poss.push_back(d);
+ }
+ }
+
+ const auto *res=&poss;
+ if(poss.empty() && !g_roundrobinFailOnNoServer)
+ res = &servers;
+
+ if(res->empty())
+ return shared_ptr<DownstreamState>();
+
+ static unsigned int counter;
+
+ return (*res)[(counter++) % res->size()].second;
+}
+
+ServerPolicy::NumberedServerVector getDownstreamCandidates(const pools_t& pools, const std::string& poolName)
+{
+ std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
+ return pool->getServers();
+}
+
+std::shared_ptr<ServerPool> createPoolIfNotExists(pools_t& pools, const string& poolName)
+{
+ std::shared_ptr<ServerPool> pool;
+ pools_t::iterator it = pools.find(poolName);
+ if (it != pools.end()) {
+ pool = it->second;
+ }
+ else {
+ if (!poolName.empty())
+ vinfolog("Creating pool %s", poolName);
+ pool = std::make_shared<ServerPool>();
+ pools.insert(std::pair<std::string,std::shared_ptr<ServerPool> >(poolName, pool));
+ }
+ return pool;
+}
+
+void setPoolPolicy(pools_t& pools, const string& poolName, std::shared_ptr<ServerPolicy> policy)
+{
+ std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
+ if (!poolName.empty()) {
+ vinfolog("Setting pool %s server selection policy to %s", poolName, policy->name);
+ } else {
+ vinfolog("Setting default pool server selection policy to %s", policy->name);
+ }
+ pool->policy = policy;
+}
+
+void addServerToPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
+{
+ std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
+ if (!poolName.empty()) {
+ vinfolog("Adding server to pool %s", poolName);
+ } else {
+ vinfolog("Adding server to default pool");
+ }
+ pool->addServer(server);
+}
+
+void removeServerFromPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
+{
+ std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
+
+ if (!poolName.empty()) {
+ vinfolog("Removing server from pool %s", poolName);
+ }
+ else {
+ vinfolog("Removing server from default pool");
+ }
+
+ pool->removeServer(server);
+}
+
+std::shared_ptr<ServerPool> getPool(const pools_t& pools, const std::string& poolName)
+{
+ pools_t::const_iterator it = pools.find(poolName);
+
+ if (it == pools.end()) {
+ throw std::out_of_range("No pool named " + poolName);
+ }
+
+ return it->second;
+}
--- /dev/null
+../dnsdist-lbpolicies.hh
\ No newline at end of file
--- /dev/null
+
+#define BOOST_TEST_DYN_LINK
+#define BOOST_TEST_NO_MAIN
+
+#include <thread>
+#include <boost/test/unit_test.hpp>
+
+#include "dnsdist.hh"
+#include "dnsdist-lua-ffi.hh"
+#include "dolog.hh"
+
+uint16_t g_maxOutstanding{std::numeric_limits<uint16_t>::max()};
+std::mutex g_luamutex;
+
+static std::shared_ptr<DownstreamState> getSelectedBackendFromPolicy(const ServerPolicy& policy, const ServerPolicy::NumberedServerVector& servers, DNSQuestion& dq)
+{
+ std::shared_ptr<DownstreamState> selectedBackend{nullptr};
+
+ if (policy.isLua) {
+ if (!policy.isFFI) {
+ std::lock_guard<std::mutex> lock(g_luamutex);
+ selectedBackend = policy.policy(servers, &dq);
+ }
+ else {
+ dnsdist_ffi_dnsquestion_t dnsq(&dq);
+ dnsdist_ffi_servers_list_t serversList(servers);
+ unsigned int selected = 0;
+ {
+ std::lock_guard<std::mutex> lock(g_luamutex);
+ selected = policy.ffipolicy(&serversList, &dnsq);
+ }
+ selectedBackend = servers.at(selected).second;
+ }
+ }
+ else {
+ selectedBackend = policy.policy(servers, &dq);
+ }
+
+ return selectedBackend;
+}
+
+static DNSQuestion getDQ(const DNSName* providedName = nullptr)
+{
+ static const DNSName qname("powerdns.com.");
+ static const ComboAddress lc("127.0.0.1:53");
+ static const ComboAddress rem("192.0.2.1:42");
+ static struct timespec queryRealTime;
+ static struct dnsheader dh;
+
+ memset(&dh, 0, sizeof(dh));
+ uint16_t qtype = QType::A;
+ uint16_t qclass = QClass::IN;
+ size_t bufferSize = 0;
+ size_t queryLen = 0;
+ bool isTcp = false;
+ gettime(&queryRealTime, true);
+
+ DNSQuestion dq(providedName ? providedName : &qname, qtype, qclass, qname.wirelength(), &lc, &rem, &dh, bufferSize, queryLen, isTcp, &queryRealTime);
+ return dq;
+}
+
+BOOST_AUTO_TEST_SUITE(dnsdistlbpolicies)
+
+BOOST_AUTO_TEST_CASE(test_firstAvailable) {
+ auto dq = getDQ();
+
+ ServerPolicy pol{"firstAvailable", firstAvailable, false};
+ ServerPolicy::NumberedServerVector servers;
+ servers.push_back({ 1, std::make_shared<DownstreamState>(ComboAddress("192.0.2.1:53")) });
+
+ /* servers start as 'down' */
+ auto server = getSelectedBackendFromPolicy(pol, servers, dq);
+ BOOST_CHECK(server == nullptr);
+
+ /* mark the server as 'up' */
+ servers.at(0).second->setUp();
+ server = getSelectedBackendFromPolicy(pol, servers, dq);
+ BOOST_CHECK(server != nullptr);
+
+ /* add a second server, we should still get the first one */
+ servers.push_back({ 2, std::make_shared<DownstreamState>(ComboAddress("192.0.2.2:53")) });
+ server = getSelectedBackendFromPolicy(pol, servers, dq);
+ BOOST_REQUIRE(server != nullptr);
+ BOOST_CHECK(server == servers.at(0).second);
+
+ /* mark the first server as 'down', second as 'up' */
+ servers.at(0).second->setDown();
+ servers.at(1).second->setUp();
+ server = getSelectedBackendFromPolicy(pol, servers, dq);
+ BOOST_REQUIRE(server != nullptr);
+ BOOST_CHECK(server == servers.at(1).second);
+}
+
+BOOST_AUTO_TEST_CASE(test_leastOutstanding) {
+ auto dq = getDQ();
+
+ ServerPolicy pol{"leastOutstanding", leastOutstanding, false};
+ ServerPolicy::NumberedServerVector servers;
+ servers.push_back({ 1, std::make_shared<DownstreamState>(ComboAddress("192.0.2.1:53")) });
+
+ /* servers start as 'down' */
+ auto server = getSelectedBackendFromPolicy(pol, servers, dq);
+ BOOST_CHECK(server == nullptr);
+
+ /* mark the server as 'up' */
+ servers.at(0).second->setUp();
+ server = getSelectedBackendFromPolicy(pol, servers, dq);
+ BOOST_CHECK(server != nullptr);
+
+ /* add a second server, we should still get the first one */
+ servers.push_back({ 2, std::make_shared<DownstreamState>(ComboAddress("192.0.2.2:53")) });
+ server = getSelectedBackendFromPolicy(pol, servers, dq);
+ BOOST_REQUIRE(server != nullptr);
+ BOOST_CHECK(server == servers.at(0).second);
+
+ /* mark the first server as 'down', second as 'up' */
+ servers.at(0).second->setDown();
+ servers.at(1).second->setUp();
+ server = getSelectedBackendFromPolicy(pol, servers, dq);
+ BOOST_REQUIRE(server != nullptr);
+ BOOST_CHECK(server == servers.at(1).second);
+
+ /* mark both servers as 'up', increase the outstanding count of the first one */
+ servers.at(0).second->setUp();
+ servers.at(0).second->outstanding = 42;
+ servers.at(1).second->setUp();
+ server = getSelectedBackendFromPolicy(pol, servers, dq);
+ BOOST_REQUIRE(server != nullptr);
+ BOOST_CHECK(server == servers.at(1).second);
+}
+
+BOOST_AUTO_TEST_CASE(test_wrandom) {
+ auto dq = getDQ();
+
+ ServerPolicy pol{"wrandom", wrandom, false};
+ ServerPolicy::NumberedServerVector servers;
+ std::map<std::shared_ptr<DownstreamState>, uint64_t> serversMap;
+ for (size_t idx = 1; idx <= 10; idx++) {
+ servers.push_back({ idx + 1, std::make_shared<DownstreamState>(ComboAddress("192.0.2." + std::to_string(idx) + ":53")) });
+ serversMap[servers.at(idx - 1).second] = 0;
+ servers.at(idx - 1).second->setUp();
+ }
+
+ for (size_t idx = 0; idx < 1000; idx++) {
+ auto server = getSelectedBackendFromPolicy(pol, servers, dq);
+ BOOST_REQUIRE(serversMap.count(server) == 1);
+ ++serversMap[server];
+ }
+ uint64_t total = 0;
+ for (const auto& entry : serversMap) {
+ BOOST_CHECK_GT(entry.second, 0);
+ BOOST_CHECK_GT(entry.second, (1000 / servers.size() / 2));
+ BOOST_CHECK_LT(entry.second, (1000 / servers.size() * 2));
+ total += entry.second;
+ }
+ BOOST_CHECK_EQUAL(total, 1000);
+
+ /* reset */
+ for (auto& entry : serversMap) {
+ entry.second = 0;
+ BOOST_CHECK_EQUAL(entry.first->weight, 1);
+ }
+
+ /* reset */
+ for (auto& entry : serversMap) {
+ entry.second = 0;
+ BOOST_CHECK_EQUAL(entry.first->weight, 1);
+ }
+ /* change the weight of the last server to 100, default is 1 */
+ servers.at(servers.size()-1).second->weight = 100;
+
+ for (size_t idx = 0; idx < 1000; idx++) {
+ auto server = getSelectedBackendFromPolicy(pol, servers, dq);
+ BOOST_REQUIRE(serversMap.count(server) == 1);
+ ++serversMap[server];
+ }
+
+ total = 0;
+ uint64_t totalW = 0;
+ for (const auto& entry : serversMap) {
+ total += entry.second;
+ totalW += entry.first->weight;
+ }
+ BOOST_CHECK_EQUAL(total, 1000);
+ auto last = servers.at(servers.size()-1).second;
+ const auto got = serversMap[last];
+ float expected = (1000 * 1.0 * last->weight) / totalW;
+ BOOST_CHECK_GT(got, expected / 2);
+ BOOST_CHECK_LT(got, expected * 2);
+}
+
+BOOST_AUTO_TEST_CASE(test_whashed) {
+ std::vector<DNSName> names;
+ names.reserve(1000);
+ for (size_t idx = 0; idx < 1000; idx++) {
+ names.push_back(DNSName("powerdns-" + std::to_string(idx) + ".com."));
+ }
+
+ ServerPolicy pol{"whashed", whashed, false};
+ ServerPolicy::NumberedServerVector servers;
+ std::map<std::shared_ptr<DownstreamState>, uint64_t> serversMap;
+ for (size_t idx = 1; idx <= 10; idx++) {
+ servers.push_back({ idx + 1, std::make_shared<DownstreamState>(ComboAddress("192.0.2." + std::to_string(idx) + ":53")) });
+ serversMap[servers.at(idx - 1).second] = 0;
+ servers.at(idx - 1).second->setUp();
+ }
+
+ for (const auto& name : names) {
+ auto dq = getDQ(&name);
+ auto server = getSelectedBackendFromPolicy(pol, servers, dq);
+ BOOST_REQUIRE(serversMap.count(server) == 1);
+ ++serversMap[server];
+ }
+ uint64_t total = 0;
+ for (const auto& entry : serversMap) {
+ BOOST_CHECK_GT(entry.second, 0);
+ BOOST_CHECK_GT(entry.second, (names.size() / servers.size() / 2));
+ BOOST_CHECK_LT(entry.second, (names.size() / servers.size() * 2));
+ total += entry.second;
+ }
+ BOOST_CHECK_EQUAL(total, names.size());
+
+ /* reset */
+ for (auto& entry : serversMap) {
+ entry.second = 0;
+ BOOST_CHECK_EQUAL(entry.first->weight, 1);
+ }
+
+ /* request 1000 times the same name, we should go to the same server every time */
+ {
+ auto dq = getDQ(&names.at(0));
+ auto server = getSelectedBackendFromPolicy(pol, servers, dq);
+ for (size_t idx = 0; idx < 1000; idx++) {
+ BOOST_CHECK(getSelectedBackendFromPolicy(pol, servers, dq) == server);
+ }
+ }
+
+ /* reset */
+ for (auto& entry : serversMap) {
+ entry.second = 0;
+ BOOST_CHECK_EQUAL(entry.first->weight, 1);
+ }
+ /* change the weight of the last server to 100, default is 1 */
+ servers.at(servers.size()-1).second->setWeight(100);
+
+ for (const auto& name : names) {
+ auto dq = getDQ(&name);
+ auto server = getSelectedBackendFromPolicy(pol, servers, dq);
+ BOOST_REQUIRE(serversMap.count(server) == 1);
+ ++serversMap[server];
+ }
+
+ total = 0;
+ uint64_t totalW = 0;
+ for (const auto& entry : serversMap) {
+ total += entry.second;
+ totalW += entry.first->weight;
+ }
+ BOOST_CHECK_EQUAL(total, names.size());
+ auto last = servers.at(servers.size()-1).second;
+ const auto got = serversMap[last];
+ float expected = (names.size() * 1.0 * last->weight) / totalW;
+ BOOST_CHECK_GT(got, expected / 2);
+ BOOST_CHECK_LT(got, expected * 2);
+}
+
+BOOST_AUTO_TEST_CASE(test_chashed) {
+ bool existingVerboseValue = g_verbose;
+ g_verbose = false;
+
+ std::vector<DNSName> names;
+ names.reserve(1000);
+ for (size_t idx = 0; idx < 1000; idx++) {
+ names.push_back(DNSName("powerdns-" + std::to_string(idx) + ".com."));
+ }
+
+ ServerPolicy pol{"chashed", chashed, false};
+ ServerPolicy::NumberedServerVector servers;
+ std::map<std::shared_ptr<DownstreamState>, uint64_t> serversMap;
+ for (size_t idx = 1; idx <= 10; idx++) {
+ servers.push_back({ idx + 1, std::make_shared<DownstreamState>(ComboAddress("192.0.2." + std::to_string(idx) + ":53")) });
+ serversMap[servers.at(idx - 1).second] = 0;
+ servers.at(idx - 1).second->setUp();
+ /* we need to have a weight of at least 1000 to get an optimal repartition with the consistent hashing algo */
+ servers.at(idx - 1).second->setWeight(1000);
+ /* make sure that the hashes have been computed */
+ servers.at(idx - 1).second->hash();
+ }
+
+ for (const auto& name : names) {
+ auto dq = getDQ(&name);
+ auto server = getSelectedBackendFromPolicy(pol, servers, dq);
+ BOOST_REQUIRE(serversMap.count(server) == 1);
+ ++serversMap[server];
+ }
+ uint64_t total = 0;
+ for (const auto& entry : serversMap) {
+ BOOST_CHECK_GT(entry.second, 0);
+ BOOST_CHECK_GT(entry.second, (names.size() / servers.size() / 2));
+ BOOST_CHECK_LT(entry.second, (names.size() / servers.size() * 2));
+ total += entry.second;
+ }
+ BOOST_CHECK_EQUAL(total, names.size());
+
+ /* reset */
+ for (auto& entry : serversMap) {
+ entry.second = 0;
+ BOOST_CHECK_EQUAL(entry.first->weight, 1000);
+ }
+
+ /* request 1000 times the same name, we should go to the same server every time */
+ {
+ auto dq = getDQ(&names.at(0));
+ auto server = getSelectedBackendFromPolicy(pol, servers, dq);
+ for (size_t idx = 0; idx < 1000; idx++) {
+ BOOST_CHECK(getSelectedBackendFromPolicy(pol, servers, dq) == server);
+ }
+ }
+
+ /* reset */
+ for (auto& entry : serversMap) {
+ entry.second = 0;
+ BOOST_CHECK_EQUAL(entry.first->weight, 1000);
+ }
+ /* change the weight of the last server to 100000, others stay at 1000 */
+ servers.at(servers.size()-1).second->setWeight(100000);
+
+ for (const auto& name : names) {
+ auto dq = getDQ(&name);
+ auto server = getSelectedBackendFromPolicy(pol, servers, dq);
+ BOOST_REQUIRE(serversMap.count(server) == 1);
+ ++serversMap[server];
+ }
+
+ total = 0;
+ uint64_t totalW = 0;
+ for (const auto& entry : serversMap) {
+ total += entry.second;
+ totalW += entry.first->weight;
+ }
+ BOOST_CHECK_EQUAL(total, names.size());
+ auto last = servers.at(servers.size()-1).second;
+ const auto got = serversMap[last];
+ float expected = (names.size() * 1.0 * last->weight) / totalW;
+ BOOST_CHECK_GT(got, expected / 2);
+ BOOST_CHECK_LT(got, expected * 2);
+
+ g_verbose = existingVerboseValue;
+}
+
+BOOST_AUTO_TEST_SUITE_END()