From: Michael Tremer Date: Mon, 8 Dec 2025 15:48:51 +0000 (+0000) Subject: dnsbl: Move the exporters into a separate file X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=665bef3aa08ab3c8915308c873982ebf51fd4c3a;p=dbl.git dnsbl: Move the exporters into a separate file Signed-off-by: Michael Tremer --- diff --git a/Makefile.am b/Makefile.am index eafc682..d2e0c30 100644 --- a/Makefile.am +++ b/Makefile.am @@ -52,6 +52,7 @@ SED_PROCESS = \ dist_pkgpython_PYTHON = \ src/dnsbl/__init__.py \ src/dnsbl/database.py \ + src/dnsbl/exporters.py \ src/dnsbl/i18n.py \ src/dnsbl/lists.py \ src/dnsbl/logger.py \ diff --git a/src/dnsbl/exporters.py b/src/dnsbl/exporters.py new file mode 100644 index 0000000..731feb8 --- /dev/null +++ b/src/dnsbl/exporters.py @@ -0,0 +1,153 @@ +############################################################################### +# # +# dnsbl - A DNS Blocklist Compositor For IPFire # +# Copyright (C) 2025 IPFire Development Team # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +############################################################################### + +import abc +import dns.name +import dns.rdataclass +import dns.zone + +class Exporter(abc.ABC): + """ + Abstract class to define an exporter + """ + def __init__(self, backend, list): + self.backend = backend + self.list = list + + @abc.abstractmethod + def export(self, f, **kwargs): + """ + Runs the export + """ + raise NotImplementedError + + def write_header(self, f, delim="#"): + """ + Writes a header + """ + lines = [ + "##############################################################################", + " DNS Blocklist for IPFire", + "##############################################################################", + "", + " This file contains domains that are part of the official IPFire DNS", + " Blocklist Project. It is intended for use with DNS servers and firewalls", + " that support blocklisting of known malicious or unwanted domains.", + "", + " List : %s" % self.list.name, + " License : %s" % self.list.license, + " Updated : %s" % self.list.updated_at.isoformat(), + "", + " Copyright (C) %s - IPFire Team" % self.list.updated_at.strftime("%Y"), + "", + " For more information or to contribute:", + " https://dnsbl.ipfire.org/", + "", + "##############################################################################", + "", + " This list contains data from these sources:", + "", + ] + + # Append all sources + for source in self.list.sources: + lines.append(" * %s (%s)" % (source.name, source.license)) + lines.append(" %s" % source.url) + if source.updated_at: + lines.append(" Updated: %s" % source.updated_at.isoformat()) + + # Newline + lines.append("") + + # Write everything to the output file + for line in lines: + f.write("%s%s\n" % (delim, line)) + + +class DomainsExporter(Exporter): + """ + Exports the plain domains + """ + def export(self, f): + # Write the header + self.write_header(f) + + # Write all domains + for domain in self.list.domains: + f.write("%s\n" % domain) + + +class HostsExporter(Exporter): + """ + Exports a file like /etc/hosts + """ + def export(self, f): + # Write the header + self.write_header(f) + + # Write all domains + for domain in self.list.domains: + f.write("0.0.0.0 %s\n" % domain) + + +class RPZExporter(Exporter): + """ + Exports the list as a RPZ zone file + """ + def export(self, f, ttl=60, rpz_action="."): + # Write the header + self.write_header(f, ";") + + # Create the origin + origin = dns.name.from_text(self.list.zone) + + # Create a new zone + zone = dns.zone.Zone(origin) + + # Create the SOA + soa = dns.rdataset.from_text( + dns.rdataclass.IN, + dns.rdatatype.SOA, + ttl, + " ".join(( + "master.lwldns.net.", + "hostmaster.ipfire.org.", + self.list.updated_at.strftime("%s"), + "3600", + "600", + "3600000", + "%s" % ttl, + )), + ) + zone.replace_rdataset(origin, soa) + + # XXX Add NS + + # Write all domains + for domain in self.list.domains: + zone.replace_rdataset( + dns.name.from_text("%s.%s" % (domain, self.list.zone)), + dns.rdataset.from_text( + dns.rdataclass.IN, dns.rdatatype.CNAME, ttl, rpz_action, + ), + ) + + # Write the zone to file + zone.to_file(f) diff --git a/src/dnsbl/lists.py b/src/dnsbl/lists.py index e0bcddc..104be82 100644 --- a/src/dnsbl/lists.py +++ b/src/dnsbl/lists.py @@ -19,9 +19,6 @@ ############################################################################### import datetime -import dns.name -import dns.rdataclass -import dns.zone import functools import io import logging @@ -29,6 +26,7 @@ import sqlmodel import typing from . import database +from . import exporters from . import sources from . import util from .i18n import _ @@ -244,7 +242,7 @@ class List(sqlmodel.SQLModel, database.BackendMixin, table=True): # Export! - def export(self, f, format, ttl=60, rpz_action="."): + def export(self, f, format, **kwargs): """ Exports the list """ @@ -255,112 +253,24 @@ class List(sqlmodel.SQLModel, database.BackendMixin, table=True): f = io.TextIOWrapper(f, encoding="utf-8") detach = True - # Write the domains as they are - if format == "domains": - # Write the header - self._write_header(f) - - # Write all domains - for domain in self.domains: - f.write("%s\n" % domain) - - # hosts format - elif format == "hosts": - # Write the header - self._write_header(f) - - # Write all domains - for domain in self.domains: - f.write("0.0.0.0 %s\n" % domain) - - # Write a RPZ zone file - elif format == "rpz": - # Write the header - self._write_header(f, ";") - - # Create the origin - origin = dns.name.from_text(self.zone) - - # Create a new zone - zone = dns.zone.Zone(origin) - - # Create the SOA - soa = dns.rdataset.from_text( - dns.rdataclass.IN, - dns.rdatatype.SOA, - ttl, - " ".join(( - "master.lwldns.net.", - "hostmaster.ipfire.org.", - self.updated_at.strftime("%s"), - "3600", - "600", - "3600000", - "%s" % ttl, - )), - ) - zone.replace_rdataset(origin, soa) - - # XXX Add NS + formats = { + "domains" : exporters.DomainsExporter, + "hosts" : exporters.HostsExporter, + "rpz" : exporters.RPZExporter, + } - # Write all domains - for domain in self.domains: - zone.replace_rdataset( - dns.name.from_text("%s.%s" % (domain, self.zone)), - dns.rdataset.from_text( - dns.rdataclass.IN, dns.rdatatype.CNAME, ttl, rpz_action, - ), - ) + # Fetch the exporter + try: + exporter = formats[format](self.backend, list=self) - # Write the zone to file - zone.to_file(f) + # Raise an error if we could not find an exporter + except KeyError as e: + raise ValueError("Unknown output format: %s" % format) from e - else: - raise ValueError("Unknown output format: %s" % format) + # Run the export + exporter.export(f, **kwargs) # Detach the underlying stream. That way, the wrapper won't close # the underlying file handle. if detach: f.detach() - - def _write_header(self, f, delim="#"): - """ - Writes a header - """ - lines = [ - "##############################################################################", - " DNS Blocklist for IPFire", - "##############################################################################", - "", - " This file contains domains that are part of the official IPFire DNS", - " Blocklist Project. It is intended for use with DNS servers and firewalls", - " that support blocklisting of known malicious or unwanted domains.", - "", - " List : %s" % self.name, - " License : %s" % self.license, - " Updated : %s" % self.updated_at.isoformat(), - "", - " Copyright (C) %s - IPFire Team" % self.updated_at.strftime("%Y"), - "", - " For more information or to contribute:", - " https://dnsbl.ipfire.org/", - "", - "##############################################################################", - "", - " This list contains data from these sources:", - "", - ] - - # Append all sources - for source in self.sources: - lines.append(" * %s (%s)" % (source.name, source.license)) - lines.append(" %s" % source.url) - if source.updated_at: - lines.append(" Updated: %s" % source.updated_at.isoformat()) - - # Newline - lines.append("") - - # Write everything to the output file - for line in lines: - f.write("%s%s\n" % (delim, line))