]> git.ipfire.org Git - thirdparty/suricata-verify.git/commitdiff
add new checks to test.yaml
authorJason Ish <ish@unx.ca>
Wed, 20 Dec 2017 23:02:16 +0000 (17:02 -0600)
committerJason Ish <ish@unx.ca>
Wed, 20 Dec 2017 23:02:16 +0000 (17:02 -0600)
shell - for checking shell command output
filter - for counting number of events matching a filter
stats: for matching values in stats

run.py

diff --git a/run.py b/run.py
index d893c6a7d864a5c5bfc96b4fd1d47412b1e39b35..5c91a0891809110cbbdff5e6e7a687ec5b336afd 100755 (executable)
--- a/run.py
+++ b/run.py
@@ -147,6 +147,90 @@ class SuricataConfig:
     def has_feature(self, feature):
         return feature in self.features
 
+def find_value(name, obj):
+    """Find the value in an object for a field specified by name.
+
+    Example names:
+      event_type
+      alert.signature_id
+      smtp.rcpt_to[0]
+    """
+    parts = name.split(".")
+    for part in parts:
+        name = None
+        index = None
+        m = re.match("^(.*)\[(\d+)\]$", part)
+        if m:
+            name = m.group(1)
+            index = m.group(2)
+        else:
+            name = part
+
+        if not name in obj:
+            return None
+        obj = obj[name]
+
+        if index is not None:
+            obj = obj[int(index)]
+
+    return obj
+
+class ShellCheck:
+
+    def __init__(self, config):
+        self.config = config
+
+    def run(self):
+        output = subprocess.check_output(self.config["args"], shell=True)
+        if "expect" in self.config:
+            return str(self.config["expect"]) == output.strip()
+        return True
+
+class StatsCheck:
+
+    def __init__(self, config):
+        self.config = config
+
+    def run(self):
+        stats = None
+        with open(os.path.join("output", "eve.json"), "rb") as fileobj:
+            for line in fileobj:
+                event = json.loads(line)
+                if event["event_type"] == "stats":
+                    stats = event["stats"]
+        for key in self.config:
+            val = find_value(key, stats)
+            if val != self.config[key]:
+                raise Exception("stats.%s: expected %s; got %s" % (
+                    key, str(self.config[key]), str(val)))
+        return True
+
+class FilterCheck:
+
+    def __init__(self, config):
+        self.config = config
+
+    def run(self):
+        count = 0
+        with open(os.path.join("output", "eve.json"), "rb") as fileobj:
+            for line in fileobj:
+                event = json.loads(line)
+                if self.match(event):
+                    count += 1
+        if count == self.config["count"]:
+            return True
+        raise Exception("expected %d matches; got %d for filter %s" % (
+            self.config["count"], count, str(self.config)))
+
+    def match(self, event):
+        for field in self.config["match"]:
+            val = find_value(field, event)
+            if val is None:
+                return False
+            if val != self.config["match"][field]:
+                return False
+        return True
+
 class TestRunner:
 
     def __init__(self, cwd, directory, suricata_config, verbose=False):
@@ -229,13 +313,28 @@ class TestRunner:
 
     def check(self, test_config):
 
-        if "checks" in test_config.config:
-            for check in test_config.config["checks"]:
-                for key in check:
-                    if key == "signature-id":
-                        if not self.check_signature_id(check[key]):
-                            raise Exception("signature-id %d not found" % (
-                                check[key]))
+        pdir = os.getcwd()
+        os.chdir(self.directory)
+        try:
+            if "checks" in test_config.config:
+                for check in test_config.config["checks"]:
+                    for key in check:
+                        if key == "filter":
+                            if not FilterCheck(check[key]).run():
+                                raise Exception("filter did not match: %s" % (
+                                    str(check[key])))
+                        elif key == "shell":
+                            if not ShellCheck(check[key]).run():
+                                raise Exception(
+                                    "shell output did not match: %s" % (
+                                        str(check[key])))
+                        elif key == "stats":
+                            if not StatsCheck(check[key]).run():
+                                raise Exception("stats check did not pass")
+                        else:
+                            raise Exception("Unknown check type: %s" % (key))
+        finally:
+            os.chdir(pdir)
 
         if not os.path.exists(os.path.join(self.directory, "check.sh")):
             print("OK")
@@ -247,17 +346,6 @@ class TestRunner:
         print("OK")
         return True
         
-    def check_signature_id(self, sig_id):
-        with open(
-                os.path.join(
-                    self.directory, "output", "eve.json"), "rb") as fileobj:
-            for line in fileobj:
-                event = json.loads(line)
-                if "alert" in event:
-                    if event["alert"]["signature_id"] == sig_id:
-                        return True
-        return False
-
     def default_args(self):
         args = [
             os.path.join(self.cwd, "src/suricata"),