import abc
import datetime
import io
+import os
+import pathlib
import tarfile
+import tempfile
import zlib
from . import util
file.uname = file.gname = "nobody"
# Set the mtime
- file.mtime = self.list.updated_at.timestamp()
+ file.mtime = self.exported_at.timestamp()
# Set the length
file.size = f.tell()
# Write the buffer to the tarball
tarball.addfile(file, fileobj=f)
+ @property
+ def exported_at(self):
+ """
+ The timestamp of the export
+ """
+ return self.list.updated_at
+
class NullExporter(Exporter):
"""
"""
This is a base class that can export multiple lists at the same time
"""
- files = {
- "%(list)s/domains.txt" : DomainsExporter,
- }
- def __init__(self, backend, lists):
+ def __init__(self, backend, lists=None):
self.backend = backend
+
+ if lists is None:
+ lists = backend.lists
+
self.lists = lists
@abc.abstractmethod
def __call__(self, *args, **kwargs):
raise NotImplementedError
+ @property
+ def exported_at(self):
+ """
+ The timestamp of the export
+ """
+ return max(l.updated_at for l in self.lists)
+
class CombinedSquidGuardExporter(MultiExporter):
"""
for file, exporter in self.files.items():
e = exporter(self.backend, list)
e.export_to_tarball(tarball, file)
+
+
+class DirectoryExporter(MultiExporter):
+ """
+ This is a simple nested exporter that will create the directory structure
+ as it is available on https://dnsbl.ipfire.org/lists.
+ """
+ files = {
+ # Simple formats
+ "%(list)s/domains.txt" : DomainsExporter,
+ "%(list)s/hosts.txt" : HostsExporter,
+
+ # DNS Zones
+ "%(list)s/dnsbl.zone" : BlocklistExporter,
+ "%(list)s/rpz.zone" : RPZExporter,
+
+ # Adblock Plus
+ "%(list)s/abp.txt" : AdBlockPlusExporter,
+
+ # Export squidGuard & Suricata rules only as a tarball
+ "squidguard.tar.gz" : CombinedSquidGuardExporter,
+ "suricata.tar.gz" : CombinedSuricataExporter,
+ }
+
+ def __init__(self, backend, root, lists=None):
+ super().__init__(backend, lists)
+
+ # Store the root
+ self.root = pathlib.Path(root)
+
+ def __call__(self):
+ # Ensure the root directory exists
+ try:
+ self.root.mkdir()
+ except FileExistsError:
+ pass
+
+ # Export everything
+ for name, exporter in self.files.items():
+ # For MultiExporters, we will have to export everything at once
+ if issubclass(exporter, MultiExporter):
+ e = exporter(self.backend, self.lists)
+ self.export(e, name)
+
+ # For regular exporters, we will have to export each list at a time
+ else:
+ for list in self.lists:
+ e = exporter(self.backend, list)
+ self.export(e, name, list=list)
+
+ def export(self, exporter, name, **kwargs):
+ """
+ This function takes an exporter instance and runs it
+ """
+ # Make the path
+ path = self._make_path(name, **kwargs)
+
+ # Ensure the parent directory exists
+ try:
+ path.parent.mkdir()
+ except FileExistsError:
+ pass
+
+ # Create a new temporary file
+ with tempfile.NamedTemporaryFile(dir=path.parent) as f:
+ # Export everthing to the file
+ exporter(f)
+
+ # Set the modification time (so that clients won't download again
+ # just because we have done a re-export)
+ if self.exported_at:
+ os.utime(f.name, (
+ self.exported_at.timestamp(),
+ self.exported_at.timestamp(),
+ ))
+
+ # Fix permissions
+ os.chmod(f.name, 0o644)
+
+ # Remove the previous file (if it exists)
+ try:
+ os.unlink(path)
+ except FileNotFoundError:
+ pass
+
+ # Once the output has been written in full, we will rename the file
+ os.link(f.name, path)
+
+ def _make_path(self, name, list=None):
+ """
+ A helper function to expand any variables in the paths
+ """
+ args = {}
+
+ # Substitute the list if present
+ if list:
+ args |= {
+ "list" : list.slug,
+ }
+
+ return self.root / (name % args)
import dnsbl.exporters
import logging
import os
-import pathlib
import rich.console
import rich.table
import rich.text
import sys
-import tempfile
import uuid
# i18n
"""
Exports all lists
"""
- formats = {
- "abp" : "abp.txt",
- "domains" : "domains.txt",
- "dnsbl" : "dnsbl.zone",
- "hosts" : "hosts.txt",
- "rpz" : "rpz.zone",
- "squidguard" : "squidguard.tar.gz",
-
- # Suricata
- "suricata-dns" : "suricata-dns.rules",
- "suricata-http" : "suricata-http.rules",
- "suricata-tls" : "suricata-tls.rules",
- "suricata-quic" : "suricata-quic.rules",
- }
-
- # Ensure the output directory exists
- try:
- os.makedirs(args.directory)
- except FileExistsError:
- pass
-
- # Open the root
- root = pathlib.Path(args.directory)
-
- # Export all lists
- for list in backend.lists:
- for format, filename in formats.items():
- # Compose the directory for the list
- dir = root / list.slug
-
- # Compose the output filename
- name = dir / filename
-
- # Create a directory for the list
- try:
- dir.mkdir()
- except FileExistsError:
- pass
-
- # Create a new temporary file
- with tempfile.NamedTemporaryFile(dir=dir) as f:
- list.export(f, format=format)
-
- # Remove the previous file (if it exists)
- try:
- os.unlink(name)
- except FileNotFoundError:
- pass
-
- # Set the modification time (so that clients won't download again
- # just because we have done a re-export)
- if list.updated_at:
- os.utime(f.name, (
- list.updated_at.timestamp(),
- list.updated_at.timestamp(),
- ))
-
- # Fix permissions
- os.chmod(f.name, 0o644)
-
- # Once the output has been written in full, we will rename the file
- os.link(f.name, name)
-
- # Write all lists as one tarball for squidGuard
- exporter = dnsbl.exporters.CombinedSquidGuardExporter(backend, backend.lists)
- with open(root / "squidguard.tar.gz", "wb") as f:
- exporter(f)
-
- # Write all Suricata rules into one tarball
- exporter = dnsbl.exporters.CombinedSuricataExporter(backend, backend.lists)
- with open(root / "suricata.tar.gz", "wb") as f:
- exporter(f)
+ # Launch the DirectoryExporter
+ exporter = dnsbl.exporters.DirectoryExporter(backend, root=args.directory)
+ exporter()
def __add_source(self, backend, args):
"""