From 98e0c0a0635e1b6cb314e64d99971789bbbcf0ac Mon Sep 17 00:00:00 2001 From: Remi Gacogne Date: Mon, 9 Mar 2020 18:43:37 +0100 Subject: [PATCH] rec: Add tests for EDNS Padding --- .../paddingoption.py | 50 +++++ .../test_EDNSPadding.py | 181 ++++++++++++++++++ 2 files changed, 231 insertions(+) create mode 100644 regression-tests.recursor-dnssec/paddingoption.py create mode 100644 regression-tests.recursor-dnssec/test_EDNSPadding.py diff --git a/regression-tests.recursor-dnssec/paddingoption.py b/regression-tests.recursor-dnssec/paddingoption.py new file mode 100644 index 0000000000..42c2f3f663 --- /dev/null +++ b/regression-tests.recursor-dnssec/paddingoption.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python + +import dns +import dns.edns +import dns.flags +import dns.message +import dns.query + +class PaddingOption(dns.edns.Option): + """Implementation of rfc7830. + """ + + def __init__(self, numberOfBytes): + super(PaddingOption, self).__init__(12) + self.numberOfBytes = numberOfBytes + + def to_wire(self, file): + """Create EDNS packet as defined in rfc7830.""" + + file.write(bytes(self.numberOfBytes)) + + def from_wire(cls, otype, wire, current, olen): + """Read EDNS packet as defined in rfc7830. + + Returns: + An instance of PaddingOption based on the EDNS packet + """ + + numberOfBytes = olen + + return cls(numberOfBytes) + + from_wire = classmethod(from_wire) + + def __repr__(self): + return '%s(%d)' % ( + self.__class__.__name__, + self.numberOfBytes + ) + + def __eq__(self, other): + if not isinstance(other, PaddingOption): + return False + return self.numberOfBytes == numberOfBytes + + def __ne__(self, other): + return not self.__eq__(other) + + +dns.edns._type_to_class[0x000C] = PaddingOption diff --git a/regression-tests.recursor-dnssec/test_EDNSPadding.py b/regression-tests.recursor-dnssec/test_EDNSPadding.py new file mode 100644 index 0000000000..f2a40c4de7 --- /dev/null +++ b/regression-tests.recursor-dnssec/test_EDNSPadding.py @@ -0,0 +1,181 @@ +import dns +import os +import socket + +import paddingoption + +from recursortests import RecursorTest + +class TestRecursorEDNSPadding(RecursorTest): + + @classmethod + def setUpClass(cls): + cls.setUpSockets() + + cls.startResponders() + + confdir = os.path.join('configs', cls._confdir) + cls.createConfigDir(confdir) + cls.generateAllAuthConfig(confdir) + + # we only need these auths and this cuts the needed time in half + if cls._auth_zones: + for auth_suffix in ['8', '9', '10']: + authconfdir = os.path.join(confdir, 'auth-%s' % auth_suffix) + ipaddress = cls._PREFIX + '.' + auth_suffix + cls.startAuth(authconfdir, ipaddress) + + cls.generateRecursorConfig(confdir) + cls.startRecursor(confdir, cls._recursorPort) + + print("Launching tests..") + + def checkPadding(self, message, numberOfBytes=None): + self.assertEqual(message.edns, 0) + self.assertEquals(len(message.options), 1) + for option in message.options: + self.assertEquals(option.otype, 12) + if numberOfBytes: + self.assertEquals(option.olen, numberOfBytes) + + def checkNoPadding(self, message): + self.assertEqual(message.edns, 0) + self.assertEquals(len(message.options), 0) + + def sendUDPQueryOverIPv6(self, query, timeout=2.0): + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.settimeout(2.0) + sock.connect(("127.0.0.1", self._recursorPort)) + + if timeout: + sock.settimeout(timeout) + + try: + sock.send(query.to_wire()) + data = sock.recv(4096) + except socket.timeout: + data = None + finally: + if timeout: + sock.settimeout(None) + + message = None + if data: + message = dns.message.from_wire(data) + return message + +class PaddingDefaultTest(TestRecursorEDNSPadding): + + _confdir = 'PaddingDefault' + + def testQueryWithPadding(self): + name = 'secure.example.' + expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.17') + po = paddingoption.PaddingOption(64) + query = dns.message.make_query(name, 'A', want_dnssec=True, options=[po]) + query.flags |= dns.flags.CD + res = self.sendUDPQuery(query) + self.checkNoPadding(res) + self.assertRRsetInAnswer(res, expected) + + def testqueryWithoutPadding(self): + name = 'secure.example.' + expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.17') + query = dns.message.make_query(name, 'A', want_dnssec=True) + query.flags |= dns.flags.CD + res = self.sendUDPQuery(query) + self.checkNoPadding(res) + self.assertRRsetInAnswer(res, expected) + +class PaddingAllowedAlwaysTest(TestRecursorEDNSPadding): + + _confdir = 'PaddingAlways' + _config_template = """edns-padding-from=127.0.0.1 +edns-padding-mode=always +edns-padding-tag=7830 + """ + + def testQueryWithPadding(self): + name = 'secure.example.' + expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.17') + po = paddingoption.PaddingOption(64) + query = dns.message.make_query(name, 'A', want_dnssec=True, options=[po]) + query.flags |= dns.flags.CD + res = self.sendUDPQuery(query) + self.checkPadding(res) + self.assertRRsetInAnswer(res, expected) + + def testqueryWithoutPadding(self): + name = 'secure.example.' + expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.17') + query = dns.message.make_query(name, 'A', want_dnssec=True) + query.flags |= dns.flags.CD + res = self.sendUDPQuery(query) + self.checkPadding(res) + self.assertRRsetInAnswer(res, expected) + +class PaddingAllowedWhenPaddedTest(TestRecursorEDNSPadding): + + _confdir = 'PaddingWhenPadded' + _config_template = """edns-padding-from=127.0.0.1 +edns-padding-mode=padded-queries-only +edns-padding-tag=7830 + """ + + def testQueryWithPadding(self): + name = 'secure.example.' + expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.17') + po = paddingoption.PaddingOption(64) + query = dns.message.make_query(name, 'A', want_dnssec=True, options=[po]) + query.flags |= dns.flags.CD + res = self.sendUDPQuery(query) + self.checkPadding(res) + self.assertRRsetInAnswer(res, expected) + + def testqueryWithoutPadding(self): + name = 'secure.example.' + expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.17') + query = dns.message.make_query(name, 'A', want_dnssec=True) + query.flags |= dns.flags.CD + res = self.sendUDPQuery(query) + self.checkNoPadding(res) + self.assertRRsetInAnswer(res, expected) + +class PaddingAllowedAlwaysSameTagTest(TestRecursorEDNSPadding): + + # we use the default tag (0) for padded responses, which will cause + # the same packet cache entry (with padding ) to be returned to a client + # not allowed by the edns-padding-from list + _confdir = 'PaddingAlwaysSameTag' + _config_template = """edns-padding-from=127.0.0.1 +edns-padding-mode=always +edns-padding-tag=0 +local-address=127.0.0.1, ::1 + """ + + def testQueryWithPadding(self): + name = 'secure.example.' + expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.17') + po = paddingoption.PaddingOption(64) + query = dns.message.make_query(name, 'A', want_dnssec=True, options=[po]) + query.flags |= dns.flags.CD + res = self.sendUDPQuery(query) + self.checkPadding(res) + self.assertRRsetInAnswer(res, expected) + + res = self.sendUDPQueryOverIPv6(query) + self.checkPadding(res) + self.assertRRsetInAnswer(res, expected) + + def testqueryWithoutPadding(self): + name = 'secure.example.' + expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.17') + query = dns.message.make_query(name, 'A', want_dnssec=True) + query.flags |= dns.flags.CD + res = self.sendUDPQuery(query) + self.checkPadding(res) + self.assertRRsetInAnswer(res, expected) + + res = self.sendUDPQueryOverIPv6(query) + self.checkPadding(res) + self.assertRRsetInAnswer(res, expected) -- 2.39.2