]> git.ipfire.org Git - dbl.git/commitdiff
exporter: Attempt to remove code duplication in the exporters
authorMichael Tremer <michael.tremer@ipfire.org>
Mon, 5 Jan 2026 17:32:42 +0000 (17:32 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Mon, 5 Jan 2026 17:32:42 +0000 (17:32 +0000)
This will also export a unified tarball of all Suricata rules.

Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/dnsbl/exporters.py
src/scripts/dnsbl.in

index 618369f8944c5b251ae8013a4555e44103d96f1f..ea88cdc759c0096fb5eef3c1297fa97861a32132 100644 (file)
@@ -51,6 +51,49 @@ class Exporter(abc.ABC):
                """
                raise NotImplementedError
 
+       def export_to_tarball(self, tarball, name, **kwargs):
+               """
+                       Exports the list to the tarball using the given exporter
+               """
+               assert isinstance(tarball, tarfile.TarFile), "Input is not a tarball"
+
+               # Create a temporary buffer
+               f = io.BytesIO()
+
+               # Export the data
+               self(f, **kwargs)
+
+               # Expand the filename
+               name = name % {
+                       "list" : self.list.slug,
+               }
+
+               # Create a new file
+               file = tarfile.TarInfo(name=name)
+
+               # Assign the list to be owned by nobody
+               file.uname = file.gname = "nobody"
+
+               # Set the mtime
+               file.mtime = self.list.updated_at.timestamp()
+
+               # Set the length
+               file.size = f.tell()
+
+               # Ensure we read the payload from the beginning
+               f.seek(0)
+
+               # Write the buffer to the tarball
+               tarball.addfile(file, fileobj=f)
+
+
+class NullExporter(Exporter):
+       """
+               Exports nothing, i.e. writes an empty file
+       """
+       def export(self, *args, **kwargs):
+               pass
+
 
 class TextExporter(Exporter):
        def __call__(self, f, **kwargs):
@@ -333,74 +376,33 @@ class RPZExporter(ZoneExporter):
        section = "rpz"
 
 
-class SquidGuardExporter(Exporter):
+class TarballExporter(Exporter):
        """
-               Export a list in the format that squidguard can process it
+               This class contains some helper functions to write data to tarballs
        """
        def export(self, f):
-               # Create a tar file
-               with tarfile.open(fileobj=f, mode="w|gz") as tarball:
-                       # Write the list
-                       self._write_list(tarball)
-
-       def _write_list(self, tarball):
-               # Create a new
-               file = tarfile.TarInfo(name="blacklists/%s/domains" % self.list.slug)
-
-               # Write the payload to memory
-               f = io.BytesIO()
-
-               # Create a domain exporter
-               exporter = DomainsExporter(self.backend, self.list)
-
-               # Write all domains to the buffer
-               exporter(f)
-
-               # Write the file
-               self._write_file(tarball, file, f)
-
-               # squidguard is also expecting some more files
-               for name in ("expressions", "urls"):
-                       file = tarfile.TarInfo(name="blacklists/%s/%s" % (self.list.slug, name))
-
-                       self._write_file(tarball, file)
-
-       def _write_file(self, tarball, file, f=None):
-               """
-                       A helper function to set some commonly used metadata
-               """
-               # Assign the list to be owned by nobody
-               file.uname = file.gname = "nobody"
-
-               # Set the mtime
-               file.mtime = self.list.updated_at.timestamp()
-
-               # Set the length
-               if f:
-                       file.size = f.tell()
+               # Accept a tarball object and in that case, simply don't nest them
+               if not isinstance(f, tarfile.TarFile):
+                       f = tarfile.open(fileobj=f, mode="w|gz")
 
-               # Ensure we read the payload from the beginning
-               if f:
-                       f.seek(0)
+               # Create a tar file
+               for file, exporter in self.files.items():
+                       # Create the exporter
+                       e = exporter(self.backend, self.list)
 
-               # Write the buffer to the tarball
-               tarball.addfile(file, fileobj=f)
+                       # Export to the tarball
+                       e.export_to_tarball(f, file)
 
 
-class CombinedSquidGuardExporter(object):
+class SquidGuardExporter(TarballExporter):
        """
-               This is a special exporter which combines all lists into a single tarball
+               Export a list in the format that squidguard can process it
        """
-       def __init__(self, backend, lists):
-               self.backend = backend
-               self.lists = lists
-
-       def __call__(self, f):
-               # Create a tar file
-               with tarfile.open(fileobj=f, mode="w|gz") as tarball:
-                       for list in self.lists:
-                               exporter = SquidGuardExporter(self.backend, list)
-                               exporter._write_list(tarball)
+       files = {
+               "blacklists/%(list)s/domains"     : DomainsExporter,
+               "blacklists/%(list)s/expressions" : NullExporter,
+               "blacklists/%(list)s/urls"        : NullExporter,
+       }
 
 
 class SuricataExporter(TextExporter):
@@ -548,3 +550,52 @@ class SuricataQUICExporter(SuricataExporter):
                " rev:%(rev)s;"
                ")\n"
        )
+
+
+class MultiExporter(abc.ABC):
+       """
+               This is a base class that can export multiple lists at the same time
+       """
+       files = {
+               "%(list)s/domains.txt" : DomainsExporter,
+       }
+
+       def __init__(self, backend, lists):
+               self.backend = backend
+               self.lists = lists
+
+       @abc.abstractmethod
+       def __call__(self, *args, **kwargs):
+               raise NotImplementedError
+
+
+class CombinedSquidGuardExporter(MultiExporter):
+       """
+               This is a special exporter which combines all lists into a single tarball
+       """
+       def __call__(self, f):
+               # Create a tar file
+               with tarfile.open(fileobj=f, mode="w|gz") as tarball:
+                       for list in self.lists:
+                               exporter = SquidGuardExporter(self.backend, list)
+                               exporter(tarball)
+
+
+class CombinedSuricataExporter(MultiExporter):
+       """
+               This is a special exporter which combines all Suricata rulesets into a tarball
+       """
+       files = {
+               "%(list)s-dns.rules"  : SuricataDNSExporter,
+               "%(list)s-http.rules" : SuricataHTTPExporter,
+               "%(list)s-tls.rules"  : SuricataTLSExporter,
+               "%(list)s-quic.rules" : SuricataQUICExporter,
+       }
+
+       def __call__(self, f):
+               # Create a tar file
+               with tarfile.open(fileobj=f, mode="w|gz") as tarball:
+                       for list in self.lists:
+                               for file, exporter in self.files.items():
+                                       e = exporter(self.backend, list)
+                                       e.export_to_tarball(tarball, file)
index 5c83db29dd1c539ef5dba6d03e71c45ef5636a83..cd4007984a3010b12cb500451e1cd24d016a63b7 100644 (file)
@@ -450,6 +450,11 @@ class CLI(object):
                with open(root / "squidguard.tar.gz", "wb") as f:
                        exporter(f)
 
+               # Write all Suricata rules into one tarball
+               exporter = dnsbl.exporters.CombinedSuricataExporter(backend, backend.lists)
+               with open(root / "suricata.tar.gz", "wb") as f:
+                       exporter(f)
+
        def __add_source(self, backend, args):
                """
                        Adds a new source to a list