###############################################################################
import abc
+import base64
import datetime
import io
import os
import pathlib
import tarfile
import tempfile
-import zlib
from . import util
from .i18n import _
"""
Export domains as a set of rules for Suricata
"""
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
-
- # Cache any used SIDs
- self._used_sids = set()
-
- @abc.abstractproperty
- def sid_prefix(self):
- """
- The value that is being added to any SIDs
- """
- raise NotImplementedError
-
- @abc.abstractproperty
- def rule(self):
- """
- The rule string
- """
- raise NotImplementedError
-
- # Default rule revision
- rev = 1
-
def export(self, f):
# Write the header
self.write_header(f)
- # Write all domains
- for domain in self.list.domains:
- args = {
- "list" : "%s" % self.list,
- "domain" : domain,
- "sid" : self.compute_sid(domain),
- "rev" : self.rev,
- }
-
- # Write the rule
- f.write(self.rule % args)
-
- def compute_sid(self, domain):
- """
- Implements a simple hash function for a domain.
-
- The hash can only use up to
- """
- # Convert the domain into bytes()
- domain = domain.encode()
-
- # Compute the CRC32 checksum
- h = zlib.crc32(domain)
-
- # Truncate to make space for the prefix
- h %= 0x0fffffff
-
- # Add the prefix
- h |= self.sid_prefix
-
- # Check if we have a collision, if so, increment the hash by one
- while h in self._used_sids:
- h += 1
+ args = {
+ "name" : self.list,
+ "list" : self.list.slug,
+ }
- # Store the hash for the next collision check
- self._used_sids.add(h)
+ # XXX Maybe we should look into having different priority for different lists.
+ # For example, blocking some advertising has a lower priority than accessing
+ # a malware/phishing domain.
+
+ rules = (
+ # DNS
+ (
+ "alert dns any any -> any any ("
+ " msg:\"IPFire DNSBL [%(name)s] Blocked DNS Query\";"
+ " dns.query; "
+ " domain; "
+ " dataset:isset,%(list)s,type string,load datasets/%(list)s.txt;"
+ " classtype:policy-violation;"
+ " priority:3;"
+ " sid:1;"
+ " rev:1;"
+ " reference:url,https://www.ipfire.org/dnsbl/%(list)s;"
+ " metadata:dnsbl %(list)s.dnsbl.ipfire.org;"
+ ")\n"
+ ),
+
+ # HTTP
+ (
+ "alert http any any -> any any ("
+ " msg:\"IPFire DNSBL [%(name)s] Blocked HTTP Request\";"
+ " http.host;"
+ " domain;"
+ " dataset:isset,%(list)s,type string,load datasets/%(list)s.txt;"
+ " classtype:policy-violation;"
+ " priority:3;"
+ " sid:2;"
+ " rev:1;"
+ " reference:url,https://www.ipfire.org/dnsbl/%(list)s;"
+ " metadata:dnsbl %(list)s.dnsbl.ipfire.org;"
+ ")\n"
+ ),
+
+ # TLS
+ (
+ "alert tls any any -> any any ("
+ " msg:\"IPFire DNSBL [%(name)s] Blocked TLS SNI\";"
+ " tls.sni;"
+ " domain; "
+ " dataset:isset,%(list)s,type string,load datasets/%(list)s.txt;"
+ " classtype:policy-violation;"
+ " priority:3;"
+ " sid:3;"
+ " rev:1;"
+ " reference:url,https://www.ipfire.org/dnsbl/%(list)s;"
+ " metadata:dnsbl %(list)s.dnsbl.ipfire.org;"
+ ")\n"
+ ),
+
+ # QUIC
+ (
+ "alert quic any any -> any any ("
+ " msg:\"IPFire DNSBL [%(name)s] Blocked QUIC SNI\";"
+ " quic.sni;"
+ " domain; "
+ " dataset:isset,%(list)s,type string,load datasets/%(list)s.txt;"
+ " classtype:policy-violation;"
+ " priority:3;"
+ " sid:4;"
+ " rev:1;"
+ " reference:url,https://www.ipfire.org/dnsbl/%(list)s;"
+ " metadata:dnsbl %(list)s.dnsbl.ipfire.org;"
+ ")\n"
+ ),
+ )
+
+ # Write all rules
+ for rule in rules:
+ f.write(rule % args)
+
+
+class SuricataDatasetExporter(TextExporter):
+ """
+ Exports the domains encoded as base64
+ """
+ def export(self, f):
+ # This file cannot have a header because Suricata will try to base64-decode it, too
- return h
+ # Write all domains
+ for domain in self.list.domains:
+ # Convert the domain to bytes
+ domain = domain.encode()
+ # Convert into base64
+ domain = base64.b64encode(domain)
-class SuricataDNSExporter(SuricataRulesExporter):
- """
- Exports the lists as a Suricata ruleset that filters DNS queries.
- """
- sid_prefix = 0x10000000
-
- rule = (
- "alert dns any any -> any any ("
- " msg:\"IPFire DNSBL [%(list)s] Blocked DNS Query for *.%(domain)s\";"
- " dns.query; "
- " content:\"%(domain)s\";"
- " nocase;"
- " endswith;"
- " sid:%(sid)s;"
- " rev:%(rev)s;"
- ")\n"
- )
-
-
-class SuricataHTTPExporter(SuricataRulesExporter):
- """
- Exports the lists as a Suricata ruleset that filters HTTP requests.
- """
- sid_prefix = 0x20000000
-
- rule = (
- "alert dns any any -> any any ("
- " msg:\"IPFire DNSBL [%(list)s] Blocked HTTP Request to *.%(domain)s\";"
- " http.host;"
- " content:\"%(domain)s\";"
- " nocase;"
- " endswith;"
- " sid:%(sid)s;"
- " rev:%(rev)s;"
- ")\n"
- )
-
-
-class SuricataTLSExporter(SuricataRulesExporter):
- """
- Exports the lists as a Suricata ruleset that filters TLS connections.
- """
- sid_prefix = 0x30000000
-
- rule = (
- "alert dns any any -> any any ("
- " msg:\"IPFire DNSBL [%(list)s] Blocked TLS SNI to *.%(domain)s\";"
- " tls.sni;"
- " content:\"%(domain)s\";"
- " nocase;"
- " endswith;"
- " sid:%(sid)s;"
- " rev:%(rev)s;"
- ")\n"
- )
-
-
-class SuricataQUICExporter(SuricataRulesExporter):
- """
- Exports the lists as a Suricata ruleset that filters QUIC connections.
- """
- sid_prefix = 0x40000000
+ # Decode back to string
+ domain = domain.decode()
- rule = (
- "alert dns any any -> any any ("
- " msg:\"IPFire DNSBL [%(list)s] Blocked QUIC SNI to *.%(domain)s\";"
- " quic.sni;"
- " content:\"%(domain)s\";"
- " nocase;"
- " endswith;"
- " sid:%(sid)s;"
- " rev:%(rev)s;"
- ")\n"
- )
+ f.write("%s\n" % domain)
class SuricataExporter(TarballExporter):
- """
- Export a list in the format that squidguard can process it
- """
files = {
- "%(list)s-dns.rules" : SuricataDNSExporter,
- "%(list)s-http.rules" : SuricataHTTPExporter,
- "%(list)s-tls.rules" : SuricataTLSExporter,
- "%(list)s-quic.rules" : SuricataQUICExporter,
+ "%(list)s.rules" : SuricataRulesExporter,
+ "datasets/%(list)s.txt" : SuricataDatasetExporter,
}
"""
This is a special exporter which combines all Suricata rulesets into a tarball
"""
- files = {
- "%(list)s-dns.rules" : SuricataDNSExporter,
- "%(list)s-http.rules" : SuricataHTTPExporter,
- "%(list)s-tls.rules" : SuricataTLSExporter,
- "%(list)s-quic.rules" : SuricataQUICExporter,
- }
-
def __call__(self, f):
# Create a tar file
with tarfile.open(fileobj=f, mode="w|gz") as tarball: