EDE = 15
#: REPORTCHANNEL
REPORTCHANNEL = 18
+ #: EDE-EXTRA-TEXT-LANGUAGE (Extended DNS Error EXTRA-TEXT language)
+ EDE_EXTRA_TEXT_LANGUAGE = 22
+ #: FILTERING-CONTACT
+ FILTERING_CONTACT = 23
+ #: FILTERING-ORGANIZATION
+ FILTERING_ORGANIZATION = 24
+ #: FILTERING-DB
+ FILTERING_DB = 25
@classmethod
def _maximum(cls):
return cls(parser.get_name())
+class EDEExtraTextLanguageOption(Option):
+ """Extended DNS Error EXTRA-TEXT language (EDE-EXTRA-TEXT-LANGUAGE)"""
+
+ def __init__(self, language: str):
+ """Initialize an EDEExtraTextLanguageOption.
+
+ :param language: The language of EXTRA-TEXT in the EDE option.
+ :type language: str
+ """
+
+ super().__init__(OptionType.EDE_EXTRA_TEXT_LANGUAGE)
+ self.language = language
+
+ def to_wire(self, file: Any = None) -> bytes | None:
+ if file:
+ file.write(self.language.encode("utf8"))
+ return None
+ else:
+ return self.language
+
+ def to_text(self) -> str:
+ return f"EDE-EXTRA-TEXT-LANGUAGE {self.language}"
+
+ @classmethod
+ def from_wire_parser(
+ cls, otype: OptionType | str, parser: dns.wire.Parser
+ ) -> Option:
+ return cls(parser.get_remaining().decode("utf8"))
+
+
+class FilteringContactOption(Option):
+ """Filtering contact (FILTERING-CONTACT)"""
+
+ def __init__(self, contact: str):
+ """Initialize a FilteringContactOption.
+
+ :param contact: A filtering contact URI as a string.
+ :type contact: str
+ """
+
+ super().__init__(OptionType.FILTERING_CONTACT)
+ self.contact = contact
+
+ def to_wire(self, file: Any = None) -> bytes | None:
+ if file:
+ file.write(self.contact.encode("utf8"))
+ return None
+ else:
+ return self.contact
+
+ def to_text(self) -> str:
+ return f"FILTERING-CONTACT {self.contact}"
+
+ @classmethod
+ def from_wire_parser(
+ cls, otype: OptionType | str, parser: dns.wire.Parser
+ ) -> Option:
+ return cls(parser.get_remaining().decode("utf8"))
+
+
+class FilteringOrganizationOption(Option):
+ """Filtering organization (FILTERING-ORGANIZATION)"""
+
+ def __init__(self, organization: str):
+ """Initialize a FilteringOrganizationOption.
+
+ :param organization: The filtering organization.
+ :type organization: str
+ """
+
+ super().__init__(OptionType.FILTERING_ORGANIZATION)
+ self.organization = organization
+
+ def to_wire(self, file: Any = None) -> bytes | None:
+ if file:
+ file.write(self.organization.encode("utf8"))
+ return None
+ else:
+ return self.organization
+
+ def to_text(self) -> str:
+ return f"FILTERING-ORGANIZATION {self.organization}"
+
+ @classmethod
+ def from_wire_parser(
+ cls, otype: OptionType | str, parser: dns.wire.Parser
+ ) -> Option:
+ return cls(parser.get_remaining().decode("utf8"))
+
+
+class FilteringDBOption(Option):
+ """Filtering DB (FILTERING-DB)"""
+
+ def __init__(self, db: str):
+ """Initialize a FilteringDBOption.
+
+ :param db: The filtering database containing the identifier,
+ name, or description of the filtering database
+ against which a matched query caused the filtering to
+ occur.
+ :type db: str
+
+ """
+
+ super().__init__(OptionType.FILTERING_DB)
+ self.db = db
+
+ def to_wire(self, file: Any = None) -> bytes | None:
+ if file:
+ file.write(self.db.encode("utf8"))
+ return None
+ else:
+ return self.db
+
+ def to_text(self) -> str:
+ return f"FILTERING-DB {self.db}"
+
+ @classmethod
+ def from_wire_parser(
+ cls, otype: OptionType | str, parser: dns.wire.Parser
+ ) -> Option:
+ return cls(parser.get_remaining().decode("utf8"))
+
+
_type_to_class: dict[OptionType, Any] = {
OptionType.ECS: ECSOption,
OptionType.EDE: EDEOption,
OptionType.NSID: NSIDOption,
OptionType.COOKIE: CookieOption,
OptionType.REPORTCHANNEL: ReportChannelOption,
+ OptionType.EDE_EXTRA_TEXT_LANGUAGE: EDEExtraTextLanguageOption,
+ OptionType.FILTERING_CONTACT: FilteringContactOption,
+ OptionType.FILTERING_ORGANIZATION: FilteringOrganizationOption,
+ OptionType.FILTERING_DB: FilteringDBOption,
}
CHAIN = OptionType.CHAIN
EDE = OptionType.EDE
REPORTCHANNEL = OptionType.REPORTCHANNEL
+EDE_EXTRA_TEXT_LANGUAGE = OptionType.EDE_EXTRA_TEXT_LANGUAGE
+FILTERING_CONTACT = OptionType.FILTERING_CONTACT
+FILTERING_ORGANIZATION = OptionType.FILTERING_ORGANIZATION
+FILTERING_DB = OptionType.FILTERING_DB
### END generated OptionType constants
)
self.assertEqual(opt2.agent_domain, agent_domain)
+ def testEDEExtraTextLanguageOption(self):
+ language = "en"
+ expected_wire = bytes(language, "utf8")
+ opt = dns.edns.EDEExtraTextLanguageOption(language)
+ io = BytesIO()
+ opt.to_wire(io)
+ data = io.getvalue()
+ self.assertEqual(data, expected_wire)
+ self.assertEqual(str(opt), "EDE-EXTRA-TEXT-LANGUAGE en")
+ opt2 = dns.edns.option_from_wire(
+ dns.edns.OptionType.EDE_EXTRA_TEXT_LANGUAGE,
+ expected_wire, 0, len(expected_wire)
+ )
+ self.assertEqual(opt2.language, language)
+
+ def testFilteringContactOption(self):
+ contact = "mailto:support@example.com"
+ expected_wire = bytes(contact, "utf8")
+ opt = dns.edns.FilteringContactOption(contact)
+ io = BytesIO()
+ opt.to_wire(io)
+ data = io.getvalue()
+ self.assertEqual(data, expected_wire)
+ self.assertEqual(str(opt), "FILTERING-CONTACT mailto:support@example.com")
+ opt2 = dns.edns.option_from_wire(
+ dns.edns.OptionType.FILTERING_CONTACT,
+ expected_wire, 0, len(expected_wire)
+ )
+ self.assertEqual(opt2.contact, contact)
+
+ def testFilteringOrganizationOption(self):
+ organization = "The Example Organization"
+ expected_wire = bytes(organization, "utf8")
+ opt = dns.edns.FilteringOrganizationOption(organization)
+ io = BytesIO()
+ opt.to_wire(io)
+ data = io.getvalue()
+ self.assertEqual(data, expected_wire)
+ self.assertEqual(str(opt), "FILTERING-ORGANIZATION The Example Organization")
+ opt2 = dns.edns.option_from_wire(
+ dns.edns.OptionType.FILTERING_ORGANIZATION,
+ expected_wire, 0, len(expected_wire)
+ )
+ self.assertEqual(opt2.organization, organization)
+
+ def testFilteringDBOption(self):
+ db = "Government Anti-Piracy Policies #1"
+ expected_wire = b"Government Anti-Piracy Policies #1"
+ opt = dns.edns.FilteringDBOption(db)
+ io = BytesIO()
+ opt.to_wire(io)
+ data = io.getvalue()
+ self.assertEqual(data, expected_wire)
+ self.assertEqual(str(opt), "FILTERING-DB Government Anti-Piracy Policies #1")
+ opt2 = dns.edns.option_from_wire(
+ dns.edns.OptionType.FILTERING_DB,
+ expected_wire, 0, len(expected_wire)
+ )
+ self.assertEqual(opt2.db, db)
+
def test_option_registration(self):
U32OptionType = 9999
generic = dns.edns.GenericOption(12345, "foo")
assert generic.to_generic() is generic
+
+
+if __name__ == "__main__":
+ unittest.main()