From: Michael Tremer Date: Mon, 5 Jan 2026 17:32:42 +0000 (+0000) Subject: exporter: Attempt to remove code duplication in the exporters X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=44ea9677c5e0c1667e30a94670668c2ec9851f79;p=dbl.git exporter: Attempt to remove code duplication in the exporters This will also export a unified tarball of all Suricata rules. Signed-off-by: Michael Tremer --- diff --git a/src/dnsbl/exporters.py b/src/dnsbl/exporters.py index 618369f..ea88cdc 100644 --- a/src/dnsbl/exporters.py +++ b/src/dnsbl/exporters.py @@ -51,6 +51,49 @@ class Exporter(abc.ABC): """ raise NotImplementedError + def export_to_tarball(self, tarball, name, **kwargs): + """ + Exports the list to the tarball using the given exporter + """ + assert isinstance(tarball, tarfile.TarFile), "Input is not a tarball" + + # Create a temporary buffer + f = io.BytesIO() + + # Export the data + self(f, **kwargs) + + # Expand the filename + name = name % { + "list" : self.list.slug, + } + + # Create a new file + file = tarfile.TarInfo(name=name) + + # Assign the list to be owned by nobody + file.uname = file.gname = "nobody" + + # Set the mtime + file.mtime = self.list.updated_at.timestamp() + + # Set the length + file.size = f.tell() + + # Ensure we read the payload from the beginning + f.seek(0) + + # Write the buffer to the tarball + tarball.addfile(file, fileobj=f) + + +class NullExporter(Exporter): + """ + Exports nothing, i.e. writes an empty file + """ + def export(self, *args, **kwargs): + pass + class TextExporter(Exporter): def __call__(self, f, **kwargs): @@ -333,74 +376,33 @@ class RPZExporter(ZoneExporter): section = "rpz" -class SquidGuardExporter(Exporter): +class TarballExporter(Exporter): """ - Export a list in the format that squidguard can process it + This class contains some helper functions to write data to tarballs """ def export(self, f): - # Create a tar file - with tarfile.open(fileobj=f, mode="w|gz") as tarball: - # Write the list - self._write_list(tarball) - - def _write_list(self, tarball): - # Create a new - file = tarfile.TarInfo(name="blacklists/%s/domains" % self.list.slug) - - # Write the payload to memory - f = io.BytesIO() - - # Create a domain exporter - exporter = DomainsExporter(self.backend, self.list) - - # Write all domains to the buffer - exporter(f) - - # Write the file - self._write_file(tarball, file, f) - - # squidguard is also expecting some more files - for name in ("expressions", "urls"): - file = tarfile.TarInfo(name="blacklists/%s/%s" % (self.list.slug, name)) - - self._write_file(tarball, file) - - def _write_file(self, tarball, file, f=None): - """ - A helper function to set some commonly used metadata - """ - # Assign the list to be owned by nobody - file.uname = file.gname = "nobody" - - # Set the mtime - file.mtime = self.list.updated_at.timestamp() - - # Set the length - if f: - file.size = f.tell() + # Accept a tarball object and in that case, simply don't nest them + if not isinstance(f, tarfile.TarFile): + f = tarfile.open(fileobj=f, mode="w|gz") - # Ensure we read the payload from the beginning - if f: - f.seek(0) + # Create a tar file + for file, exporter in self.files.items(): + # Create the exporter + e = exporter(self.backend, self.list) - # Write the buffer to the tarball - tarball.addfile(file, fileobj=f) + # Export to the tarball + e.export_to_tarball(f, file) -class CombinedSquidGuardExporter(object): +class SquidGuardExporter(TarballExporter): """ - This is a special exporter which combines all lists into a single tarball + Export a list in the format that squidguard can process it """ - def __init__(self, backend, lists): - self.backend = backend - self.lists = lists - - def __call__(self, f): - # Create a tar file - with tarfile.open(fileobj=f, mode="w|gz") as tarball: - for list in self.lists: - exporter = SquidGuardExporter(self.backend, list) - exporter._write_list(tarball) + files = { + "blacklists/%(list)s/domains" : DomainsExporter, + "blacklists/%(list)s/expressions" : NullExporter, + "blacklists/%(list)s/urls" : NullExporter, + } class SuricataExporter(TextExporter): @@ -548,3 +550,52 @@ class SuricataQUICExporter(SuricataExporter): " rev:%(rev)s;" ")\n" ) + + +class MultiExporter(abc.ABC): + """ + This is a base class that can export multiple lists at the same time + """ + files = { + "%(list)s/domains.txt" : DomainsExporter, + } + + def __init__(self, backend, lists): + self.backend = backend + self.lists = lists + + @abc.abstractmethod + def __call__(self, *args, **kwargs): + raise NotImplementedError + + +class CombinedSquidGuardExporter(MultiExporter): + """ + This is a special exporter which combines all lists into a single tarball + """ + def __call__(self, f): + # Create a tar file + with tarfile.open(fileobj=f, mode="w|gz") as tarball: + for list in self.lists: + exporter = SquidGuardExporter(self.backend, list) + exporter(tarball) + + +class CombinedSuricataExporter(MultiExporter): + """ + This is a special exporter which combines all Suricata rulesets into a tarball + """ + files = { + "%(list)s-dns.rules" : SuricataDNSExporter, + "%(list)s-http.rules" : SuricataHTTPExporter, + "%(list)s-tls.rules" : SuricataTLSExporter, + "%(list)s-quic.rules" : SuricataQUICExporter, + } + + def __call__(self, f): + # Create a tar file + with tarfile.open(fileobj=f, mode="w|gz") as tarball: + for list in self.lists: + for file, exporter in self.files.items(): + e = exporter(self.backend, list) + e.export_to_tarball(tarball, file) diff --git a/src/scripts/dnsbl.in b/src/scripts/dnsbl.in index 5c83db2..cd40079 100644 --- a/src/scripts/dnsbl.in +++ b/src/scripts/dnsbl.in @@ -450,6 +450,11 @@ class CLI(object): with open(root / "squidguard.tar.gz", "wb") as f: exporter(f) + # Write all Suricata rules into one tarball + exporter = dnsbl.exporters.CombinedSuricataExporter(backend, backend.lists) + with open(root / "suricata.tar.gz", "wb") as f: + exporter(f) + def __add_source(self, backend, args): """ Adds a new source to a list