From 3f2b0bed494c5c428a9a004129607f6a69380a03 Mon Sep 17 00:00:00 2001 From: Remi Gacogne Date: Thu, 21 Jan 2021 17:25:51 +0100 Subject: [PATCH] dnsdist: Don't apply QPS to backend server on cache hits --- pdns/dnsdist.cc | 2 +- pdns/dnsdist.hh | 65 ++++++---- pdns/dnsdistdist/dnsdist-lbpolicies.cc | 3 +- pdns/dnsdistdist/test-dnsdistlbpolicies_cc.cc | 37 ++++++ regression-tests.dnsdist/test_Routing.py | 116 ++++++++++++++++++ 5 files changed, 198 insertions(+), 25 deletions(-) diff --git a/pdns/dnsdist.cc b/pdns/dnsdist.cc index 0731ee9a95..de81b75317 100644 --- a/pdns/dnsdist.cc +++ b/pdns/dnsdist.cc @@ -1267,7 +1267,7 @@ ProcessQueryResult processQuery(DNSQuestion& dq, ClientState& cs, LocalHolders& addXPF(dq, selectedBackend->xpfRRCode); } - selectedBackend->queries++; + selectedBackend->incQueriesCount(); return ProcessQueryResult::PassToBackend; } catch (const std::exception& e){ diff --git a/pdns/dnsdist.hh b/pdns/dnsdist.hh index 56cbcceefa..dfb1a61cdf 100644 --- a/pdns/dnsdist.hh +++ b/pdns/dnsdist.hh @@ -446,25 +446,40 @@ public: } bool check(unsigned int rate, unsigned int burst) const // this is not quite fair + { + if (checkOnly(rate, burst)) { + addHit(); + return true; + } + + return false; + } + + bool checkOnly(unsigned int rate, unsigned int burst) const // this is not quite fair { auto delta = d_prev.udiffAndSet(); - if(delta > 0.0) // time, frequently, does go backwards.. + if (delta > 0.0) { // time, frequently, does go backwards.. d_tokens += 1.0 * rate * (delta/1000000.0); + } - if(d_tokens > burst) { + if (d_tokens > burst) { d_tokens = burst; } - bool ret=false; - if(d_tokens >= 1.0) { // we need this because burst=1 is weird otherwise - ret=true; - --d_tokens; + bool ret = false; + if (d_tokens >= 1.0) { // we need this because burst=1 is weird otherwise + ret = true; } return ret; } + virtual void addHit() const + { + --d_tokens; + } + bool seenSince(const struct timespec& cutOff) const { return cutOff < d_prev.d_start; @@ -492,35 +507,32 @@ public: return d_passthrough ? 0 : d_rate; } - int getPassed() const + bool check() const // this is not quite fair { - return d_passed; - } + if (d_passthrough) { + return true; + } - int getBlocked() const - { - return d_blocked; + return BasicQPSLimiter::check(d_rate, d_burst); } - bool check() const // this is not quite fair + bool checkOnly() const { if (d_passthrough) { return true; } - bool ret = BasicQPSLimiter::check(d_rate, d_burst); - if (ret) { - d_passed++; - } - else { - d_blocked++; - } + return BasicQPSLimiter::checkOnly(d_rate, d_burst); + } - return ret; + void addHit() const override + { + if (!d_passthrough) { + --d_tokens; + } } + private: - mutable unsigned int d_passed{0}; - mutable unsigned int d_blocked{0}; unsigned int d_rate; unsigned int d_burst; bool d_passthrough{true}; @@ -994,6 +1006,13 @@ struct DownstreamState tcpAvgQueriesPerConnection = (99.0 * tcpAvgQueriesPerConnection / 100.0) + (nbQueries / 100.0); tcpAvgConnectionDuration = (99.0 * tcpAvgConnectionDuration / 100.0) + (durationMs / 100.0); } + + void incQueriesCount() + { + ++queries; + qps.addHit(); + } + private: std::string name; std::string nameWithAddr; diff --git a/pdns/dnsdistdist/dnsdist-lbpolicies.cc b/pdns/dnsdistdist/dnsdist-lbpolicies.cc index 1d75ca0600..882cbfb2a1 100644 --- a/pdns/dnsdistdist/dnsdist-lbpolicies.cc +++ b/pdns/dnsdistdist/dnsdist-lbpolicies.cc @@ -59,8 +59,9 @@ shared_ptr leastOutstanding(const ServerPolicy::NumberedServerV shared_ptr firstAvailable(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq) { for(auto& d : servers) { - if(d.second->isUp() && d.second->qps.check()) + if (d.second->isUp() && d.second->qps.checkOnly()) { return d.second; + } } return leastOutstanding(servers, dq); } diff --git a/pdns/dnsdistdist/test-dnsdistlbpolicies_cc.cc b/pdns/dnsdistdist/test-dnsdistlbpolicies_cc.cc index ce76c8bab1..ca17ca8e45 100644 --- a/pdns/dnsdistdist/test-dnsdistlbpolicies_cc.cc +++ b/pdns/dnsdistdist/test-dnsdistlbpolicies_cc.cc @@ -179,6 +179,43 @@ BOOST_AUTO_TEST_CASE(test_firstAvailable) { benchPolicy(pol); } +BOOST_AUTO_TEST_CASE(test_firstAvailableWithOrderAndQPS) { + auto dq = getDQ(); + size_t qpsLimit = 10; + + ServerPolicy pol{"firstAvailable", firstAvailable, false}; + ServerPolicy::NumberedServerVector servers; + servers.push_back({ 1, std::make_shared(ComboAddress("192.0.2.1:53")) }); + servers.push_back({ 2, std::make_shared(ComboAddress("192.0.2.2:53")) }); + /* Second server has a higher order, so most queries should be routed to the first (remember that + we need to keep them ordered!). + However the first server has a QPS limit at 10 qps, so any query above that should be routed + to the second server. */ + servers.at(0).second->order = 1; + servers.at(1).second->order = 2; + servers.at(0).second->qps = QPSLimiter(qpsLimit, qpsLimit); + /* mark the servers as 'up' */ + servers.at(0).second->setUp(); + servers.at(1).second->setUp(); + + /* the first queries under the QPS limit should be + sent to the first server */ + for (size_t idx = 0; idx < qpsLimit; idx++) { + auto server = pol.getSelectedBackend(servers, dq); + BOOST_REQUIRE(server != nullptr); + BOOST_CHECK(server == servers.at(0).second); + server->incQueriesCount(); + } + + /* then to the second server */ + for (size_t idx = 0; idx < 100; idx++) { + auto server = pol.getSelectedBackend(servers, dq); + BOOST_REQUIRE(server != nullptr); + BOOST_CHECK(server == servers.at(1).second); + server->incQueriesCount(); + } +} + BOOST_AUTO_TEST_CASE(test_roundRobin) { auto dq = getDQ(); diff --git a/regression-tests.dnsdist/test_Routing.py b/regression-tests.dnsdist/test_Routing.py index 77c0c231d7..8567a3a8d8 100644 --- a/regression-tests.dnsdist/test_Routing.py +++ b/regression-tests.dnsdist/test_Routing.py @@ -397,6 +397,122 @@ class TestRoutingOrder(DNSDistTest): self.assertEquals(self._responsesCounter['TCP Responder'], 0) self.assertEquals(self._responsesCounter['TCP Responder 2'], numberOfQueries) +class TestFirstAvailableQPSPacketCacheHits(DNSDistTest): + + _verboseMode = True + _testServer2Port = 5351 + _config_params = ['_testServerPort', '_testServer2Port'] + _config_template = """ + setServerPolicy(firstAvailable) + s1 = newServer{address="127.0.0.1:%s", order=2} + s1:setUp() + s2 = newServer{address="127.0.0.1:%s", order=1, qps=10} + s2:setUp() + pc = newPacketCache(100, {maxTTL=86400, minTTL=1}) + getPool(""):setCache(pc) + """ + + @classmethod + def startResponders(cls): + print("Launching responders..") + cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue]) + cls._UDPResponder.setDaemon(True) + cls._UDPResponder.start() + cls._UDPResponder2 = threading.Thread(name='UDP Responder 2', target=cls.UDPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue]) + cls._UDPResponder2.setDaemon(True) + cls._UDPResponder2.start() + + cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue]) + cls._TCPResponder.setDaemon(True) + cls._TCPResponder.start() + + cls._TCPResponder2 = threading.Thread(name='TCP Responder 2', target=cls.TCPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue]) + cls._TCPResponder2.setDaemon(True) + cls._TCPResponder2.start() + + def testOrderQPSCacheHits(self): + """ + Routing: firstAvailable policy with QPS limit and packet cache + + Send 50 A queries for "order-qps-cache.routing.tests.powerdns.com.", + then 10 A queries for "order-qps-cache-2.routing.tests.powerdns.com." (uncached) + check that dnsdist routes all of the (uncached) queries to the second backend, because it has the lower order value, + and the QPS should only be counted for cache misses. + """ + numberOfQueries = 50 + name = 'order-qps-cache.routing.tests.powerdns.com.' + query = dns.message.make_query(name, 'A', 'IN') + response = dns.message.make_response(query) + rrset = dns.rrset.from_text(name, + 60, + dns.rdataclass.IN, + dns.rdatatype.A, + '192.0.2.1') + response.answer.append(rrset) + + # first queries to fill the cache + (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response) + self.assertTrue(receivedQuery) + self.assertTrue(receivedResponse) + receivedQuery.id = query.id + self.assertEquals(query, receivedQuery) + self.assertEquals(receivedResponse, response) + (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response) + self.assertTrue(receivedQuery) + self.assertTrue(receivedResponse) + receivedQuery.id = query.id + self.assertEquals(query, receivedQuery) + self.assertEquals(receivedResponse, response) + + for _ in range(numberOfQueries): + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (_, receivedResponse) = sender(query, response=None, useQueue=False) + self.assertEquals(receivedResponse, response) + + numberOfQueries = 10 + name = 'order-qps-cache-2.routing.tests.powerdns.com.' + query = dns.message.make_query(name, 'A', 'IN') + response = dns.message.make_response(query) + rrset = dns.rrset.from_text(name, + 60, + dns.rdataclass.IN, + dns.rdatatype.A, + '192.0.2.1') + response.answer.append(rrset) + + # first queries to fill the cache + (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response) + self.assertTrue(receivedQuery) + self.assertTrue(receivedResponse) + receivedQuery.id = query.id + self.assertEquals(query, receivedQuery) + self.assertEquals(receivedResponse, response) + (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response) + self.assertTrue(receivedQuery) + self.assertTrue(receivedResponse) + receivedQuery.id = query.id + self.assertEquals(query, receivedQuery) + self.assertEquals(receivedResponse, response) + + for _ in range(numberOfQueries): + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (_, receivedResponse) = sender(query, response=None, useQueue=False) + self.assertEquals(receivedResponse, response) + + # 4 queries should made it through, 2 UDP and 2 TCP + for k,v in self._responsesCounter.items(): + print(k) + print(v) + + if 'UDP Responder' in self._responsesCounter: + self.assertEquals(self._responsesCounter['UDP Responder'], 0) + self.assertEquals(self._responsesCounter['UDP Responder 2'], 2) + if 'TCP Responder' in self._responsesCounter: + self.assertEquals(self._responsesCounter['TCP Responder'], 0) + self.assertEquals(self._responsesCounter['TCP Responder 2'], 2) + class TestRoutingNoServer(DNSDistTest): _config_template = """ -- 2.47.2