From: Michael Tremer Date: Sat, 6 Dec 2025 20:38:44 +0000 (+0000) Subject: dnsbl: Add a search command X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=ba905eb0f377390251a4c1a8dd0572ba5183ce43;p=dnsbl.git dnsbl: Add a search command Signed-off-by: Michael Tremer --- diff --git a/src/dnsbl/__init__.py b/src/dnsbl/__init__.py index 6571c64..27c8a48 100644 --- a/src/dnsbl/__init__.py +++ b/src/dnsbl/__init__.py @@ -22,6 +22,7 @@ import configparser import functools import httpx import logging +import sqlmodel # Initialize logging as early as possible from . import logger @@ -84,3 +85,37 @@ class Backend(object): @functools.cached_property def sources(self): return sources.Sources(self) + + def search(self, name): + """ + Searches for a domain + """ + stmt = ( + sqlmodel + .select( + sources.SourceDomain, + ) + .join( + sources.Source, sources.SourceDomain.source_id == sources.Source.id, + ) + .join( + lists.List, sources.Source.list_id == lists.List.id, + ) + .where( + sources.SourceDomain.name == name, + sources.SourceDomain.removed_at == None, + sources.Source.deleted_at == None, + lists.List.deleted_at == None, + ) + ) + + res = {} + + # Group all matches by list + for domain in self.db.fetch(stmt): + try: + res[domain.source.list].append(domain) + except KeyError: + res[domain.source.list] = [domain] + + return res diff --git a/src/dnsbl/lists.py b/src/dnsbl/lists.py index 47b8a78..2353c8a 100644 --- a/src/dnsbl/lists.py +++ b/src/dnsbl/lists.py @@ -106,6 +106,13 @@ class List(sqlmodel.SQLModel, database.BackendMixin, table=True): def __str__(self): return self.name + def __hash__(self): + # Only hashable once the object has an ID + if self.id is None: + raise TypeError("Cannot hash Domain objects before they are persisted and have an ID") + + return hash(self.id) + # ID id : int = sqlmodel.Field(primary_key=True) diff --git a/src/scripts/dnsbl.in b/src/scripts/dnsbl.in index 0325414..3af9885 100644 --- a/src/scripts/dnsbl.in +++ b/src/scripts/dnsbl.in @@ -132,6 +132,11 @@ class CLI(object): default=os.environ.get("USER"), help=_("The person deleting the list")) delete_source.set_defaults(func=self.__delete_source) + # search + search = subparsers.add_parser("search", help=_("Searches for a domain")) + search.add_argument("domain", help=_("The domain name")) + search.set_defaults(func=self.__search) + # Parse all arguments args = parser.parse_args() @@ -317,7 +322,6 @@ class CLI(object): # Once the output has been written in full, we will rename the file os.link(f.name, name) - def __add_source(self, backend, args): """ Adds a new source to a list @@ -347,6 +351,33 @@ class CLI(object): deleted_by = args.deleted_by, ) + def __search(self, backend, args): + """ + Searches for a domain name + """ + # Search! + lists = backend.search(args.domain) + + # Do nothing if nothing was found + if not lists: + return + + table = rich.table.Table(title=_("Results for '%s'") % args.domain) + table.add_column(_("List")) + table.add_column(_("Source")) + table.add_column(_("Added At")) + + for list, domains in lists.items(): + for domain in domains: + table.add_row( + list.name, + domain.source.name, + domain.added_at.isoformat(), + ) + + # Print the table + self.console.print(table) + def main(): c = CLI()