]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Don't apply QPS to backend server on cache hits 9999/head
authorRemi Gacogne <remi.gacogne@powerdns.com>
Thu, 21 Jan 2021 16:25:51 +0000 (17:25 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Thu, 21 Jan 2021 16:25:51 +0000 (17:25 +0100)
pdns/dnsdist.cc
pdns/dnsdist.hh
pdns/dnsdistdist/dnsdist-lbpolicies.cc
pdns/dnsdistdist/test-dnsdistlbpolicies_cc.cc
regression-tests.dnsdist/test_Routing.py

index 0731ee9a95a175d36e8c48b59f6138826d19c980..de81b75317170fc974028fdc199f487e9d68cd7e 100644 (file)
@@ -1267,7 +1267,7 @@ ProcessQueryResult processQuery(DNSQuestion& dq, ClientState& cs, LocalHolders&
       addXPF(dq, selectedBackend->xpfRRCode);
     }
 
-    selectedBackend->queries++;
+    selectedBackend->incQueriesCount();
     return ProcessQueryResult::PassToBackend;
   }
   catch (const std::exception& e){
index 56cbcceefa0d8d79404ec6eeac4d9b7f3fbea17f..dfb1a61cdfe64c3f4cb15b41123ca4a4aebb3176 100644 (file)
@@ -446,25 +446,40 @@ public:
   }
 
   bool check(unsigned int rate, unsigned int burst) const // this is not quite fair
