return servers.at(candidates.at((counter++) % candidates.size()) - 1).second;
}
+shared_ptr<DownstreamState> orderedWrandUntag(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq)
+{
+ if (servers.empty()) {
+ return shared_ptr<DownstreamState>();
+ }
+
+ ServerPolicy::NumberedServerVector candidates;
+ candidates.reserve(servers.size());
+
+ int curOrder = std::numeric_limits<int>::max();
+ unsigned int startIndex = 0;
+ unsigned int curNumber = 1;
+
+ for (auto& d : servers) {
+ if (d.second->isUp() && d.second->d_config.order <= curOrder && (!dq->ids.qTag || dq->ids.qTag->count(d.second->getNameWithAddr()) == 0)) {
+ if (d.second->d_config.order < curOrder) {
+ curOrder = d.second->d_config.order;
+ startIndex = candidates.end() - candidates.begin();
+ curNumber = 1;
+ }
+ candidates.push_back(ServerPolicy::NumberedServer(curNumber++, d.second));
+ }
+ }
+
+ if (candidates.empty()) {
+ return shared_ptr<DownstreamState>();
+ }
+
+ ServerPolicy::NumberedServerVector selected(candidates.begin() + startIndex, candidates.end());
+ return wrandom(selected, dq);
+}
+
std::shared_ptr<const ServerPolicy::NumberedServerVector> getDownstreamCandidates(const std::string& poolName)
{
std::shared_ptr<ServerPool> pool = getPool(poolName);
std::make_shared<ServerPolicy>("wrandom", wrandom, false),
std::make_shared<ServerPolicy>("whashed", whashed, false),
std::make_shared<ServerPolicy>("chashed", chashed, false),
+ std::make_shared<ServerPolicy>("orderedWrandUntag", orderedWrandUntag, false),
std::make_shared<ServerPolicy>("leastOutstanding", leastOutstanding, false)};
return s_policies;
}
class ServerPolicy
{
public:
+ template <class T>
+ using Numbered = std::pair<unsigned int, T>;
+ using NumberedServer = Numbered<shared_ptr<DownstreamState>>;
template <class T>
using NumberedVector = std::vector<std::pair<unsigned int, T>>;
using NumberedServerVector = NumberedVector<shared_ptr<DownstreamState>>;
std::shared_ptr<DownstreamState> chashed(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq);
std::shared_ptr<DownstreamState> chashedFromHash(const ServerPolicy::NumberedServerVector& servers, size_t hash);
std::shared_ptr<DownstreamState> roundrobin(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq);
+std::shared_ptr<DownstreamState> orderedWrandUntag(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq);
#include <unordered_map>
- name: "weight"
type: "u32"
default: 1
- description: "The weight of this server, used by the `wrandom`, `whashed` and `chashed` policies, default: 1. Supported values are a minimum of 1, and a maximum of 2147483647"
+ description: "The weight of this server, used by the `wrandom`, `whashed`, `chashed` and `orderedWrandUntag` policies, default: 1. Supported values are a minimum of 1, and a maximum of 2147483647"
- name: "udp_timeout"
type: "u8"
default: 0
lua-name: "setWeightedBalancingFactor"
internal-field-name: "d_weightedBalancingFactor"
runtime-configurable: false
- description: "Set the maximum imbalance between the number of outstanding queries intended for a given server, based on its weight, and the actual number, when using the ``whashed`` or ``wrandom`` load-balancing policy. Default is 0, which disables the bounded-load algorithm"
+ description: "Set the maximum imbalance between the number of outstanding queries intended for a given server, based on its weight, and the actual number, when using the ``whashed`` or ``wrandom`` or ``orderedWrandUntag`` load-balancing policy. Default is 0, which disables the bounded-load algorithm"
- name: "consistent_hashing_balancing_factor"
type: "f64"
default: 0.0
The last available policy is ``roundrobin``, which indiscriminately sends each query to the next server that is up.
If all servers are down, the policy will still select one server by default. Setting :func:`setRoundRobinFailOnNoServer` to ``true`` will change this behavior.
+``orderedWrandUntag``
+~~~~~~~~~~~~~~~~~~~~~
+
+``orderedWrandUntag`` is another weighted policy with additional server filtering:
+
+- select the group of server(s) with the lowest ``order`` passed to :func:`newServer`.
+- filter out server(s) that were tagged with key string of :func:`Server:getNameWithAddr` in the query that was set by :func:`DNSQuestion:setTag`. This can be useful to restart a query with a different server, the user is responsible to set the required tag in lua action before calling :func:`DNSResponse:restart`. Initial queries are not impacted by this filtering if no other intentional lua action to set the tag.
+- policy ``wrandom`` is then applied to the selected server(s) above.
+
Lua server policies
-------------------
.. versionadded: 1.5.0
Set the maximum imbalance between the number of outstanding queries intended for a given server, based on its weight,
- and the actual number, when using the ``whashed`` or ``wrandom`` load-balancing policy.
+ and the actual number, when using the ``whashed`` or ``wrandom`` or ``orderedWrandUntag`` load-balancing policy.
Default is 0, which disables the bounded-load algorithm.
.. function:: showPoolServerPolicy(pool)
``qps`` ``number`` "Limit the number of queries per second to ``number``, when using the `firstAvailable` policy"
``order`` ``number`` "The order of this server, used by the `leastOutstanding` and `firstAvailable` policies"
- ``weight`` ``number`` "The weight of this server, used by the `wrandom`, `whashed` and `chashed` policies, default: 1. Supported values are a minimum of 1, and a maximum of 2147483647."
+ ``weight`` ``number`` "The weight of this server, used by the `wrandom`, `whashed`, `chashed` and `orderedWrandUntag` policies, default: 1. Supported values are a minimum of 1, and a maximum of 2147483647."
``udpTimeout`` ``number`` "The timeout (in seconds) of a UDP query attempt"
``pool`` ``string|{string}`` "The pools this server belongs to (unset or empty string means default pool) as a string or table of strings"
``retries`` ``number`` "The number of TCP connection attempts to the backend, for a given query"
- **use_proxy_protocol**: Boolean ``(false)`` - Add a proxy protocol header to the query, passing along the client's IP address and port along with the original destination address and port
- **queries_per_second**: Unsigned integer ``(0)`` - Limit the number of queries per second to ``number``, when using the ``firstAvailable`` policy
- **order**: Unsigned integer ``(1)`` - The order of this server, used by the `leastOutstanding` and `firstAvailable` policies
-- **weight**: Unsigned integer ``(1)`` - The weight of this server, used by the `wrandom`, `whashed` and `chashed` policies, default: 1. Supported values are a minimum of 1, and a maximum of 2147483647
+- **weight**: Unsigned integer ``(1)`` - The weight of this server, used by the `wrandom`, `whashed`, `chashed` and `orderedWrandUntag` policies, default: 1. Supported values are a minimum of 1, and a maximum of 2147483647
- **udp_timeout**: Unsigned integer ``(0)`` - The udp backend query timeout value in seconds, default: 0. Supported values are a minimum of 1, and a maximum of 255. Value greater than 0 will override global UDP timeout setting
- **pools**: Sequence of String ``("")`` - List of pools to place this backend into. By default a server is placed in the default ("") pool
- **tcp**: :ref:`OutgoingTcpConfiguration <yaml-settings-OutgoingTcpConfiguration>` - TCP-related settings for a backend
- **default_policy**: String ``(leastOutstanding)`` - Set the default server selection policy
- **servfail_on_no_server**: Boolean ``(false)`` - If set, return a ServFail when no servers are available, instead of the default behaviour of dropping the query
- **round_robin_servfail_on_no_server**: Boolean ``(false)`` - By default the roundrobin load-balancing policy will still try to select a backend even if all backends are currently down. Setting this to true will make the policy fail and return that no server is available instead
-- **weighted_balancing_factor**: Double ``(0.0)`` - Set the maximum imbalance between the number of outstanding queries intended for a given server, based on its weight, and the actual number, when using the ``whashed`` or ``wrandom`` load-balancing policy. Default is 0, which disables the bounded-load algorithm
+- **weighted_balancing_factor**: Double ``(0.0)`` - Set the maximum imbalance between the number of outstanding queries intended for a given server, based on its weight, and the actual number, when using the ``whashed`` or ``wrandom`` or ``orderedWrandUntag`` load-balancing policy. Default is 0, which disables the bounded-load algorithm
- **consistent_hashing_balancing_factor**: Double ``(0.0)`` - Set the maximum imbalance between the number of outstanding queries intended for a given server, based on its weight, and the actual number, when using the ``chashed`` consistent hashing load-balancing policy. Default is 0, which disables the bounded-load algorithm
- **custom_policies**: Sequence of :ref:`CustomLoadBalancingPolicyConfiguration <yaml-settings-CustomLoadBalancingPolicyConfiguration>` - Custom load-balancing policies implemented in Lua
- **hash_perturbation**: Unsigned integer ``(0)`` - Set the hash perturbation value to be used in the ``whashed`` policy instead of a random one, allowing to have consistent ``whashed`` results on different instances
- **cookie_hashing**: Boolean ``(false)`` - If true, EDNS Cookie values will be hashed, resulting in separate entries for different cookies in the packet cache. This is required if the backend is sending answers with EDNS Cookies, otherwise a client might receive an answer with the wrong cookie
- **maximum_entry_size**: Unsigned integer ``(4096)`` - The maximum size, in bytes, of a DNS packet that can be inserted into the packet cache
- **options_to_skip**: Sequence of String ``("")`` - Extra list of EDNS option codes to skip when hashing the packet (if ``cookie_hashing`` above is false, EDNS cookie option number will be added to this list internally)
-- **payload_ranks**: Sequence of Unsigned integer ``([])`` - List of payload size used when hashing the packet. The list will be sorted in ascend order and searched to find a lower bound value for the payload size in the packet. If found then it will be used for packet hashing. Values less than 512 or greater than ``maximum_entry_size`` above will be discarded. This option is to enable cache entry sharing between clients using different payload sizes when needed
+- **payload_ranks**: Sequence of Unsigned integer ``([])`` - List of payload size used when hashing the packet. The list will be sorted in ascending order and searched to find a lower bound value for the payload size in the packet. If found then it will be used for packet hashing. Values less than 512 or greater than ``maximum_entry_size`` above will be discarded. This option is to enable cache entry sharing between clients using different payload sizes when needed
.. _yaml-settings-PoolConfiguration:
sender = getattr(self, method)
(_, receivedResponse) = sender(query, response=None, useQueue=False)
self.assertEqual(expectedResponse, receivedResponse)
+
+
+class QueryCounter:
+
+ def __init__(self, name):
+ self.name = name
+ self.qcnt = 0
+
+ def __call__(self):
+ return self.qcnt
+
+ def reset(self):
+ self.qcnt = 0
+
+ def create_cb(self):
+ def callback(request):
+ self.qcnt += 1
+ response = dns.message.make_response(request)
+ rrset = dns.rrset.from_text(request.question[0].name,
+ 3600,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+ response.answer.append(rrset)
+ return response.to_wire()
+ return callback
+
+class TestRoutingOrderedWRandUntag(DNSDistTest):
+
+ _queryCounts = {}
+
+ _consoleKey = DNSDistTest.generateConsoleKey()
+ _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
+ _testServer1Port = pickAvailablePort()
+ _testServer2Port = pickAvailablePort()
+ _testServer3Port = pickAvailablePort()
+ _testServer4Port = pickAvailablePort()
+ _serverPorts = [_testServer1Port, _testServer2Port, _testServer3Port, _testServer4Port]
+ _config_params = ['_consoleKeyB64', '_consolePort', '_testServer1Port', '_testServer2Port', '_testServer3Port', '_testServer4Port']
+ _config_template = """
+ setKey("%s")
+ controlSocket("127.0.0.1:%d")
+ setServerPolicy(orderedWrandUntag)
+ s11 = newServer{name="s11", address="127.0.0.1:%s", order=1, weight=1}
+ s11:setUp()
+ s12 = newServer{name="s12", address="127.0.0.1:%s", order=1, weight=2}
+ s12:setUp()
+ s21 = newServer{name="s21", address="127.0.0.1:%s", order=2, weight=1}
+ s21:setUp()
+ s22 = newServer{name="s22", address="127.0.0.1:%s", order=2, weight=2}
+ s22:setUp()
+ function setServerDown(name)
+ for _, s in ipairs(getServers()) do
+ if s.name == name then
+ s:setDown()
+ end
+ end
+ end
+ """
+
+ @classmethod
+ def startResponders(cls):
+ print("Launching responders..")
+
+ for i, name in enumerate(['s11', 's12', 's21', 's22']):
+ cls._queryCounts[name] = QueryCounter(name)
+ cb = cls._queryCounts[name].create_cb()
+ responder = threading.Thread(name=name, target=cls.UDPResponder, args=[cls._serverPorts[i], cls._toResponderQueue, cls._fromResponderQueue, False, cb])
+ responder.daemon = True
+ responder.start()
+
+ def setDown(self, name):
+ self.sendConsoleCommand("setServerDown('{}')".format(name))
+
+ def testDefault(self):
+ """
+ Routing: orderedWrandUntag
+
+ Send multiple A queries to "ordered.wrand.routing.tests.powerdns.com.",
+ check that dnsdist routes based on order first then weighted.
+ """
+ numberOfQueries = 100
+ name = 'ordered.wrand.routing.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ expectedResponse = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 3600,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+ expectedResponse.answer.append(rrset)
+
+ # send 100 queries
+ for _ in range(numberOfQueries):
+ (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEqual(expectedResponse, receivedResponse)
+
+ # Only order 1 servers get queries and weighted
+ self.assertGreater(self._queryCounts['s12'](), numberOfQueries * 0.50)
+ self.assertLess(self._queryCounts['s11'](), numberOfQueries * 0.50)
+ self.assertEqual(self._queryCounts['s21'](), 0)
+ self.assertEqual(self._queryCounts['s22'](), 0)
+
+ # reset counters
+ for name in ['s11', 's12', 's21', 's22']:
+ self._queryCounts[name].reset()
+
+ self.setDown('s11')
+
+ # send 100 queries
+ for _ in range(numberOfQueries):
+ (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEqual(expectedResponse, receivedResponse)
+
+ # queries shall arrive 's12' only
+ self.assertEqual(self._queryCounts['s11'](), 0)
+ self.assertEqual(self._queryCounts['s12'](), numberOfQueries)
+ self.assertEqual(self._queryCounts['s21'](), 0)
+ self.assertEqual(self._queryCounts['s22'](), 0)
+
+ # reset counters
+ for name in ['s11', 's12', 's21', 's22']:
+ self._queryCounts[name].reset()
+
+ self.setDown('s12')
+
+ # send 100 queries
+ for _ in range(numberOfQueries):
+ (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEqual(expectedResponse, receivedResponse)
+
+ # queries now shall be sent to order 2 servers and weighted
+ self.assertEqual(self._queryCounts['s11'](), 0)
+ self.assertEqual(self._queryCounts['s12'](), 0)
+ self.assertLess(self._queryCounts['s21'](), numberOfQueries * 0.50)
+ self.assertGreater(self._queryCounts['s22'](), numberOfQueries * 0.50)