import functools
import httpx
import logging
+import sqlmodel
# Initialize logging as early as possible
from . import logger
@functools.cached_property
def sources(self):
return sources.Sources(self)
+
+ def search(self, name):
+ """
+ Searches for a domain
+ """
+ stmt = (
+ sqlmodel
+ .select(
+ sources.SourceDomain,
+ )
+ .join(
+ sources.Source, sources.SourceDomain.source_id == sources.Source.id,
+ )
+ .join(
+ lists.List, sources.Source.list_id == lists.List.id,
+ )
+ .where(
+ sources.SourceDomain.name == name,
+ sources.SourceDomain.removed_at == None,
+ sources.Source.deleted_at == None,
+ lists.List.deleted_at == None,
+ )
+ )
+
+ res = {}
+
+ # Group all matches by list
+ for domain in self.db.fetch(stmt):
+ try:
+ res[domain.source.list].append(domain)
+ except KeyError:
+ res[domain.source.list] = [domain]
+
+ return res
default=os.environ.get("USER"), help=_("The person deleting the list"))
delete_source.set_defaults(func=self.__delete_source)
+ # search
+ search = subparsers.add_parser("search", help=_("Searches for a domain"))
+ search.add_argument("domain", help=_("The domain name"))
+ search.set_defaults(func=self.__search)
+
# Parse all arguments
args = parser.parse_args()
# Once the output has been written in full, we will rename the file
os.link(f.name, name)
-
def __add_source(self, backend, args):
"""
Adds a new source to a list
deleted_by = args.deleted_by,
)
+ def __search(self, backend, args):
+ """
+ Searches for a domain name
+ """
+ # Search!
+ lists = backend.search(args.domain)
+
+ # Do nothing if nothing was found
+ if not lists:
+ return
+
+ table = rich.table.Table(title=_("Results for '%s'") % args.domain)
+ table.add_column(_("List"))
+ table.add_column(_("Source"))
+ table.add_column(_("Added At"))
+
+ for list, domains in lists.items():
+ for domain in domains:
+ table.add_row(
+ list.name,
+ domain.source.name,
+ domain.added_at.isoformat(),
+ )
+
+ # Print the table
+ self.console.print(table)
+
def main():
c = CLI()