]> git.ipfire.org Git - dbl.git/commitdiff
dnsbl: Move the exporters into a separate file
authorMichael Tremer <michael.tremer@ipfire.org>
Mon, 8 Dec 2025 15:48:51 +0000 (15:48 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Mon, 8 Dec 2025 15:48:51 +0000 (15:48 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
Makefile.am
src/dnsbl/exporters.py [new file with mode: 0644]
src/dnsbl/lists.py

index eafc6829f23a90b9f7fc905c5f1cfb5cf0285140..d2e0c309a907b0b7be0c897c804b6b37e0fc96f8 100644 (file)
@@ -52,6 +52,7 @@ SED_PROCESS = \
 dist_pkgpython_PYTHON = \
        src/dnsbl/__init__.py \
        src/dnsbl/database.py \
+       src/dnsbl/exporters.py \
        src/dnsbl/i18n.py \
        src/dnsbl/lists.py \
        src/dnsbl/logger.py \
diff --git a/src/dnsbl/exporters.py b/src/dnsbl/exporters.py
new file mode 100644 (file)
index 0000000..731feb8
--- /dev/null
@@ -0,0 +1,153 @@
+###############################################################################
+#                                                                             #
+# dnsbl - A DNS Blocklist Compositor For IPFire                               #
+# Copyright (C) 2025 IPFire Development Team                                  #
+#                                                                             #
+# This program is free software: you can redistribute it and/or modify        #
+# it under the terms of the GNU General Public License as published by        #
+# the Free Software Foundation, either version 3 of the License, or           #
+# (at your option) any later version.                                         #
+#                                                                             #
+# This program is distributed in the hope that it will be useful,             #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of              #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the               #
+# GNU General Public License for more details.                                #
+#                                                                             #
+# You should have received a copy of the GNU General Public License           #
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.       #
+#                                                                             #
+###############################################################################
+
+import abc
+import dns.name
+import dns.rdataclass
+import dns.zone
+
+class Exporter(abc.ABC):
+       """
+               Abstract class to define an exporter
+       """
+       def __init__(self, backend, list):
+               self.backend = backend
+               self.list = list
+
+       @abc.abstractmethod
+       def export(self, f, **kwargs):
+               """
+                       Runs the export
+               """
+               raise NotImplementedError
+
+       def write_header(self, f, delim="#"):
+               """
+                       Writes a header
+               """
+               lines = [
+                       "##############################################################################",
+                       " DNS Blocklist for IPFire",
+                       "##############################################################################",
+                       "",
+                       " This file contains domains that are part of the official IPFire DNS",
+                       " Blocklist Project. It is intended for use with DNS servers and firewalls",
+                       " that support blocklisting of known malicious or unwanted domains.",
+                       "",
+                       " List    : %s" % self.list.name,
+                       " License : %s" % self.list.license,
+                       " Updated : %s" % self.list.updated_at.isoformat(),
+                       "",
+                       " Copyright (C) %s - IPFire Team" % self.list.updated_at.strftime("%Y"),
+                       "",
+                       " For more information or to contribute:",
+                       "   https://dnsbl.ipfire.org/",
+                       "",
+                       "##############################################################################",
+                       "",
+                       " This list contains data from these sources:",
+                       "",
+               ]
+
+               # Append all sources
+               for source in self.list.sources:
+                       lines.append("   * %s (%s)" % (source.name, source.license))
+                       lines.append("     %s" % source.url)
+                       if source.updated_at:
+                               lines.append("     Updated: %s" % source.updated_at.isoformat())
+
+               # Newline
+               lines.append("")
+
+               # Write everything to the output file
+               for line in lines:
+                       f.write("%s%s\n" % (delim, line))
+
+
+class DomainsExporter(Exporter):
+       """
+               Exports the plain domains
+       """
+       def export(self, f):
+               # Write the header
+               self.write_header(f)
+
+               # Write all domains
+               for domain in self.list.domains:
+                       f.write("%s\n" % domain)
+
+
+class HostsExporter(Exporter):
+       """
+               Exports a file like /etc/hosts
+       """
+       def export(self, f):
+               # Write the header
+               self.write_header(f)
+
+               # Write all domains
+               for domain in self.list.domains:
+                       f.write("0.0.0.0 %s\n" % domain)
+
+
+class RPZExporter(Exporter):
+       """
+               Exports the list as a RPZ zone file
+       """
+       def export(self, f, ttl=60, rpz_action="."):
+               # Write the header
+               self.write_header(f, ";")
+
+               # Create the origin
+               origin = dns.name.from_text(self.list.zone)
+
+               # Create a new zone
+               zone = dns.zone.Zone(origin)
+
+               # Create the SOA
+               soa = dns.rdataset.from_text(
+                       dns.rdataclass.IN,
+                       dns.rdatatype.SOA,
+                       ttl,
+                       " ".join((
+                               "master.lwldns.net.",
+                               "hostmaster.ipfire.org.",
+                               self.list.updated_at.strftime("%s"),
+                               "3600",
+                               "600",
+                               "3600000",
+                               "%s" % ttl,
+                       )),
+               )
+               zone.replace_rdataset(origin, soa)
+
+               # XXX Add NS
+
+               # Write all domains
+               for domain in self.list.domains:
+                       zone.replace_rdataset(
+                               dns.name.from_text("%s.%s" % (domain, self.list.zone)),
+                               dns.rdataset.from_text(
+                                       dns.rdataclass.IN, dns.rdatatype.CNAME, ttl, rpz_action,
+                               ),
+                       )
+
+               # Write the zone to file
+               zone.to_file(f)
index e0bcddc73f973146c9cec63fa7e924cb7720c404..104be828b2ed6131ce7dbc1878f70a60d2d865b9 100644 (file)
@@ -19,9 +19,6 @@
 ###############################################################################
 
 import datetime
-import dns.name
-import dns.rdataclass
-import dns.zone
 import functools
 import io
 import logging
@@ -29,6 +26,7 @@ import sqlmodel
 import typing
 
 from . import database
+from . import exporters
 from . import sources
 from . import util
 from .i18n import _
@@ -244,7 +242,7 @@ class List(sqlmodel.SQLModel, database.BackendMixin, table=True):
 
        # Export!
 
-       def export(self, f, format, ttl=60, rpz_action="."):
+       def export(self, f, format, **kwargs):
                """
                        Exports the list
                """
@@ -255,112 +253,24 @@ class List(sqlmodel.SQLModel, database.BackendMixin, table=True):
                        f = io.TextIOWrapper(f, encoding="utf-8")
                        detach = True
 
-               # Write the domains as they are
-               if format == "domains":
-                       # Write the header
-                       self._write_header(f)
-
-                       # Write all domains
-                       for domain in self.domains:
-                               f.write("%s\n" % domain)
-
-               # hosts format
-               elif format == "hosts":
-                       # Write the header
-                       self._write_header(f)
-
-                       # Write all domains
-                       for domain in self.domains:
-                               f.write("0.0.0.0 %s\n" % domain)
-
-               # Write a RPZ zone file
-               elif format == "rpz":
-                       # Write the header
-                       self._write_header(f, ";")
-
-                       # Create the origin
-                       origin = dns.name.from_text(self.zone)
-
-                       # Create a new zone
-                       zone = dns.zone.Zone(origin)
-
-                       # Create the SOA
-                       soa = dns.rdataset.from_text(
-                               dns.rdataclass.IN,
-                               dns.rdatatype.SOA,
-                               ttl,
-                               " ".join((
-                                       "master.lwldns.net.",
-                                       "hostmaster.ipfire.org.",
-                                       self.updated_at.strftime("%s"),
-                                       "3600",
-                                       "600",
-                                       "3600000",
-                                       "%s" % ttl,
-                               )),
-                       )
-                       zone.replace_rdataset(origin, soa)
-
-                       # XXX Add NS
+               formats = {
+                       "domains" : exporters.DomainsExporter,
+                       "hosts"   : exporters.HostsExporter,
+                       "rpz"     : exporters.RPZExporter,
+               }
 
-                       # Write all domains
-                       for domain in self.domains:
-                               zone.replace_rdataset(
-                                       dns.name.from_text("%s.%s" % (domain, self.zone)),
-                                       dns.rdataset.from_text(
-                                               dns.rdataclass.IN, dns.rdatatype.CNAME, ttl, rpz_action,
-                                       ),
-                               )
+               # Fetch the exporter
+               try:
+                       exporter = formats[format](self.backend, list=self)
 
-                       # Write the zone to file
-                       zone.to_file(f)
+               # Raise an error if we could not find an exporter
+               except KeyError as e:
+                       raise ValueError("Unknown output format: %s" % format) from e
 
-               else:
-                       raise ValueError("Unknown output format: %s" % format)
+               # Run the export
+               exporter.export(f, **kwargs)
 
                # Detach the underlying stream. That way, the wrapper won't close
                # the underlying file handle.
                if detach:
                        f.detach()
-
-       def _write_header(self, f, delim="#"):
-               """
-                       Writes a header
-               """
-               lines = [
-                       "##############################################################################",
-                       " DNS Blocklist for IPFire",
-                       "##############################################################################",
-                       "",
-                       " This file contains domains that are part of the official IPFire DNS",
-                       " Blocklist Project. It is intended for use with DNS servers and firewalls",
-                       " that support blocklisting of known malicious or unwanted domains.",
-                       "",
-                       " List    : %s" % self.name,
-                       " License : %s" % self.license,
-                       " Updated : %s" % self.updated_at.isoformat(),
-                       "",
-                       " Copyright (C) %s - IPFire Team" % self.updated_at.strftime("%Y"),
-                       "",
-                       " For more information or to contribute:",
-                       "   https://dnsbl.ipfire.org/",
-                       "",
-                       "##############################################################################",
-                       "",
-                       " This list contains data from these sources:",
-                       "",
-               ]
-
-               # Append all sources
-               for source in self.sources:
-                       lines.append("   * %s (%s)" % (source.name, source.license))
-                       lines.append("     %s" % source.url)
-                       if source.updated_at:
-                               lines.append("     Updated: %s" % source.updated_at.isoformat())
-
-               # Newline
-               lines.append("")
-
-               # Write everything to the output file
-               for line in lines:
-                       f.write("%s%s\n" % (delim, line))