]> git.ipfire.org Git - dnsbl.git/commitdiff
dnsbl: Add a search command
authorMichael Tremer <michael.tremer@ipfire.org>
Sat, 6 Dec 2025 20:38:44 +0000 (20:38 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Sat, 6 Dec 2025 20:38:44 +0000 (20:38 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/dnsbl/__init__.py
src/dnsbl/lists.py
src/scripts/dnsbl.in

index 6571c64bf375324edb795c3357a92082b30eee03..27c8a48e28bbc72203347aae658a070f903eda81 100644 (file)
@@ -22,6 +22,7 @@ import configparser
 import functools
 import httpx
 import logging
+import sqlmodel
 
 # Initialize logging as early as possible
 from . import logger
@@ -84,3 +85,37 @@ class Backend(object):
        @functools.cached_property
        def sources(self):
                return sources.Sources(self)
+
+       def search(self, name):
+               """
+                       Searches for a domain
+               """
+               stmt = (
+                       sqlmodel
+                       .select(
+                               sources.SourceDomain,
+                       )
+                       .join(
+                               sources.Source, sources.SourceDomain.source_id == sources.Source.id,
+                       )
+                       .join(
+                               lists.List, sources.Source.list_id == lists.List.id,
+                       )
+                       .where(
+                               sources.SourceDomain.name == name,
+                               sources.SourceDomain.removed_at == None,
+                               sources.Source.deleted_at == None,
+                               lists.List.deleted_at == None,
+                       )
+               )
+
+               res = {}
+
+               # Group all matches by list
+               for domain in self.db.fetch(stmt):
+                       try:
+                               res[domain.source.list].append(domain)
+                       except KeyError:
+                               res[domain.source.list] = [domain]
+
+               return res
index 47b8a78204cd7bce5aa9bac8e749ee5931e091f4..2353c8abc26e37cf97da6b37b29a750543b8ea22 100644 (file)
@@ -106,6 +106,13 @@ class List(sqlmodel.SQLModel, database.BackendMixin, table=True):
        def __str__(self):
                return self.name
 
+       def __hash__(self):
+               # Only hashable once the object has an ID
+               if self.id is None:
+                       raise TypeError("Cannot hash Domain objects before they are persisted and have an ID")
+
+               return hash(self.id)
+
        # ID
        id : int = sqlmodel.Field(primary_key=True)
 
index 03254143c6ce8ebbb6120462a4badcee0794eea8..3af98857951b2773e398d27783da5e9c682b292f 100644 (file)
@@ -132,6 +132,11 @@ class CLI(object):
                                default=os.environ.get("USER"), help=_("The person deleting the list"))
                delete_source.set_defaults(func=self.__delete_source)
 
+               # search
+               search = subparsers.add_parser("search", help=_("Searches for a domain"))
+               search.add_argument("domain", help=_("The domain name"))
+               search.set_defaults(func=self.__search)
+
                # Parse all arguments
                args = parser.parse_args()
 
@@ -317,7 +322,6 @@ class CLI(object):
                                        # Once the output has been written in full, we will rename the file
                                        os.link(f.name, name)
 
-
        def __add_source(self, backend, args):
                """
                        Adds a new source to a list
@@ -347,6 +351,33 @@ class CLI(object):
                        deleted_by = args.deleted_by,
                )
 
+       def __search(self, backend, args):
+               """
+                       Searches for a domain name
+               """
+               # Search!
+               lists = backend.search(args.domain)
+
+               # Do nothing if nothing was found
+               if not lists:
+                       return
+
+               table = rich.table.Table(title=_("Results for '%s'") % args.domain)
+               table.add_column(_("List"))
+               table.add_column(_("Source"))
+               table.add_column(_("Added At"))
+
+               for list, domains in lists.items():
+                       for domain in domains:
+                               table.add_row(
+                                       list.name,
+                                       domain.source.name,
+                                       domain.added_at.isoformat(),
+                               )
+
+               # Print the table
+               self.console.print(table)
+
 
 def main():
        c = CLI()