def tearDownClass(cls):
cls.tearDownRecursor()
- def checkBlocked(self, name, shouldBeBlocked=True, adQuery=False):
+ def checkBlocked(self, name, shouldBeBlocked=True, adQuery=False, singleCheck=False):
query = dns.message.make_query(name, 'A', want_dnssec=True)
query.flags |= dns.flags.CD
if adQuery:
expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42')
self.assertRRsetInAnswer(res, expected)
+ if singleCheck:
+ break
- def checkNotBlocked(self, name, adQuery=False):
- self.checkBlocked(name, False, adQuery)
+ def checkNotBlocked(self, name, adQuery=False, singleCheck=False):
+ self.checkBlocked(name, False, adQuery, singleCheck)
def checkCustom(self, qname, qtype, expected):
query = dns.message.make_query(qname, qtype, want_dnssec=True)
"""
_confdir = 'RPZFile'
- _wsPort = 8042
- _wsTimeout = 2
- _wsPassword = 'secretpassword'
- _apiKey = 'secretapikey'
_lua_config_file = """
rpzFile('configs/%s/zone.rpz', { policyName="zone.rpz." })
""" % (_confdir)
_config_template = """
auth-zones=example=configs/%s/example.zone
-webserver=yes
-webserver-port=%d
-webserver-address=127.0.0.1
-webserver-password=%s
-api-key=%s
-""" % (_confdir, _wsPort, _wsPassword, _apiKey)
+""" % (_confdir)
@classmethod
def generateRecursorConfig(cls, confdir):
"""
_confdir = 'RPZFileDefaultPolicy'
- _wsPort = 8042
- _wsTimeout = 2
- _wsPassword = 'secretpassword'
- _apiKey = 'secretapikey'
_lua_config_file = """
rpzFile('configs/%s/zone.rpz', { policyName="zone.rpz.", defpol=Policy.NoAction })
""" % (_confdir)
_config_template = """
auth-zones=example=configs/%s/example.zone
-webserver=yes
-webserver-port=%d
-webserver-address=127.0.0.1
-webserver-password=%s
-api-key=%s
-""" % (_confdir, _wsPort, _wsPassword, _apiKey)
+""" % (_confdir)
@classmethod
def generateRecursorConfig(cls, confdir):
"""
_confdir = 'RPZFileDefaultPolicyNotOverrideLocal'
- _wsPort = 8042
- _wsTimeout = 2
- _wsPassword = 'secretpassword'
- _apiKey = 'secretapikey'
_lua_config_file = """
rpzFile('configs/%s/zone.rpz', { policyName="zone.rpz.", defpol=Policy.NoAction, defpolOverrideLocalData=false })
""" % (_confdir)
_config_template = """
auth-zones=example=configs/%s/example.zone
-webserver=yes
-webserver-port=%d
-webserver-address=127.0.0.1
-webserver-password=%s
-api-key=%s
-""" % (_confdir, _wsPort, _wsPassword, _apiKey)
+""" % (_confdir)
@classmethod
def generateRecursorConfig(cls, confdir):
super(RPZFileDefaultPolNotOverrideLocalRecursorTest, cls).generateRecursorConfig(confdir)
def testRPZ(self):
- # local data entries will not be overridden by the default polic
+ # local data entries will not be overridden by the default policy
self.checkCustom('a.example.', 'A', dns.rrset.from_text('a.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.42', '192.0.2.43'))
self.checkCustom('a.example.', 'TXT', dns.rrset.from_text('a.example.', 0, dns.rdataclass.IN, 'TXT', '"some text"'))
# will be blocked because the default policy does not override local data entries
self.checkNXD('tc.example.', 'A')
self.checkNotBlocked('drop.example.')
-class RPZOrderingPrecedenceRecursorTesT(RPZRecursorTest):
+class RPZSimpleAuthServer(object):
+
+ def __init__(self, port):
+ self._serverPort = port
+ listener = threading.Thread(name='RPZ Simple Auth Listener', target=self._listener, args=[])
+ listener.setDaemon(True)
+ listener.start()
+
+ def _getAnswer(self, message):
+
+ response = dns.message.make_response(message)
+ response.flags |= dns.flags.AA
+ records = [
+ dns.rrset.from_text('nsip.delegated.example.', 60, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.42')
+ ]
+
+ response.answer = records
+ return response
+
+ def _listener(self):
+ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ try:
+ sock.bind(("127.0.0.1", self._serverPort))
+ except socket.error as e:
+ print("Error binding in the RPZ simple auth listener: %s" % str(e))
+ sys.exit(1)
+
+ while True:
+ try:
+ data, addr = sock.recvfrom(4096)
+ message = dns.message.from_wire(data)
+ if len(message.question) != 1:
+ print('Invalid query, qdcount is %d' % (len(message.question)))
+ break
+
+ answer = self._getAnswer(message)
+ if not answer:
+ print('Unable to get a response for %s %d' % (message.question[0].name, message.question[0].rdtype))
+ break
+
+ wire = answer.to_wire()
+ sock.sendto(wire, addr)
+
+ except socket.error as e:
+ print('Error in RPZ simple auth socket: %s' % str(e))
+
+rpzAuthServerPort = 4260
+rpzAuthServer = RPZSimpleAuthServer(rpzAuthServerPort)
+
+class RPZOrderingPrecedenceRecursorTest(RPZRecursorTest):
"""
This test makes sure that the recursor respects the RPZ ordering precedence rules
"""
_confdir = 'RPZOrderingPrecedence'
- _wsPort = 8042
- _wsTimeout = 2
- _wsPassword = 'secretpassword'
- _apiKey = 'secretapikey'
_lua_config_file = """
rpzFile('configs/%s/zone.rpz', { policyName="zone.rpz."})
rpzFile('configs/%s/zone2.rpz', { policyName="zone2.rpz."})
""" % (_confdir, _confdir)
_config_template = """
auth-zones=example=configs/%s/example.zone
-webserver=yes
-webserver-port=%d
-webserver-address=127.0.0.1
-webserver-password=%s
-api-key=%s
-""" % (_confdir, _wsPort, _wsPassword, _apiKey)
+forward-zones=delegated.example=127.0.0.1:%d
+""" % (_confdir, rpzAuthServerPort)
@classmethod
def generateRecursorConfig(cls, confdir):
authzone.write("""$ORIGIN example.
@ 3600 IN SOA {soa}
sub.test 3600 IN A 192.0.2.42
+passthru-then-blocked-by-higher 3600 IN A 192.0.2.66
+passthru-then-blocked-by-same 3600 IN A 192.0.2.66
+blocked-then-passhtru-by-higher 3600 IN A 192.0.2.100
""".format(soa=cls._SOA))
rpzFilePath = os.path.join(confdir, 'zone.rpz')
rpzZone.write("""$ORIGIN zone.rpz.
@ 3600 IN SOA {soa}
*.test.example.zone.rpz. 60 IN CNAME rpz-passthru.
+32.66.2.0.192.rpz-ip.zone.rpz. 60 IN A 192.0.2.1
+32.100.2.0.192.rpz-ip.zone.rpz. 60 IN CNAME rpz-passthru.
+passthru-then-blocked-by-same.example.zone.rpz. 60 IN CNAME rpz-passthru.
+32.1.0.0.127.rpz-nsip.zone.rpz. 60 IN CNAME rpz-passthru.
""".format(soa=cls._SOA))
rpzFilePath = os.path.join(confdir, 'zone2.rpz')
rpzZone.write("""$ORIGIN zone2.rpz.
@ 3600 IN SOA {soa}
sub.test.example.com.zone2.rpz. 60 IN CNAME .
+passthru-then-blocked-by-higher.example.zone2.rpz. 60 IN CNAME rpz-passthru.
+blocked-then-passhtru-by-higher.example.zone2.rpz. 60 IN A 192.0.2.1
32.42.2.0.192.rpz-ip 60 IN CNAME .
""".format(soa=cls._SOA))
- super(RPZOrderingPrecedenceRecursorTesT, cls).generateRecursorConfig(confdir)
+ super(RPZOrderingPrecedenceRecursorTest, cls).generateRecursorConfig(confdir)
- def testRPZ(self):
+ def testRPZOrderingForQNameAndWhitelisting(self):
# we should first match on the qname (the wildcard, not on the exact name since
# we respect the order of the RPZ zones), see the pass-thru rule
- # and stop RPZ processing. The subsequent rule on the content of the A
- # should therefore not trigger a NXDOMAIN.
+ # and only process RPZ rules of higher precedence.
+ # The subsequent rule on the content of the A should therefore not trigger a NXDOMAIN.
self.checkNotBlocked('sub.test.example.')
+
+ def testRPZOrderingWhitelistedThenBlockedByHigher(self):
+ # we should first match on the qname from the second RPZ zone,
+ # continue the resolution process, and get blocked by the content of the A record
+ # based on the first RPZ zone, whose priority is higher than the second one.
+ self.checkBlocked('passthru-then-blocked-by-higher.example.')
+
+ def testRPZOrderingWhitelistedThenBlockedBySame(self):
+ # we should first match on the qname from the first RPZ zone,
+ # continue the resolution process, and NOT get blocked by the content of the A record
+ # based on the same RPZ zone, since it's not higher.
+ self.checkCustom('passthru-then-blocked-by-same.example.', 'A', dns.rrset.from_text('passthru-then-blocked-by-same.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.66'))
+
+ def testRPZOrderBlockedThenWhitelisted(self):
+ # The qname is first blocked by the second RPZ zone
+ # Then, should the resolution process go on, the A record would be whitelisted
+ # by the first zone.
+ # This is what the RPZ specification requires, but we currently decided that we
+ # don't want to leak queries to malicious DNS servers and waste time if the qname is blacklisted.
+ # We might change our opinion at some point, though.
+ self.checkBlocked('blocked-then-passhtru-by-higher.example.')
+
+ def testRPZOrderDelegate(self):
+ # The IP of the NS we are going to contact is whitelisted (passthru) in zone 1,
+ # so even though the record (192.0.2.42) returned by the server is blacklisted
+ # by zone 2, it should not be blocked.
+ # We only test once because after that the answer is cached, so the NS is not contacted
+ # and the whitelist is not applied (yes, NSIP and NSDNAME are brittle).
+ self.checkNotBlocked('nsip.delegated.example.', singleCheck=True)
+
+class RPZNSIPCustomTest(RPZRecursorTest):
+ """
+ This test makes sure that the recursor handles custom RPZ rules in a NSIP
+ """
+
+ _confdir = 'RPZNSIPCustom'
+ _lua_config_file = """
+ rpzFile('configs/%s/zone.rpz', { policyName="zone.rpz."})
+ rpzFile('configs/%s/zone2.rpz', { policyName="zone2.rpz."})
+ """ % (_confdir, _confdir)
+ _config_template = """
+auth-zones=example=configs/%s/example.zone
+forward-zones=delegated.example=127.0.0.1:%d
+""" % (_confdir, rpzAuthServerPort)
+
+ @classmethod
+ def generateRecursorConfig(cls, confdir):
+ authzonepath = os.path.join(confdir, 'example.zone')
+ with open(authzonepath, 'w') as authzone:
+ authzone.write("""$ORIGIN example.
+@ 3600 IN SOA {soa}
+""".format(soa=cls._SOA))
+
+ rpzFilePath = os.path.join(confdir, 'zone.rpz')
+ with open(rpzFilePath, 'w') as rpzZone:
+ rpzZone.write("""$ORIGIN zone.rpz.
+@ 3600 IN SOA {soa}
+32.1.0.0.127.rpz-nsip.zone.rpz. 60 IN A 192.0.2.1
+""".format(soa=cls._SOA))
+
+ rpzFilePath = os.path.join(confdir, 'zone2.rpz')
+ with open(rpzFilePath, 'w') as rpzZone:
+ rpzZone.write("""$ORIGIN zone2.rpz.
+@ 3600 IN SOA {soa}
+32.1.2.0.192.rpz-ip 60 IN CNAME .
+""".format(soa=cls._SOA))
+
+ super(RPZNSIPCustomTest, cls).generateRecursorConfig(confdir)
+
+ def testRPZDelegate(self):
+ # The IP of the NS we are going to contact should result in a custom record (192.0.2.1) from zone 1,
+ # so even though the record (192.0.2.1) returned by the server is blacklisted
+ # by zone 2, it should not be blocked.
+ # We only test once because after that the answer is cached, so the NS is not contacted
+ # and the whitelist is not applied (yes, NSIP and NSDNAME are brittle).
+ self.checkCustom('nsip.delegated.example.', 'A', dns.rrset.from_text('nsip.delegated.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.1'))