}
bool check(unsigned int rate, unsigned int burst) const // this is not quite fair
+ {
+ if (checkOnly(rate, burst)) {
+ addHit();
+ return true;
+ }
+
+ return false;
+ }
+
+ bool checkOnly(unsigned int rate, unsigned int burst) const // this is not quite fair
{
auto delta = d_prev.udiffAndSet();
- if(delta > 0.0) // time, frequently, does go backwards..
+ if (delta > 0.0) { // time, frequently, does go backwards..
d_tokens += 1.0 * rate * (delta/1000000.0);
+ }
- if(d_tokens > burst) {
+ if (d_tokens > burst) {
d_tokens = burst;
}
- bool ret=false;
- if(d_tokens >= 1.0) { // we need this because burst=1 is weird otherwise
- ret=true;
- --d_tokens;
+ bool ret = false;
+ if (d_tokens >= 1.0) { // we need this because burst=1 is weird otherwise
+ ret = true;
}
return ret;
}
+ virtual void addHit() const
+ {
+ --d_tokens;
+ }
+
bool seenSince(const struct timespec& cutOff) const
{
return cutOff < d_prev.d_start;
return d_passthrough ? 0 : d_rate;
}
- int getPassed() const
+ bool check() const // this is not quite fair
{
- return d_passed;
- }
+ if (d_passthrough) {
+ return true;
+ }
- int getBlocked() const
- {
- return d_blocked;
+ return BasicQPSLimiter::check(d_rate, d_burst);
}
- bool check() const // this is not quite fair
+ bool checkOnly() const
{
if (d_passthrough) {
return true;
}
- bool ret = BasicQPSLimiter::check(d_rate, d_burst);
- if (ret) {
- d_passed++;
- }
- else {
- d_blocked++;
- }
+ return BasicQPSLimiter::checkOnly(d_rate, d_burst);
+ }
- return ret;
+ void addHit() const override
+ {
+ if (!d_passthrough) {
+ --d_tokens;
+ }
}
+
private:
- mutable unsigned int d_passed{0};
- mutable unsigned int d_blocked{0};
unsigned int d_rate;
unsigned int d_burst;
bool d_passthrough{true};
tcpAvgQueriesPerConnection = (99.0 * tcpAvgQueriesPerConnection / 100.0) + (nbQueries / 100.0);
tcpAvgConnectionDuration = (99.0 * tcpAvgConnectionDuration / 100.0) + (durationMs / 100.0);
}
+
+ void incQueriesCount()
+ {
+ ++queries;
+ qps.addHit();
+ }
+
private:
std::string name;
std::string nameWithAddr;
benchPolicy(pol);
}
+BOOST_AUTO_TEST_CASE(test_firstAvailableWithOrderAndQPS) {
+ auto dq = getDQ();
+ size_t qpsLimit = 10;
+
+ ServerPolicy pol{"firstAvailable", firstAvailable, false};
+ ServerPolicy::NumberedServerVector servers;
+ servers.push_back({ 1, std::make_shared<DownstreamState>(ComboAddress("192.0.2.1:53")) });
+ servers.push_back({ 2, std::make_shared<DownstreamState>(ComboAddress("192.0.2.2:53")) });
+ /* Second server has a higher order, so most queries should be routed to the first (remember that
+ we need to keep them ordered!).
+ However the first server has a QPS limit at 10 qps, so any query above that should be routed
+ to the second server. */
+ servers.at(0).second->order = 1;
+ servers.at(1).second->order = 2;
+ servers.at(0).second->qps = QPSLimiter(qpsLimit, qpsLimit);
+ /* mark the servers as 'up' */
+ servers.at(0).second->setUp();
+ servers.at(1).second->setUp();
+
+ /* the first queries under the QPS limit should be
+ sent to the first server */
+ for (size_t idx = 0; idx < qpsLimit; idx++) {
+ auto server = pol.getSelectedBackend(servers, dq);
+ BOOST_REQUIRE(server != nullptr);
+ BOOST_CHECK(server == servers.at(0).second);
+ server->incQueriesCount();
+ }
+
+ /* then to the second server */
+ for (size_t idx = 0; idx < 100; idx++) {
+ auto server = pol.getSelectedBackend(servers, dq);
+ BOOST_REQUIRE(server != nullptr);
+ BOOST_CHECK(server == servers.at(1).second);
+ server->incQueriesCount();
+ }
+}
+
BOOST_AUTO_TEST_CASE(test_roundRobin) {
auto dq = getDQ();
self.assertEquals(self._responsesCounter['TCP Responder'], 0)
self.assertEquals(self._responsesCounter['TCP Responder 2'], numberOfQueries)
+class TestFirstAvailableQPSPacketCacheHits(DNSDistTest):
+
+ _verboseMode = True
+ _testServer2Port = 5351
+ _config_params = ['_testServerPort', '_testServer2Port']
+ _config_template = """
+ setServerPolicy(firstAvailable)
+ s1 = newServer{address="127.0.0.1:%s", order=2}
+ s1:setUp()
+ s2 = newServer{address="127.0.0.1:%s", order=1, qps=10}
+ s2:setUp()
+ pc = newPacketCache(100, {maxTTL=86400, minTTL=1})
+ getPool(""):setCache(pc)
+ """
+
+ @classmethod
+ def startResponders(cls):
+ print("Launching responders..")
+ cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
+ cls._UDPResponder.setDaemon(True)
+ cls._UDPResponder.start()
+ cls._UDPResponder2 = threading.Thread(name='UDP Responder 2', target=cls.UDPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
+ cls._UDPResponder2.setDaemon(True)
+ cls._UDPResponder2.start()
+
+ cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
+ cls._TCPResponder.setDaemon(True)
+ cls._TCPResponder.start()
+
+ cls._TCPResponder2 = threading.Thread(name='TCP Responder 2', target=cls.TCPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
+ cls._TCPResponder2.setDaemon(True)
+ cls._TCPResponder2.start()
+
+ def testOrderQPSCacheHits(self):
+ """
+ Routing: firstAvailable policy with QPS limit and packet cache
+
+ Send 50 A queries for "order-qps-cache.routing.tests.powerdns.com.",
+ then 10 A queries for "order-qps-cache-2.routing.tests.powerdns.com." (uncached)
+ check that dnsdist routes all of the (uncached) queries to the second backend, because it has the lower order value,
+ and the QPS should only be counted for cache misses.
+ """
+ numberOfQueries = 50
+ name = 'order-qps-cache.routing.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 60,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '192.0.2.1')
+ response.answer.append(rrset)
+
+ # first queries to fill the cache
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(receivedResponse, response)
+ (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(receivedResponse, response)
+
+ for _ in range(numberOfQueries):
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, response)
+
+ numberOfQueries = 10
+ name = 'order-qps-cache-2.routing.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 60,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '192.0.2.1')
+ response.answer.append(rrset)
+
+ # first queries to fill the cache
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(receivedResponse, response)
+ (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(receivedResponse, response)
+
+ for _ in range(numberOfQueries):
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, response)
+
+ # 4 queries should made it through, 2 UDP and 2 TCP
+ for k,v in self._responsesCounter.items():
+ print(k)
+ print(v)
+
+ if 'UDP Responder' in self._responsesCounter:
+ self.assertEquals(self._responsesCounter['UDP Responder'], 0)
+ self.assertEquals(self._responsesCounter['UDP Responder 2'], 2)
+ if 'TCP Responder' in self._responsesCounter:
+ self.assertEquals(self._responsesCounter['TCP Responder'], 0)
+ self.assertEquals(self._responsesCounter['TCP Responder 2'], 2)
+
class TestRoutingNoServer(DNSDistTest):
_config_template = """