From 87765ce2e9a02d2a3be3c9585afab7b7627e1770 Mon Sep 17 00:00:00 2001 From: Bob Halley Date: Fri, 19 Apr 2024 14:10:18 -0700 Subject: [PATCH] Basic public suffix list support. --- .gitignore | 4 +- dns/psl.py | 194 +++++++++++++++++++++++++++++++++++++++++++++ doc/utilities.rst | 3 + tests/psl_test.txt | 11 +++ tests/test_psl.py | 118 +++++++++++++++++++++++++++ util/download-psl | 19 +++++ 6 files changed, 348 insertions(+), 1 deletion(-) create mode 100644 dns/psl.py create mode 100644 tests/psl_test.txt create mode 100644 tests/test_psl.py create mode 100755 util/download-psl diff --git a/.gitignore b/.gitignore index f4399571..d14c3efb 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,8 @@ html html.zip html.tar.gz tests/*.out +tests/real_psl.txt +tests/real_psl_tests.txt *.pyc .coverage .tox @@ -17,4 +19,4 @@ htmlcov coverage.xml .dir-locals.el .vscode/ -doc/_build \ No newline at end of file +doc/_build diff --git a/dns/psl.py b/dns/psl.py new file mode 100644 index 00000000..9e82b00c --- /dev/null +++ b/dns/psl.py @@ -0,0 +1,194 @@ +# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license. + +"""Public Suffix List Support""" + +import os.path +import re +from typing import Optional, Set + +import dns.name +import dns.namedict + +_have_httpx = dns._features.have("doh") +if _have_httpx: + import httpx + +PSL_URL = "https://publicsuffix.org/list/public_suffix_list.dat" + + +class _Node: + """If the most-enclosing match is a _Node, then there is no public suffix.""" + + def __init__(self, name: dns.name.Name): + self.name = name + + def public_suffix_depth(self, name: dns.name.Name) -> Optional[int]: + return None + + +class _ExactNode(_Node): + """If the most-enclosing match is an _ExactNode, then the public suffix is + at the depth of the _ExactNode, i.e. is equal to the node's name.""" + + def public_suffix_depth(self, name: dns.name.Name) -> Optional[int]: + return len(self.name) + + +class _WildNode(_Node): + """If the most-enclosing match is a _WildNode, then the public suffix is + at the depth of the _WildNode plus one, or ``None`` if the queried name + is not deeper than the _WildNode name.""" + + def public_suffix_depth(self, name: dns.name.Name) -> Optional[int]: + if len(name) <= len(self.name): + return None + return len(self.name) + 1 + + +class _ExceptionNode(_Node): + """If the most-enclosing match is an _ExceptionNode, then the public suffix is + the parent name. _ExceptionNodes only make sense when their parent is a + _WildNode.""" + + def public_suffix_depth(self, name: dns.name.Name) -> Optional[int]: + return len(self.name) - 1 + + +class PublicSuffixList: + """Public suffix list database. + + A *public suffix* or *effective top-level domain* (*eTLD*) is a domain under which a + user can register names. + + The *base domain*, also known as the *registerable domain* or the *eTLD + 1* is one + level deeper than the public suffix. For example, for `www.dnspython.org` the + public suffix is `org` and the base domain is `dnspython.org`. Names which are + public suffixes do not have a base domain. + + The *reduced domain* of a name is the base domain of that name if it is defined, or + the name itself otherwise. Reduced domains are useful for statistical aggregations + where you are principally trying to aggregate by base domain but don't want to lose + track of queries without base names (e.g. queries to com, or the root). + """ + + def __init__( + self, + filename: str, + categories: Optional[Set[str]] = None, + allow_unlisted_gtlds: bool = True, + download_if_needed: bool = False, + url: str = PSL_URL, + ): + """Initialize a public suffix list. + + *filename*, a ``str``, is the filename of the public suffix list, in the + `standard format `. If the + file does not exist and *download_if_needed* has been specified, then the file + will be downloaded from the specified *url*. + + *categories*, a set of ``str`` or ``None``, the PSL categories to include when + searching. If ``None``, the default set ``{"ICANN", "PRIVATE"}`` is used. The + ``"ICANN"`` category is the public suffixes administered by global and national + registries, and the "PRIVATE" category is public suffixes administred by private + entities as part of their namespace. + + *allow_unlisted_gtlds*, a ``bool``, with a default of ``True``. If ``True``, + then the root node is a wildcard node, and gTLDs not listed in the + public suffix database will still be considered as public suffixes. For + example, a query of "www.example.bogus-gtld." would have a public suffix + of "bogus-gtld." and a base domain of "example.bogus-gtld.". If ``False``, + then ``None`` will be returned for gTLDs which are not listed. + + *download_if_needed*: a ``bool``, defaulting to ``True``. If ``True``, then + download the list from the *url* if *filename* does not exist. If ``False``, + then *filename* must exist. + + *url*: a ``str``. The URL to use if downloading the public suffix list is + required; the default is the standard URL recommended by publicsuffix.org. + """ + + if download_if_needed: + if not _have_httpx: + raise ValueError( + "download_if_needed is True but httpx is not available" + ) + if not os.path.isfile(filename): + response = httpx.request("GET", url) + if response.status_code == 200: + with open(filename, "w") as f: + f.write(response.text) + self.suffixes = dns.namedict.NameDict() + if allow_unlisted_gtlds: + root_node: _Node = _WildNode(dns.name.root) + else: + root_node = _Node(dns.name.root) + self.suffixes[dns.name.root] = root_node + if categories is None: + categories = {"ICANN", "PRIVATE"} + assert categories is not None # for mypy + pattern = re.compile("// ===(BEGIN|END) ([A-Z]+) DOMAINS===") + skipping = True + with open(filename, "r") as f: + self.mtime = os.fstat(f.fileno()).st_mtime + for l in f.readlines(): + l = l.rstrip() + if l.startswith("//"): + match = pattern.match(l) + if match: + op = match.group(1) + category = match.group(2) + skipping = not (category in categories and op == "BEGIN") + continue + if l == "" or skipping: + continue + if l.startswith("!"): + exception = True + l = l[1:] + else: + exception = False + n = dns.name.from_text(l) + if n.is_wild(): + n = n.parent() # remove leading "*" label + node: _Node = _WildNode(n) + elif exception: + node = _ExceptionNode(n) + else: + node = _ExactNode(n) + if self.suffixes.has_key(n): + raise ValueError(f"redefinition of {n}") + self.suffixes[n] = node + + def public_suffix(self, name: dns.name.Name) -> Optional[dns.name.Name]: + """Return the public suffix for *name*, or ``None`` if it is not defined. + + *name*, a ``dna.name.Name`` + """ + _, node = self.suffixes.get_deepest_match(name) + depth = node.public_suffix_depth(name) + if depth is None or depth > len(name): + return None + _, suffix = name.split(depth) + return suffix + + def base_domain(self, name: dns.name.Name) -> Optional[dns.name.Name]: + """Return the base domain for *name*, or ``None`` if it is not defined. + + *name*, a ``dna.name.Name`` + """ + _, node = self.suffixes.get_deepest_match(name) + depth = node.public_suffix_depth(name) + if depth is None or depth >= len(name): + return None + _, suffix = name.split(depth + 1) + return suffix + + def reduced_domain(self, name: dns.name.Name) -> dns.name.Name: + """Return the reduced domain for *name*. + + *name*, a ``dna.name.Name`` + """ + reduced_name = self.base_domain(name) + if reduced_name is not None: + return reduced_name + else: + return name diff --git a/doc/utilities.rst b/doc/utilities.rst index 88c1f72e..6b84799c 100644 --- a/doc/utilities.rst +++ b/doc/utilities.rst @@ -12,6 +12,9 @@ Miscellaneous Utilities .. automodule:: dns.ipv6 :members: +.. automodule:: dns.psl + :members: + .. autofunction:: dns.ttl.from_text .. automodule:: dns.set diff --git a/tests/psl_test.txt b/tests/psl_test.txt new file mode 100644 index 00000000..854eb859 --- /dev/null +++ b/tests/psl_test.txt @@ -0,0 +1,11 @@ +// This is not the real PSL, it is just test data + +// ===BEGIN ICANN DOMAINS=== +com +org +net +uk +co.uk +*.sch.uk +!exc.sch.uk +// ===END ICANN DOMAINS=== diff --git a/tests/test_psl.py b/tests/test_psl.py new file mode 100644 index 00000000..338086b7 --- /dev/null +++ b/tests/test_psl.py @@ -0,0 +1,118 @@ +# CopyrighAt (C) Dnspython Contributors, see LICENSE for text of ISC license + +import os.path + +import pytest + +import dns.name +from dns.psl import PublicSuffixList +from tests.util import here, is_internet_reachable + + +def check_expectations(psl, tests, reduce=False): + for input, expected in tests.items(): + is_idna = not dns.name.is_all_ascii(input) + try: + input_name = dns.name.from_text(input).canonicalize() + except dns.name.EmptyLabel: + # Some exceptions are expected as the test data has some + # lines with domains starting with ".". These will make us + # throw an empty label exception, which is fine. + assert expected == "exception" + continue + if reduce: + output_name = psl.reduced_domain(input_name) + else: + output_name = psl.base_domain(input_name) + if output_name is None: + result = "none" + else: + if is_idna: + result = output_name.to_unicode(True) + else: + result = output_name.to_text(True) + assert result == expected + if not reduce and output_name is not None: + assert output_name.parent() == psl.public_suffix(input_name) + + +@pytest.mark.skipif( + not ( + os.path.exists(here("real_psl.txt")) + and os.path.exists(here("real_psl_tests.txt")) + ), + reason="Real PSL and/or tests not available", +) +def test_public_suffix_tests_with_real_psl(): + psl = PublicSuffixList(here("real_psl.txt"), download_if_needed=True) + tests = {} + with open(here("real_psl_tests.txt"), "r") as f: + for l in f.readlines(): + if l == "": + break + l = l.rstrip() + if l == "" or l.startswith("//"): + continue + parts = l.split() + if len(parts) != 2: + raise ValueError("split didn't result in two things") + input = parts[0] + expected = parts[1] + if input.startswith("."): + expected = "exception" + if expected == "null": + expected = "none" + tests[input] = expected + + +def test_base_domain(): + psl = PublicSuffixList(here("psl_test.txt")) + tests = { + ".": "none", + "com": "none", + "bogus-tld": "none", + "sub.bogus-tld": "sub.bogus-tld", + "www.sub.bogus-tld": "sub.bogus-tld", + "org": "none", + "dnspython.org": "dnspython.org", + "www.dnspython.org": "dnspython.org", + "uk": "none", + "sub.uk": "sub.uk", + "www.sub.uk": "sub.uk", + "co.uk": "none", + "sub.co.uk": "sub.co.uk", + "www.sub.co.uk": "sub.co.uk", + "sub.sch.uk": "none", + "sub.sub.sch.uk": "sub.sub.sch.uk", + "www.sub.sub.sch.uk": "sub.sub.sch.uk", + "exc.sch.uk": "exc.sch.uk", + } + check_expectations(psl, tests) + + +def test_strict_base_domain(): + psl = PublicSuffixList(here("psl_test.txt"), allow_unlisted_gtlds=False) + tests = { + ".": "none", + "bogus-tld": "none", + "sub.bogus-tld": "none", + "www.sub.bogus-tld": "none", + "org": "none", + "dnspython.org": "dnspython.org", + "www.dnspython.org": "dnspython.org", + } + check_expectations(psl, tests) + + +def test_reduced_name(): + psl = PublicSuffixList(here("psl_test.txt")) + tests = { + ".": ".", + "bogus-tld": "bogus-tld", + "sub.bogus-tld": "sub.bogus-tld", + "www.sub.bogus-tld": "sub.bogus-tld", + "org": "org", + "dnspython.org": "dnspython.org", + "www.dnspython.org": "dnspython.org", + } + check_expectations(psl, tests, True) diff --git a/util/download-psl b/util/download-psl new file mode 100755 index 00000000..6895febb --- /dev/null +++ b/util/download-psl @@ -0,0 +1,19 @@ +#!/usr/bin/env python3 + +import os.path + +import httpx + +if not os.path.isdir("tests"): + raise ValueError("should be run from the dnspython top-level directory") + +DOWNLOADS = { + "https://publicsuffix.org/list/public_suffix_list.dat": "tests/real_psl.txt", + "https://raw.githubusercontent.com/publicsuffix/list/master/tests/tests.txt": "tests/real_psl_tests.txt", +} + +for url, filename in DOWNLOADS.items(): + response = httpx.request("GET", url) + if response.status_code == 200: + with open(filename, "w") as f: + f.write(response.text) -- 2.47.3