import subprocess
import re
import logging
+import yaml
from collections import namedtuple
logger = logging.getLogger()
SuricataVersion = namedtuple(
"SuricataVersion", ["major", "minor", "patch", "full", "short", "raw"])
+class Configuration:
+ """An abstraction over the Suricata configuration file."""
+
+ def __init__(self, conf):
+ self.conf = conf
+
+ def keys(self):
+ return self.conf.keys()
+
+ def is_true(self, key, truthy=[]):
+ if not key in self.conf:
+ logger.warning(
+ "Suricata configuration key does not exist: %s" % (key))
+ return False
+ if key in self.conf:
+ val = self.conf[key]
+ if val.lower() in ["1", "yes", "true"] + truthy:
+ return True
+ return False
+
+ @classmethod
+ def load(cls, config_filename, suricata_path=None):
+ env = {
+ "SC_LOG_FORMAT": "%t - <%d> -- ",
+ "SC_LOG_LEVEL": "Error",
+ "ASAN_OPTIONS": "detect_leaks=0",
+ }
+ if not suricata_path:
+ suricata_path = get_path()
+ if not suricata_path:
+ raise Exception("Suricata program could not be found.")
+ if not os.path.exists(suricata_path):
+ raise Exception("Suricata program %s does not exist.", suricata_path)
+ configuration_dump = subprocess.check_output(
+ [suricata_path, "-c", config_filename, "--dump-config"],
+ env=env)
+ conf = {}
+ for line in configuration_dump.splitlines():
+ try:
+ key, val = line.split(" = ")
+ conf[key] = val
+ except:
+ logger.warning("Failed to parse: %s", line)
+ return cls(conf)
+
def get_path(program="suricata"):
"""Find Suricata in the shell path."""
for path in os.environ["PATH"].split(os.pathsep):
return cls()
return None
+class ProtoRuleMatcher:
+ """A rule matcher that matches on the protocol of a rule."""
+
+ def __init__(self, proto):
+ self.proto = proto
+
+ def match(self, rule):
+ return rule.proto == self.proto
+
class IdRuleMatcher(object):
"""Matcher object to match an idstools rule object by its signature
ID."""
logger.info("Loading %s.", drop_conf_filename)
drop_filters += load_drop_filters(drop_conf_filename)
+ if os.path.exists("/etc/suricata/suricata.yaml") and \
+ suricata_path and os.path.exists(suricata_path):
+ logger.info("Loading /etc/suricata/suricata.yaml")
+ suriconf = suricata.update.engine.Configuration.load(
+ "/etc/suricata/suricata.yaml", suricata_path=suricata_path)
+ for key in suriconf.keys():
+ if key.startswith("app-layer.protocols") and \
+ key.endswith(".enabled"):
+ if not suriconf.is_true(key, ["detection-only"]):
+ proto = key.split(".")[2]
+ logger.info("Disabling rules with proto %s", proto)
+ disable_matchers.append(ProtoRuleMatcher(proto))
+
# Check that the cache directory exists and is writable.
if not os.path.exists(config.get_cache_dir()):
try:
r"(?P<raw>"
r"(?P<header>"
r"(?P<action>%s)\s*" # Action
- r"[^\s]*\s*" # Protocol
+ r"(?P<proto>[^\s]*)\s*" # Protocol
r"[^\s]*\s*" # Source address(es)
r"[^\s]*\s*" # Source port
r"(?P<direction>[-><]+)\s*" # Direction
disabled (commented)
- **action**: The action of the rule (alert, pass, etc) as a
string
+ - **proto**: The protocol of the rule.
- **direction**: The direction string of the rule.
- **gid**: The gid of the rule as an integer
- **sid**: The sid of the rule as an integer
dict.__init__(self)
self["enabled"] = enabled
self["action"] = action
+ self["proto"] = None
self["direction"] = None
self["group"] = group
self["gid"] = 1
action=m.group("action"),
group=group)
- rule["direction"] = m.groupdict().get("direction", None)
rule["header"] = m.groupdict().get("header", None)
+ rule["proto"] = m.groupdict().get("proto", None)
+ rule["direction"] = m.groupdict().get("direction", None)
options = m.group("options")