]> git.ipfire.org Git - dbl.git/commitdiff
sources: Add format detecting and parse the Adblock Plus format
authorMichael Tremer <michael.tremer@ipfire.org>
Sat, 6 Dec 2025 20:15:36 +0000 (20:15 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Sat, 6 Dec 2025 20:15:36 +0000 (20:15 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/dnsbl/sources.py

index 4ed5b04106ffd6505d06aa1a8d3d407e25a51c46..e2b7424c8cae95f14cf3e42ff5245c210b4b40df 100644 (file)
@@ -20,6 +20,7 @@
 
 import datetime
 import email.utils
+import enum
 import logging
 import sqlalchemy.dialects.postgresql
 import sqlmodel
@@ -31,6 +32,11 @@ from .i18n import _
 # Setup logging
 log = logging.getLogger(__name__)
 
+class Format(enum.Enum):
+       PLAIN = 1
+       ADBLOCKPLUS = 2
+
+
 class Sources(object):
        def __init__(self, backend):
                self.backend = backend
@@ -127,6 +133,9 @@ class Source(sqlmodel.SQLModel, database.BackendMixin, table=True):
                """
                log.debug("%s: Updating source %s" % (self.list, self))
 
+               # Initialize the format
+               format = None
+
                with self.db.transaction():
                        with self.backend.client() as client:
                                # Compose some request headers
@@ -143,11 +152,31 @@ class Source(sqlmodel.SQLModel, database.BackendMixin, table=True):
 
                                        # Add all domains
                                        for line in response.iter_lines():
-                                               try:
-                                                       self.add_domain(line)
+                                               # Detect the format if still unknown
+                                               if format is None:
+                                                       format = self._detect_format(line)
+
+                                               # Process the line according to its format
+                                               match format:
+                                                       case Format.ADBLOCKPLUS:
+                                                               domain = self._process_adblockplus(line)
 
+                                                       case Format.PLAIN:
+                                                               domain = line
+
+                                                       # Skip the line if could not find the right format
+                                                       case _:
+                                                               continue
+
+                                               # Skip the line if no domain could be extracted
+                                               if not domain:
+                                                       continue
+
+                                               # Add the domain to the database
+                                               try:
+                                                       self.add_domain(domain)
                                                except ValueError as e:
-                                                       log.warning("Failed to add '%s' to the database: %s" % (line, e))
+                                                       log.warning("Failed to add '%s' to the database: %s" % (domain, e))
 
                        # Mark all domains that have not been updated as removed
                        self.__prune()
@@ -184,6 +213,34 @@ class Source(sqlmodel.SQLModel, database.BackendMixin, table=True):
                # Store the ETag
                self.etag = headers.get("ETag")
 
+       def _detect_format(self, line):
+               """
+                       Called very early when we are detecting the format
+               """
+               # Check for the Adblock Plus header
+               if line == "[Adblock Plus]":
+                       return Format.ADBLOCKPLUS
+
+               # Check for a plain FQDN
+               elif util.is_fqdn(line):
+                       return Format.PLAIN
+
+               # The format is (still?) unknown
+               return None
+
+       def _process_adblockplus(self, line):
+               """
+                       Parse the domain from the AdBlockPlus format
+               """
+               if line.startswith("||"):
+                       # Remove the leading ||
+                       line = line.removeprefix("||")
+
+                       # Cut off everything after ^
+                       domain, _, rest = line.partition("^")
+
+                       return domain
+
        def add_domain(self, name):
                """
                        Adds or updates a domain.