]> git.ipfire.org Git - dbl.git/commitdiff
exporters: Create a nested directory exporter
authorMichael Tremer <michael.tremer@ipfire.org>
Mon, 5 Jan 2026 18:21:38 +0000 (18:21 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Mon, 5 Jan 2026 18:21:38 +0000 (18:21 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/dnsbl/exporters.py
src/scripts/dnsbl.in

index ea88cdc759c0096fb5eef3c1297fa97861a32132..21b99bf9cb911fce24518d44e867234f40c344ab 100644 (file)
 import abc
 import datetime
 import io
+import os
+import pathlib
 import tarfile
+import tempfile
 import zlib
 
 from . import util
@@ -75,7 +78,7 @@ class Exporter(abc.ABC):
                file.uname = file.gname = "nobody"
 
                # Set the mtime
-               file.mtime = self.list.updated_at.timestamp()
+               file.mtime = self.exported_at.timestamp()
 
                # Set the length
                file.size = f.tell()
@@ -86,6 +89,13 @@ class Exporter(abc.ABC):
                # Write the buffer to the tarball
                tarball.addfile(file, fileobj=f)
 
+       @property
+       def exported_at(self):
+               """
+                       The timestamp of the export
+               """
+               return self.list.updated_at
+
 
 class NullExporter(Exporter):
        """
@@ -556,18 +566,26 @@ class MultiExporter(abc.ABC):
        """
                This is a base class that can export multiple lists at the same time
        """
-       files = {
-               "%(list)s/domains.txt" : DomainsExporter,
-       }
 
-       def __init__(self, backend, lists):
+       def __init__(self, backend, lists=None):
                self.backend = backend
+
+               if lists is None:
+                       lists = backend.lists
+
                self.lists = lists
 
        @abc.abstractmethod
        def __call__(self, *args, **kwargs):
                raise NotImplementedError
 
+       @property
+       def exported_at(self):
+               """
+                       The timestamp of the export
+               """
+               return max(l.updated_at for l in self.lists)
+
 
 class CombinedSquidGuardExporter(MultiExporter):
        """
@@ -599,3 +617,104 @@ class CombinedSuricataExporter(MultiExporter):
                                for file, exporter in self.files.items():
                                        e = exporter(self.backend, list)
                                        e.export_to_tarball(tarball, file)
+
+
+class DirectoryExporter(MultiExporter):
+       """
+               This is a simple nested exporter that will create the directory structure
+               as it is available on https://dnsbl.ipfire.org/lists.
+       """
+       files = {
+               # Simple formats
+               "%(list)s/domains.txt" : DomainsExporter,
+               "%(list)s/hosts.txt"   : HostsExporter,
+
+               # DNS Zones
+               "%(list)s/dnsbl.zone"  : BlocklistExporter,
+               "%(list)s/rpz.zone"    : RPZExporter,
+
+               # Adblock Plus
+               "%(list)s/abp.txt"     : AdBlockPlusExporter,
+
+               # Export squidGuard & Suricata rules only as a tarball
+               "squidguard.tar.gz"    : CombinedSquidGuardExporter,
+               "suricata.tar.gz"      : CombinedSuricataExporter,
+       }
+
+       def __init__(self, backend, root, lists=None):
+               super().__init__(backend, lists)
+
+               # Store the root
+               self.root = pathlib.Path(root)
+
+       def __call__(self):
+               # Ensure the root directory exists
+               try:
+                       self.root.mkdir()
+               except FileExistsError:
+                       pass
+
+               # Export everything
+               for name, exporter in self.files.items():
+                       # For MultiExporters, we will have to export everything at once
+                       if issubclass(exporter, MultiExporter):
+                               e = exporter(self.backend, self.lists)
+                               self.export(e, name)
+
+                       # For regular exporters, we will have to export each list at a time
+                       else:
+                               for list in self.lists:
+                                       e = exporter(self.backend, list)
+                                       self.export(e, name, list=list)
+
+       def export(self, exporter, name, **kwargs):
+               """
+                       This function takes an exporter instance and runs it
+               """
+               # Make the path
+               path = self._make_path(name, **kwargs)
+
+               # Ensure the parent directory exists
+               try:
+                       path.parent.mkdir()
+               except FileExistsError:
+                       pass
+
+               # Create a new temporary file
+               with tempfile.NamedTemporaryFile(dir=path.parent) as f:
+                       # Export everthing to the file
+                       exporter(f)
+
+                       # Set the modification time (so that clients won't download again
+                       # just because we have done a re-export)
+                       if self.exported_at:
+                               os.utime(f.name, (
+                                       self.exported_at.timestamp(),
+                                       self.exported_at.timestamp(),
+                               ))
+
+                       # Fix permissions
+                       os.chmod(f.name, 0o644)
+
+                       # Remove the previous file (if it exists)
+                       try:
+                               os.unlink(path)
+                       except FileNotFoundError:
+                               pass
+
+                       # Once the output has been written in full, we will rename the file
+                       os.link(f.name, path)
+
+       def _make_path(self, name, list=None):
+               """
+                       A helper function to expand any variables in the paths
+               """
+               args = {}
+
+               # Substitute the list if present
+               if list:
+                       args |= {
+                               "list" : list.slug,
+                       }
+
+               return self.root / (name % args)
index 9a438c2150666beb2b75dc3292e6c64eba70aa92..94d9a660306ec2875dd814227085b439e6ba233a 100644 (file)
@@ -27,12 +27,10 @@ import dnsbl.checker
 import dnsbl.exporters
 import logging
 import os
-import pathlib
 import rich.console
 import rich.table
 import rich.text
 import sys
-import tempfile
 import uuid
 
 # i18n
@@ -382,78 +380,9 @@ class CLI(object):
                """
                        Exports all lists
                """
-               formats = {
-                       "abp"           : "abp.txt",
-                       "domains"       : "domains.txt",
-                       "dnsbl"         : "dnsbl.zone",
-                       "hosts"         : "hosts.txt",
-                       "rpz"           : "rpz.zone",
-                       "squidguard"    : "squidguard.tar.gz",
-
-                       # Suricata
-                       "suricata-dns"  : "suricata-dns.rules",
-                       "suricata-http" : "suricata-http.rules",
-                       "suricata-tls"  : "suricata-tls.rules",
-                       "suricata-quic" : "suricata-quic.rules",
-               }
-
-               # Ensure the output directory exists
-               try:
-                       os.makedirs(args.directory)
-               except FileExistsError:
-                       pass
-
-               # Open the root
-               root = pathlib.Path(args.directory)
-
-               # Export all lists
-               for list in backend.lists:
-                       for format, filename in formats.items():
-                               # Compose the directory for the list
-                               dir = root / list.slug
-
-                               # Compose the output filename
-                               name = dir / filename
-
-                               # Create a directory for the list
-                               try:
-                                       dir.mkdir()
-                               except FileExistsError:
-                                       pass
-
-                               # Create a new temporary file
-                               with tempfile.NamedTemporaryFile(dir=dir) as f:
-                                       list.export(f, format=format)
-
-                                       # Remove the previous file (if it exists)
-                                       try:
-                                               os.unlink(name)
-                                       except FileNotFoundError:
-                                               pass
-
-                                       # Set the modification time (so that clients won't download again
-                                       # just because we have done a re-export)
-                                       if list.updated_at:
-                                               os.utime(f.name, (
-                                                       list.updated_at.timestamp(),
-                                                       list.updated_at.timestamp(),
-                                               ))
-
-                                       # Fix permissions
-                                       os.chmod(f.name, 0o644)
-
-                                       # Once the output has been written in full, we will rename the file
-                                       os.link(f.name, name)
-
-               # Write all lists as one tarball for squidGuard
-               exporter = dnsbl.exporters.CombinedSquidGuardExporter(backend, backend.lists)
-               with open(root / "squidguard.tar.gz", "wb") as f:
-                       exporter(f)
-
-               # Write all Suricata rules into one tarball
-               exporter = dnsbl.exporters.CombinedSuricataExporter(backend, backend.lists)
-               with open(root / "suricata.tar.gz", "wb") as f:
-                       exporter(f)
+               # Launch the DirectoryExporter
+               exporter = dnsbl.exporters.DirectoryExporter(backend, root=args.directory)
+               exporter()
 
        def __add_source(self, backend, args):
                """