From 1435fd47329c06c6bd3c47f30ca5f90b0dc8ec5c Mon Sep 17 00:00:00 2001 From: Tomas Krizek Date: Sat, 18 Dec 2021 18:12:47 +0100 Subject: [PATCH] edns: implement Extended DNS Error Option support This is quite minimalistic implementation of the Extended DNS Errors (RFC 8914). It just allows access to code and text fields. --- dns/edns.py | 58 +++++++++++++++++++++++++++++++++++++++++++++- tests/test_edns.py | 36 ++++++++++++++++++++++++++++ 2 files changed, 93 insertions(+), 1 deletion(-) diff --git a/dns/edns.py b/dns/edns.py index 237178f2..5d85da9b 100644 --- a/dns/edns.py +++ b/dns/edns.py @@ -47,6 +47,8 @@ class OptionType(dns.enum.IntEnum): PADDING = 12 #: CHAIN CHAIN = 13 + #: EDE (extended-dns-error) + EDE = 15 @classmethod def _maximum(cls): @@ -300,10 +302,63 @@ class ECSOption(Option): return cls(addr, src, scope) +class EDEOption(Option): + """Extended DNS Error (EDE, RFC8914)""" + + def __init__(self, code, text=None): + """*code*, an ``int``, the info code of the extended error. + + *text*, a ``str``, optional field containing additional textual + information. + """ + + super().__init__(OptionType.EDE) + + if code < 0 or code > 65535: + raise ValueError('code must be uint16') + if text is not None and not isinstance(text, str): + raise ValueError('text must be string or None') + + self.code = code + self.text = text + + def to_text(self): + output = "EDE {}".format(self.code) + if self.text is not None: + output += ': {}'.format(self.text) + return output + + def to_wire(self, file=None): + value = struct.pack('!H', self.code) + if self.text is not None: + value += self.text.encode('utf8') + + if file: + file.write(value) + else: + return value + + @classmethod + def from_wire_parser(cls, otype, parser): + code = parser.get_uint16() + text = parser.get_remaining() + + if text: + if text[-1] == 0: # text MAY be null-terminated + text = text[:-1] + text = text.decode('utf8') + else: + text = None + + return cls(code, text) + + _type_to_class = { - OptionType.ECS: ECSOption + OptionType.ECS: ECSOption, + OptionType.EDE: EDEOption, } + def get_option_class(otype): """Return the class for the specified option type. @@ -372,5 +427,6 @@ COOKIE = OptionType.COOKIE KEEPALIVE = OptionType.KEEPALIVE PADDING = OptionType.PADDING CHAIN = OptionType.CHAIN +EDE = OptionType.EDE ### END generated OptionType constants diff --git a/tests/test_edns.py b/tests/test_edns.py index 6ba0c995..427eb29c 100644 --- a/tests/test_edns.py +++ b/tests/test_edns.py @@ -134,6 +134,42 @@ class OptionTestCase(unittest.TestCase): opt = dns.edns.option_from_wire(dns.edns.ECS, b'\x00\xff\x18\x00\x01\x02\x03', 0, 7) + def testEDEOption(self): + opt = dns.edns.EDEOption(3) + io = BytesIO() + opt.to_wire(io) + data = io.getvalue() + self.assertEqual(data, b'\x00\x03') + self.assertEqual(str(opt), 'EDE 3') + # with text + opt = dns.edns.EDEOption(16, 'test') + io = BytesIO() + opt.to_wire(io) + data = io.getvalue() + self.assertEqual(data, b'\x00\x10test') + + def testEDEOption_invalid(self): + with self.assertRaises(ValueError): + opt = dns.edns.EDEOption(-1) + with self.assertRaises(ValueError): + opt = dns.edns.EDEOption(65536) + with self.assertRaises(ValueError): + opt = dns.edns.EDEOption(0, 0) + + def testEDEOption_from_wire(self): + data = b'\x00\01' + self.assertEqual( + dns.edns.option_from_wire(dns.edns.EDE, data, 0, 2), + dns.edns.EDEOption(1)) + data = b'\x00\01test' + self.assertEqual( + dns.edns.option_from_wire(dns.edns.EDE, data, 0, 6), + dns.edns.EDEOption(1, 'test')) + # utf-8 text MAY be null-terminated + data = b'\x00\01test\x00' + self.assertEqual( + dns.edns.option_from_wire(dns.edns.EDE, data, 0, 7), + dns.edns.EDEOption(1, 'test')) def test_basic_relations(self): o1 = dns.edns.ECSOption.from_text('1.2.3.0/24/0') -- 2.47.3