This will trigger changes to the lists.
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
###############################################################################
import datetime
+import logging
import sqlmodel
import uuid
from . import database
+# Setup logging
+log = logging.getLogger(__name__)
+
class Domain(sqlmodel.SQLModel, database.BackendMixin, table=True):
__tablename__ = "domains"
report_remove: "Report" = sqlmodel.Relationship(sa_relationship_kwargs={
"foreign_keys" : "[Domain.report_remove_id]",
})
+
+ # Remove!
+
+ def remove(self, removed_by, report=None):
+ """
+ Removes the domain from the list
+ """
+ self.removed_at = sqlmodel.func.current_timestamp()
+ self.removed_by = removed_by
+
+ # Store the report
+ self.report_remove = report
+
+ # Log action
+ log.info("%s (block = %s) has been removed from %s" % (self.name, self.block, self.list))
return self.backend.db.fetch(stmt)
+ def add_domain(self, name, added_by, report=None, block=True):
+ """
+ Adds a new domain to the list
+ """
+ # Check if the domain is already listed
+ domain = self.get_domain(name)
+ if domain:
+ print(domain.block, block)
+
+ # Silently ignore if the domain is already listed
+ if domain.block == block:
+ return
+
+ # If the block status of the domain has changed,
+ # we remove the old entry first and then add the domain again.
+ domain.remove(
+ removed_by = added_by,
+ report = report,
+ )
+
+ # Add the domain to the database
+ domain = self.backend.db.insert(
+ domains.Domain,
+ list = self,
+ name = name,
+ added_by = added_by,
+ report_add = report,
+ block = block,
+ )
+
+ # Log action
+ log.info("Domain %s has been added to %s" % (domain, self))
+
+ # The list has now been updated
+ self.updated_at = sqlmodel.func.current_timestamp()
+
+ return domain
+
+ def get_domain(self, name):
+ """
+ Fetches a domain (not including sources)
+ """
+ stmt = (
+ sqlmodel
+ .select(
+ domains.Domain,
+ )
+ .where(
+ # The domain must be on this list
+ domains.Domain.list == self,
+
+ # The name must match
+ domains.Domain.name == name,
+
+ # The domain cannot be removed
+ domains.Domain.removed_at == None,
+
+ # The domain cannot come from a source
+ domains.Domain.source == None,
+ )
+ )
+
+ return self.backend.db.fetch_one(stmt)
+
# Total Domains
total_domains : int
###############################################################################
import datetime
+import logging
import pydantic
import sqlmodel
import uuid
from . import database
+# Setup logging
+log = logging.getLogger(__name__)
+
class Reports(object):
def __init__(self, backend):
self.backend = backend
class Report(sqlmodel.SQLModel, database.BackendMixin, table=True):
__tablename__ = "reports"
+ def __str__(self):
+ return "%s" % self.id
+
# ID
- id : uuid.UUID = sqlmodel.Field(primary_key=True)
+ id : uuid.UUID = sqlmodel.Field(primary_key=True, default=sqlmodel.func.gen_random_uuid())
# List ID
list_id : int = sqlmodel.Field(foreign_key="lists.id", exclude=True)
# Close!
- def close(self):
+ def close(self, closed_by=None, accept=True):
+ """
+ Called when a moderator has made a decision
+ """
+ # Mark this report as closed
+ self.closed_at = sqlmodel.func.current_timestamp()
+ self.closed_by = closed_by
+
+ # Has this report been accepted?
+ self.accepted = accept
+
# Decrement the counter for pending reports
- report.list.pending_reports -= 1
+ self.list.pending_reports -= 1
+
+ # XXX Send a message to the reporter?
+
+ # We are done if the report has not been accepted
+ if not self.accepted:
+ return
+
+ # Add the domain to the list (the add function will do the rest)
+ self.list.add_domain(
+ name = self.name,
+ added_by = self.reported_by,
+ report = self,
+ block = self.block,
+ )
- # XXX TODO
+ # Log action
+ log.info("Report %s has been closed by %s" % (self, self.closed_by))
import rich.text
import sys
import tempfile
+import uuid
# i18n
from dnsbl.i18n import _
default=os.environ.get("USER"), help=_("The person deleting the key"))
delete_api_key.set_defaults(func=self.__delete_api_key)
+ # Reports
+
+ close_report = subparsers.add_parser("close-report",
+ help=_("Closes a report"))
+ close_report.add_argument("id", type=uuid.UUID, help=_("The ID of the report"))
+ close_report.add_argument("--closed-by", required=True,
+ default=os.environ.get("USER"), help=_("The person closing the report"))
+ close_report_accept_reject = close_report.add_mutually_exclusive_group(required=True)
+ close_report_accept_reject.add_argument("--accept", action="store_true",
+ help=_("Accept the report"))
+ close_report_accept_reject.add_argument("--reject", action="store_true",
+ help=_("Reject the report"))
+ close_report.set_defaults(func=self.__close_report)
+
# Parse all arguments
args = parser.parse_args()
# Delete the key
key.delete(deleted_by=args.deleted_by)
+ # Reports
+
+ def __close_report(self, backend, args):
+ """
+ Closes a report
+ """
+ report = backend.reports.get_by_id(args.id)
+
+ # Fail if we cannot find the report
+ if not report:
+ print("Could not find report %s" % args.id)
+ return 2
+
+ # Close the report
+ report.close(
+ closed_by = args.closed_by,
+ accept = args.accept and not args.reject,
+ )
+
def main():
c = CLI()