import email.message
import email.utils
import gettext
+import json
import locale
import logging
import os
]),
))
+ # Translate app protocols
+ app_protocols = {
+ "bittorrent-dht" : _("BT DHT"),
+ "dns" : _("DNS"),
+ "http" : _("HTTP"),
+ "quic" : _("QUIC"),
+ "smb" : _("SMB"),
+ "smtp" : _("SMTP"),
+ "tls" : _("TLS"),
+
+ # Suricata has a special result if protocol detection failed
+ "failed" : None,
+ }
+
# Fetch the alerts
c = self.db.execute("""
SELECT
(event ->> '$.alert.severity') AS alert_severity,
(event ->> '$.alert.action') AS alert_action,
(event ->> '$.alert.gid') AS alert_gid,
- (event ->> '$.alert.rev') AS alert_rev
+ (event ->> '$.alert.rev') AS alert_rev,
+
+ -- App Protocol
+ (event ->> '$.app_proto') AS app_protocol,
+
+ -- DNS
+ (event ->> '$.dns.queries') AS dns_queries,
+
+ -- HTTP
+ (event ->> '$.http.protocol') AS http_protocol,
+ (event ->> '$.http.method') AS http_method,
+ (event ->> '$.http.hostname') AS http_hostname,
+ (event ->> '$.http.url') AS http_url,
+
+ -- TLS
+ (event ->> '$.tls.version') AS tls_version,
+ (event ->> '$.tls.sni') AS tls_sni,
+
+ -- QUIC
+ (event ->> '$.quic.sni') AS quic_sni
FROM
alerts
WHERE
if color:
style.add("BACKGROUND", (0, i), (0, i), color)
+ # Collect all protocols
+ protocols = [
+ app_protocols.get(row.app_protocol, row.app_protocol),
+ row.protocol,
+ ]
+
+ # Collect everything that goes into the signature column
+ signature = [
+ # Line 1
+ row.alert_signature,
+
+ # Line 2
+ "[%s:%s:%s] - %s" % (
+ row.alert_gid,
+ row.alert_signature_id,
+ row.alert_rev,
+ row.alert_category,
+ ),
+ ]
+
+ # Add more information if available for the app proto
+
+ # DNS
+ if row.app_protocol == "dns":
+ line = []
+
+ # Append all queries
+ for query in json.loads(row.dns_queries):
+ line.append(
+ "%s: %s" % (_("Query"), "%(rrname)s (%(rrtype)s)" % query),
+ )
+
+ # Append the line to the signature
+ signature.append(" - ".join(line))
+
+ # HTTP
+ elif row.app_protocol == "http":
+ line = []
+
+ # Add protocol
+ if row.http_protocol:
+ line.append("%s: %s" % (_("Protocol"), row.http_protocol))
+
+ # Add hostname
+ if row.http_hostname:
+ line.append("%s: %s" % (_("Hostname"), row.http_hostname))
+
+ # Add method
+ if row.http_method:
+ line.append(row.http_method)
+
+ # Add URL
+ if row.http_url:
+ line.append("%s: %s" % (_("URL"), row.http_url))
+
+ # Append the line to the signature
+ signature.append(" - ".join(line))
+
+ # TLS
+ elif row.app_protocol == "tls":
+ line = []
+
+ # Add TLS version
+ if row.tls_version and not row.tls_version == "UNDETERMINED":
+ line.append("%s: %s" % (_("TLS Version"), row.tls_version))
+
+ # Add SNI
+ if row.tls_sni:
+ line.append("%s: %s" % (_("SNI"), row.tls_sni))
+
+ # Append the line to the signature
+ signature.append(" - ".join(line))
+
+ # QUIC
+ elif row.app_protocol == "quic":
+ line = []
+
+ # Add SNI
+ if row.quic_sni:
+ line.append("%s: %s" % (_("SNI"), row.quic_sni))
+
+ # Append the line to the signature
+ signature.append(" - ".join(line))
+
# Append the row
rows.append((
# Severity
t.strftime("%H:%M:%S"),
# Signature
- reportlab.platypus.Paragraph(
- "%s<br />[%s:%s:%s] - %s" % (
- row.alert_signature,
- row.alert_gid,
- row.alert_signature_id,
- row.alert_rev,
- row.alert_category,
- ),
- self.styles["Small"],
- ),
+ reportlab.platypus.Paragraph("<br />".join(signature), self.styles["Small"]),
# Protocol
- row.protocol,
+ "/".join(p for p in protocols if p),
# Source/Destination
"%s:%s\n%s:%s" % (