output = subprocess.check_output(["./src/suricata", "-V"])
return parse_suricata_version(output)
+def version_gte(v1, v2):
+ """Return True if v1 is great than or equal to v2."""
+ if v1.major < v2.major:
+ return False
+ elif v1.major > v2.major:
+ return True
+
+ if v1.minor < v2.minor:
+ return False
+ elif v1.minor > v2.minor:
+ return True
+
+ if v1.patch < v2.patch:
+ return False
+
+ return True
+
def pipe_reader(fileobj, output=None, verbose=False):
for line in fileobj:
line = line.decode()
if verbose:
print(line.strip())
-class TestConfig:
-
- def __init__(self, config, suricata_config):
- self.config = config
- self.suricata_config = suricata_config
-
- def check_requires(self):
- if "requires" in self.config:
- requires = self.config["requires"]
-
- if "min-version" in requires:
- min_version = parse_suricata_version(requires["min-version"])
- suri_version = self.suricata_config.version
- if not self._version_gte(suri_version, min_version):
- raise UnsatisfiedRequirementError(
- "requires at least version %s" % (min_version.raw))
-
- if "features" in requires:
- for feature in requires["features"]:
- if not self.suricata_config.has_feature(feature):
- raise UnsatisfiedRequirementError(
- "requires feature %s" % (feature))
-
- if "not-features" in requires:
- for feature in requires["not-features"]:
- if self.suricata_config.has_feature(feature):
- if requires["not-features"][feature]:
- comment = "%s" % (
- requires["not-features"][feature])
- else:
- comment = "not for feature %s" % (feature)
- raise UnsatisfiedRequirementError(comment)
-
- if "env" in requires:
- for env in requires["env"]:
- if not env in os.environ:
- raise UnsatisfiedRequirementError(
- "requires env var %s" % (env))
-
- if "files" in requires:
- for filename in requires["files"]:
- if not os.path.exists(filename):
- raise UnsatisfiedRequirementError(
- "requires file %s" % (filename))
-
- def has_command(self):
- return "command" in self.config
-
- def get_command(self):
- return self.config["command"]
-
- def _version_gte(self, v1, v2):
- """Return True if v1 is great than or equal to v2."""
- if v1.major < v2.major:
- return False
- elif v1.major > v2.major:
- return True
-
- if v1.minor < v2.minor:
- return False
- elif v1.minor > v2.minor:
- return True
-
- if v1.patch < v2.patch:
- return False
-
- return True
-
class SuricataConfig:
def __init__(self, version):
# List of thread readers.
self.readers = []
+ # Load the test configuration.
self.load_config()
def load_config(self):
return
requires = self.config["requires"]
+ if "min-version" in requires:
+ min_version = parse_suricata_version(requires["min-version"])
+ suri_version = self.suricata_config.version
+ if not version_gte(suri_version, min_version):
+ raise UnsatisfiedRequirementError(
+ "requires at least version %s" % (min_version.raw))
+
+ if "features" in requires:
+ for feature in requires["features"]:
+ if not self.suricata_config.has_feature(feature):
+ raise UnsatisfiedRequirementError(
+ "requires feature %s" % (feature))
+
+ if "not-features" in requires:
+ for feature in requires["not-features"]:
+ if self.suricata_config.has_feature(feature):
+ if requires["not-features"][feature]:
+ comment = "%s" % (
+ requires["not-features"][feature])
+ else:
+ comment = "not for feature %s" % (feature)
+ raise UnsatisfiedRequirementError(comment)
+
+ if "env" in requires:
+ for env in requires["env"]:
+ if not env in os.environ:
+ raise UnsatisfiedRequirementError(
+ "requires env var %s" % (env))
+
+ if "files" in requires:
+ for filename in requires["files"]:
+ if not os.path.exists(filename):
+ raise UnsatisfiedRequirementError(
+ "requires file %s" % (filename))
+
# Check if a pcap is required or not. By default a pcap is
# required unless a "command" has been provided.
if not "command" in self.config:
sys.stdout.write("===> %s: " % os.path.basename(self.directory))
sys.stdout.flush()
- if os.path.exists(os.path.join(self.directory, "test.yaml")):
- test_config = yaml.load(
- open(os.path.join(self.directory, "test.yaml"), "rb"))
- test_config = TestConfig(test_config, self.suricata_config)
- else:
- test_config = TestConfig({}, self.suricata_config)
-
# Cleanup the output directory.
if os.path.exists(self.output):
shutil.rmtree(self.output)
os.makedirs(self.output)
- test_config.check_requires()
self.check_requires()
self.setup()
shell = False
- if test_config.has_command():
- args = test_config.get_command()
+ if "command" in self.config:
+ args = self.config["command"]
shell = True
else:
args = self.default_args()
print("FAIL: process returned with non-0 exit code: %d" % r)
return False
- return self.check(test_config)
+ return self.check()
- def check(self, test_config):
+ def check(self):
pdir = os.getcwd()
os.chdir(self.directory)
try:
- if "checks" in test_config.config:
- for check in test_config.config["checks"]:
+ if "checks" in self.config:
+ for check in self.config["checks"]:
for key in check:
if key == "filter":
if not FilterCheck(check[key]).run():