import os
import subprocess
import shutil
+import tempfile
+import suricata.update.rule
DATA_DIR = "./tests/tmp"
"add-source", "--http-header", "Authorization: Basic dXNlcjE6cGFzc3dvcmQx",
"testing-header-with-spaces", "file:///doesnotexist"
])
+
+
+class IntegrationTest:
+ def __init__(self, configs={}):
+ self.directory = tempfile.mkdtemp(dir=DATA_DIR)
+ self.configs = configs
+ self.args = []
+ self.write_configs()
+
+ if not "update.yaml" in self.configs:
+ self.args += ["-c", "./tests/empty"]
+
+ def write_configs(self):
+ for config in self.configs:
+ config_filename = "%s/%s" % (self.directory, config)
+ with open(config_filename, "w") as of:
+ of.write(self.configs[config])
+ if config == "modify.conf":
+ self.args += ["--modify-conf", config_filename]
+ elif config == "drop.conf":
+ self.args += ["--drop-conf", config_filename]
+ elif config == "enable.conf":
+ self.args += ["--enable-conf", config_filename]
+ elif config == "disable.conf":
+ self.args += ["--disable-conf", config_filename]
+
+ def run(self):
+ args = [
+ sys.executable,
+ "./bin/suricata-update",
+ "-D",
+ self.directory,
+ "--no-test",
+ "--no-reload",
+ "--suricata-conf",
+ "./tests/suricata.yaml",
+ ] + self.args
+ subprocess.check_call(args)
+ self.check()
+ self.clean()
+
+ def clean(self):
+ if self.directory.startswith(DATA_DIR):
+ shutil.rmtree(self.directory)
+
+ def check(self):
+ pass
+
+ def get_rule_by_sid(self, sid):
+ """ Return all rules where the provided substring is found. """
+ with open("%s/rules/suricata.rules" % (self.directory)) as inf:
+ for line in inf:
+ rule = suricata.update.rule.parse(line)
+ if rule.sid == sid:
+ return rule
+ return None
+
+
+class MultipleModifyTest(IntegrationTest):
+
+ configs = {
+ "modify.conf":
+ """
+modifysid emerging-exploit.rules "^alert" | "drop"
+modifysid * "^drop(.*)noalert(.*)" | "alert${1}noalert${2}"
+ """
+ }
+
+ def __init__(self):
+ IntegrationTest.__init__(self, self.configs)
+
+ def check(self):
+ # This rule should have been converted to drop.
+ rule1 = self.get_rule_by_sid(2103461)
+ assert(rule1.action == "drop")
+
+ # This one should have been converted back to alert.
+ rule2 = self.get_rule_by_sid(2023184)
+ assert(rule2.action == "alert")
+
+class DropAndModifyTest(IntegrationTest):
+
+ configs = {
+ "drop.conf": """
+2024029
+ """,
+ "modify.conf": """
+2024029 "ET INFO" "TEST INFO"
+ """
+ }
+
+ def __init__(self):
+ IntegrationTest.__init__(self, self.configs)
+
+ def check(self):
+ rule1 = self.get_rule_by_sid(2024029)
+ assert(rule1.action == "drop")
+ assert(rule1.msg.startswith("TEST INFO"))
+
+
+MultipleModifyTest().run()
+DropAndModifyTest().run()