def __init__(self, version):
self.version = version
self.features = set()
-
+ self.config = {}
self.load_build_info()
def load_build_info(self):
if line.decode().startswith("Features:"):
self.features = set(line.decode().split()[1:])
+ def load_config(self, config_filename):
+ output = subprocess.check_output([
+ "./src/suricata",
+ "-c", config_filename,
+ "--dump-config"])
+ self.config = {}
+ for line in output.split("\n"):
+ parts = [p.strip() for p in line.split("=", 1)]
+ if parts and parts[0]:
+ if len(parts) > 1:
+ val = parts[1]
+ else:
+ val = ""
+ self.config[parts[0]] = val
+
def has_feature(self, feature):
return feature in self.features
# Load the test configuration.
self.load_config()
+ self.suricata_config.load_config(self.get_suricata_yaml_path())
+
def load_config(self):
if os.path.exists(os.path.join(self.directory, "test.yaml")):
self.config = yaml.safe_load(
else:
requires = {}
+ if "config" in requires:
+ for key_pattern, need_val in requires["config"].items():
+ for key, val in self.suricata_config.config.items():
+ if re.match(key_pattern, key):
+ if need_val != val:
+ raise UnsatisfiedRequirementError(
+ "requires %s = %s" % (
+ key, need_val))
+
if "min-version" in requires:
min_version = parse_suricata_version(requires["min-version"])
suri_version = self.suricata_config.version
if "ips" in self.name:
args.append("--simulate-ips")
- if os.path.exists(os.path.join(self.directory, "suricata.yaml")):
- args += ["-c", os.path.join(self.directory, "suricata.yaml")]
- else:
- args += ["-c", os.path.join(self.cwd, "suricata.yaml")]
+ args += ["-c", self.get_suricata_yaml_path()]
# Find pcaps.
if "pcap" in self.config:
return args
+ def get_suricata_yaml_path(self):
+ """Return the path to the suricata.yaml that will be used for this
+ test."""
+ if os.path.exists(os.path.join(self.directory, "suricata.yaml")):
+ return os.path.join(self.directory, "suricata.yaml")
+ return os.path.join(self.cwd, "suricata.yaml")
+
def start_reader(self, input, output):
t = threading.Thread(
target=pipe_reader, args=(input, output, self.verbose))