]> git.ipfire.org Git - thirdparty/dnspython.git/commitdiff
Add new IANA-allocated options from draft-muks-dns-filtering (#1269)
authorMukund Sivaraman <muks@mukund.org>
Mon, 25 May 2026 19:28:36 +0000 (19:28 +0000)
committerGitHub <noreply@github.com>
Mon, 25 May 2026 19:28:36 +0000 (12:28 -0700)
dns/edns.py
doc/message-edns.rst
tests/test_edns.py

index f9e9e13f2f7d4608b6efbfc2e9db940b723d1b2c..a60f89cb1636576e4f940f1fd63470fb8c4f9c96 100644 (file)
@@ -59,6 +59,14 @@ class OptionType(dns.enum.IntEnum):
     EDE = 15
     #: REPORTCHANNEL
     REPORTCHANNEL = 18
+    #: EDE-EXTRA-TEXT-LANGUAGE (Extended DNS Error EXTRA-TEXT language)
+    EDE_EXTRA_TEXT_LANGUAGE = 22
+    #: FILTERING-CONTACT
+    FILTERING_CONTACT = 23
+    #: FILTERING-ORGANIZATION
+    FILTERING_ORGANIZATION = 24
+    #: FILTERING-DB
+    FILTERING_DB = 25
 
     @classmethod
     def _maximum(cls):
@@ -513,12 +521,140 @@ class ReportChannelOption(Option):
         return cls(parser.get_name())
 
 
+class EDEExtraTextLanguageOption(Option):
+    """Extended DNS Error EXTRA-TEXT language (EDE-EXTRA-TEXT-LANGUAGE)"""
+
+    def __init__(self, language: str):
+        """Initialize an EDEExtraTextLanguageOption.
+
+        :param language: The language of EXTRA-TEXT in the EDE option.
+        :type language: str
+        """
+
+        super().__init__(OptionType.EDE_EXTRA_TEXT_LANGUAGE)
+        self.language = language
+
+    def to_wire(self, file: Any = None) -> bytes | None:
+        if file:
+            file.write(self.language.encode("utf8"))
+            return None
+        else:
+            return self.language
+
+    def to_text(self) -> str:
+        return f"EDE-EXTRA-TEXT-LANGUAGE {self.language}"
+
+    @classmethod
+    def from_wire_parser(
+        cls, otype: OptionType | str, parser: dns.wire.Parser
+    ) -> Option:
+        return cls(parser.get_remaining().decode("utf8"))
+
+
+class FilteringContactOption(Option):
+    """Filtering contact (FILTERING-CONTACT)"""
+
+    def __init__(self, contact: str):
+        """Initialize a FilteringContactOption.
+
+        :param contact: A filtering contact URI as a string.
+        :type contact: str
+        """
+
+        super().__init__(OptionType.FILTERING_CONTACT)
+        self.contact = contact
+
+    def to_wire(self, file: Any = None) -> bytes | None:
+        if file:
+            file.write(self.contact.encode("utf8"))
+            return None
+        else:
+            return self.contact
+
+    def to_text(self) -> str:
+        return f"FILTERING-CONTACT {self.contact}"
+
+    @classmethod
+    def from_wire_parser(
+        cls, otype: OptionType | str, parser: dns.wire.Parser
+    ) -> Option:
+        return cls(parser.get_remaining().decode("utf8"))
+
+
+class FilteringOrganizationOption(Option):
+    """Filtering organization (FILTERING-ORGANIZATION)"""
+
+    def __init__(self, organization: str):
+        """Initialize a FilteringOrganizationOption.
+
+        :param organization: The filtering organization.
+        :type organization: str
+        """
+
+        super().__init__(OptionType.FILTERING_ORGANIZATION)
+        self.organization = organization
+
+    def to_wire(self, file: Any = None) -> bytes | None:
+        if file:
+            file.write(self.organization.encode("utf8"))
+            return None
+        else:
+            return self.organization
+
+    def to_text(self) -> str:
+        return f"FILTERING-ORGANIZATION {self.organization}"
+
+    @classmethod
+    def from_wire_parser(
+        cls, otype: OptionType | str, parser: dns.wire.Parser
+    ) -> Option:
+        return cls(parser.get_remaining().decode("utf8"))
+
+
+class FilteringDBOption(Option):
+    """Filtering DB (FILTERING-DB)"""
+
+    def __init__(self, db: str):
+        """Initialize a FilteringDBOption.
+
+        :param db: The filtering database containing the identifier,
+                   name, or description of the filtering database
+                   against which a matched query caused the filtering to
+                   occur.
+        :type db: str
+
+        """
+
+        super().__init__(OptionType.FILTERING_DB)
+        self.db = db
+
+    def to_wire(self, file: Any = None) -> bytes | None:
+        if file:
+            file.write(self.db.encode("utf8"))
+            return None
+        else:
+            return self.db
+
+    def to_text(self) -> str:
+        return f"FILTERING-DB {self.db}"
+
+    @classmethod
+    def from_wire_parser(
+        cls, otype: OptionType | str, parser: dns.wire.Parser
+    ) -> Option:
+        return cls(parser.get_remaining().decode("utf8"))
+
+
 _type_to_class: dict[OptionType, Any] = {
     OptionType.ECS: ECSOption,
     OptionType.EDE: EDEOption,
     OptionType.NSID: NSIDOption,
     OptionType.COOKIE: CookieOption,
     OptionType.REPORTCHANNEL: ReportChannelOption,
+    OptionType.EDE_EXTRA_TEXT_LANGUAGE: EDEExtraTextLanguageOption,
+    OptionType.FILTERING_CONTACT: FilteringContactOption,
+    OptionType.FILTERING_ORGANIZATION: FilteringOrganizationOption,
+    OptionType.FILTERING_DB: FilteringDBOption,
 }
 
 
@@ -596,5 +732,9 @@ PADDING = OptionType.PADDING
 CHAIN = OptionType.CHAIN
 EDE = OptionType.EDE
 REPORTCHANNEL = OptionType.REPORTCHANNEL
+EDE_EXTRA_TEXT_LANGUAGE = OptionType.EDE_EXTRA_TEXT_LANGUAGE
+FILTERING_CONTACT = OptionType.FILTERING_CONTACT
+FILTERING_ORGANIZATION = OptionType.FILTERING_ORGANIZATION
+FILTERING_DB = OptionType.FILTERING_DB
 
 ### END generated OptionType constants
index a21d0e836ff70a81935fbf5b26b741b74a453511..de003af4b9961103efdc6995dab30c5368927f12 100644 (file)
@@ -45,6 +45,18 @@ will create a :py:class:`dns.edns.ECSOption` object to represent it.
 .. autoclass:: dns.edns.ReportChannelOption
    :members:
 
+.. autoclass:: dns.edns.EDEExtraTextLanguageOption
+   :members:
+
+.. autoclass:: dns.edns.FilteringContactOption
+   :members:
+
+.. autoclass:: dns.edns.FilteringOrganizationOption
+   :members:
+
+.. autoclass:: dns.edns.FilteringDBOption
+   :members:
+
 .. autofunction:: dns.edns.get_option_class
 .. autofunction:: dns.edns.option_from_wire_parser
 .. autofunction:: dns.edns.option_from_wire
index 1229c1e9c7f0beb96dd8c09b8ea89c996ca91191..6bf044eca24874d831f6510e8924f328477b485a 100644 (file)
@@ -266,6 +266,66 @@ class OptionTestCase(unittest.TestCase):
         )
         self.assertEqual(opt2.agent_domain, agent_domain)
 
