output = subprocess.check_output([suricata_bin, "-V"])
return parse_suricata_version(output)
-def version_equal(a, b):
- """Check if version a and version b are equal in a semantic way.
- For example:
- - 4 would match 4, 4.x and 4.x.y.
- - 4.0 would match 4.0.x.
- - 4.0.3 would match only 4.0.3.
- """
- if not a.major == b.major:
- return False
+def pipe_reader(fileobj, output=None, verbose=False):
+ for line in fileobj:
+ line = line.decode()
+ if output:
+ output.write(line)
+ if verbose:
+ print(line.strip())
- if a.minor is not None and b.minor is not None:
- if a.minor != b.minor:
- return False
- if a.patch is not None and b.patch is not None:
- if a.patch != b.patch:
+def handle_exceptions(func):
+ def applicator(*args, **kwargs):
+ try:
+ func(*args,**kwargs)
+ except TestError as te:
+ print("FAIL : {}".format(te))
+ check_args_fail()
+ kwargs["count"]["failure"] += 1
+ except UnsatisfiedRequirementError as ue:
+ print("SKIPPED : {}".format(ue))
+ kwargs["count"]["skipped"] += 1
+ else:
+ print("OK")
+ kwargs["count"]["success"] += 1
+ return kwargs["count"]
+ return applicator
+
+
+class Version:
+ """
+ Class to compare Suricata versions.
+ """
+ def is_equal(self, a, b):
+ """Check if version a and version b are equal in a semantic way.
+
+ For example:
+ - 4 would match 4, 4.x and 4.x.y.
+ - 4.0 would match 4.0.x.
+ - 4.0.3 would match only 4.0.3.
+ """
+ if not a.major == b.major:
return False
- return True
+ if a.minor is not None and b.minor is not None:
+ if a.minor != b.minor:
+ return False
-def version_gte(v1, v2):
- """Return True if v1 is great than or equal to v2."""
- if v1.major < v2.major:
- return False
- elif v1.major > v2.major:
- return True
+ if a.patch is not None and b.patch is not None:
+ if a.patch != b.patch:
+ return False
- if v1.minor < v2.minor:
- return False
- elif v1.minor > v2.minor:
return True
- if v1.patch < v2.patch:
- return False
+ def is_gte(self, v1, v2):
+ """Return True if v1 is great than or equal to v2."""
+ if v1.major < v2.major:
+ return False
+ elif v1.major > v2.major:
+ return True
- return True
+ if v1.minor < v2.minor:
+ return False
+ elif v1.minor > v2.minor:
+ return True
+
+ if v1.patch < v2.patch:
+ return False
+
+ return True
-def pipe_reader(fileobj, output=None, verbose=False):
- for line in fileobj:
- line = line.decode()
- if output:
- output.write(line)
- if verbose:
- print(line.strip())
class SuricataConfig:
return obj
+
+def is_version_compatible(version, suri_version, expr):
+ config_version = parse_suricata_version(version)
+ version_obj = Version()
+ func = getattr(version_obj, "is_{}".format(expr))
+ if not func(suri_version, config_version):
+ return False
+ return True
+
+
class ShellCheck:
def __init__(self, config):
class FilterCheck:
- def __init__(self, config, outdir):
+ def __init__(self, config, outdir, suri_version):
self.config = config
self.outdir = outdir
+ self.suri_version = suri_version
def run(self):
+ req_version = self.config.get("version")
+ min_version = self.config.get("min-version")
+ expr = "equal" if req_version else "gte"
+ if (req_version == None) ^ (min_version == None):
+ version = req_version or min_version
+ if not is_version_compatible(version=version,
+ suri_version=self.suri_version, expr=expr):
+ raise UnsatisfiedRequirementError(
+ "Suricata v{} not found".format(version))
+ elif req_version and min_version:
+ raise TestError("Specify either min-version or version")
+
if "filename" in self.config:
json_filename = self.config["filename"]
else:
return True
else:
requires = {}
-
+ suri_version = self.suricata_config.version
for key in requires:
-
if key == "min-version":
- min_version = parse_suricata_version(requires["min-version"])
- suri_version = self.suricata_config.version
- if not version_gte(suri_version, min_version):
+ min_version = requires["min-version"]
+ if not is_version_compatible(version=min_version,
+ suri_version=suri_version, expr="gte"):
raise UnsatisfiedRequirementError(
- "requires at least version %s" % (
- requires["min-version"]))
-
+ "requires at least version {}".format(min_version))
elif key == "version":
- requires_version = parse_suricata_version(requires["version"])
- if not version_equal(
- self.suricata_config.version,
- requires_version):
+ req_version = requires["version"]
+ if not is_version_compatible(version=req_version,
+ suri_version=suri_version, expr="equal"):
raise UnsatisfiedRequirementError(
- "only for version %s" % (requires["version"]))
-
+ "only for version {}".format(req_version))
elif key == "features":
for feature in requires["features"]:
if not self.suricata_config.has_feature(feature):
raise TestError("got exit code %d, expected %d" % (
r, expected_exit_code));
- if not self.check():
- return False
+ check_value = self.check()
+ if check_value["check_sh"]:
+ return check_value
print("OK%s" % (" (%dx)" % count if count > 1 else ""))
- return True
+ return check_value
def pre_check(self):
if "pre-check" in self.config:
subprocess.call(self.config["pre-check"], shell=True)
- def check(self):
+ @handle_exceptions
+ def perform_filter_checks(self, check, count):
+ count = FilterCheck(check, self.output, self.suricata_config.version).run()
+ return count
+ @handle_exceptions
+ def perform_shell_checks(self, check, count):
+ count = ShellCheck(check).run()
+ return count
+
+ @handle_exceptions
+ def perform_stats_checks(self, check, count):
+ count = StatsCheck(check, self.output).run()
+ return count
+
+ def check(self):
pdir = os.getcwd()
os.chdir(self.output)
+ count = {
+ "success": 0,
+ "failure": 0,
+ "skipped": 0,
+ "check_sh": 0,
+ }
try:
self.pre_check()
if "checks" in self.config:
- for check in self.config["checks"]:
+ for check_count, check in enumerate(self.config["checks"]):
+ print("\n|\n --> Sub test #{}: ".format(check_count + 1),
+ end="")
for key in check:
- if key == "filter":
- if not FilterCheck(check[key], self.output).run():
- raise TestError("filter did not match: %s" % (
- str(check[key])))
- elif key == "shell":
- if not ShellCheck(check[key]).run():
- raise TestError(
- "shell output did not match: %s" % (
- str(check[key])))
- elif key == "stats":
- if not StatsCheck(check[key], self.output).run():
- raise TestError("stats check did not pass")
+ if key in ["filter", "shell", "stats"]:
+ func = getattr(self, "perform_{}_checks".format(key))
+ count = func(check=check[key], count=count)
else:
- raise TestError("Unknown check type: %s" % (key))
+ print("FAIL: Unknown check type: {}".format(key))
finally:
os.chdir(pdir)
+ if count["failure"] or count["skipped"]:
+ return count
+
# Old style check script.
pdir = os.getcwd()
os.chdir(self.output)
try:
if not os.path.exists(os.path.join(self.directory, "check.sh")):
- return True
+ success_c = count["success"]
+ # Covering cases like "tests/show-help" which do not have
+ # check.sh and/or no checks in test.yaml should be counted
+ # successful
+ count["success"] = 1 if not success_c else success_c
+ return count
extraenv = {
# The suricata source directory.
"SRCDIR": self.cwd,
[os.path.join(self.directory, "check.sh")], env=env)
if r != 0:
print("FAILED: verification failed")
- return False
- return True
+ count["failure"] = 1
+ count["check_sh"] = 1
+ return count
+ else:
+ count["success"] = 1
+ return count
finally:
os.chdir(pdir)
t.start()
self.readers.append(t)
+
+def check_args_fail():
+ if args.fail:
+ sys.exit(1)
+
+
def check_deps():
try:
cmd = "jq --version > nil" if WIN32 else "jq --version > /dev/null 2>&1"
def main():
global TOPDIR
+ global args
if not check_deps():
return 1
test_runner = TestRunner(
cwd, dirpath, outdir, suricata_config, args.verbose)
try:
- if test_runner.run():
- passed += 1
- else:
- failed += 1
- if args.fail:
- return 1
- except UnsatisfiedRequirementError as err:
- print("SKIPPED: %s" % (str(err)))
+ results = test_runner.run()
+ passed += results["success"]
+ failed += results["failure"]
+ skipped += results["skipped"]
+ except UnsatisfiedRequirementError as ue:
+ print("SKIPPED: {}".format(ue))
skipped += 1
- except TestError as err:
- print("FAIL: %s" % (str(err)))
+ except TestError as te:
+ print("FAILED: {}".format(te))
+ check_args_fail()
failed += 1
- if args.fail:
- return 1
- except Exception as err:
- raise
print("")
print("PASSED: %d" % (passed))