import datetime
import email.utils
+import enum
import logging
import sqlalchemy.dialects.postgresql
import sqlmodel
# Setup logging
log = logging.getLogger(__name__)
+class Format(enum.Enum):
+ PLAIN = 1
+ ADBLOCKPLUS = 2
+
+
class Sources(object):
def __init__(self, backend):
self.backend = backend
"""
log.debug("%s: Updating source %s" % (self.list, self))
+ # Initialize the format
+ format = None
+
with self.db.transaction():
with self.backend.client() as client:
# Compose some request headers
# Add all domains
for line in response.iter_lines():
- try:
- self.add_domain(line)
+ # Detect the format if still unknown
+ if format is None:
+ format = self._detect_format(line)
+
+ # Process the line according to its format
+ match format:
+ case Format.ADBLOCKPLUS:
+ domain = self._process_adblockplus(line)
+ case Format.PLAIN:
+ domain = line
+
+ # Skip the line if could not find the right format
+ case _:
+ continue
+
+ # Skip the line if no domain could be extracted
+ if not domain:
+ continue
+
+ # Add the domain to the database
+ try:
+ self.add_domain(domain)
except ValueError as e:
- log.warning("Failed to add '%s' to the database: %s" % (line, e))
+ log.warning("Failed to add '%s' to the database: %s" % (domain, e))
# Mark all domains that have not been updated as removed
self.__prune()
# Store the ETag
self.etag = headers.get("ETag")
+ def _detect_format(self, line):
+ """
+ Called very early when we are detecting the format
+ """
+ # Check for the Adblock Plus header
+ if line == "[Adblock Plus]":
+ return Format.ADBLOCKPLUS
+
+ # Check for a plain FQDN
+ elif util.is_fqdn(line):
+ return Format.PLAIN
+
+ # The format is (still?) unknown
+ return None
+
+ def _process_adblockplus(self, line):
+ """
+ Parse the domain from the AdBlockPlus format
+ """
+ if line.startswith("||"):
+ # Remove the leading ||
+ line = line.removeprefix("||")
+
+ # Cut off everything after ^
+ domain, _, rest = line.partition("^")
+
+ return domain
+
def add_domain(self, name):
"""
Adds or updates a domain.