import functools
import io
import logging
+import sqlalchemy.dialects.postgresql
+import sqlalchemy.orm
import sqlmodel
import typing
# Pending Reports
pending_reports : int = 0
+
+ # History
+
+ def get_history(self, limit=None):
+ """
+ Fetches the history of the list
+ """
+ # Collect all domains that have been added
+ domains_added = (
+ sqlmodel
+ .select(
+ domains.Domain.added_at.label("ts"),
+
+ # Aggregate all domains
+ sqlmodel.func.array_agg(
+ sqlalchemy.dialects.postgresql.aggregate_order_by(
+ domains.Domain.name,
+ domains.Domain.name.asc(),
+ ),
+ ).label("names"),
+ )
+ .where(
+ domains.Domain.list == self,
+ )
+ .order_by(
+ domains.Domain.added_at.desc(),
+ )
+ .group_by(
+ domains.Domain.added_at,
+ )
+ .limit(limit)
+ .cte("domains_added")
+ )
+
+ # Collect all domains that have been removed
+ domains_removed = (
+ sqlmodel
+ .select(
+ domains.Domain.removed_at.label("ts"),
+
+ # Aggregate all domains
+ sqlmodel.func.array_agg(
+ sqlalchemy.dialects.postgresql.aggregate_order_by(
+ domains.Domain.name,
+ domains.Domain.name.asc(),
+ ),
+ ).label("names"),
+ )
+ .where(
+ domains.Domain.list == self,
+ domains.Domain.removed_at != None,
+ )
+ .order_by(
+ domains.Domain.removed_at.desc(),
+ )
+ .group_by(
+ domains.Domain.removed_at,
+ )
+ .limit(limit)
+ .cte("domains_removed")
+ )
+
+ # Union timestamps
+ timestamps = (
+ sqlmodel
+ .union(
+ sqlmodel.select(
+ domains_added.c.ts,
+ ),
+ sqlmodel.select(
+ domains_removed.c.ts,
+ ),
+ )
+ .cte("timestamps")
+ )
+
+ stmt = (
+ sqlmodel
+ .select(
+ timestamps.c.ts,
+
+ # Fetch all added domains
+ sqlmodel.func.coalesce(
+ domains_added.c.names,
+ sqlmodel.text("'{}'"),
+ ).label("domains_added"),
+
+ # Fetch all removed domains
+ sqlmodel.func.coalesce(
+ domains_removed.c.names,
+ sqlmodel.text("'{}'"),
+ ).label("domains_removed"),
+ )
+ .outerjoin(
+ domains_added,
+ domains_added.c.ts == timestamps.c.ts,
+ )
+ .outerjoin(
+ domains_removed,
+ domains_removed.c.ts == timestamps.c.ts,
+ )
+ .order_by(
+ timestamps.c.ts.asc(),
+ )
+ )
+
+ return self.backend.db.select(stmt)
###############################################################################
import argparse
+import babel.dates
import babel.numbers
import dnsbl
import dnsbl.checker
analyze.add_argument("list", help=_("The name of the list"))
analyze.set_defaults(func=self.__analyze)
+ # history
+ history = subparsers.add_parser("history",
+ help=_("Shows the latest changes of a list"))
+ history.add_argument("list",
+ help=_("The name of the list"))
+ history.add_argument("--limit", type=int, default=10,
+ help=_("Only show a few changesets"))
+ history.set_defaults(func=self.__history)
+
# check-domains
check_domains = subparsers.add_parser("check-domains",
help=_("Checks if domains are alive"))
# Print the table
self.console.print(table)
+ def __history(self, backend, args):
+ """
+ Shows the history of a list
+ """
+ # Fetch the list
+ list = backend.lists.get_by_slug(args.list)
+
+ # Fetch the history
+ history = list.get_history(limit=args.limit)
+
+ # Create a new table
+ table = rich.table.Table(title=_("History of %s") % list)
+
+ # Add the header
+ table.add_column(_("Timestamp"))
+ table.add_column(_("Domains Added"))
+ table.add_column(_("Domains Removed"))
+
+ # Add the history
+ for events in history:
+ table.add_row(
+ babel.dates.format_datetime(events.ts),
+ "\n".join(events.domains_added),
+ "\n".join(events.domains_removed),
+ )
+
+ # Print the table
+ self.console.print(table)
+
def __check_domains(self, backend, args):
"""
Runs the checker over all domains