-- PostgreSQL database dump
--
-\restrict 8o0t5OTJIfaTvsY8tJmP42PzP48zYSIzdnWkVDzdbzece2fzwS3EaVGiSKAGHZF
+\restrict zZebqk0H6Bn3b2lu7FY4z6lPZCN2XBuuNpQTisoH3Wz8jUXzJwWewQlrEzRq9r7
-- Dumped from database version 17.6 (Debian 17.6-0+deb13u1)
-- Dumped by pg_dump version 17.6 (Debian 17.6-0+deb13u1)
report_add_id uuid,
report_remove_id uuid,
checked_at timestamp with time zone,
- dead boolean DEFAULT false
+ dead boolean DEFAULT false,
+ subsumed boolean DEFAULT false NOT NULL
);
-- PostgreSQL database dump complete
--
-\unrestrict 8o0t5OTJIfaTvsY8tJmP42PzP48zYSIzdnWkVDzdbzece2fzwS3EaVGiSKAGHZF
+\unrestrict zZebqk0H6Bn3b2lu7FY4z6lPZCN2XBuuNpQTisoH3Wz8jUXzJwWewQlrEzRq9r7
def resolve(self, domain):
log.debug("Resolving %s..." % domain)
+ # We want to check if the domain still exists and for that querying
+ # the top domain is enough.
+ domain = self.backend.psl.get_sld(domain)
+
try:
result = self.resolver.resolve(domain, "SOA", search=False, lifetime=60)
# Ignore domains that have been removed
domains.Domain.removed_at == None,
+ # Ignore any domains that are subsumed by another domain
+ domains.Domain.subsumed == False,
+
# Only select domains that are not dead
# or have not been checked, yet.
sqlmodel.or_(
if updated:
self.updated_at = sqlmodel.func.current_timestamp()
+ # Optimize the list
+ self.optimize(update_stats=False)
+
# Update the stats
self.update_stats()
return self.backend.db.fetch(stmt)
+ def optimize(self, update_stats=True):
+ """
+ Optimizes this list
+ """
+ log.info("Optimizing %s..." % self)
+
+ # Fetch all domains on this list
+ names = self.backend.db.fetch_as_set(
+ sqlmodel
+ .select(
+ domains.Domain.name
+ )
+ .distinct()
+ .where(
+ domains.Domain.list == self,
+ domains.Domain.removed_at == None,
+ )
+ )
+
+ # Collect all names that are redundant
+ redundant_names = set()
+
+ # Walk through all domains
+ for name in names:
+ parent = name
+
+ # Check if any parent domain is also listed
+ while "." in parent:
+ *garbage, parent = parent.partition(".")
+
+ # If the domain is already listed, we ignore it
+ if parent in names:
+ redundant_names.add(name)
+ break
+
+ log.info(_("Identified %s redunduant domain(s)") % len(redundant_names))
+
+ # Reset the status for all domains
+ self.backend.db.execute(
+ sqlmodel
+ .update(
+ domains.Domain,
+ )
+ .values(
+ subsumed = False,
+ )
+ .where(
+ domains.Domain.list == self,
+ domains.Domain.removed_at == None,
+ domains.Domain.subsumed == True,
+ )
+ )
+
+ # De-list the redundant domains
+ self.backend.db.execute(
+ sqlmodel
+ .update(
+ domains.Domain,
+ )
+ .values(
+ subsumed = True,
+ )
+ .where(
+ domains.Domain.list == self,
+ domains.Domain.removed_at == None,
+ domains.Domain.name.in_(
+ redundant_names,
+ )
+ )
+ )
+
+ # Update all stats afterwards
+ if update_stats:
+ self.update_stats()
+
class ListStats(sqlmodel.SQLModel, table=True):
__tablename__ = "list_stats"
import dnsbl
import dnsbl.checker
import dnsbl.exporters
+import dnsbl.util
import logging
import os
import rich.console
analyze.add_argument("list", help=_("The name of the list"))
analyze.set_defaults(func=self.__analyze)
+ # optimize
+ optimize = subparsers.add_parser("optimize", help=_("Optimize a list"))
+ optimize.add_argument("list", help=_("The name of the list"))
+ optimize.set_defaults(func=self.__optimize)
+
# history
history = subparsers.add_parser("history",
help=_("Shows the latest changes of a list"))
# Print the table
self.console.print(table)
+ def __optimize(self, backend, args):
+ """
+ Optimizes a list
+ """
+ # Fetch the list
+ list = self.__get_list(backend, args.list)
+
+ with dnsbl.util.Stopwatch(_("Optimizing %s") % list):
+ list.optimize()
+
def __history(self, backend, args):
"""
Shows the history of a list