"""
raise NotImplementedError
+ def export_to_tarball(self, tarball, name, **kwargs):
+ """
+ Exports the list to the tarball using the given exporter
+ """
+ assert isinstance(tarball, tarfile.TarFile), "Input is not a tarball"
+
+ # Create a temporary buffer
+ f = io.BytesIO()
+
+ # Export the data
+ self(f, **kwargs)
+
+ # Expand the filename
+ name = name % {
+ "list" : self.list.slug,
+ }
+
+ # Create a new file
+ file = tarfile.TarInfo(name=name)
+
+ # Assign the list to be owned by nobody
+ file.uname = file.gname = "nobody"
+
+ # Set the mtime
+ file.mtime = self.list.updated_at.timestamp()
+
+ # Set the length
+ file.size = f.tell()
+
+ # Ensure we read the payload from the beginning
+ f.seek(0)
+
+ # Write the buffer to the tarball
+ tarball.addfile(file, fileobj=f)
+
+
+class NullExporter(Exporter):
+ """
+ Exports nothing, i.e. writes an empty file
+ """
+ def export(self, *args, **kwargs):
+ pass
+
class TextExporter(Exporter):
def __call__(self, f, **kwargs):
section = "rpz"
-class SquidGuardExporter(Exporter):
+class TarballExporter(Exporter):
"""
- Export a list in the format that squidguard can process it
+ This class contains some helper functions to write data to tarballs
"""
def export(self, f):
- # Create a tar file
- with tarfile.open(fileobj=f, mode="w|gz") as tarball:
- # Write the list
- self._write_list(tarball)
-
- def _write_list(self, tarball):
- # Create a new
- file = tarfile.TarInfo(name="blacklists/%s/domains" % self.list.slug)
-
- # Write the payload to memory
- f = io.BytesIO()
-
- # Create a domain exporter
- exporter = DomainsExporter(self.backend, self.list)
-
- # Write all domains to the buffer
- exporter(f)
-
- # Write the file
- self._write_file(tarball, file, f)
-
- # squidguard is also expecting some more files
- for name in ("expressions", "urls"):
- file = tarfile.TarInfo(name="blacklists/%s/%s" % (self.list.slug, name))
-
- self._write_file(tarball, file)
-
- def _write_file(self, tarball, file, f=None):
- """
- A helper function to set some commonly used metadata
- """
- # Assign the list to be owned by nobody
- file.uname = file.gname = "nobody"
-
- # Set the mtime
- file.mtime = self.list.updated_at.timestamp()
-
- # Set the length
- if f:
- file.size = f.tell()
+ # Accept a tarball object and in that case, simply don't nest them
+ if not isinstance(f, tarfile.TarFile):
+ f = tarfile.open(fileobj=f, mode="w|gz")
- # Ensure we read the payload from the beginning
- if f:
- f.seek(0)
+ # Create a tar file
+ for file, exporter in self.files.items():
+ # Create the exporter
+ e = exporter(self.backend, self.list)
- # Write the buffer to the tarball
- tarball.addfile(file, fileobj=f)
+ # Export to the tarball
+ e.export_to_tarball(f, file)
-class CombinedSquidGuardExporter(object):
+class SquidGuardExporter(TarballExporter):
"""
- This is a special exporter which combines all lists into a single tarball
+ Export a list in the format that squidguard can process it
"""
- def __init__(self, backend, lists):
- self.backend = backend
- self.lists = lists
-
- def __call__(self, f):
- # Create a tar file
- with tarfile.open(fileobj=f, mode="w|gz") as tarball:
- for list in self.lists:
- exporter = SquidGuardExporter(self.backend, list)
- exporter._write_list(tarball)
+ files = {
+ "blacklists/%(list)s/domains" : DomainsExporter,
+ "blacklists/%(list)s/expressions" : NullExporter,
+ "blacklists/%(list)s/urls" : NullExporter,
+ }
class SuricataExporter(TextExporter):
" rev:%(rev)s;"
")\n"
)
+
+
+class MultiExporter(abc.ABC):
+ """
+ This is a base class that can export multiple lists at the same time
+ """
+ files = {
+ "%(list)s/domains.txt" : DomainsExporter,
+ }
+
+ def __init__(self, backend, lists):
+ self.backend = backend
+ self.lists = lists
+
+ @abc.abstractmethod
+ def __call__(self, *args, **kwargs):
+ raise NotImplementedError
+
+
+class CombinedSquidGuardExporter(MultiExporter):
+ """
+ This is a special exporter which combines all lists into a single tarball
+ """
+ def __call__(self, f):
+ # Create a tar file
+ with tarfile.open(fileobj=f, mode="w|gz") as tarball:
+ for list in self.lists:
+ exporter = SquidGuardExporter(self.backend, list)
+ exporter(tarball)
+
+
+class CombinedSuricataExporter(MultiExporter):
+ """
+ This is a special exporter which combines all Suricata rulesets into a tarball
+ """
+ files = {
+ "%(list)s-dns.rules" : SuricataDNSExporter,
+ "%(list)s-http.rules" : SuricataHTTPExporter,
+ "%(list)s-tls.rules" : SuricataTLSExporter,
+ "%(list)s-quic.rules" : SuricataQUICExporter,
+ }
+
+ def __call__(self, f):
+ # Create a tar file
+ with tarfile.open(fileobj=f, mode="w|gz") as tarball:
+ for list in self.lists:
+ for file, exporter in self.files.items():
+ e = exporter(self.backend, list)
+ e.export_to_tarball(tarball, file)