dist_pkgpython_PYTHON = \
src/dnsbl/__init__.py \
+ src/dnsbl/auth.py \
src/dnsbl/checker.py \
src/dnsbl/database.py \
src/dnsbl/exporters.py \
-- PostgreSQL database dump
--
-\restrict pZMXIfcg6Z3bqZxjE81zS0hcdaYCb5acZyYejnUVfbZS81irldi5kwkQVbA7Prg
+\restrict b3umosWsIJkpOpBVplcLzTd5va09HE84xdwzTROCiAHoMRKnogpbKgIksBp6ahf
-- Dumped from database version 17.6 (Debian 17.6-0+deb13u1)
-- Dumped by pg_dump version 17.6 (Debian 17.6-0+deb13u1)
SET default_table_access_method = heap;
+--
+-- Name: api_keys; Type: TABLE; Schema: public; Owner: -
+--
+
+CREATE TABLE public.api_keys (
+ id integer NOT NULL,
+ prefix text NOT NULL,
+ secret text NOT NULL,
+ created_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP NOT NULL,
+ created_by text NOT NULL,
+ deleted_at timestamp with time zone,
+ deleted_by text
+);
+
+
+--
+-- Name: api_keys_id_seq; Type: SEQUENCE; Schema: public; Owner: -
+--
+
+CREATE SEQUENCE public.api_keys_id_seq
+ AS integer
+ START WITH 1
+ INCREMENT BY 1
+ NO MINVALUE
+ NO MAXVALUE
+ CACHE 1;
+
+
+--
+-- Name: api_keys_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: -
+--
+
+ALTER SEQUENCE public.api_keys_id_seq OWNED BY public.api_keys.id;
+
+
--
-- Name: checker_domains; Type: TABLE; Schema: public; Owner: -
--
ALTER SEQUENCE public.sources_id_seq OWNED BY public.sources.id;
+--
+-- Name: api_keys id; Type: DEFAULT; Schema: public; Owner: -
+--
+
+ALTER TABLE ONLY public.api_keys ALTER COLUMN id SET DEFAULT nextval('public.api_keys_id_seq'::regclass);
+
+
--
-- Name: lists id; Type: DEFAULT; Schema: public; Owner: -
--
ALTER TABLE ONLY public.sources ALTER COLUMN id SET DEFAULT nextval('public.sources_id_seq'::regclass);
+--
+-- Name: api_keys api_keys_pkey; Type: CONSTRAINT; Schema: public; Owner: -
+--
+
+ALTER TABLE ONLY public.api_keys
+ ADD CONSTRAINT api_keys_pkey PRIMARY KEY (id);
+
+
--
-- Name: checker_domains checker_domains_pkey; Type: CONSTRAINT; Schema: public; Owner: -
--
ADD CONSTRAINT sources_pkey PRIMARY KEY (id);
+--
+-- Name: api_keys_prefix; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX api_keys_prefix ON public.api_keys USING btree (prefix) WHERE (deleted_at IS NULL);
+
+
--
-- Name: lists_unique; Type: INDEX; Schema: public; Owner: -
--
-- PostgreSQL database dump complete
--
-\unrestrict pZMXIfcg6Z3bqZxjE81zS0hcdaYCb5acZyYejnUVfbZS81irldi5kwkQVbA7Prg
+\unrestrict b3umosWsIJkpOpBVplcLzTd5va09HE84xdwzTROCiAHoMRKnogpbKgIksBp6ahf
log = logging.getLogger(__name__)
# Import sub-modules
+from . import auth
from . import database
from . import lists
from . import sources
follow_redirects=True,
)
+ @functools.cached_property
+ def auth(self):
+ return auth.Auth(self)
+
@functools.cached_property
def lists(self):
return lists.Lists(self)
###############################################################################
import fastapi
+import fastapi.security
# Import the backend
from .. import Backend
debug = True,
)
+api_key_header = fastapi.security.APIKeyHeader(name="X-API-Key")
+
+def require_api_key(api_key: str = fastapi.Depends(api_key_header)):
+ """
+ Requires that a client provides a valid API key
+ """
+ if not backend.auth(api_key):
+ raise fastapi.HTTPException(401, "Invalid API key")
+
# Import any endpoints
from . import lists
--- /dev/null
+###############################################################################
+# #
+# dnsbl - A DNS Blocklist Compositor For IPFire #
+# Copyright (C) 2025 IPFire Development Team #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <http://www.gnu.org/licenses/>. #
+# #
+###############################################################################
+
+import datetime
+import logging
+import secrets
+import sqlmodel
+import string
+
+from . import database
+from .i18n import _
+
+# Setup logging
+log = logging.getLogger(__name__)
+
+# The alphabet to generate the prefix from
+ALPHABET = string.ascii_letters + string.digits
+
+class Auth(object):
+ """
+ The authentication backend.
+
+ This implements some very simply API token authentication.
+ """
+ def __init__(self, backend):
+ self.backend = backend
+
+ def __call__(self, key):
+ """
+ The main authentication function which takes an API key
+ and checks it against the database.
+ """
+ key = self.get_key(key)
+
+ # If we have found a key, we have successfully authenticated
+ if key:
+ log.debug("Successfully authenticated %s" % key)
+ return True
+
+ return False
+
+ def get_key(self, key):
+ """
+ Fetches a specific key
+ """
+ # Partition the key
+ prefix, _, secret = key.partition(":")
+
+ # Check if we have both parts
+ if not prefix or not secret:
+ log.error("The provided API key does not contain a secret")
+ return
+
+ # Fetch all keys by prefix
+ keys = self.get_keys(prefix)
+
+ # Check all keys
+ for key in keys:
+ # Return True if the key matches the secret
+ if key.check(secret):
+ return key
+
+ def get_keys(self, prefix):
+ """
+ Fetches all keys with the matching prefix
+ """
+ stmt = (
+ sqlmodel
+ .select(
+ APIKey,
+ )
+ .where(
+ APIKey.deleted_at == None,
+ APIKey.prefix == prefix,
+ )
+ )
+
+ return self.backend.db.fetch_as_list(stmt)
+
+ def create(self, created_by):
+ """
+ Creates a new API key
+ """
+ # Generate a new prefix
+ prefix = "".join(secrets.choice(ALPHABET) for _ in range(6))
+
+ # Generate a new secret
+ secret = secrets.token_urlsafe(32)
+
+ # Insert the new token into the database
+ key = self.backend.db.insert(
+ APIKey,
+ prefix = prefix,
+ secret = secret,
+ created_by = created_by,
+ )
+
+ # Log action
+ log.info(_("A new API Key has been created by %s") % key.created_by)
+
+ return key
+
+
+class APIKey(sqlmodel.SQLModel, database.BackendMixin, table=True):
+ __tablename__ = "api_keys"
+
+ def __str__(self):
+ return "%s:%s" % (self.prefix, self.secret)
+
+ # ID
+ id : int = sqlmodel.Field(primary_key=True)
+
+ # Prefix
+ prefix : str
+
+ # Secret
+ secret : str
+
+ # Created At
+ created_at : datetime.datetime = sqlmodel.Field(
+ sa_column_kwargs = {"server_default" : sqlmodel.text("CURRENT_TIMESTAMP")}
+ )
+
+ # Created By
+ created_by : str
+
+ # Deleted At
+ deleted_at : datetime.datetime | None
+
+ # Deleted By
+ deleted_by : str | None
+
+ def check(self, secret):
+ """
+ Checks if the provided secret matches
+ """
+ return secrets.compare_digest(self.secret, secret)
+
+ def delete(self, deleted_by):
+ """
+ Deletes the key
+ """
+ self.deleted_at = sqlmodel.func.current_timestamp()
+ self.deleted_by = deleted_by
+
+ # Log action
+ log.info(_("API key %s has been deleted by %s") % (self.id, self.deleted_by))
help=_("Checks if domains are alive"))
check_domains.set_defaults(func=self.__check_domains)
+ # Authentication: create-api-key
+ create_api_key = subparsers.add_parser("create-api-key",
+ help=_("Creates a new API key"))
+ create_api_key.add_argument("--created-by", required=True,
+ default=os.environ.get("USER"), help=_("The creator of the key"))
+ create_api_key.set_defaults(func=self.__create_api_key)
+
+ # Authentication: delete-api-key
+ delete_api_key = subparsers.add_parser("delete-api-key",
+ help=_("Deletes an API key"))
+ delete_api_key.add_argument("key", help=_("The key to be deleted"))
+ delete_api_key.add_argument("--deleted-by", required=True,
+ default=os.environ.get("USER"), help=_("The person deleting the key"))
+ delete_api_key.set_defaults(func=self.__delete_api_key)
+
# Parse all arguments
args = parser.parse_args()
checker = dnsbl.checker.Checker(backend)
checker.check()
+ # Authentication
+
+ def __create_api_key(self, backend, args):
+ """
+ Creates a new API key
+ """
+ key = backend.auth.create(created_by=args.created_by)
+
+ # Show the new key
+ print(_("Your new API key has been created: %s") % key)
+
+ def __delete_api_key(self, backend, args):
+ """
+ Creates a new API key
+ """
+ key = backend.auth.get_key(args.key)
+
+ # If we could not find a key, we cannot delete it
+ if not key:
+ print("Could not find key %s" % args.key)
+ return 2
+
+ # Delete the key
+ key.delete(deleted_by=args.deleted_by)
+
def main():
c = CLI()