From: Michael Tremer Date: Wed, 7 Jan 2026 15:07:36 +0000 (+0000) Subject: API: Add a simple endpoint to check where a domain is being listed X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=6eca58e3a36ec5cdc61836f12bf1a98cb53898e8;p=dbl.git API: Add a simple endpoint to check where a domain is being listed Signed-off-by: Michael Tremer --- diff --git a/Makefile.am b/Makefile.am index ca37dd9..98adb9d 100644 --- a/Makefile.am +++ b/Makefile.am @@ -67,6 +67,7 @@ dist_pkgpython_PYTHON = \ dist_pkgpython_api_PYTHON = \ src/dnsbl/api/__init__.py \ + src/dnsbl/api/domains.py \ src/dnsbl/api/lists.py \ src/dnsbl/api/reports.py diff --git a/src/dnsbl/__init__.py b/src/dnsbl/__init__.py index a378b08..161b474 100644 --- a/src/dnsbl/__init__.py +++ b/src/dnsbl/__init__.py @@ -97,6 +97,10 @@ class Backend(object): def auth(self): return auth.Auth(self) + @functools.cached_property + def domains(self): + return domains.Domains(self) + @functools.cached_property def lists(self): return lists.Lists(self) diff --git a/src/dnsbl/api/__init__.py b/src/dnsbl/api/__init__.py index 54479e6..b35297f 100644 --- a/src/dnsbl/api/__init__.py +++ b/src/dnsbl/api/__init__.py @@ -48,6 +48,7 @@ def require_api_key(api_key: str = fastapi.Depends(api_key_header)): raise fastapi.HTTPException(401, "Invalid API key") # Import any endpoints +from . import domains from . import lists from . import reports diff --git a/src/dnsbl/api/domains.py b/src/dnsbl/api/domains.py new file mode 100644 index 0000000..d795a59 --- /dev/null +++ b/src/dnsbl/api/domains.py @@ -0,0 +1,46 @@ +############################################################################### +# # +# dnsbl - A DNS Blocklist Compositor For IPFire # +# Copyright (C) 2026 IPFire Development Team # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +############################################################################### + +import fastapi +import typing +import uuid + +from .. import domains + +# Import the main app +from . import app +from . import backend + +# Create a router +router = fastapi.APIRouter( + prefix="/domains", + tags=["Domains"], +) + +@router.get("/{name}") +def get_domain(name: str): + # Fetch all domains that match the name + domains = backend.domains.get_by_name(name) + + # Show where the domain is currently listed + return { domain.list.slug : { "added_at" : domain.added_at } for domain in domains } + +# Include our endpoints +app.include_router(router) diff --git a/src/dnsbl/domains.py b/src/dnsbl/domains.py index 0b939ad..8976d7a 100644 --- a/src/dnsbl/domains.py +++ b/src/dnsbl/domains.py @@ -28,12 +28,40 @@ from . import database # Setup logging log = logging.getLogger(__name__) +class Domains(object): + def __init__(self, backend): + self.backend = backend + + def get_by_name(self, name): + """ + Fetch all domain objects that match the name + """ + stmt = ( + sqlmodel + .select( + Domain, + ) + .where( + # Name must match + Domain.name == name, + + # Ignore domains that have been removed + Domain.removed_at == None, + ) + ) + + return self.backend.db.fetch_as_set(stmt) + + class Domain(sqlmodel.SQLModel, database.BackendMixin, table=True): __tablename__ = "domains" def __str__(self): return self.name + def __hash__(self): + return hash(self.id) + # ID id: int = sqlmodel.Field(primary_key=True) diff --git a/src/dnsbl/lists.py b/src/dnsbl/lists.py index 5601840..60b6e86 100644 --- a/src/dnsbl/lists.py +++ b/src/dnsbl/lists.py @@ -120,6 +120,37 @@ class Lists(object): priority = priority, ) + def get_listing_domain(self, name): + """ + Returns all lists that currently contain the given domain. + """ + stmt = ( + sqlmodel + .select( + List, + ) + .select_from( + domains.Domain, + domains.Domain.list_id == List.id, + ) + .where( + # Fetch the matching domain + domains.Domain.name == name, + + # Ignore any domains that have been removed + domains.Domain.removed_at == None, + + # Ignore deleted lists + List.deleted_at == None, + ) + .order_by( + List.name, + List.slug, + ) + ) + + return self.backend.db.fetch_as_list(stmt) + class List(sqlmodel.SQLModel, database.BackendMixin, table=True): __tablename__ = "lists"