"""
import os
+import sys
import struct
import socket
import select
+import time
from samba.dcerpc import dns, nbt
-
from samba.tests import TestCase
SERVER = os.environ['SERVER_IP']
-SERVER_NAME = f"{os.environ['SERVER']}.{os.environ['REALM']}"
+SERVER_NAME = "%s.%s" % (os.environ['SERVER'], os.environ['REALM'])
+NBT_NAME = 'EOGFGLGPCACACACACACACACACACACACA' # "neko"
TIMEOUT = 0.5
+VERBOSE = '-v' in sys.argv
+
+# We use OK for rcode assertions when we don't known whether the query
+# is DNS or NBT (not exactly coincidentally, OK is 0 in both cases).
+OK = dns.DNS_RCODE_OK
+
def encode_netbios_bytes(chars):
- """Even RFC 1002 uses distancing quotes when calling this "compression"."""
+ """RFC 1002 calls this "first-level encoding"."""
out = []
chars = (chars + b' ')[:16]
for c in chars:
def tearDown(self):
# we need to ensure the DNS server is responsive before
- # continuing.
- for i in range(40):
- ok = self._known_good_query()
- if ok:
- return
- print(f"the server is STILL unresponsive after {40 * TIMEOUT} seconds")
+ # continuing. This will catch the return of any DoS problems
+ # like CVE-2020-10745.
+ ok = self._known_good_query()
+ if not ok:
+ self.fail("the server is STILL unresponsive")
def decode_reply(self, data):
header = data[:12]
return header + b''.join(encoded_bits)
def _test_query(self, names=(), expected_rcode=None):
-
if isinstance(names, str):
names = [names]
packet = self.construct_query(names)
+ start = time.time()
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.sendto(packet, self.server)
r, _, _ = select.select([s], [], [], TIMEOUT)
- s.close()
- # It is reasonable to not reply to these packets (Windows
- # doesn't), but it is not reasonable to render the server
- # unresponsive.
- if r != [s]:
- ok = self._known_good_query()
- self.assertTrue(ok, "the server is unresponsive")
+
+ # For some queries Windows varies its response. In these
+ # cases, we accept any of the known replies, or noreply if it
+ # is an option.
+ if (isinstance(expected_rcode, set) and
+ None in expected_rcode and
+ r == []):
+ expected_rcode = None
+
+ if expected_rcode is None:
+ # we don't think the server should answer (the packet is
+ # rubbish), but we also want to be sure that the reason
+ # the server is not answering is not that it is stuck
+ # doing work on the packet (c.f. CVE-2020-10745). So we
+ # wait for a while, then immediately send another, easy
+ # packet. If the reply to the easy packet comes back
+ # quickly, we say the server is good.
+
+ elapsed = time.time() - start
+ if r:
+ data, addr = s.recvfrom(16 * 1024)
+ s.close()
+
+ rcode = self.decode_reply(data)['rcode']
+ self.fail(
+ "an answer was not expected "
+ "(timeout {}, elapsed {:.2f}, rcode {})".format(TIMEOUT,
+ elapsed,
+ rcode))
+
+ s.close()
+
+ self.assertTrue(self._known_good_query(),
+ "The query timed out (good), but the server is "
+ "still unresponsive.")
+
+ else:
+ if r != [s]:
+ s.close()
+ self.fail("an answer was expected within %s seconds)" % TIMEOUT)
+
+ data, addr = s.recvfrom(16 * 1024)
+ s.close()
+ rcode = self.decode_reply(data)['rcode']
+
+ if isinstance(expected_rcode, set):
+ self.assertIn(rcode,
+ expected_rcode,
+ "expected RCODE %s, got %s" % (expected_rcode,
+ rcode))
+ else:
+ self.assertEqual(rcode,
+ expected_rcode,
+ "expected RCODE %s, got %s" % (expected_rcode,
+ rcode))
+
+ if VERBOSE:
+ print(data, len(data))
+ if VERBOSE:
+ print("succeeded in %f seconds" % (time.time() - start))
def _known_good_query(self):
if self.server[1] == 53:
ok = self._known_good_query()
self.assertTrue(ok, "the server is unresponsive")
+ def _make_long_name(self, length, first_component=None):
+ name = []
+ if first_component is not None:
+ name.append(first_component)
+ length -= len(first_component) + 1
+ while length > 33:
+ name.append("x" * 30)
+ length -= 31
+ name.append("x" * length)
+ return '.'.join(name)
+
class TestDnsPackets(TestDnsPacketBase):
server = (SERVER, 53)
def test_127_very_dotty_components(self):
label = b'.' * 63
- self._test_many_repeated_components(label, 127)
+ # Windows will refuse to reply, but we also accept format error
+ self._test_many_repeated_components(label, 127,
+ expected_rcode={None,
+ nbt.NBT_RCODE_FMT})
def test_127_half_dotty_components(self):
label = b'x.' * 31 + b'x'
- self._test_many_repeated_components(label, 127)
+ # Windows will refuse to reply
+ self._test_many_repeated_components(label, 127,
+ expected_rcode={None,
+ nbt.NBT_RCODE_FMT})
+
+ def test_253_bytes(self):
+ name = self._make_long_name(253)
+ self._test_query(name,
+ expected_rcode=dns.DNS_RCODE_NXDOMAIN)
+
+ def test_254_bytes(self):
+ name = self._make_long_name(254)
+ self._test_query(name,
+ expected_rcode=dns.DNS_RCODE_FORMERR)
def test_empty_packet(self):
self._test_empty_packet()
server = (SERVER, 137)
qtype = 0x20 # NBT_QTYPE_NETBIOS
+ def _test_many_repeated_components(self, label, n,
+ expected_rcode=None):
+ name = [label] * n
+ name[0] = encode_netbios_bytes(label)
+ self._test_query([name],
+ expected_rcode=expected_rcode)
+
def _test_nbt_encode_query(self, names, *args, **kwargs):
if isinstance(names, str):
names = [names]
self._test_query(nbt_names, *args, **kwargs)
- def _test_many_repeated_components(self, label, n, expected_rcode=None):
- name = [label] * n
- name[0] = encode_netbios_bytes(label)
- self._test_query([name],
- expected_rcode=expected_rcode)
-
def test_127_very_dotty_components(self):
label = b'.' * 63
- self._test_many_repeated_components(label, 127)
+ # Windows will refuse to reply
+ self._test_many_repeated_components(label, 127,
+ expected_rcode={None,
+ nbt.NBT_RCODE_FMT})
def test_127_half_dotty_components(self):
label = b'x.' * 31 + b'x'
- self._test_many_repeated_components(label, 127)
+ # Windows will refuse to reply
+ self._test_many_repeated_components(label, 127,
+ expected_rcode={None,
+ nbt.NBT_RCODE_FMT})
def test_empty_packet(self):
self._test_empty_packet()
+
+ def test_253_bytes(self):
+ name = self._make_long_name(253, NBT_NAME)
+ self._test_query(name,
+ expected_rcode=nbt.NBT_RCODE_NAM)
+
+ def test_254_bytes(self):
+ # This works because we follow Windows, not RFC 1001/1002.
+ # (see next test for a longer explanation).
+ name = self._make_long_name(254, NBT_NAME)
+ self._test_query(name,
+ expected_rcode=nbt.NBT_RCODE_NAM)
+
+ def test_272_bytes(self):
+ # This works because we (contra RFC, following Windows) treat
+ # the 32 byte encoded Netbios name as if it were the 16 byte
+ # un-encoded form (used in MS-WINSRA), AND add three bytes
+ # because what kind of limit is 253 anyway? (matches 2012r2)
+ name = self._make_long_name(272, NBT_NAME)
+ self._test_query(name,
+ expected_rcode=nbt.NBT_RCODE_NAM)
+
+ def test_273_bytes(self):
+ # Finally we exhaust Windows' generosity toward long names.
+ name = self._make_long_name(273, NBT_NAME)
+ self._test_query(name,
+ expected_rcode={None, nbt.NBT_RCODE_FMT})