From f75cfc629dabbbf85c0ca1bd4c0bd400fb5267d3 Mon Sep 17 00:00:00 2001 From: Mukund Sivaraman Date: Mon, 25 May 2026 19:28:36 +0000 Subject: [PATCH] Add new IANA-allocated options from draft-muks-dns-filtering (#1269) --- dns/edns.py | 140 +++++++++++++++++++++++++++++++++++++++++++ doc/message-edns.rst | 12 ++++ tests/test_edns.py | 64 ++++++++++++++++++++ 3 files changed, 216 insertions(+) diff --git a/dns/edns.py b/dns/edns.py index f9e9e13f..a60f89cb 100644 --- a/dns/edns.py +++ b/dns/edns.py @@ -59,6 +59,14 @@ class OptionType(dns.enum.IntEnum): EDE = 15 #: REPORTCHANNEL REPORTCHANNEL = 18 + #: EDE-EXTRA-TEXT-LANGUAGE (Extended DNS Error EXTRA-TEXT language) + EDE_EXTRA_TEXT_LANGUAGE = 22 + #: FILTERING-CONTACT + FILTERING_CONTACT = 23 + #: FILTERING-ORGANIZATION + FILTERING_ORGANIZATION = 24 + #: FILTERING-DB + FILTERING_DB = 25 @classmethod def _maximum(cls): @@ -513,12 +521,140 @@ class ReportChannelOption(Option): return cls(parser.get_name()) +class EDEExtraTextLanguageOption(Option): + """Extended DNS Error EXTRA-TEXT language (EDE-EXTRA-TEXT-LANGUAGE)""" + + def __init__(self, language: str): + """Initialize an EDEExtraTextLanguageOption. + + :param language: The language of EXTRA-TEXT in the EDE option. + :type language: str + """ + + super().__init__(OptionType.EDE_EXTRA_TEXT_LANGUAGE) + self.language = language + + def to_wire(self, file: Any = None) -> bytes | None: + if file: + file.write(self.language.encode("utf8")) + return None + else: + return self.language + + def to_text(self) -> str: + return f"EDE-EXTRA-TEXT-LANGUAGE {self.language}" + + @classmethod + def from_wire_parser( + cls, otype: OptionType | str, parser: dns.wire.Parser + ) -> Option: + return cls(parser.get_remaining().decode("utf8")) + + +class FilteringContactOption(Option): + """Filtering contact (FILTERING-CONTACT)""" + + def __init__(self, contact: str): + """Initialize a FilteringContactOption. + + :param contact: A filtering contact URI as a string. + :type contact: str + """ + + super().__init__(OptionType.FILTERING_CONTACT) + self.contact = contact + + def to_wire(self, file: Any = None) -> bytes | None: + if file: + file.write(self.contact.encode("utf8")) + return None + else: + return self.contact + + def to_text(self) -> str: + return f"FILTERING-CONTACT {self.contact}" + + @classmethod + def from_wire_parser( + cls, otype: OptionType | str, parser: dns.wire.Parser + ) -> Option: + return cls(parser.get_remaining().decode("utf8")) + + +class FilteringOrganizationOption(Option): + """Filtering organization (FILTERING-ORGANIZATION)""" + + def __init__(self, organization: str): + """Initialize a FilteringOrganizationOption. + + :param organization: The filtering organization. + :type organization: str + """ + + super().__init__(OptionType.FILTERING_ORGANIZATION) + self.organization = organization + + def to_wire(self, file: Any = None) -> bytes | None: + if file: + file.write(self.organization.encode("utf8")) + return None + else: + return self.organization + + def to_text(self) -> str: + return f"FILTERING-ORGANIZATION {self.organization}" + + @classmethod + def from_wire_parser( + cls, otype: OptionType | str, parser: dns.wire.Parser + ) -> Option: + return cls(parser.get_remaining().decode("utf8")) + + +class FilteringDBOption(Option): + """Filtering DB (FILTERING-DB)""" + + def __init__(self, db: str): + """Initialize a FilteringDBOption. + + :param db: The filtering database containing the identifier, + name, or description of the filtering database + against which a matched query caused the filtering to + occur. + :type db: str + + """ + + super().__init__(OptionType.FILTERING_DB) + self.db = db + + def to_wire(self, file: Any = None) -> bytes | None: + if file: + file.write(self.db.encode("utf8")) + return None + else: + return self.db + + def to_text(self) -> str: + return f"FILTERING-DB {self.db}" + + @classmethod + def from_wire_parser( + cls, otype: OptionType | str, parser: dns.wire.Parser + ) -> Option: + return cls(parser.get_remaining().decode("utf8")) + + _type_to_class: dict[OptionType, Any] = { OptionType.ECS: ECSOption, OptionType.EDE: EDEOption, OptionType.NSID: NSIDOption, OptionType.COOKIE: CookieOption, OptionType.REPORTCHANNEL: ReportChannelOption, + OptionType.EDE_EXTRA_TEXT_LANGUAGE: EDEExtraTextLanguageOption, + OptionType.FILTERING_CONTACT: FilteringContactOption, + OptionType.FILTERING_ORGANIZATION: FilteringOrganizationOption, + OptionType.FILTERING_DB: FilteringDBOption, } @@ -596,5 +732,9 @@ PADDING = OptionType.PADDING CHAIN = OptionType.CHAIN EDE = OptionType.EDE REPORTCHANNEL = OptionType.REPORTCHANNEL +EDE_EXTRA_TEXT_LANGUAGE = OptionType.EDE_EXTRA_TEXT_LANGUAGE +FILTERING_CONTACT = OptionType.FILTERING_CONTACT +FILTERING_ORGANIZATION = OptionType.FILTERING_ORGANIZATION +FILTERING_DB = OptionType.FILTERING_DB ### END generated OptionType constants diff --git a/doc/message-edns.rst b/doc/message-edns.rst index a21d0e83..de003af4 100644 --- a/doc/message-edns.rst +++ b/doc/message-edns.rst @@ -45,6 +45,18 @@ will create a :py:class:`dns.edns.ECSOption` object to represent it. .. autoclass:: dns.edns.ReportChannelOption :members: +.. autoclass:: dns.edns.EDEExtraTextLanguageOption + :members: + +.. autoclass:: dns.edns.FilteringContactOption + :members: + +.. autoclass:: dns.edns.FilteringOrganizationOption + :members: + +.. autoclass:: dns.edns.FilteringDBOption + :members: + .. autofunction:: dns.edns.get_option_class .. autofunction:: dns.edns.option_from_wire_parser .. autofunction:: dns.edns.option_from_wire diff --git a/tests/test_edns.py b/tests/test_edns.py index 1229c1e9..6bf044ec 100644 --- a/tests/test_edns.py +++ b/tests/test_edns.py @@ -266,6 +266,66 @@ class OptionTestCase(unittest.TestCase): ) self.assertEqual(opt2.agent_domain, agent_domain) + def testEDEExtraTextLanguageOption(self): + language = "en" + expected_wire = bytes(language, "utf8") + opt = dns.edns.EDEExtraTextLanguageOption(language) + io = BytesIO() + opt.to_wire(io) + data = io.getvalue() + self.assertEqual(data, expected_wire) + self.assertEqual(str(opt), "EDE-EXTRA-TEXT-LANGUAGE en") + opt2 = dns.edns.option_from_wire( + dns.edns.OptionType.EDE_EXTRA_TEXT_LANGUAGE, + expected_wire, 0, len(expected_wire) + ) + self.assertEqual(opt2.language, language) + + def testFilteringContactOption(self): + contact = "mailto:support@example.com" + expected_wire = bytes(contact, "utf8") + opt = dns.edns.FilteringContactOption(contact) + io = BytesIO() + opt.to_wire(io) + data = io.getvalue() + self.assertEqual(data, expected_wire) + self.assertEqual(str(opt), "FILTERING-CONTACT mailto:support@example.com") + opt2 = dns.edns.option_from_wire( + dns.edns.OptionType.FILTERING_CONTACT, + expected_wire, 0, len(expected_wire) + ) + self.assertEqual(opt2.contact, contact) + + def testFilteringOrganizationOption(self): + organization = "The Example Organization" + expected_wire = bytes(organization, "utf8") + opt = dns.edns.FilteringOrganizationOption(organization) + io = BytesIO() + opt.to_wire(io) + data = io.getvalue() + self.assertEqual(data, expected_wire) + self.assertEqual(str(opt), "FILTERING-ORGANIZATION The Example Organization") + opt2 = dns.edns.option_from_wire( + dns.edns.OptionType.FILTERING_ORGANIZATION, + expected_wire, 0, len(expected_wire) + ) + self.assertEqual(opt2.organization, organization) + + def testFilteringDBOption(self): + db = "Government Anti-Piracy Policies #1" + expected_wire = b"Government Anti-Piracy Policies #1" + opt = dns.edns.FilteringDBOption(db) + io = BytesIO() + opt.to_wire(io) + data = io.getvalue() + self.assertEqual(data, expected_wire) + self.assertEqual(str(opt), "FILTERING-DB Government Anti-Piracy Policies #1") + opt2 = dns.edns.option_from_wire( + dns.edns.OptionType.FILTERING_DB, + expected_wire, 0, len(expected_wire) + ) + self.assertEqual(opt2.db, db) + def test_option_registration(self): U32OptionType = 9999 @@ -312,3 +372,7 @@ class OptionTestCase(unittest.TestCase): generic = dns.edns.GenericOption(12345, "foo") assert generic.to_generic() is generic + + +if __name__ == "__main__": + unittest.main() -- 2.47.3