+  {
+    if (checkOnly(rate, burst)) {
+      addHit();
+      return true;
+    }
+
+    return false;
+  }
+
+  bool checkOnly(unsigned int rate, unsigned int burst) const // this is not quite fair
   {
     auto delta = d_prev.udiffAndSet();
 
-    if(delta > 0.0) // time, frequently, does go backwards..
+    if (delta > 0.0) { // time, frequently, does go backwards..
       d_tokens += 1.0 * rate * (delta/1000000.0);
+    }
 
-    if(d_tokens > burst) {
+    if (d_tokens > burst) {
       d_tokens = burst;
     }
 
-    bool ret=false;
-    if(d_tokens >= 1.0) { // we need this because burst=1 is weird otherwise
-      ret=true;
-      --d_tokens;
+    bool ret = false;
+    if (d_tokens >= 1.0) { // we need this because burst=1 is weird otherwise
+      ret = true;
     }
 
     return ret;
   }
 
+  virtual void addHit() const
+  {
+    --d_tokens;
+  }
+
   bool seenSince(const struct timespec& cutOff) const
   {
     return cutOff < d_prev.d_start;
@@ -492,35 +507,32 @@ public:
     return d_passthrough ? 0 : d_rate;
   }
 
-  int getPassed() const
+  bool check() const // this is not quite fair
   {
-    return d_passed;
-  }
+    if (d_passthrough) {
+      return true;
+    }
 
-  int getBlocked() const
-  {
-    return d_blocked;
+    return BasicQPSLimiter::check(d_rate, d_burst);
   }
 
-  bool check() const // this is not quite fair
+  bool checkOnly() const
   {
     if (d_passthrough) {
       return true;
     }
 
-    bool ret = BasicQPSLimiter::check(d_rate, d_burst);
-    if (ret) {
-      d_passed++;
-    }
-    else {
-      d_blocked++;
-    }
+    return BasicQPSLimiter::checkOnly(d_rate, d_burst);
+  }
 
-    return ret;
+  void addHit() const override
+  {
+    if (!d_passthrough) {
+      --d_tokens;
+    }
   }
+
 private:
-  mutable unsigned int d_passed{0};
-  mutable unsigned int d_blocked{0};
   unsigned int d_rate;
   unsigned int d_burst;
   bool d_passthrough{true};
@@ -994,6 +1006,13 @@ struct DownstreamState
     tcpAvgQueriesPerConnection = (99.0 * tcpAvgQueriesPerConnection / 100.0) + (nbQueries / 100.0);
     tcpAvgConnectionDuration = (99.0 * tcpAvgConnectionDuration / 100.0) + (durationMs / 100.0);
   }
+
+  void incQueriesCount()
+  {
+    ++queries;
+    qps.addHit();
+  }
+
 private:
   std::string name;
   std::string nameWithAddr;
index 1d75ca0600a88d88d5921c9253e4b47dac212970..882cbfb2a1e4c78436b83247af91e1efaf7ee16d 100644 (file)
@@ -59,8 +59,9 @@ shared_ptr<DownstreamState> leastOutstanding(const ServerPolicy::NumberedServerV
 shared_ptr<DownstreamState> firstAvailable(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq)
 {
   for(auto& d : servers) {
-    if(d.second->isUp() && d.second->qps.check())
+    if (d.second->isUp() && d.second->qps.checkOnly()) {
       return d.second;
+    }
   }
   return leastOutstanding(servers, dq);
 }
index ce76c8bab1f9a631a8b2c3361c9097059ac37bce..ca17ca8e458fa20add410b245428c4d589141b75 100644 (file)
@@ -179,6 +179,43 @@ BOOST_AUTO_TEST_CASE(test_firstAvailable) {
   benchPolicy(pol);
 }
 
+BOOST_AUTO_TEST_CASE(test_firstAvailableWithOrderAndQPS) {
+  auto dq = getDQ();
+  size_t qpsLimit = 10;
+
+  ServerPolicy pol{"firstAvailable", firstAvailable, false};
+  ServerPolicy::NumberedServerVector servers;
+  servers.push_back({ 1, std::make_shared<DownstreamState>(ComboAddress("192.0.2.1:53")) });
+  servers.push_back({ 2, std::make_shared<DownstreamState>(ComboAddress("192.0.2.2:53")) });
+  /* Second server has a higher order, so most queries should be routed to the first (remember that
+     we need to keep them ordered!).
+     However the first server has a QPS limit at 10 qps, so any query above that should be routed 
+     to the second server. */
+  servers.at(0).second->order = 1;
+  servers.at(1).second->order = 2;
+  servers.at(0).second->qps = QPSLimiter(qpsLimit, qpsLimit);
+  /* mark the servers as 'up' */
+  servers.at(0).second->setUp();
+  servers.at(1).second->setUp();
+
+  /* the first queries under the QPS limit should be
+     sent to the first server */
+  for (size_t idx = 0; idx < qpsLimit; idx++) {
+    auto server = pol.getSelectedBackend(servers, dq);
+    BOOST_REQUIRE(server != nullptr);
+    BOOST_CHECK(server == servers.at(0).second);
+    server->incQueriesCount();
+  }
+
+  /* then to the second server */
+  for (size_t idx = 0; idx < 100; idx++) {
+    auto server = pol.getSelectedBackend(servers, dq);
+    BOOST_REQUIRE(server != nullptr);
+    BOOST_CHECK(server == servers.at(1).second);
+    server->incQueriesCount();
+  }
+}
+
 BOOST_AUTO_TEST_CASE(test_roundRobin) {
   auto dq = getDQ();
 
index 77c0c231d7b10091f8b79c77fa535cd56d997de9..8567a3a8d879a4967b46ca9e2241058f0f08fc1f 100644 (file)
@@ -397,6 +397,122 @@ class TestRoutingOrder(DNSDistTest):
             self.assertEquals(self._responsesCounter['TCP Responder'], 0)
         self.assertEquals(self._responsesCounter['TCP Responder 2'], numberOfQueries)
 
+class TestFirstAvailableQPSPacketCacheHits(DNSDistTest):
+
+    _verboseMode = True
+    _testServer2Port = 5351
+    _config_params = ['_testServerPort', '_testServer2Port']
+    _config_template = """
+    setServerPolicy(firstAvailable)
+    s1 = newServer{address="127.0.0.1:%s", order=2}
+    s1:setUp()
+    s2 = newServer{address="127.0.0.1:%s", order=1, qps=10}
+    s2:setUp()
+    pc = newPacketCache(100, {maxTTL=86400, minTTL=1})
+    getPool(""):setCache(pc)
+    """
+
+    @classmethod
+    def startResponders(cls):
+        print("Launching responders..")
+        cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
+        cls._UDPResponder.setDaemon(True)
+        cls._UDPResponder.start()
+        cls._UDPResponder2 = threading.Thread(name='UDP Responder 2', target=cls.UDPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
+        cls._UDPResponder2.setDaemon(True)
+        cls._UDPResponder2.start()
+
+        cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
+        cls._TCPResponder.setDaemon(True)
+        cls._TCPResponder.start()
+
+        cls._TCPResponder2 = threading.Thread(name='TCP Responder 2', target=cls.TCPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
+        cls._TCPResponder2.setDaemon(True)
+        cls._TCPResponder2.start()
+
+    def testOrderQPSCacheHits(self):
+        """
+        Routing: firstAvailable policy with QPS limit and packet cache
+
+        Send 50 A queries for "order-qps-cache.routing.tests.powerdns.com.",
+        then 10 A queries for "order-qps-cache-2.routing.tests.powerdns.com." (uncached)
+        check that dnsdist routes all of the (uncached) queries to the second backend, because it has the lower order value,
+        and the QPS should only be counted for cache misses.
+        """
+        numberOfQueries = 50
+        name = 'order-qps-cache.routing.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        response = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    60,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '192.0.2.1')
+        response.answer.append(rrset)
+
+        # first queries to fill the cache
+        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = query.id
+        self.assertEquals(query, receivedQuery)
+        self.assertEquals(receivedResponse, response)
+        (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = query.id
+        self.assertEquals(query, receivedQuery)
+        self.assertEquals(receivedResponse, response)
+
+        for _ in range(numberOfQueries):
+            for method in ("sendUDPQuery", "sendTCPQuery"):
+                sender = getattr(self, method)
+                (_, receivedResponse) = sender(query, response=None, useQueue=False)
+                self.assertEquals(receivedResponse, response)
+
+        numberOfQueries = 10
+        name = 'order-qps-cache-2.routing.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        response = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    60,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '192.0.2.1')
+        response.answer.append(rrset)
+
+        # first queries to fill the cache
+        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = query.id
+        self.assertEquals(query, receivedQuery)
+        self.assertEquals(receivedResponse, response)
+        (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = query.id
+        self.assertEquals(query, receivedQuery)
+        self.assertEquals(receivedResponse, response)
+
+        for _ in range(numberOfQueries):
+            for method in ("sendUDPQuery", "sendTCPQuery"):
+                sender = getattr(self, method)
+                (_, receivedResponse) = sender(query, response=None, useQueue=False)
+                self.assertEquals(receivedResponse, response)
+
+        # 4 queries should made it through, 2 UDP and 2 TCP
+        for k,v in self._responsesCounter.items():
+            print(k)
+            print(v)
+
+        if 'UDP Responder' in self._responsesCounter:
+            self.assertEquals(self._responsesCounter['UDP Responder'], 0)
+        self.assertEquals(self._responsesCounter['UDP Responder 2'], 2)
+        if 'TCP Responder' in self._responsesCounter:
+            self.assertEquals(self._responsesCounter['TCP Responder'], 0)
+        self.assertEquals(self._responsesCounter['TCP Responder 2'], 2)
+
 class TestRoutingNoServer(DNSDistTest):
 
     _config_template = """