+    def testEDEExtraTextLanguageOption(self):
+        language = "en"
+        expected_wire = bytes(language, "utf8")
+        opt = dns.edns.EDEExtraTextLanguageOption(language)
+        io = BytesIO()
+        opt.to_wire(io)
+        data = io.getvalue()
+        self.assertEqual(data, expected_wire)
+        self.assertEqual(str(opt), "EDE-EXTRA-TEXT-LANGUAGE en")
+        opt2 = dns.edns.option_from_wire(
+            dns.edns.OptionType.EDE_EXTRA_TEXT_LANGUAGE,
+            expected_wire, 0, len(expected_wire)
+        )
+        self.assertEqual(opt2.language, language)
+
+    def testFilteringContactOption(self):
+        contact = "mailto:support@example.com"
+        expected_wire = bytes(contact, "utf8")
+        opt = dns.edns.FilteringContactOption(contact)
+        io = BytesIO()
+        opt.to_wire(io)
+        data = io.getvalue()
+        self.assertEqual(data, expected_wire)
+        self.assertEqual(str(opt), "FILTERING-CONTACT mailto:support@example.com")
+        opt2 = dns.edns.option_from_wire(
+            dns.edns.OptionType.FILTERING_CONTACT,
+            expected_wire, 0, len(expected_wire)
+        )
+        self.assertEqual(opt2.contact, contact)
+
+    def testFilteringOrganizationOption(self):
+        organization = "The Example Organization"
+        expected_wire = bytes(organization, "utf8")
+        opt = dns.edns.FilteringOrganizationOption(organization)
+        io = BytesIO()
+        opt.to_wire(io)
+        data = io.getvalue()
+        self.assertEqual(data, expected_wire)
+        self.assertEqual(str(opt), "FILTERING-ORGANIZATION The Example Organization")
+        opt2 = dns.edns.option_from_wire(
+            dns.edns.OptionType.FILTERING_ORGANIZATION,
+            expected_wire, 0, len(expected_wire)
+        )
+        self.assertEqual(opt2.organization, organization)
+
+    def testFilteringDBOption(self):
+        db = "Government Anti-Piracy Policies #1"
+        expected_wire = b"Government Anti-Piracy Policies #1"
+        opt = dns.edns.FilteringDBOption(db)
+        io = BytesIO()
+        opt.to_wire(io)
+        data = io.getvalue()
+        self.assertEqual(data, expected_wire)
+        self.assertEqual(str(opt), "FILTERING-DB Government Anti-Piracy Policies #1")
+        opt2 = dns.edns.option_from_wire(
+            dns.edns.OptionType.FILTERING_DB,
+            expected_wire, 0, len(expected_wire)
+        )
+        self.assertEqual(opt2.db, db)
+
     def test_option_registration(self):
         U32OptionType = 9999
 
@@ -312,3 +372,7 @@ class OptionTestCase(unittest.TestCase):
 
         generic = dns.edns.GenericOption(12345, "foo")
         assert generic.to_generic() is generic
+
+
+if __name__ == "__main__":
+    unittest.main()