]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: add route policy of first ordered then weighted
authorOliver Chen <oliver.chen@nokia-sbell.com>
Fri, 13 Jun 2025 11:20:48 +0000 (11:20 +0000)
committerOliver Chen <oliver.chen@nokia-sbell.com>
Fri, 13 Jun 2025 11:20:48 +0000 (11:20 +0000)
User may require two levels of routing policy to select downstream
servers. First choose the least ordered, then distribute queries
according to weights among the same ordered servers. It also added
special filtering on selecting servers for query restart. If user
sets the required tag in the restarted query then the policy will
not select server(s) that had been tried before.

pdns/dnsdistdist/dnsdist-lbpolicies.cc
pdns/dnsdistdist/dnsdist-lbpolicies.hh
pdns/dnsdistdist/dnsdist-settings-definitions.yml
pdns/dnsdistdist/docs/guides/serverselection.rst
pdns/dnsdistdist/docs/reference/config.rst
pdns/dnsdistdist/docs/reference/yaml-settings.rst
regression-tests.dnsdist/test_Routing.py

index b28025d8b8dc69386aed7cdddcc35946776d7de7..2da5fe7675ff2f4148752bdbe4f0f4fd57fe6e16 100644 (file)
@@ -259,6 +259,38 @@ shared_ptr<DownstreamState> roundrobin(const ServerPolicy::NumberedServerVector&
   return servers.at(candidates.at((counter++) % candidates.size()) - 1).second;
 }
 
+shared_ptr<DownstreamState> orderedWrandUntag(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq)
+{
+  if (servers.empty()) {
+    return shared_ptr<DownstreamState>();
+  }
+
+  ServerPolicy::NumberedServerVector candidates;
+  candidates.reserve(servers.size());
+
+  int curOrder = std::numeric_limits<int>::max();
+  unsigned int startIndex = 0;
+  unsigned int curNumber = 1;
+
+  for (auto& d : servers) {
+    if (d.second->isUp() && d.second->d_config.order <= curOrder && (!dq->ids.qTag || dq->ids.qTag->count(d.second->getNameWithAddr()) == 0)) {
+      if (d.second->d_config.order < curOrder) {
+          curOrder = d.second->d_config.order;
+          startIndex = candidates.end() - candidates.begin();
+          curNumber = 1;
+      }
+      candidates.push_back(ServerPolicy::NumberedServer(curNumber++, d.second));
+    }
+  }
+
+  if (candidates.empty()) {
+    return shared_ptr<DownstreamState>();
+  }
+
+  ServerPolicy::NumberedServerVector selected(candidates.begin() + startIndex, candidates.end());
+  return wrandom(selected, dq);
+}
+
 std::shared_ptr<const ServerPolicy::NumberedServerVector> getDownstreamCandidates(const std::string& poolName)
 {
   std::shared_ptr<ServerPool> pool = getPool(poolName);
@@ -415,6 +447,7 @@ const std::vector<std::shared_ptr<ServerPolicy>>& getBuiltInPolicies()
     std::make_shared<ServerPolicy>("wrandom", wrandom, false),
     std::make_shared<ServerPolicy>("whashed", whashed, false),
     std::make_shared<ServerPolicy>("chashed", chashed, false),
+    std::make_shared<ServerPolicy>("orderedWrandUntag", orderedWrandUntag, false),
     std::make_shared<ServerPolicy>("leastOutstanding", leastOutstanding, false)};
   return s_policies;
 }
