dist_pkgpython_PYTHON = \
src/dnsbl/__init__.py \
src/dnsbl/i18n.py \
- src/dnsbl/logger.py
+ src/dnsbl/lists.py \
+ src/dnsbl/logger.py \
+ src/dnsbl/util.py
# ------------------------------------------------------------------------------
###############################################################################
import configparser
+import functools
import logging
import sqlmodel
# Setup logging
log = logging.getLogger(__name__)
+# Import sub-modules
+from . import lists
+
class Backend(object):
def __init__(self, config=None, debug=False):
self.debug = debug
logging_name=log.name,
)
+ def session(self):
+ """
+ Returns a new database session
+ """
+ return sqlmodel.Session(self.db)
+
+ @functools.cached_property
+ def lists(self):
+ return lists.Lists(self)
+
def update_sources(self):
"""
Updates all sources
--- /dev/null
+###############################################################################
+# #
+# dnsbl - A DNS Blacklist Compositor For IPFire #
+# Copyright (C) 2025 IPFire Development Team #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <http://www.gnu.org/licenses/>. #
+# #
+###############################################################################
+
+import datetime
+import sqlmodel
+
+from . import util
+
+class Lists(object):
+ def __init__(self, backend):
+ self.backend = backend
+
+ def get_by_slug(self, slug):
+ stmt = (
+ sqlmodel
+ .select(
+ List,
+ )
+ .where(
+ List.slug == slug,
+ List.deleted_at == None,
+ )
+ )
+
+ with self.backend.session() as session:
+ result = session.execute(stmt)
+
+ return result.scalar_one_or_none()
+
+ def _make_slug(self, name):
+ i = 0
+
+ while True:
+ slug = util.slugify(name, i)
+
+ # Skip if the list already exists
+ if self.get_by_slug(slug):
+ i += 1
+ continue
+
+ return slug
+
+ def create(self, name, created_by, license):
+ """
+ Creates a new list
+ """
+ slug = self._make_slug(name)
+
+ # Create a new list
+ with self.backend.session() as session:
+ list = List(
+ name = name,
+ slug = slug,
+ created_by = created_by,
+ license = license,
+ )
+ session.add(list)
+ session.commit()
+
+ return list
+
+
+class List(sqlmodel.SQLModel, table=True):
+ __tablename__ = "lists"
+
+ # ID
+ id : int = sqlmodel.Field(primary_key=True)
+
+ # Name
+ name : str
+
+ # Slug
+ slug : str
+
+ # Created At
+ created_at : datetime.datetime = sqlmodel.Field(
+ sa_column_kwargs = {"server_default" : sqlmodel.text("CURRENT_TIMESTAMP")}
+ )
+
+ # Created By
+ created_by : str
+
+ # Deleted At
+ deleted_at : datetime.datetime | None
+
+ # Deleted By
+ deleted_by : str | None
+
+ # License
+ license : str
--- /dev/null
+###############################################################################
+# #
+# dnsbl - A DNS Blacklist Compositor For IPFire #
+# Copyright (C) 2025 IPFire Development Team #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <http://www.gnu.org/licenses/>. #
+# #
+###############################################################################
+
+import re
+import unicodedata
+
+def slugify(text, iteration=None):
+ # Append the iteration
+ if iteration:
+ text = "%s-%s" % (text, iteration)
+
+ # Normalize Unicode characters (e.g., é → e)
+ text = unicodedata.normalize("NFKD", text)
+ text = text.encode("ascii", "ignore").decode("ascii")
+
+ # Lowercase
+ text = text.lower()
+
+ # Replace non-alphanumeric characters with hyphens
+ text = re.sub(r"[^a-z0-9]+", "-", text)
+
+ # Remove leading/trailing hyphens
+ text = text.strip("-")
+
+ # Collapse multiple hyphens
+ text = re.sub(r"-+", "-", text)
+
+ return text
import argparse
import dnsbl
import logging
+import os
import sys
# i18n
# Show Version
parser.add_argument("--version", action="version", version="%(prog)s @VERSION@")
+ # create-list
+ create_list = subparsers.add_parser("create-list", help=_("Create a new list"))
+ create_list.add_argument("--name", required=True,
+ help=_("The name of the list"))
+ create_list.add_argument("--created-by", required=True,
+ default=os.environ.get("USER"), help=_("The creator of the list"))
+ create_list.add_argument("--license", required=True,
+ help=_("The license of the list"))
+ create_list.set_defaults(func=self.__create_list)
+
# update
update = subparsers.add_parser("update", help=_("Update sources"))
update.set_defaults(func=self.__update)
# Otherwise just exit
sys.exit(0)
+ def __create_list(self, backend, args):
+ """
+ Creates a new list
+ """
+ backend.lists.create(
+ name = args.name,
+ created_by = args.created_by,
+ license = args.license,
+ )
+
def __update(self, backend, args):
"""
Updates all sources