import email.utils
import enum
import gzip
+import httpx
import io
import itertools
import logging
# Compose some request headers
headers = self._make_headers(force=force)
- with client.stream("GET", self.url, headers=headers) as response:
- # Parse the response headers
- self._parse_headers(response.headers)
+ try:
+ with client.stream("GET", self.url, headers=headers) as response:
+ # Parse the response headers
+ self._parse_headers(response.headers)
- # There is nothing to do if the source has not changed
- if response.status_code == 304:
- log.debug("Source %s has not been changed, skipping processing" % self)
- return False
+ # There is nothing to do if the source has not changed
+ if response.status_code == 304:
+ log.debug("Source %s has not been changed, skipping processing" % self)
+ return False
- # Consume, transparently decompress and decode the payload
- f = self._consume_payload(response)
+ # Consume, transparently decompress and decode the payload
+ f = self._consume_payload(response)
- # Add all domains
- for line in f:
- line = line.rstrip()
+ # Add all domains
+ for line in f:
+ line = line.rstrip()
- # Detect the format if still unknown
- if format is None:
- format = self._detect_format(line)
+ # Detect the format if still unknown
+ if format is None:
+ format = self._detect_format(line)
+
+ # Process the line according to its format
+ match format:
+ case Format.ADBLOCKPLUS:
+ domain = self._process_adblockplus(line)
- # Process the line according to its format
- match format:
- case Format.ADBLOCKPLUS:
- domain = self._process_adblockplus(line)
+ case Format.HOSTS:
+ domain = self._process_hosts(line)
- case Format.HOSTS:
- domain = self._process_hosts(line)
+ case Format.PLAIN:
+ domain = self._process_plain(line)
- case Format.PLAIN:
- domain = self._process_plain(line)
+ # Skip the line if could not find the right format
+ case _:
+ continue
- # Skip the line if could not find the right format
- case _:
+ # Strip any whitespace
+ if domain:
+ domain = domain.strip()
+
+ # Skip the line if no domain could be extracted
+ if not domain:
continue
- # Strip any whitespace
- if domain:
- domain = domain.strip()
+ # Remove any leading "*."
+ domain = domain.removeprefix("*.")
- # Skip the line if no domain could be extracted
- if not domain:
- continue
+ # Skip any invalid domain names
+ if not util.is_fqdn(domain):
+ # Silently skip any IP addresses
+ if util.is_ip_address(domain):
+ continue
- # Remove any leading "*."
- domain = domain.removeprefix("*.")
+ # And also skip any URLs
+ elif util.is_url(domain):
+ continue
- # Skip any invalid domain names
- if not util.is_fqdn(domain):
- # Silently skip any IP addresses
- if util.is_ip_address(domain):
+ log.warning(_("Skipping invalid domain: %s") % domain)
continue
- # And also skip any URLs
- elif util.is_url(domain):
+ # Skip any special domains
+ if domain in IGNORED_DOMAINS:
+ log.debug("Skipping ignored domain: %s" % domain)
continue
- log.warning(_("Skipping invalid domain: %s") % domain)
- continue
+ # Add the domain
+ domains.add(domain)
- # Skip any special domains
- if domain in IGNORED_DOMAINS:
- log.debug("Skipping ignored domain: %s" % domain)
- continue
+ # Log an error if we could not detect the format
+ if format is None:
+ log.error("Format of '%s' (%s) seems to be unkown. No data could be parsed" \
+ % (self, self.url))
- # Add the domain
- domains.add(domain)
+ # Add all domains to the database
+ self.add_domains(domains)
- # Log an error if we could not detect the format
- if format is None:
- log.error("Format of '%s' (%s) seems to be unkown. No data could be parsed" \
- % (self, self.url))
+ # The list has now been updated
+ self.updated_at = sqlmodel.func.current_timestamp()
- # Add all domains to the database
- self.add_domains(domains)
+ # Skip if we could not read from the server
+ except httpx.ReadTimeout as e:
+ log.warning("Failed to read the server's response: %s" % e)
- # The list has now been updated
- self.updated_at = sqlmodel.func.current_timestamp()
+ return False
# Mark all domains that have not been updated as removed
self.__prune()