index d975988742ee93bfc380c5789e48c5912a9cf18b..c8770580bd364939aea62a0c5bf5294ce20c2e50 100644 (file)
@@ -33,6 +33,9 @@ struct PerThreadPoliciesState;
 class ServerPolicy
 {
 public:
+  template <class T>
+  using Numbered = std::pair<unsigned int, T>;
+  using NumberedServer = Numbered<shared_ptr<DownstreamState>>;
   template <class T>
   using NumberedVector = std::vector<std::pair<unsigned int, T>>;
   using NumberedServerVector = NumberedVector<shared_ptr<DownstreamState>>;
@@ -106,6 +109,7 @@ std::shared_ptr<DownstreamState> whashedFromHash(const ServerPolicy::NumberedSer
 std::shared_ptr<DownstreamState> chashed(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq);
 std::shared_ptr<DownstreamState> chashedFromHash(const ServerPolicy::NumberedServerVector& servers, size_t hash);
 std::shared_ptr<DownstreamState> roundrobin(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq);
+std::shared_ptr<DownstreamState> orderedWrandUntag(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq);
 
 #include <unordered_map>
 
index 84858a572b1abedc0f47f4f1c61f3da20f8d9397..fe3faad27e82ce62ccccadc32008dc1a03dae35e 100644 (file)
@@ -1349,7 +1349,7 @@ backend:
     - name: "weight"
       type: "u32"
       default: 1
-      description: "The weight of this server, used by the `wrandom`, `whashed` and `chashed` policies, default: 1. Supported values are a minimum of 1, and a maximum of 2147483647"
+      description: "The weight of this server, used by the `wrandom`, `whashed`, `chashed` and `orderedWrandUntag` policies, default: 1. Supported values are a minimum of 1, and a maximum of 2147483647"
     - name: "udp_timeout"
       type: "u8"
       default: 0
@@ -2079,7 +2079,7 @@ load_balancing_policies:
       lua-name: "setWeightedBalancingFactor"
       internal-field-name: "d_weightedBalancingFactor"
       runtime-configurable: false
-      description: "Set the maximum imbalance between the number of outstanding queries intended for a given server, based on its weight, and the actual number, when using the ``whashed`` or ``wrandom`` load-balancing policy. Default is 0, which disables the bounded-load algorithm"
+      description: "Set the maximum imbalance between the number of outstanding queries intended for a given server, based on its weight, and the actual number, when using the ``whashed`` or ``wrandom`` or ``orderedWrandUntag`` load-balancing policy. Default is 0, which disables the bounded-load algorithm"
     - name: "consistent_hashing_balancing_factor"
       type: "f64"
       default: 0.0
index 574dfb0f0785862a55c2e3ec44e83357a4d32d39..c7f1112cefb4aed73a0ba7ac0f12dacd01a49b98 100644 (file)
@@ -85,6 +85,15 @@ For example, if we have two servers, with respective weights of 1 and 4, we expe
 The last available policy is ``roundrobin``, which indiscriminately sends each query to the next server that is up.
 If all servers are down, the policy will still select one server by default. Setting :func:`setRoundRobinFailOnNoServer` to ``true`` will change this behavior.
 
+``orderedWrandUntag``
+~~~~~~~~~~~~~~~~~~~~~
+
+``orderedWrandUntag`` is another weighted policy with additional server filtering:
+
+- select the group of server(s) with the lowest ``order`` passed to :func:`newServer`.
+- filter out server(s) that were tagged with key string of :func:`Server:getNameWithAddr` in the query that was set by :func:`DNSQuestion:setTag`. This can be useful to restart a query with a different server, the user is responsible to set the required tag in lua action before calling :func:`DNSResponse:restart`. Initial queries are not impacted by this filtering if no other intentional lua action to set the tag.
+- policy ``wrandom`` is then applied to the selected server(s) above.
+
 Lua server policies
 -------------------
 
@@ -300,7 +309,7 @@ Functions
   .. versionadded: 1.5.0
 
   Set the maximum imbalance between the number of outstanding queries intended for a given server, based on its weight,
-  and the actual number, when using the ``whashed`` or ``wrandom`` load-balancing policy.
+  and the actual number, when using the ``whashed`` or ``wrandom`` or ``orderedWrandUntag`` load-balancing policy.
   Default is 0, which disables the bounded-load algorithm.
 
 .. function:: showPoolServerPolicy(pool)
index 8adffc1fb681bba229456f402c410ea50fe3ea88..a3934abd2829cb68761a6c326db3343286aa1e3b 100644 (file)
@@ -694,7 +694,7 @@ Servers
 
     ``qps``                                  ``number``            "Limit the number of queries per second to ``number``, when using the `firstAvailable` policy"
     ``order``                                ``number``            "The order of this server, used by the `leastOutstanding` and `firstAvailable` policies"
-    ``weight``                               ``number``            "The weight of this server, used by the `wrandom`, `whashed` and `chashed` policies, default: 1. Supported values are a minimum of 1, and a maximum of 2147483647."
+    ``weight``                               ``number``            "The weight of this server, used by the `wrandom`, `whashed`, `chashed` and `orderedWrandUntag` policies, default: 1. Supported values are a minimum of 1, and a maximum of 2147483647."
     ``udpTimeout``                           ``number``            "The timeout (in seconds) of a UDP query attempt"
     ``pool``                                 ``string|{string}``   "The pools this server belongs to (unset or empty string means default pool) as a string or table of strings"
     ``retries``                              ``number``            "The number of TCP connection attempts to the backend, for a given query"
index 7b39250dc60430735b46c644a255a67915f5dad2..58dc8bf1b588b19b2d3058819b043fe8a3db0cec 100644 (file)
@@ -77,7 +77,7 @@ Generic settings for backends
 - **use_proxy_protocol**: Boolean ``(false)`` - Add a proxy protocol header to the query, passing along the client's IP address and port along with the original destination address and port
 - **queries_per_second**: Unsigned integer ``(0)`` - Limit the number of queries per second to ``number``, when using the ``firstAvailable`` policy
 - **order**: Unsigned integer ``(1)`` - The order of this server, used by the `leastOutstanding` and `firstAvailable` policies
-- **weight**: Unsigned integer ``(1)`` - The weight of this server, used by the `wrandom`, `whashed` and `chashed` policies, default: 1. Supported values are a minimum of 1, and a maximum of 2147483647
+- **weight**: Unsigned integer ``(1)`` - The weight of this server, used by the `wrandom`, `whashed`, `chashed` and `orderedWrandUntag` policies, default: 1. Supported values are a minimum of 1, and a maximum of 2147483647
 - **udp_timeout**: Unsigned integer ``(0)`` - The udp backend query timeout value in seconds, default: 0. Supported values are a minimum of 1, and a maximum of 255. Value greater than 0 will override global UDP timeout setting
 - **pools**: Sequence of String ``("")`` - List of pools to place this backend into. By default a server is placed in the default ("") pool
 - **tcp**: :ref:`OutgoingTcpConfiguration <yaml-settings-OutgoingTcpConfiguration>` - TCP-related settings for a backend
@@ -631,7 +631,7 @@ Setting for load-balancing policies
 - **default_policy**: String ``(leastOutstanding)`` - Set the default server selection policy
 - **servfail_on_no_server**: Boolean ``(false)`` - If set, return a ServFail when no servers are available, instead of the default behaviour of dropping the query
 - **round_robin_servfail_on_no_server**: Boolean ``(false)`` - By default the roundrobin load-balancing policy will still try to select a backend even if all backends are currently down. Setting this to true will make the policy fail and return that no server is available instead
-- **weighted_balancing_factor**: Double ``(0.0)`` - Set the maximum imbalance between the number of outstanding queries intended for a given server, based on its weight, and the actual number, when using the ``whashed`` or ``wrandom`` load-balancing policy. Default is 0, which disables the bounded-load algorithm
+- **weighted_balancing_factor**: Double ``(0.0)`` - Set the maximum imbalance between the number of outstanding queries intended for a given server, based on its weight, and the actual number, when using the ``whashed`` or ``wrandom`` or ``orderedWrandUntag`` load-balancing policy. Default is 0, which disables the bounded-load algorithm
 - **consistent_hashing_balancing_factor**: Double ``(0.0)`` - Set the maximum imbalance between the number of outstanding queries intended for a given server, based on its weight, and the actual number, when using the ``chashed`` consistent hashing load-balancing policy. Default is 0, which disables the bounded-load algorithm
 - **custom_policies**: Sequence of :ref:`CustomLoadBalancingPolicyConfiguration <yaml-settings-CustomLoadBalancingPolicyConfiguration>` - Custom load-balancing policies implemented in Lua
 - **hash_perturbation**: Unsigned integer ``(0)`` - Set the hash perturbation value to be used in the ``whashed`` policy instead of a random one, allowing to have consistent ``whashed`` results on different instances
@@ -744,7 +744,7 @@ Packet-cache settings
 - **cookie_hashing**: Boolean ``(false)`` - If true, EDNS Cookie values will be hashed, resulting in separate entries for different cookies in the packet cache. This is required if the backend is sending answers with EDNS Cookies, otherwise a client might receive an answer with the wrong cookie
 - **maximum_entry_size**: Unsigned integer ``(4096)`` - The maximum size, in bytes, of a DNS packet that can be inserted into the packet cache
 - **options_to_skip**: Sequence of String ``("")`` - Extra list of EDNS option codes to skip when hashing the packet (if ``cookie_hashing`` above is false, EDNS cookie option number will be added to this list internally)
-- **payload_ranks**: Sequence of Unsigned integer ``([])`` - List of payload size used when hashing the packet. The list will be sorted in ascend order and searched to find a lower bound value for the payload size in the packet. If found then it will be used for packet hashing. Values less than 512 or greater than ``maximum_entry_size`` above will be discarded. This option is to enable cache entry sharing between clients using different payload sizes when needed
+- **payload_ranks**: Sequence of Unsigned integer ``([])`` - List of payload size used when hashing the packet. The list will be sorted in ascending order and searched to find a lower bound value for the payload size in the packet. If found then it will be used for packet hashing. Values less than 512 or greater than ``maximum_entry_size`` above will be discarded. This option is to enable cache entry sharing between clients using different payload sizes when needed
 
 
 .. _yaml-settings-PoolConfiguration:
index cabbf21ef984439cfa64ebe5e9f26f46df800014..c8e65cf5402fb9a41110a583fa8ee1b7e5560546 100644 (file)
@@ -1003,3 +1003,142 @@ class TestRoutingLuaFFILBNoServer(DNSDistTest):
             sender = getattr(self, method)
             (_, receivedResponse) = sender(query, response=None, useQueue=False)
             self.assertEqual(expectedResponse, receivedResponse)
+
+
+class QueryCounter:
+
+    def __init__(self, name):
+        self.name = name
+        self.qcnt = 0
+
+    def __call__(self):
+        return self.qcnt
+
+    def reset(self):
+        self.qcnt = 0
+
+    def create_cb(self):
+        def callback(request):
+            self.qcnt += 1
+            response = dns.message.make_response(request)
+            rrset = dns.rrset.from_text(request.question[0].name,
+                                3600,
+                                dns.rdataclass.IN,
+                                dns.rdatatype.A,
+                                '127.0.0.1')
+            response.answer.append(rrset)
+            return response.to_wire()
+        return callback
+
+class TestRoutingOrderedWRandUntag(DNSDistTest):
+
+    _queryCounts = {}
+
+    _consoleKey = DNSDistTest.generateConsoleKey()
+    _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
+    _testServer1Port = pickAvailablePort()
+    _testServer2Port = pickAvailablePort()
+    _testServer3Port = pickAvailablePort()
+    _testServer4Port = pickAvailablePort()
+    _serverPorts = [_testServer1Port, _testServer2Port, _testServer3Port, _testServer4Port]
+    _config_params = ['_consoleKeyB64', '_consolePort', '_testServer1Port', '_testServer2Port', '_testServer3Port', '_testServer4Port']
+    _config_template = """
+    setKey("%s")
+    controlSocket("127.0.0.1:%d")
+    setServerPolicy(orderedWrandUntag)
+    s11 = newServer{name="s11", address="127.0.0.1:%s", order=1, weight=1}
+    s11:setUp()
+    s12 = newServer{name="s12", address="127.0.0.1:%s", order=1, weight=2}
+    s12:setUp()
+    s21 = newServer{name="s21", address="127.0.0.1:%s", order=2, weight=1}
+    s21:setUp()
+    s22 = newServer{name="s22", address="127.0.0.1:%s", order=2, weight=2}
+    s22:setUp()
+    function setServerDown(name)
+        for _, s in ipairs(getServers()) do
+            if s.name == name then
+                s:setDown()
+            end
+        end
+    end
+    """
+
+    @classmethod
+    def startResponders(cls):
+        print("Launching responders..")
+
+        for i, name in enumerate(['s11', 's12', 's21', 's22']):
+            cls._queryCounts[name] = QueryCounter(name)
+            cb = cls._queryCounts[name].create_cb()
+            responder = threading.Thread(name=name, target=cls.UDPResponder, args=[cls._serverPorts[i], cls._toResponderQueue, cls._fromResponderQueue, False, cb])
+            responder.daemon = True
+            responder.start()
+
+    def setDown(self, name):
+        self.sendConsoleCommand("setServerDown('{}')".format(name))
+
+    def testDefault(self):
+        """
+        Routing: orderedWrandUntag
+
+        Send multiple A queries to "ordered.wrand.routing.tests.powerdns.com.",
+        check that dnsdist routes based on order first then weighted.
+        """
+        numberOfQueries = 100
+        name = 'ordered.wrand.routing.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        expectedResponse = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+        expectedResponse.answer.append(rrset)
+
+        # send 100 queries
+        for _ in range(numberOfQueries):
+            (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEqual(expectedResponse, receivedResponse)
+
+        # Only order 1 servers get queries and weighted
+        self.assertGreater(self._queryCounts['s12'](),  numberOfQueries * 0.50)
+        self.assertLess(self._queryCounts['s11'](),  numberOfQueries * 0.50)
+        self.assertEqual(self._queryCounts['s21'](),  0)
+        self.assertEqual(self._queryCounts['s22'](),  0)
+
+        # reset counters
+        for name in ['s11', 's12', 's21', 's22']:
+            self._queryCounts[name].reset()
+
+        self.setDown('s11')
+
+        # send 100 queries
+        for _ in range(numberOfQueries):
+            (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEqual(expectedResponse, receivedResponse)
+
+        # queries shall arrive 's12' only
+        self.assertEqual(self._queryCounts['s11'](),  0)
+        self.assertEqual(self._queryCounts['s12'](),  numberOfQueries)
+        self.assertEqual(self._queryCounts['s21'](),  0)
+        self.assertEqual(self._queryCounts['s22'](),  0)
+
+        # reset counters
+        for name in ['s11', 's12', 's21', 's22']:
+            self._queryCounts[name].reset()
+
+        self.setDown('s12')
+
+        # send 100 queries
+        for _ in range(numberOfQueries):
+            (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+            self.assertTrue(receivedResponse)
+            self.assertEqual(expectedResponse, receivedResponse)
+
+        # queries now shall be sent to order 2 servers and weighted
+        self.assertEqual(self._queryCounts['s11'](),  0)
+        self.assertEqual(self._queryCounts['s12'](),  0)
+        self.assertLess(self._queryCounts['s21'](),  numberOfQueries * 0.50)
+        self.assertGreater(self._queryCounts['s22'](),  numberOfQueries * 0.50)