def has_feature(self, feature):
return feature in self.features
+def find_value(name, obj):
+ """Find the value in an object for a field specified by name.
+
+ Example names:
+ event_type
+ alert.signature_id
+ smtp.rcpt_to[0]
+ """
+ parts = name.split(".")
+ for part in parts:
+ name = None
+ index = None
+ m = re.match("^(.*)\[(\d+)\]$", part)
+ if m:
+ name = m.group(1)
+ index = m.group(2)
+ else:
+ name = part
+
+ if not name in obj:
+ return None
+ obj = obj[name]
+
+ if index is not None:
+ obj = obj[int(index)]
+
+ return obj
+
+class ShellCheck:
+
+ def __init__(self, config):
+ self.config = config
+
+ def run(self):
+ output = subprocess.check_output(self.config["args"], shell=True)
+ if "expect" in self.config:
+ return str(self.config["expect"]) == output.strip()
+ return True
+
+class StatsCheck:
+
+ def __init__(self, config):
+ self.config = config
+
+ def run(self):
+ stats = None
+ with open(os.path.join("output", "eve.json"), "rb") as fileobj:
+ for line in fileobj:
+ event = json.loads(line)
+ if event["event_type"] == "stats":
+ stats = event["stats"]
+ for key in self.config:
+ val = find_value(key, stats)
+ if val != self.config[key]:
+ raise Exception("stats.%s: expected %s; got %s" % (
+ key, str(self.config[key]), str(val)))
+ return True
+
+class FilterCheck:
+
+ def __init__(self, config):
+ self.config = config
+
+ def run(self):
+ count = 0
+ with open(os.path.join("output", "eve.json"), "rb") as fileobj:
+ for line in fileobj:
+ event = json.loads(line)
+ if self.match(event):
+ count += 1
+ if count == self.config["count"]:
+ return True
+ raise Exception("expected %d matches; got %d for filter %s" % (
+ self.config["count"], count, str(self.config)))
+
+ def match(self, event):
+ for field in self.config["match"]:
+ val = find_value(field, event)
+ if val is None:
+ return False
+ if val != self.config["match"][field]:
+ return False
+ return True
+
class TestRunner:
def __init__(self, cwd, directory, suricata_config, verbose=False):
def check(self, test_config):
- if "checks" in test_config.config:
- for check in test_config.config["checks"]:
- for key in check:
- if key == "signature-id":
- if not self.check_signature_id(check[key]):
- raise Exception("signature-id %d not found" % (
- check[key]))
+ pdir = os.getcwd()
+ os.chdir(self.directory)
+ try:
+ if "checks" in test_config.config:
+ for check in test_config.config["checks"]:
+ for key in check:
+ if key == "filter":
+ if not FilterCheck(check[key]).run():
+ raise Exception("filter did not match: %s" % (
+ str(check[key])))
+ elif key == "shell":
+ if not ShellCheck(check[key]).run():
+ raise Exception(
+ "shell output did not match: %s" % (
+ str(check[key])))
+ elif key == "stats":
+ if not StatsCheck(check[key]).run():
+ raise Exception("stats check did not pass")
+ else:
+ raise Exception("Unknown check type: %s" % (key))
+ finally:
+ os.chdir(pdir)
if not os.path.exists(os.path.join(self.directory, "check.sh")):
print("OK")
print("OK")
return True
- def check_signature_id(self, sig_id):
- with open(
- os.path.join(
- self.directory, "output", "eve.json"), "rb") as fileobj:
- for line in fileobj:
- event = json.loads(line)
- if "alert" in event:
- if event["alert"]["signature_id"] == sig_id:
- return True
- return False
-
def default_args(self):
args = [
os.path.join(self.cwd, "src/suricata"),