From: Douglas Bagnall Date: Wed, 3 Jun 2020 02:42:41 +0000 (+1200) Subject: pytests: dns_packet tests check rcodes match Windows X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=cf44e414bde6ed85a1e19e186630759af904c2bf;p=thirdparty%2Fsamba.git pytests: dns_packet tests check rcodes match Windows the dns_packet tests originally checked only for a particular DoS situation (CVE-2020-10745) but now we widen them to ensure Samba's replies to invalid packets resembles those of Windows (in particular, Windows 2012r2). We want Samba to reply only when Windows replies, and with the same rcode. At present we fail a lot of these tests. The original CVE-2020-10745 test is retained and widened indirectly -- any test that leaves the server unable to respond within 0.5 seconds will count as a failure. BUG: https://bugzilla.samba.org/show_bug.cgi?id=14378 Signed-off-by: Douglas Bagnall Reviewed-by: Andreas Schneider --- diff --git a/python/samba/tests/dns_packet.py b/python/samba/tests/dns_packet.py index 61d5aabcc9d..c1fcb97776f 100644 --- a/python/samba/tests/dns_packet.py +++ b/python/samba/tests/dns_packet.py @@ -22,11 +22,12 @@ We don't use a proper client library so we can make improper packets. """ import os +import sys import struct import socket import select +import time from samba.dcerpc import dns, nbt - from samba.tests import TestCase @@ -37,12 +38,19 @@ def _msg_id(): SERVER = os.environ['SERVER_IP'] -SERVER_NAME = f"{os.environ['SERVER']}.{os.environ['REALM']}" +SERVER_NAME = "%s.%s" % (os.environ['SERVER'], os.environ['REALM']) +NBT_NAME = 'EOGFGLGPCACACACACACACACACACACACA' # "neko" TIMEOUT = 0.5 +VERBOSE = '-v' in sys.argv + +# We use OK for rcode assertions when we don't known whether the query +# is DNS or NBT (not exactly coincidentally, OK is 0 in both cases). +OK = dns.DNS_RCODE_OK + def encode_netbios_bytes(chars): - """Even RFC 1002 uses distancing quotes when calling this "compression".""" + """RFC 1002 calls this "first-level encoding".""" out = [] chars = (chars + b' ')[:16] for c in chars: @@ -56,12 +64,11 @@ class TestDnsPacketBase(TestCase): def tearDown(self): # we need to ensure the DNS server is responsive before - # continuing. - for i in range(40): - ok = self._known_good_query() - if ok: - return - print(f"the server is STILL unresponsive after {40 * TIMEOUT} seconds") + # continuing. This will catch the return of any DoS problems + # like CVE-2020-10745. + ok = self._known_good_query() + if not ok: + self.fail("the server is STILL unresponsive") def decode_reply(self, data): header = data[:12] @@ -119,21 +126,74 @@ class TestDnsPacketBase(TestCase): return header + b''.join(encoded_bits) def _test_query(self, names=(), expected_rcode=None): - if isinstance(names, str): names = [names] packet = self.construct_query(names) + start = time.time() s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.sendto(packet, self.server) r, _, _ = select.select([s], [], [], TIMEOUT) - s.close() - # It is reasonable to not reply to these packets (Windows - # doesn't), but it is not reasonable to render the server - # unresponsive. - if r != [s]: - ok = self._known_good_query() - self.assertTrue(ok, "the server is unresponsive") + + # For some queries Windows varies its response. In these + # cases, we accept any of the known replies, or noreply if it + # is an option. + if (isinstance(expected_rcode, set) and + None in expected_rcode and + r == []): + expected_rcode = None + + if expected_rcode is None: + # we don't think the server should answer (the packet is + # rubbish), but we also want to be sure that the reason + # the server is not answering is not that it is stuck + # doing work on the packet (c.f. CVE-2020-10745). So we + # wait for a while, then immediately send another, easy + # packet. If the reply to the easy packet comes back + # quickly, we say the server is good. + + elapsed = time.time() - start + if r: + data, addr = s.recvfrom(16 * 1024) + s.close() + + rcode = self.decode_reply(data)['rcode'] + self.fail( + "an answer was not expected " + "(timeout {}, elapsed {:.2f}, rcode {})".format(TIMEOUT, + elapsed, + rcode)) + + s.close() + + self.assertTrue(self._known_good_query(), + "The query timed out (good), but the server is " + "still unresponsive.") + + else: + if r != [s]: + s.close() + self.fail("an answer was expected within %s seconds)" % TIMEOUT) + + data, addr = s.recvfrom(16 * 1024) + s.close() + rcode = self.decode_reply(data)['rcode'] + + if isinstance(expected_rcode, set): + self.assertIn(rcode, + expected_rcode, + "expected RCODE %s, got %s" % (expected_rcode, + rcode)) + else: + self.assertEqual(rcode, + expected_rcode, + "expected RCODE %s, got %s" % (expected_rcode, + rcode)) + + if VERBOSE: + print(data, len(data)) + if VERBOSE: + print("succeeded in %f seconds" % (time.time() - start)) def _known_good_query(self): if self.server[1] == 53: @@ -169,6 +229,17 @@ class TestDnsPacketBase(TestCase): ok = self._known_good_query() self.assertTrue(ok, "the server is unresponsive") + def _make_long_name(self, length, first_component=None): + name = [] + if first_component is not None: + name.append(first_component) + length -= len(first_component) + 1 + while length > 33: + name.append("x" * 30) + length -= 31 + name.append("x" * length) + return '.'.join(name) + class TestDnsPackets(TestDnsPacketBase): server = (SERVER, 53) @@ -181,11 +252,27 @@ class TestDnsPackets(TestDnsPacketBase): def test_127_very_dotty_components(self): label = b'.' * 63 - self._test_many_repeated_components(label, 127) + # Windows will refuse to reply, but we also accept format error + self._test_many_repeated_components(label, 127, + expected_rcode={None, + nbt.NBT_RCODE_FMT}) def test_127_half_dotty_components(self): label = b'x.' * 31 + b'x' - self._test_many_repeated_components(label, 127) + # Windows will refuse to reply + self._test_many_repeated_components(label, 127, + expected_rcode={None, + nbt.NBT_RCODE_FMT}) + + def test_253_bytes(self): + name = self._make_long_name(253) + self._test_query(name, + expected_rcode=dns.DNS_RCODE_NXDOMAIN) + + def test_254_bytes(self): + name = self._make_long_name(254) + self._test_query(name, + expected_rcode=dns.DNS_RCODE_FORMERR) def test_empty_packet(self): self._test_empty_packet() @@ -195,6 +282,13 @@ class TestNbtPackets(TestDnsPacketBase): server = (SERVER, 137) qtype = 0x20 # NBT_QTYPE_NETBIOS + def _test_many_repeated_components(self, label, n, + expected_rcode=None): + name = [label] * n + name[0] = encode_netbios_bytes(label) + self._test_query([name], + expected_rcode=expected_rcode) + def _test_nbt_encode_query(self, names, *args, **kwargs): if isinstance(names, str): names = [names] @@ -212,19 +306,46 @@ class TestNbtPackets(TestDnsPacketBase): self._test_query(nbt_names, *args, **kwargs) - def _test_many_repeated_components(self, label, n, expected_rcode=None): - name = [label] * n - name[0] = encode_netbios_bytes(label) - self._test_query([name], - expected_rcode=expected_rcode) - def test_127_very_dotty_components(self): label = b'.' * 63 - self._test_many_repeated_components(label, 127) + # Windows will refuse to reply + self._test_many_repeated_components(label, 127, + expected_rcode={None, + nbt.NBT_RCODE_FMT}) def test_127_half_dotty_components(self): label = b'x.' * 31 + b'x' - self._test_many_repeated_components(label, 127) + # Windows will refuse to reply + self._test_many_repeated_components(label, 127, + expected_rcode={None, + nbt.NBT_RCODE_FMT}) def test_empty_packet(self): self._test_empty_packet() + + def test_253_bytes(self): + name = self._make_long_name(253, NBT_NAME) + self._test_query(name, + expected_rcode=nbt.NBT_RCODE_NAM) + + def test_254_bytes(self): + # This works because we follow Windows, not RFC 1001/1002. + # (see next test for a longer explanation). + name = self._make_long_name(254, NBT_NAME) + self._test_query(name, + expected_rcode=nbt.NBT_RCODE_NAM) + + def test_272_bytes(self): + # This works because we (contra RFC, following Windows) treat + # the 32 byte encoded Netbios name as if it were the 16 byte + # un-encoded form (used in MS-WINSRA), AND add three bytes + # because what kind of limit is 253 anyway? (matches 2012r2) + name = self._make_long_name(272, NBT_NAME) + self._test_query(name, + expected_rcode=nbt.NBT_RCODE_NAM) + + def test_273_bytes(self): + # Finally we exhaust Windows' generosity toward long names. + name = self._make_long_name(273, NBT_NAME) + self._test_query(name, + expected_rcode={None, nbt.NBT_RCODE_FMT}) diff --git a/selftest/knownfail.d/dns_packet b/selftest/knownfail.d/dns_packet new file mode 100644 index 00000000000..fa09f11adf4 --- /dev/null +++ b/selftest/knownfail.d/dns_packet @@ -0,0 +1,4 @@ +samba.tests.dns_packet.samba.tests.dns_packet.TestDnsPackets.test_127_half_dotty_components +samba.tests.dns_packet.samba.tests.dns_packet.TestDnsPackets.test_127_very_dotty_components +samba.tests.dns_packet.samba.tests.dns_packet.TestDnsPackets.test_254_bytes +samba.tests.dns_packet.samba.tests.dns_packet.TestNbtPackets.test_273_bytes