]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
script/eve-parity: add script for checking eve/keyword parity
authorJason Ish <jason.ish@oisf.net>
Wed, 19 Feb 2025 22:36:57 +0000 (16:36 -0600)
committerVictor Julien <victor@inliniac.net>
Wed, 5 Mar 2025 14:59:58 +0000 (15:59 +0100)
Currently this script has two commands: "missing" and "having".

"missing" will show eve fields that do not map to any keywords.

"having" will sohw eve fields along with their keyword mappsings,
while also validating that those keywords really exist.

Related to tickets: #6463, #4772

scripts/eve-parity.py [new file with mode: 0755]

diff --git a/scripts/eve-parity.py b/scripts/eve-parity.py
new file mode 100755 (executable)
index 0000000..07b25eb
--- /dev/null
@@ -0,0 +1,164 @@
+#! /usr/bin/env python3
+#
+# Tool for checking parity between the EVE schema and Suricata
+# keywords.
+#
+# Usage: ./scripts/eve-parity.py [missing|having]
+#
+# ## unmapped-keywords
+#
+# Display all known keywords that are not mapped to an EVE field.
+#
+# ## unmapped-fields
+#
+# Display all eve fields that do not have a keyword mapping.
+#
+# ## mapped-fields
+#
+# Display all EVE fields that have a keyword mapping.
+
+
+import sys
+import subprocess
+import json
+import argparse
+
+
+def main():
+    parser = argparse.ArgumentParser(description="EVE Parity Check Tool")
+    parser.add_argument(
+        "command", choices=["mapped-fields", "unmapped-keywords", "unmapped-fields"]
+    )
+    args = parser.parse_args()
+
+    keywords = load_known_keywords()
+    keys = load_schema()
+
+    if args.command == "mapped-fields":
+        mapped_fields(keywords, keys)
+    elif args.command == "unmapped-keywords":
+        unmapped_keywords(keywords, keys)
+    elif args.command == "unmapped-fields":
+        unmapped_fields(keywords, keys)
+
+
+def unmapped_keywords(keywords, keys):
+    """Report known keywords that are not mapped to an EVE field."""
+    schema_keywords = set()
+    for key in keys.keys():
+        if "keywords" in keys[key] and keys[key]["keywords"]:
+            for keyword in keys[key]["keywords"]:
+                schema_keywords.add(keyword)
+    unmapped = keywords - schema_keywords
+    for keyword in sorted(unmapped):
+        print(keyword)
+
+
+def unmapped_fields(keywords, keys):
+    with_missing = set()
+
+    for key in keys.keys():
+        if "keywords" not in keys[key]:
+            with_missing.add(key)
+
+    # Print sorted.
+    for key in sorted(with_missing):
+        print(key)
+
+
+def mapped_fields(keywords, keys):
+    for key in keys.keys():
+        if "keywords" in keys[key] and keys[key]["keywords"]:
+            for keyword in keys[key]["keywords"]:
+                if keyword not in keywords:
+                    errprint("ERROR: Unknown keyword: {}".format(keyword))
+            print("{} -> [{}]".format(key, ", ".join(keys[key]["keywords"])))
+
+
+def load_schema():
+    schema = json.load(open("etc/schema.json"))
+    stack = [(schema, [])]
+    keys = {}
+
+    while stack:
+        (current, path) = stack.pop(0)
+
+        for name, props in current["properties"].items():
+            if "$ref" in props:
+                ref = find_ref(schema, props["$ref"])
+                if not ref:
+                    raise Exception("$ref not found: {}".format(props["$ref"]))
+                props = ref
+            if props["type"] in ["string", "integer", "boolean", "number"]:
+                # End of the line...
+                key = ".".join(path + [name])
+                keys[key] = props.get("suricata", {})
+            elif props["type"] == "object":
+                #  An object can set "suricata.keywords" to false to
+                #  disable descending into it. For examples, "stats".
+                keywords = props.get("suricata", {}).get("keywords")
+                if keywords is False:
+                    # print("Skipping object {}, keywords disabled".format(".".join(path + [name])))
+                    continue
+
+                if "properties" in props:
+                    stack.insert(0, (props, path + [name]))
+                else:
+                    # May want to warn that this object has no properties.
+                    key = ".".join(path + [name])
+                    keys[key] = {}
+            elif props["type"] == "array":
+                if "items" in props and "type" in props["items"]:
+                    if "properties" in props["items"]:
+                        stack.insert(
+                            0,
+                            (
+                                props["items"],
+                                path + ["{}".format(name)],
+                            ),
+                        )
+                    else:
+                        # May want to warn that this array has no properties.
+                        key = ".".join(path + [name])
+                        keys[key] = {}
+                else:
+                    # May want to warn that this array has no items.
+                    key = ".".join(path + [name])
+                    keys[key] = {}
+            else:
+                raise Exception("Unsupported type: {}".format(props["type"]))
+
+    return keys
+
+
+def load_known_keywords():
+    keywords = set()
+    result = subprocess.check_output(["./src/suricata", "--list-keywords=csv"])
+    lines = result.decode().split("\n")
+    # Skip first line, as its a header line.
+    for line in lines[1:]:
+        parts = line.split(";")
+        if parts:
+            keywords.add(parts[0])
+    return keywords
+
+
+def errprint(*args, **kwargs):
+    print(*args, file=sys.stderr, **kwargs)
+
+
+def find_ref(schema: dict, ref: str) -> dict:
+    parts = ref.split("/")
+
+    root = parts.pop(0)
+    if root != "#":
+        raise Exception("Unsupported reference: {}".format(ref))
+
+    while parts:
+        schema = schema[parts.pop(0)]
+
+    return schema
+
+
+if __name__ == "__main__":
+    sys.exit(main())