#!/usr/bin/env python
from datetime import datetime, timedelta
-import dns
import os
-import subprocess
import threading
import time
-import unittest
+import dns
from dnsdisttests import DNSDistTest
+class TestAdvancedAllow(DNSDistTest):
+
+ _config_template = """
+ addAction(makeRule("allowed.advanced.tests.powerdns.com."), AllowAction())
+ addAction(AllRule(), DropAction())
+ newServer{address="127.0.0.1:%s"}
+ """
+
+ def testAdvancedAllow(self):
+ """
+ Advanced: Allowed qname is not dropped
+
+ A query for allowed.advanced.tests.powerdns.com. should be allowed
+ while others should be dropped.
+ """
+ name = 'allowed.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 3600,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+ response.answer.append(rrset)
+
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
+
+ (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
+
+ def testAdvancedAllowDropped(self):
+ """
+ Advanced: Not allowed qname is dropped
+
+ A query for notallowed.advanced.tests.powerdns.com. should be dropped.
+ """
+ name = 'notallowed.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, None)
+
+ (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, None)
+
class TestAdvancedFixupCase(DNSDistTest):
_config_template = """
_config_template = """
addDisableValidationRule("setcd.advanced.tests.powerdns.com.")
+ addAction(makeRule("setcdviaaction.advanced.tests.powerdns.com."), DisableValidationAction())
newServer{address="127.0.0.1:%s"}
"""
"""
name = 'setcd.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
- expectedQuery = dns.message.make_query(name.lower(), 'A', 'IN')
+ expectedQuery = dns.message.make_query(name, 'A', 'IN')
+ expectedQuery.flags |= dns.flags.CD
+
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 3600,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+ response.answer.append(rrset)
+
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = expectedQuery.id
+ self.assertEquals(expectedQuery, receivedQuery)
+ self.assertEquals(response, receivedResponse)
+
+ (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = expectedQuery.id
+ self.assertEquals(expectedQuery, receivedQuery)
+ self.assertEquals(response, receivedResponse)
+
+ def testAdvancedSetCDViaAction(self):
+ """
+ Advanced: Set CD via Action
+
+ Send a query with CD cleared,
+ check that dnsdist set the CD flag.
+ """
+ name = 'setcdviaaction.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ expectedQuery = dns.message.make_query(name, 'A', 'IN')
expectedQuery.flags |= dns.flags.CD
response = dns.message.make_response(query)
self.assertEquals(query, receivedQuery)
self.assertEquals(response, receivedResponse)
+class TestAdvancedClearRD(DNSDistTest):
+
+ _config_template = """
+ addNoRecurseRule("clearrd.advanced.tests.powerdns.com.")
+ addAction(makeRule("clearrdviaaction.advanced.tests.powerdns.com."), NoRecurseAction())
+ newServer{address="127.0.0.1:%s"}
+ """
+
+ def testAdvancedClearRD(self):
+ """
+ Advanced: Clear RD
+
+ Send a query with RD set,
+ check that dnsdist clears the RD flag.
+ """
+ name = 'clearrd.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ expectedQuery = dns.message.make_query(name, 'A', 'IN')
+ expectedQuery.flags &= ~dns.flags.RD
+
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 3600,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+ response.answer.append(rrset)
+
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = expectedQuery.id
+ self.assertEquals(expectedQuery, receivedQuery)
+ self.assertEquals(response, receivedResponse)
+
+ (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = expectedQuery.id
+ self.assertEquals(expectedQuery, receivedQuery)
+ self.assertEquals(response, receivedResponse)
+
+ def testAdvancedClearRDViaAction(self):
+ """
+ Advanced: Clear RD via Action
+
+ Send a query with RD set,
+ check that dnsdist clears the RD flag.
+ """
+ name = 'clearrdviaaction.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ expectedQuery = dns.message.make_query(name, 'A', 'IN')
+ expectedQuery.flags &= ~dns.flags.RD
+
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 3600,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+ response.answer.append(rrset)
+
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = expectedQuery.id
+ self.assertEquals(expectedQuery, receivedQuery)
+ self.assertEquals(response, receivedResponse)
+
+ (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = expectedQuery.id
+ self.assertEquals(expectedQuery, receivedQuery)
+ self.assertEquals(response, receivedResponse)
+
+ def testAdvancedKeepRD(self):
+ """
+ Advanced: Preserve RD canary
+
+ Send a query with RD for a canary domain,
+ check that dnsdist does not clear the RD flag.
+ """
+ name = 'keeprd.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 3600,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+ response.answer.append(rrset)
+
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
+
+ (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
+
class TestAdvancedSpoof(DNSDistTest):
_config_template = """
- addDomainSpoof("spoof.tests.powerdns.com.", "192.0.2.1", "2001:DB8::1")
- addDomainCNAMESpoof("cnamespoof.tests.powerdns.com.", "cname.tests.powerdns.com.")
+ addDomainSpoof("spoof.advanced.tests.powerdns.com.", "192.0.2.1", "2001:DB8::1")
+ addDomainCNAMESpoof("cnamespoof.advanced.tests.powerdns.com.", "cname.advanced.tests.powerdns.com.")
+ addAction(makeRule("spoofaction.advanced.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1"))
+ addAction(makeRule("cnamespoofaction.advanced.tests.powerdns.com."), SpoofCNAMEAction("cnameaction.advanced.tests.powerdns.com."))
newServer{address="127.0.0.1:%s"}
"""
"""
Advanced: Spoof A
- Send an A query to "spoof.tests.powerdns.com.",
+ Send an A query to "spoof.advanced.tests.powerdns.com.",
check that dnsdist sends a spoofed result.
"""
- name = 'spoof.tests.powerdns.com.'
+ name = 'spoof.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
# dnsdist set RA = RD for spoofed responses
query.flags &= ~dns.flags.RD
"""
Advanced: Spoof AAAA
- Send an AAAA query to "spoof.tests.powerdns.com.",
+ Send an AAAA query to "spoof.advanced.tests.powerdns.com.",
check that dnsdist sends a spoofed result.
"""
- name = 'spoof.tests.powerdns.com.'
+ name = 'spoof.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'AAAA', 'IN')
# dnsdist set RA = RD for spoofed responses
query.flags &= ~dns.flags.RD
"""
Advanced: Spoof CNAME
- Send an A query for "cnamespoof.tests.powerdns.com.",
+ Send an A query for "cnamespoof.advanced.tests.powerdns.com.",
check that dnsdist sends a spoofed result.
"""
- name = 'cnamespoof.tests.powerdns.com.'
+ name = 'cnamespoof.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
# dnsdist set RA = RD for spoofed responses
query.flags &= ~dns.flags.RD
60,
dns.rdataclass.IN,
dns.rdatatype.CNAME,
- 'cname.tests.powerdns.com.')
+ 'cname.advanced.tests.powerdns.com.')
+ expectedResponse.answer.append(rrset)
+
+ (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
+
+ (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
+
+ def testSpoofActionA(self):
+ """
+ Advanced: Spoof A via Action
+
+ Send an A query to "spoofaction.advanced.tests.powerdns.com.",
+ check that dnsdist sends a spoofed result.
+ """
+ name = 'spoofaction.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ # dnsdist set RA = RD for spoofed responses
+ query.flags &= ~dns.flags.RD
+ expectedResponse = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 60,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '192.0.2.1')
+ expectedResponse.answer.append(rrset)
+
+ (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
+
+ (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
+
+ def testSpoofActionAAAA(self):
+ """
+ Advanced: Spoof AAAA via Action
+
+ Send an AAAA query to "spoofaction.advanced.tests.powerdns.com.",
+ check that dnsdist sends a spoofed result.
+ """
+ name = 'spoofaction.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'AAAA', 'IN')
+ # dnsdist set RA = RD for spoofed responses
+ query.flags &= ~dns.flags.RD
+ expectedResponse = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 60,
+ dns.rdataclass.IN,
+ dns.rdatatype.AAAA,
+ '2001:DB8::1')
+ expectedResponse.answer.append(rrset)
+
+ (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
+
+ (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
+
+ def testSpoofActionCNAME(self):
+ """
+ Advanced: Spoof CNAME via Action
+
+ Send an A query for "cnamespoofaction.advanced.tests.powerdns.com.",
+ check that dnsdist sends a spoofed result.
+ """
+ name = 'cnamespoofaction.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ # dnsdist set RA = RD for spoofed responses
+ query.flags &= ~dns.flags.RD
+ expectedResponse = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 60,
+ dns.rdataclass.IN,
+ dns.rdatatype.CNAME,
+ 'cnameaction.advanced.tests.powerdns.com.')
expectedResponse.answer.append(rrset)
(_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
_config_template = """
newServer{address="127.0.0.1:%s", pool="real"}
- addPoolRule("pool.tests.powerdns.com", "real")
+ addPoolRule("pool.advanced.tests.powerdns.com", "real")
+ addAction(makeRule("poolaction.advanced.tests.powerdns.com"), PoolAction("real"))
+ addQPSPoolRule("qpspool.advanced.tests.powerdns.com", 10, "abuse")
+ addAction(makeRule("qpspoolaction.advanced.tests.powerdns.com"), QPSPoolAction(10, "abuse"))
"""
def testPolicyPool(self):
"""
Advanced: Set pool by qname
- Send an A query to "pool.tests.powerdns.com.",
+ Send an A query to "pool.advanced.tests.powerdns.com.",
check that dnsdist routes the query to the "real" pool.
"""
- name = 'pool.tests.powerdns.com.'
+ name = 'pool.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 60,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '192.0.2.1')
+ response.answer.append(rrset)
+
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
+
+ (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
+
+ def testPolicyPoolAction(self):
+ """
+ Advanced: Set pool by qname via PoolAction
+
+ Send an A query to "poolaction.advanced.tests.powerdns.com.",
+ check that dnsdist routes the query to the "real" pool.
+ """
+ name = 'pool.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
response = dns.message.make_response(query)
rrset = dns.rrset.from_text(name,
"""
Advanced: Set pool by qname canary
- Send an A query to "notpool.tests.powerdns.com.",
+ Send an A query to "notpool.advanced.tests.powerdns.com.",
check that dnsdist sends no response (no servers
in the default pool).
"""
- name = 'notpool.tests.powerdns.com.'
+ name = 'notpool.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
(_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
(_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
self.assertEquals(receivedResponse, None)
+class TestAdvancedQPSPoolRouting(DNSDistTest):
+ _config_template = """
+ newServer{address="127.0.0.1:%s", pool="regular"}
+ addQPSPoolRule("qpspool.advanced.tests.powerdns.com", 10, "regular")
+ addAction(makeRule("qpspoolaction.advanced.tests.powerdns.com"), QPSPoolAction(10, "regular"))
+ """
+
+ def testQPSPool(self):
+ """
+ Advanced: Set pool by QPS
+
+ Send queries to "qpspool.advanced.tests.powerdns.com."
+ check that dnsdist does not route the query to the "regular" pool
+ when the max QPS has been reached.
+ """
+ maxQPS = 10
+ name = 'qpspool.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 60,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '192.0.2.1')
+ response.answer.append(rrset)
+
+ for _ in range(maxQPS):
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
+
+ # we should now be sent to the "abuse" pool which is empty,
+ # so the queries should be dropped
+ (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, None)
+
+ time.sleep(1)
+
+ # again, over TCP this time
+ for _ in range(maxQPS):
+ (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
+
+
+ (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, None)
+
+ def testQPSPoolAction(self):
+ """
+ Advanced: Set pool by QPS via action
+
+ Send queries to "qpspoolaction.advanced.tests.powerdns.com."
+ check that dnsdist does not route the query to the "regular" pool
+ when the max QPS has been reached.
+ """
+ maxQPS = 10
+ name = 'qpspoolaction.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 60,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '192.0.2.1')
+ response.answer.append(rrset)
+
+ for _ in range(maxQPS):
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
+
+ # we should now be sent to the "abuse" pool which is empty,
+ # so the queries should be dropped
+ (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, None)
+
+ time.sleep(1)
+
+ # again, over TCP this time
+ for _ in range(maxQPS):
+ (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
+
+
+ (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, None)
+
+
class TestAdvancedRoundRobinLB(DNSDistTest):
_testServer2Port = 5351
"""
Advanced: Round Robin
- Send 100 A queries to "rr.tests.powerdns.com.",
+ Send 100 A queries to "rr.advanced.tests.powerdns.com.",
check that dnsdist routes half of it to each backend.
"""
numberOfQueries = 10
- name = 'rr.tests.powerdns.com.'
+ name = 'rr.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
response = dns.message.make_response(query)
rrset = dns.rrset.from_text(name,
# the round robin counter is shared for UDP and TCP,
# so we need to do UDP then TCP to have a clean count
- for idx in range(numberOfQueries):
+ for _ in range(numberOfQueries):
(receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
receivedQuery.id = query.id
self.assertEquals(query, receivedQuery)
self.assertEquals(response, receivedResponse)
- for idx in range(numberOfQueries):
+ for _ in range(numberOfQueries):
(receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
receivedQuery.id = query.id
self.assertEquals(query, receivedQuery)
"""
Advanced: Round Robin with one server down
- Send 100 A queries to "rr.tests.powerdns.com.",
+ Send 100 A queries to "rr.advanced.tests.powerdns.com.",
check that dnsdist routes all of it to the only backend up.
"""
numberOfQueries = 10
- name = 'rr.tests.powerdns.com.'
+ name = 'rr.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
response = dns.message.make_response(query)
rrset = dns.rrset.from_text(name,
# the round robin counter is shared for UDP and TCP,
# so we need to do UDP then TCP to have a clean count
- for idx in range(numberOfQueries):
+ for _ in range(numberOfQueries):
(receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
receivedQuery.id = query.id
self.assertEquals(query, receivedQuery)
self.assertEquals(response, receivedResponse)
- for idx in range(numberOfQueries):
+ for _ in range(numberOfQueries):
(receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
receivedQuery.id = query.id
self.assertEquals(query, receivedQuery)
self.assertEquals(response, receivedResponse)
total = 0
- for key in TestAdvancedRoundRobinLB._responsesCounter:
- value = TestAdvancedRoundRobinLB._responsesCounter[key]
+ for key in TestAdvancedRoundRobinLBOneDown._responsesCounter:
+ value = TestAdvancedRoundRobinLBOneDown._responsesCounter[key]
self.assertTrue(value == numberOfQueries or value == 0)
total += value
receivedQuery.id = query.id
self.assertEquals(query, receivedQuery)
self.assertEquals(response, receivedResponse)
- self.assertTrue((end - begin) > timedelta(0, 1));
+ self.assertTrue((end - begin) > timedelta(0, 1))
begin = datetime.now()
(receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
receivedQuery.id = query.id
self.assertEquals(query, receivedQuery)
self.assertEquals(response, receivedResponse)
- self.assertTrue((end - begin) < timedelta(0, 1));
+ self.assertTrue((end - begin) < timedelta(0, 1))
class TestAdvancedLuaSpoof(DNSDistTest):
end
end
function spoof2rule(dq)
- return DNSAction.Spoof, "spoofedcname.tests.powerdns.com."
+ return DNSAction.Spoof, "spoofedcname.advanced.tests.powerdns.com."
end
- addLuaAction("luaspoof1.tests.powerdns.com.", spoof1rule)
- addLuaAction("luaspoof2.tests.powerdns.com.", spoof2rule)
+ addLuaAction("luaspoof1.advanced.tests.powerdns.com.", spoof1rule)
+ addLuaAction("luaspoof2.advanced.tests.powerdns.com.", spoof2rule)
newServer{address="127.0.0.1:%s"}
"""
"""
Advanced: Spoofing an A via Lua
- Send an A query to "luaspoof1.tests.powerdns.com.",
+ Send an A query to "luaspoof1.advanced.tests.powerdns.com.",
check that dnsdist sends a spoofed result.
"""
- name = 'luaspoof1.tests.powerdns.com.'
+ name = 'luaspoof1.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
# dnsdist set RA = RD for spoofed responses
query.flags &= ~dns.flags.RD
"""
Advanced: Spoofing an AAAA via Lua
- Send an AAAA query to "luaspoof1.tests.powerdns.com.",
+ Send an AAAA query to "luaspoof1.advanced.tests.powerdns.com.",
check that dnsdist sends a spoofed result.
"""
- name = 'luaspoof1.tests.powerdns.com.'
+ name = 'luaspoof1.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'AAAA', 'IN')
# dnsdist set RA = RD for spoofed responses
query.flags &= ~dns.flags.RD
"""
Advanced: Spoofing an A with a CNAME via Lua
- Send an A query to "luaspoof2.tests.powerdns.com.",
+ Send an A query to "luaspoof2.advanced.tests.powerdns.com.",
check that dnsdist sends a spoofed result.
"""
- name = 'luaspoof2.tests.powerdns.com.'
+ name = 'luaspoof2.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
# dnsdist set RA = RD for spoofed responses
query.flags &= ~dns.flags.RD
60,
dns.rdataclass.IN,
dns.rdatatype.CNAME,
- 'spoofedcname.tests.powerdns.com.')
+ 'spoofedcname.advanced.tests.powerdns.com.')
expectedResponse.answer.append(rrset)
(_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
"""
Advanced: Spoofing an AAAA with a CNAME via Lua
- Send an AAAA query to "luaspoof2.tests.powerdns.com.",
+ Send an AAAA query to "luaspoof2.advanced.tests.powerdns.com.",
check that dnsdist sends a spoofed result.
"""
- name = 'luaspoof2.tests.powerdns.com.'
+ name = 'luaspoof2.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'AAAA', 'IN')
# dnsdist set RA = RD for spoofed responses
query.flags &= ~dns.flags.RD
60,
dns.rdataclass.IN,
dns.rdatatype.CNAME,
- 'spoofedcname.tests.powerdns.com.')
+ 'spoofedcname.advanced.tests.powerdns.com.')
expectedResponse.answer.append(rrset)
(_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
"""
Advanced: Truncate ANY over TCP
- Send an ANY query to "anytruncatetcp.tests.powerdns.com.",
+ Send an ANY query to "anytruncatetcp.advanced.tests.powerdns.com.",
should be truncated over TCP, not over UDP (yes, it makes no sense,
deal with it).
"""
- name = 'anytruncatetcp.tests.powerdns.com.'
+ name = 'anytruncatetcp.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'ANY', 'IN')
response = dns.message.make_response(query)
We send an A query over UDP and TCP, and check that the
response is OK.
"""
- name = 'andnot.tests.powerdns.com.'
+ name = 'andnot.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
response = dns.message.make_response(query)
rrset = dns.rrset.from_text(name,
We send a TXT query over UDP and TCP, and check that the
response is OK for TCP and 'not implemented' for UDP.
"""
- name = 'andnot.tests.powerdns.com.'
+ name = 'andnot.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'TXT', 'IN')
expectedResponse = dns.message.make_response(query)
We send an AAAA query over UDP and TCP, and check that the
response is 'not implemented' for UDP and OK for TCP.
"""
- name = 'aorudp.tests.powerdns.com.'
+ name = 'aorudp.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'AAAA', 'IN')
response = dns.message.make_response(query)
rrset = dns.rrset.from_text(name,
We send an A query over UDP and TCP, and check that the
response is 'not implemented' for both.
"""
- name = 'aorudp.tests.powerdns.com.'
+ name = 'aorudp.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
expectedResponse = dns.message.make_response(query)
_config_template = """
pc = newPacketCache(5, 86400, 1)
getPool(""):setCache(pc)
- addAction(makeRule("nocache.tests.powerdns.com."), SkipCacheAction())
+ addAction(makeRule("nocache.advanced.tests.powerdns.com."), SkipCacheAction())
newServer{address="127.0.0.1:%s"}
"""
def testCached(self):
the first one.
"""
numberOfQueries = 10
- name = 'cached.tests.powerdns.com.'
+ name = 'cached.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'AAAA', 'IN')
response = dns.message.make_response(query)
rrset = dns.rrset.from_text(name,
self.assertEquals(query, receivedQuery)
self.assertEquals(receivedResponse, response)
- for idx in range(numberOfQueries):
+ for _ in range(numberOfQueries):
(_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
receivedResponse.id = response.id
self.assertEquals(receivedResponse, response)
"""
Advanced: SkipCacheAction
- dnsdist is configured to not cache entries for nocache.tests.powerdns.com.
+ dnsdist is configured to not cache entries for nocache.advanced.tests.powerdns.com.
we are sending several requests and checking that the backend get them all.
"""
- name = 'nocache.tests.powerdns.com.'
+ name = 'nocache.advanced.tests.powerdns.com.'
numberOfQueries = 10
query = dns.message.make_query(name, 'AAAA', 'IN')
response = dns.message.make_response(query)
'::1')
response.answer.append(rrset)
- for idx in range(numberOfQueries):
+ for _ in range(numberOfQueries):
(receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
self.assertTrue(receivedQuery)
self.assertTrue(receivedResponse)
"""
ttl = 2
misses = 0
- name = 'cacheexpiration.tests.powerdns.com.'
+ name = 'cacheexpiration.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'AAAA', 'IN')
response = dns.message.make_response(query)
rrset = dns.rrset.from_text(name,
self.assertEquals(total, misses)
+ def testCacheExpirationDifferentSets(self):
+ """
+ Advanced: Cache expiration with different sets
+
+ dnsdist is configured to cache entries, we are sending one request
+ (cache miss) whose response has a long and a very short TTL,
+ checking that the next requests are cached. Then we wait for the
+ short TTL to expire, check that the
+ next request is a miss but the following one a hit.
+ """
+ ttl = 2
+ misses = 0
+ name = 'cacheexpirationdifferentsets.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'AAAA', 'IN')
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ ttl,
+ dns.rdataclass.IN,
+ dns.rdatatype.CNAME,
+ 'cname.cacheexpirationdifferentsets.advanced.tests.powerdns.com.')
+ response.answer.append(rrset)
+ rrset = dns.rrset.from_text('cname.cacheexpirationdifferentsets.advanced.tests.powerdns.com.',
+ ttl + 3600,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '192.2.0.1')
+ response.additional.append(rrset)
+
+ # first query to fill the cache
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ receivedResponse.id = response.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(receivedResponse, response)
+ misses += 1
+
+ # next queries should hit the cache
+ (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+ receivedResponse.id = response.id
+ self.assertEquals(receivedResponse, response)
+
+ (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
+ receivedResponse.id = response.id
+ self.assertEquals(receivedResponse, response)
+
+ # now we wait a bit for the cache entry to expire
+ time.sleep(ttl + 1)
+
+ # next query should be a miss, fill the cache again
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ receivedResponse.id = response.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(receivedResponse, response)
+ misses += 1
+
+ # following queries should hit the cache again
+ (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+ receivedResponse.id = response.id
+ self.assertEquals(receivedResponse, response)
+
+ (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
+ receivedResponse.id = response.id
+ self.assertEquals(receivedResponse, response)
+
+ total = 0
+ for key in TestAdvancedCaching._responsesCounter:
+ total += TestAdvancedCaching._responsesCounter[key]
+
+ self.assertEquals(total, misses)
+
def testCacheDecreaseTTL(self):
"""
Advanced: Cache decreases TTL
"""
ttl = 600
misses = 0
- name = 'cachedecreasettl.tests.powerdns.com.'
+ name = 'cachedecreasettl.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'AAAA', 'IN')
response = dns.message.make_response(query)
rrset = dns.rrset.from_text(name,
matches.
"""
ttl = 600
- name = 'cachedifferentcase.tests.powerdns.com.'
- differentCaseName = 'CacheDifferentCASE.tests.powerdns.com.'
+ name = 'cachedifferentcase.advanced.tests.powerdns.com.'
+ differentCaseName = 'CacheDifferentCASE.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'AAAA', 'IN')
differentCaseQuery = dns.message.make_query(differentCaseName, 'AAAA', 'IN')
response = dns.message.make_response(query)
Payload size is not served from the cache.
"""
misses = 0
- name = 'cachedifferentedns.tests.powerdns.com.'
+ name = 'cachedifferentedns.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512)
response = dns.message.make_response(query)
rrset = dns.rrset.from_text(name,
misses += 1
total = 0
- for key in TestAdvancedCaching._responsesCounter:
- total += TestAdvancedCaching._responsesCounter[key]
+ for key in TestAdvancedCachingWithExistingEDNS._responsesCounter:
+ total += TestAdvancedCachingWithExistingEDNS._responsesCounter[key]
self.assertEquals(total, misses)
+
+class TestAdvancedLogAction(DNSDistTest):
+
+ _config_template = """
+ newServer{address="127.0.0.1:%s"}
+ addAction(AllRule(), LogAction("dnsdist.log", false))
+ """
+ def testAdvancedLogAction(self):
+ """
+ Advanced: Log all queries
+
+ """
+ count = 50
+ name = 'logaction.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 3600,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+ response.answer.append(rrset)
+
+ for _ in range(count):
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ receivedResponse.id = response.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
+
+ self.assertTrue(os.path.isfile('dnsdist.log'))
+ self.assertTrue(os.stat('dnsdist.log').st_size > 0)
+
+class TestAdvancedDNSSEC(DNSDistTest):
+
+ _config_template = """
+ newServer{address="127.0.0.1:%s"}
+ addAction(DNSSECRule(), DropAction())
+ """
+ def testAdvancedDNSSECDrop(self):
+ """
+ Advanced: DNSSEC Rule
+
+ """
+ name = 'dnssec.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ doquery = dns.message.make_query(name, 'A', 'IN', want_dnssec=True)
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 3600,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+ response.answer.append(rrset)
+
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ receivedResponse.id = response.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
+
+ (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ receivedResponse.id = response.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
+
+ (_, receivedResponse) = self.sendUDPQuery(doquery, response)
+ self.assertEquals(receivedResponse, None)
+ (_, receivedResponse) = self.sendTCPQuery(doquery, response)
+ self.assertEquals(receivedResponse, None)
+
+class TestAdvancedQClass(DNSDistTest):
+
+ _config_template = """
+ newServer{address="127.0.0.1:%s"}
+ addAction(QClassRule(3), DropAction())
+ """
+ def testAdvancedQClassChaosDrop(self):
+ """
+ Advanced: Drop QClass CHAOS
+
+ """
+ name = 'qclasschaos.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'TXT', 'CHAOS')
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 3600,
+ dns.rdataclass.CH,
+ dns.rdatatype.TXT,
+ 'hop')
+ response.answer.append(rrset)
+
+ (_, receivedResponse) = self.sendUDPQuery(query, response)
+ self.assertEquals(receivedResponse, None)
+ (_, receivedResponse) = self.sendTCPQuery(query, response)
+ self.assertEquals(receivedResponse, None)
+
+ def testAdvancedQClassINAllow(self):
+ """
+ Advanced: Allow QClass IN
+
+ """
+ name = 'qclassin.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 3600,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+ response.answer.append(rrset)
+
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ receivedResponse.id = response.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
+
+ (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ receivedResponse.id = response.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)