From: Michael Tremer Date: Mon, 5 Jan 2026 18:21:38 +0000 (+0000) Subject: exporters: Create a nested directory exporter X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=f99ab9aba372118e0ecc4d6dfcab292dcbc18f72;p=dbl.git exporters: Create a nested directory exporter Signed-off-by: Michael Tremer --- diff --git a/src/dnsbl/exporters.py b/src/dnsbl/exporters.py index ea88cdc..21b99bf 100644 --- a/src/dnsbl/exporters.py +++ b/src/dnsbl/exporters.py @@ -21,7 +21,10 @@ import abc import datetime import io +import os +import pathlib import tarfile +import tempfile import zlib from . import util @@ -75,7 +78,7 @@ class Exporter(abc.ABC): file.uname = file.gname = "nobody" # Set the mtime - file.mtime = self.list.updated_at.timestamp() + file.mtime = self.exported_at.timestamp() # Set the length file.size = f.tell() @@ -86,6 +89,13 @@ class Exporter(abc.ABC): # Write the buffer to the tarball tarball.addfile(file, fileobj=f) + @property + def exported_at(self): + """ + The timestamp of the export + """ + return self.list.updated_at + class NullExporter(Exporter): """ @@ -556,18 +566,26 @@ class MultiExporter(abc.ABC): """ This is a base class that can export multiple lists at the same time """ - files = { - "%(list)s/domains.txt" : DomainsExporter, - } - def __init__(self, backend, lists): + def __init__(self, backend, lists=None): self.backend = backend + + if lists is None: + lists = backend.lists + self.lists = lists @abc.abstractmethod def __call__(self, *args, **kwargs): raise NotImplementedError + @property + def exported_at(self): + """ + The timestamp of the export + """ + return max(l.updated_at for l in self.lists) + class CombinedSquidGuardExporter(MultiExporter): """ @@ -599,3 +617,104 @@ class CombinedSuricataExporter(MultiExporter): for file, exporter in self.files.items(): e = exporter(self.backend, list) e.export_to_tarball(tarball, file) + + +class DirectoryExporter(MultiExporter): + """ + This is a simple nested exporter that will create the directory structure + as it is available on https://dnsbl.ipfire.org/lists. + """ + files = { + # Simple formats + "%(list)s/domains.txt" : DomainsExporter, + "%(list)s/hosts.txt" : HostsExporter, + + # DNS Zones + "%(list)s/dnsbl.zone" : BlocklistExporter, + "%(list)s/rpz.zone" : RPZExporter, + + # Adblock Plus + "%(list)s/abp.txt" : AdBlockPlusExporter, + + # Export squidGuard & Suricata rules only as a tarball + "squidguard.tar.gz" : CombinedSquidGuardExporter, + "suricata.tar.gz" : CombinedSuricataExporter, + } + + def __init__(self, backend, root, lists=None): + super().__init__(backend, lists) + + # Store the root + self.root = pathlib.Path(root) + + def __call__(self): + # Ensure the root directory exists + try: + self.root.mkdir() + except FileExistsError: + pass + + # Export everything + for name, exporter in self.files.items(): + # For MultiExporters, we will have to export everything at once + if issubclass(exporter, MultiExporter): + e = exporter(self.backend, self.lists) + self.export(e, name) + + # For regular exporters, we will have to export each list at a time + else: + for list in self.lists: + e = exporter(self.backend, list) + self.export(e, name, list=list) + + def export(self, exporter, name, **kwargs): + """ + This function takes an exporter instance and runs it + """ + # Make the path + path = self._make_path(name, **kwargs) + + # Ensure the parent directory exists + try: + path.parent.mkdir() + except FileExistsError: + pass + + # Create a new temporary file + with tempfile.NamedTemporaryFile(dir=path.parent) as f: + # Export everthing to the file + exporter(f) + + # Set the modification time (so that clients won't download again + # just because we have done a re-export) + if self.exported_at: + os.utime(f.name, ( + self.exported_at.timestamp(), + self.exported_at.timestamp(), + )) + + # Fix permissions + os.chmod(f.name, 0o644) + + # Remove the previous file (if it exists) + try: + os.unlink(path) + except FileNotFoundError: + pass + + # Once the output has been written in full, we will rename the file + os.link(f.name, path) + + def _make_path(self, name, list=None): + """ + A helper function to expand any variables in the paths + """ + args = {} + + # Substitute the list if present + if list: + args |= { + "list" : list.slug, + } + + return self.root / (name % args) diff --git a/src/scripts/dnsbl.in b/src/scripts/dnsbl.in index 9a438c2..94d9a66 100644 --- a/src/scripts/dnsbl.in +++ b/src/scripts/dnsbl.in @@ -27,12 +27,10 @@ import dnsbl.checker import dnsbl.exporters import logging import os -import pathlib import rich.console import rich.table import rich.text import sys -import tempfile import uuid # i18n @@ -382,78 +380,9 @@ class CLI(object): """ Exports all lists """ - formats = { - "abp" : "abp.txt", - "domains" : "domains.txt", - "dnsbl" : "dnsbl.zone", - "hosts" : "hosts.txt", - "rpz" : "rpz.zone", - "squidguard" : "squidguard.tar.gz", - - # Suricata - "suricata-dns" : "suricata-dns.rules", - "suricata-http" : "suricata-http.rules", - "suricata-tls" : "suricata-tls.rules", - "suricata-quic" : "suricata-quic.rules", - } - - # Ensure the output directory exists - try: - os.makedirs(args.directory) - except FileExistsError: - pass - - # Open the root - root = pathlib.Path(args.directory) - - # Export all lists - for list in backend.lists: - for format, filename in formats.items(): - # Compose the directory for the list - dir = root / list.slug - - # Compose the output filename - name = dir / filename - - # Create a directory for the list - try: - dir.mkdir() - except FileExistsError: - pass - - # Create a new temporary file - with tempfile.NamedTemporaryFile(dir=dir) as f: - list.export(f, format=format) - - # Remove the previous file (if it exists) - try: - os.unlink(name) - except FileNotFoundError: - pass - - # Set the modification time (so that clients won't download again - # just because we have done a re-export) - if list.updated_at: - os.utime(f.name, ( - list.updated_at.timestamp(), - list.updated_at.timestamp(), - )) - - # Fix permissions - os.chmod(f.name, 0o644) - - # Once the output has been written in full, we will rename the file - os.link(f.name, name) - - # Write all lists as one tarball for squidGuard - exporter = dnsbl.exporters.CombinedSquidGuardExporter(backend, backend.lists) - with open(root / "squidguard.tar.gz", "wb") as f: - exporter(f) - - # Write all Suricata rules into one tarball - exporter = dnsbl.exporters.CombinedSuricataExporter(backend, backend.lists) - with open(root / "suricata.tar.gz", "wb") as f: - exporter(f) + # Launch the DirectoryExporter + exporter = dnsbl.exporters.DirectoryExporter(backend, root=args.directory) + exporter() def __add_source(self, backend, args): """