import yaml
import glob
import re
+from collections import namedtuple
+
+import yaml
+
+class UnsatisfiedRequirementError(Exception):
+ pass
+
+SuricataVersion = namedtuple(
+ "SuricataVersion", ["major", "minor", "patch", "full", "short", "raw"])
+
+def parse_suricata_version(buf):
+ m = re.search("((\d+)\.(\d+)(\.(\d+))?(\w+)?)", str(buf).strip())
+ if m:
+ full = m.group(1)
+ major = int(m.group(2))
+ minor = int(m.group(3))
+ if not m.group(5):
+ patch = 0
+ else:
+ patch = int(m.group(5))
+ short = "%s.%s" % (major, minor)
+ return SuricataVersion(
+ major=major, minor=minor, patch=patch, short=short, full=full,
+ raw=buf)
+ return None
+
+def get_suricata_version():
+ output = subprocess.check_output(["./src/suricata", "-V"])
+ return parse_suricata_version(output)
def pipe_reader(fileobj, output=None, verbose=False):
for line in fileobj:
if verbose:
print(line.strip())
+class TestConfig:
+
+ def __init__(self, config, suricata_config):
+ self.config = config
+ self.suricata_config = suricata_config
+
+ def check_requires(self):
+ if "requires" in self.config:
+ requires = self.config["requires"]
+
+ if "min-version" in requires:
+ min_version = parse_suricata_version(requires["min-version"])
+ suri_version = self.suricata_config.version
+ if not self._version_gte(suri_version, min_version):
+ raise UnsatisfiedRequirementError(
+ "requires at least version %s" % (min_version.raw))
+
+ def _version_gte(self, v1, v2):
+ """Return True if v1 is great than or equal to v2."""
+ if v1.major < v2.major:
+ return False
+ elif v1.major > v2.major:
+ return True
+
+ if v1.minor < v2.minor:
+ return False
+ elif v1.minor > v2.minor:
+ return True
+
+ if v1.patch < v2.patch:
+ return False
+
+ return True
+
+class SuricataConfig:
+
+ def __init__(self, version):
+ self.version = version
+
class TestRunner:
- def __init__(self, cwd, directory, verbose=False):
+ def __init__(self, cwd, directory, suricata_config, verbose=False):
self.cwd = cwd
self.directory = directory
+ self.suricata_config = suricata_config
self.verbose = verbose
self.output = os.path.join(self.directory, "output")
sys.stdout.write("===> %s: " % os.path.basename(self.directory))
sys.stdout.flush()
+ if os.path.exists(os.path.join(self.directory, "test.yaml")):
+ test_config = yaml.load(
+ open(os.path.join(self.directory, "test.yaml"), "rb"))
+ test_config = TestConfig(test_config, self.suricata_config)
+ test_config.check_requires()
+
args = []
if os.path.exists(os.path.join(self.directory, "run.sh")):
args.append(os.path.join(self.directory, "run.sh"))
"suricata is not built")
return 1
+ # Create a SuricataConfig object that is passed to all tests.
+ suricata_config = SuricataConfig(get_suricata_version())
+
for dirpath, dirnames, filenames in os.walk(os.path.join(topdir, "tests")):
# The top directory is not a test...
break
if do_test:
- test_runner = TestRunner(cwd, dirpath, args.verbose)
+ test_runner = TestRunner(
+ cwd, dirpath, suricata_config, args.verbose)
try:
- success = test_runner.run()
+ test_runner.run()
+ passed += 1
+ except UnsatisfiedRequirementError as err:
+ print("SKIPPED: %s" % (str(err)))
+ skipped += 1
except Exception as err:
print("FAIL: exception: %s" % (str(err)))
- success = False
- if success:
- passed += 1
- else:
+ failed += 1
if args.fail:
return 1
- failed += 1
print("")
print("PASSED: %d" % (passed))