From: Michael Tremer Date: Tue, 6 Jan 2026 18:27:04 +0000 (+0000) Subject: exporters: Refactor Suricata rules X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=1fddeeb028b9ee9e1d6a1108a43556e44b182e84;p=dbl.git exporters: Refactor Suricata rules Exporting everything as a single rule is completely blowing up Suricata. It was always going to be a tight game, but a lot of the tooling even breaks down due to the large size of the rule files. This is my less preferred option because it isn't easily possible for users to enable/disable any individual domains, but at least this performs well. There are now only single rules for each list that enable/disable filtering for the different protocols and there is one large list of domains for each list. This is being parsed as a dataset which should be the most efficient approach in terms of performance and memory usage. Signed-off-by: Michael Tremer --- diff --git a/src/dnsbl/exporters.py b/src/dnsbl/exporters.py index 5de5722..04b1895 100644 --- a/src/dnsbl/exporters.py +++ b/src/dnsbl/exporters.py @@ -19,13 +19,13 @@ ############################################################################### import abc +import base64 import datetime import io import os import pathlib import tarfile import tempfile -import zlib from . import util from .i18n import _ @@ -419,158 +419,115 @@ class SuricataRulesExporter(TextExporter): """ Export domains as a set of rules for Suricata """ - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - # Cache any used SIDs - self._used_sids = set() - - @abc.abstractproperty - def sid_prefix(self): - """ - The value that is being added to any SIDs - """ - raise NotImplementedError - - @abc.abstractproperty - def rule(self): - """ - The rule string - """ - raise NotImplementedError - - # Default rule revision - rev = 1 - def export(self, f): # Write the header self.write_header(f) - # Write all domains - for domain in self.list.domains: - args = { - "list" : "%s" % self.list, - "domain" : domain, - "sid" : self.compute_sid(domain), - "rev" : self.rev, - } - - # Write the rule - f.write(self.rule % args) - - def compute_sid(self, domain): - """ - Implements a simple hash function for a domain. - - The hash can only use up to - """ - # Convert the domain into bytes() - domain = domain.encode() - - # Compute the CRC32 checksum - h = zlib.crc32(domain) - - # Truncate to make space for the prefix - h %= 0x0fffffff - - # Add the prefix - h |= self.sid_prefix - - # Check if we have a collision, if so, increment the hash by one - while h in self._used_sids: - h += 1 + args = { + "name" : self.list, + "list" : self.list.slug, + } - # Store the hash for the next collision check - self._used_sids.add(h) + # XXX Maybe we should look into having different priority for different lists. + # For example, blocking some advertising has a lower priority than accessing + # a malware/phishing domain. + + rules = ( + # DNS + ( + "alert dns any any -> any any (" + " msg:\"IPFire DNSBL [%(name)s] Blocked DNS Query\";" + " dns.query; " + " domain; " + " dataset:isset,%(list)s,type string,load datasets/%(list)s.txt;" + " classtype:policy-violation;" + " priority:3;" + " sid:1;" + " rev:1;" + " reference:url,https://www.ipfire.org/dnsbl/%(list)s;" + " metadata:dnsbl %(list)s.dnsbl.ipfire.org;" + ")\n" + ), + + # HTTP + ( + "alert http any any -> any any (" + " msg:\"IPFire DNSBL [%(name)s] Blocked HTTP Request\";" + " http.host;" + " domain;" + " dataset:isset,%(list)s,type string,load datasets/%(list)s.txt;" + " classtype:policy-violation;" + " priority:3;" + " sid:2;" + " rev:1;" + " reference:url,https://www.ipfire.org/dnsbl/%(list)s;" + " metadata:dnsbl %(list)s.dnsbl.ipfire.org;" + ")\n" + ), + + # TLS + ( + "alert tls any any -> any any (" + " msg:\"IPFire DNSBL [%(name)s] Blocked TLS SNI\";" + " tls.sni;" + " domain; " + " dataset:isset,%(list)s,type string,load datasets/%(list)s.txt;" + " classtype:policy-violation;" + " priority:3;" + " sid:3;" + " rev:1;" + " reference:url,https://www.ipfire.org/dnsbl/%(list)s;" + " metadata:dnsbl %(list)s.dnsbl.ipfire.org;" + ")\n" + ), + + # QUIC + ( + "alert quic any any -> any any (" + " msg:\"IPFire DNSBL [%(name)s] Blocked QUIC SNI\";" + " quic.sni;" + " domain; " + " dataset:isset,%(list)s,type string,load datasets/%(list)s.txt;" + " classtype:policy-violation;" + " priority:3;" + " sid:4;" + " rev:1;" + " reference:url,https://www.ipfire.org/dnsbl/%(list)s;" + " metadata:dnsbl %(list)s.dnsbl.ipfire.org;" + ")\n" + ), + ) + + # Write all rules + for rule in rules: + f.write(rule % args) + + +class SuricataDatasetExporter(TextExporter): + """ + Exports the domains encoded as base64 + """ + def export(self, f): + # This file cannot have a header because Suricata will try to base64-decode it, too - return h + # Write all domains + for domain in self.list.domains: + # Convert the domain to bytes + domain = domain.encode() + # Convert into base64 + domain = base64.b64encode(domain) -class SuricataDNSExporter(SuricataRulesExporter): - """ - Exports the lists as a Suricata ruleset that filters DNS queries. - """ - sid_prefix = 0x10000000 - - rule = ( - "alert dns any any -> any any (" - " msg:\"IPFire DNSBL [%(list)s] Blocked DNS Query for *.%(domain)s\";" - " dns.query; " - " content:\"%(domain)s\";" - " nocase;" - " endswith;" - " sid:%(sid)s;" - " rev:%(rev)s;" - ")\n" - ) - - -class SuricataHTTPExporter(SuricataRulesExporter): - """ - Exports the lists as a Suricata ruleset that filters HTTP requests. - """ - sid_prefix = 0x20000000 - - rule = ( - "alert dns any any -> any any (" - " msg:\"IPFire DNSBL [%(list)s] Blocked HTTP Request to *.%(domain)s\";" - " http.host;" - " content:\"%(domain)s\";" - " nocase;" - " endswith;" - " sid:%(sid)s;" - " rev:%(rev)s;" - ")\n" - ) - - -class SuricataTLSExporter(SuricataRulesExporter): - """ - Exports the lists as a Suricata ruleset that filters TLS connections. - """ - sid_prefix = 0x30000000 - - rule = ( - "alert dns any any -> any any (" - " msg:\"IPFire DNSBL [%(list)s] Blocked TLS SNI to *.%(domain)s\";" - " tls.sni;" - " content:\"%(domain)s\";" - " nocase;" - " endswith;" - " sid:%(sid)s;" - " rev:%(rev)s;" - ")\n" - ) - - -class SuricataQUICExporter(SuricataRulesExporter): - """ - Exports the lists as a Suricata ruleset that filters QUIC connections. - """ - sid_prefix = 0x40000000 + # Decode back to string + domain = domain.decode() - rule = ( - "alert dns any any -> any any (" - " msg:\"IPFire DNSBL [%(list)s] Blocked QUIC SNI to *.%(domain)s\";" - " quic.sni;" - " content:\"%(domain)s\";" - " nocase;" - " endswith;" - " sid:%(sid)s;" - " rev:%(rev)s;" - ")\n" - ) + f.write("%s\n" % domain) class SuricataExporter(TarballExporter): - """ - Export a list in the format that squidguard can process it - """ files = { - "%(list)s-dns.rules" : SuricataDNSExporter, - "%(list)s-http.rules" : SuricataHTTPExporter, - "%(list)s-tls.rules" : SuricataTLSExporter, - "%(list)s-quic.rules" : SuricataQUICExporter, + "%(list)s.rules" : SuricataRulesExporter, + "datasets/%(list)s.txt" : SuricataDatasetExporter, } @@ -615,13 +572,6 @@ class CombinedSuricataExporter(MultiExporter): """ This is a special exporter which combines all Suricata rulesets into a tarball """ - files = { - "%(list)s-dns.rules" : SuricataDNSExporter, - "%(list)s-http.rules" : SuricataHTTPExporter, - "%(list)s-tls.rules" : SuricataTLSExporter, - "%(list)s-quic.rules" : SuricataQUICExporter, - } - def __call__(self, f): # Create a tar file with tarfile.open(fileobj=f, mode="w|gz") as tarball: