]> git.ipfire.org Git - thirdparty/suricata-verify.git/commitdiff
Add createst script 235/head
authorShivani Bhardwaj <shivanib134@gmail.com>
Tue, 9 Jul 2019 17:55:35 +0000 (23:25 +0530)
committerJason Ish <jason.ish@oisf.net>
Thu, 21 May 2020 17:49:16 +0000 (11:49 -0600)
createst is a script to produce a test directory with test.yaml as per
the PCAP and configuration provided.

This currently implements the functionality of creating the "checks" block
in `test.yaml` from a given `eve.json`. You can add other configuration
in the file thus created.

Usage
=====
```
usage: createst.py [-h] [--output-path <output-path>] [--eventtype-only]
                   [--allow-events [ALLOW_EVENTS]]
                   <test-name> <pcap-file>

Create tests with a given PCAP. Execute the script from a valid Suricata
source directory.

positional arguments:
  <test-name>           Name of the test folder
  <pcap-file>           Path to the PCAP file

optional arguments:
  -h, --help            show this help message and exit
  --output-path <output-path>
                        Path to the folder where generated test.yaml should be
                        put
  --eventtype-only      Create filter blocks based on event types only
  --allow-events [ALLOW_EVENTS]
                        Create filter blocks for the specified events
```

README.md
createst.py [new file with mode: 0755]

index 00124f09780ee433bb032a3ace437404e75c8060..26632dbf78538e5cadfdcd95c0d9f9001f2419cb 100644 (file)
--- a/README.md
+++ b/README.md
@@ -129,3 +129,31 @@ checks:
       args: cat fast.log | wc -l | xargs
       expect: 1
 ```            
+
+## Adding a new test the automated way: createst
+
+Script to create a test directory with test.yaml for a given PCAP. This
+needs to be run from a valid Suricata source directory.
+
+### Usage
+```
+usage: createst.py [-h] [--output-path <output-path>] [--eventtype-only]
+                   [--allow-events [ALLOW_EVENTS]]
+                   <test-name> <pcap-file>
+
+Create tests with a given PCAP. Execute the script from a valid Suricata source
+directory.
+
+positional arguments:
+  <test-name>           Name of the test folder
+  <pcap-file>           Path to the PCAP file
+
+optional arguments:
+  -h, --help            show this help message and exit
+  --output-path <output-path>
+                        Path to the folder where generated test.yaml should be
+                        put
+  --eventtype-only      Create filter blocks based on event types only
+  --allow-events [ALLOW_EVENTS]
+                        Create filter blocks for the specified events
+```
diff --git a/createst.py b/createst.py
new file mode 100755 (executable)
index 0000000..6b8fa4c
--- /dev/null
@@ -0,0 +1,408 @@
+#! /bin/python
+#
+# Copyright (C) 2019 Open Information Security Foundation
+#
+# You can copy, redistribute or modify this Program under the terms of
+# the GNU General Public License version 2 as published by the Free
+# Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# version 2 along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+
+import argparse
+import glob
+import json
+import logging
+import os
+import subprocess
+import sys
+from collections import defaultdict
+from shutil import copyfile
+
+import yaml
+from yaml.representer import Representer
+
+# Get a logger instance
+logger = logging.getLogger(__name__)
+logger.setLevel(logging.DEBUG)
+
+# Define YAML representer
+yaml.add_representer(defaultdict, Representer.represent_dict)
+
+WIN32 = sys.platform == "win32"
+suricata_bin = "src\suricata.exe" if WIN32 else "./src/suricata"
+suricata_yaml = "suricata.yaml" if WIN32 else "./suricata.yaml"
+CUR_DIR = os.path.dirname(os.path.realpath(__file__))
+DEFAULT_TEST_DIR = os.path.join(CUR_DIR, "tests")
+
+# Fields to exclude from the filter block
+skip_fields = ["timestamp", "flow_id", "last_reload"]
+
+
+def init_logger():
+    """
+    Initialize logger handler and format.
+    """
+    handler = logging.StreamHandler(sys.stdout)
+    handler.setLevel(logging.DEBUG)
+    formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
+    handler.setFormatter(formatter)
+    logger.addHandler(handler)
+
+
+def init_global_params():
+    """
+    Initialize global parameters before getting into major functions.
+    """
+    global args
+    global test_dir
+    global cwd
+    args = vars(parse_args())
+    test_dir = os.path.join(args["output_path"], args["test-name"])
+    # Get current working directory. This should be a valid Suricata source directory.
+    cwd = os.getcwd()
+
+
+def modify_eve_dict(eve_dict):
+    """
+    Modify nested event dictionary to make it follow a more flattened
+    approach so as to use full functionality of the run.py function
+    `find_value(name, obj)`. The dictionaries of the format
+    {
+        "event_type":"http",
+        "http":{
+            "hostname":"supportingfunctions.us.newsweaver.com",
+            "http_port":443
+            }
+    }
+
+    will be converted to
+
+    {
+        "event_type":"http",
+        "http.hostname":"supportingfunctions.us.newsweaver.com",
+        "http.http_port":443
+    }
+    """
+    modified = {}
+
+    def flatten(x, name=''):
+        """
+        Recursive function to flatten the nested dictionaries.
+        """
+        if type(x) is dict:
+            for val in x:
+                flatten(x[val], name + val + '.')
+        elif type(x) is list:
+            i = 0
+            for val in x:
+                name = name.rstrip('.')
+                flatten(val, name + '[' + str(i) + ']' + '.')
+                i += 1
+        else:
+            if name[:-1] not in ["flow.start", "flow.end"]:
+                modified[name[:-1]] = x
+
+    flatten(eve_dict)
+    return modified
+
+
+def create_directory(path):
+    """
+    Helper function to create output directories.
+    """
+    dir_path = os.path.join(args["output_path"], path)
+    try:
+        os.mkdir(dir_path)
+    except OSError as e:
+        logger.error(e)
+        sys.exit(1)
+
+
+def write_to_file(data):
+    """
+    Check for the output test.yaml file if it exists, else create one and write
+    to it.
+    """
+    test_yaml_path = os.path.join(test_dir, "test.yaml")
+    try:
+        os.remove(test_yaml_path)
+    except FileNotFoundError:
+        logger.info("{} not found. Creating...".format(test_yaml_path))
+    except IsADirectoryError as e:
+        logger.error(e)
+        sys.exit(1)
+    with open(test_yaml_path, "w+") as fp:
+        fp.write("# *** Add configuration here ***\n\n")
+        fp.write(data)
+
+
+def test_yaml_format(func):
+    """
+    Decorator to convert all events to test.yaml format.
+    """
+    def decorated(*args, **kwargs):
+        """
+        Add filters per event in eve.json.
+        """
+        eve_list = func(kwargs["eve_ds"])
+        all_eve_list = []
+        for item in eve_list:
+            eve_dict = {
+                    "filter": {
+                        "count": item[1],
+                        "match": modify_eve_dict(item[0]),
+                        },
+                    }
+            all_eve_list.append(eve_dict)
+        all_eve_dict = {"checks": all_eve_list}
+        all_eve_yaml = yaml.dump(all_eve_dict, default_flow_style=False)
+        return all_eve_yaml
+    return decorated
+
+
+def is_valid_suri_directory():
+    """
+    Check if the current working directory is a valid Suricata source
+    directory by checking presence of suricata.yaml and `src/suricata`.
+    """
+    if not (os.path.exists(suricata_yaml) and
+            os.path.exists(suricata_bin)):
+        logger.error("This is not a Suricata source directory or " +
+              "Suricata is not built")
+        return False
+    return True
+
+
+def get_manipulated_list():
+    """
+    Manipulate eve.json to load json successfully and skip the fields
+    mentioned in `skip_fields` variable.
+    """
+    eve_path = os.path.join(test_dir, "output", "eve.json")
+    allow_events = args["allow_events"].strip().split(",") if args["allow_events"] else []
+    with open(eve_path, "r") as fp:
+        content = fp.read()
+    content_list = content.strip().split("\n")
+    jcontent_list = [json.loads(e) for e in content_list]
+    all_content_list = []
+    for e in jcontent_list:
+        md = {k: v for k, v in e.items() if k not in skip_fields}
+        if "event_type" in md and md["event_type"] == "stats":
+            continue
+        all_content_list.append(md)
+    if allow_events:
+        def_eve_content_list = [item for item in all_content_list if item["event_type"] in allow_events]
+        if not def_eve_content_list:
+            logger.error("No matching events found.")
+            sys.exit(0)
+        return def_eve_content_list
+    return all_content_list
+
+
+def filter_event_type_params(eve_rules):
+    """
+    Create a filter block based on *ALL* the parameters of any event.
+    """
+    mlist = get_manipulated_list()
+    all_eve_list = get_all_eve_list(eve_ds=mlist)
+    write_to_file(data=all_eve_list)
+
+
+def filter_event_type(event_types):
+    """
+    Filter based *ONLY* on the event types.
+    """
+    all_eve_list = get_eve_list_by_type(eve_ds=event_types)
+    write_to_file(data=all_eve_list)
+
+
+@test_yaml_format
+def get_eve_list_by_type(eve_ds):
+    eve_list = []
+    for k, v in eve_ds.items():
+        key = {"event_type": k}
+        eve_list.append((key, v))
+    return eve_list
+
+
+@test_yaml_format
+def get_all_eve_list(eve_ds):
+    eve_list = []
+    for item in eve_ds:
+        eve_list.append((item, 1))
+    return eve_list
+
+
+def get_suricata_yaml_path():
+    """
+    Return the path to the suricata.yaml particular to the current test.
+    """
+    if os.path.exists(os.path.join(cwd, "suricata.yaml")):
+        return os.path.join(cwd, "suricata.yaml")
+    return os.path.join(test_dir, "suricata.yaml")
+
+
+def create_local_args():
+    """
+    Return a list of all the arguments required to be passed to a particular
+    local test.
+    """
+    test_output_dir = os.path.join(test_dir, "output")
+    pcap_path = os.path.join(test_dir, "input.pcap")
+    # Copy PCAP to the test directory
+    copyfile(args["pcap"], pcap_path)
+
+    largs = [
+        os.path.join(cwd, "src/suricata"),
+        '-r', pcap_path,
+        '-l', test_output_dir,
+        "--init-errors-fatal",
+        "-c", get_suricata_yaml_path(),
+        ]
+    # In Suricata 5.0 the classification.config and
+    # reference.config were moved into the etc/ directory. For now
+    # check there and the top level directory to still support
+    # 4.1.
+    classification_configs = [
+        os.path.join(cwd, "etc", "classification.config"),
+        os.path.join(cwd, "classification.config"),
+    ]
+
+    for config in classification_configs:
+        if os.path.exists(config):
+            largs += ["--set", "classification-file=%s" % config]
+            break
+
+    reference_configs = [
+        os.path.join(cwd, "etc", "reference.config"),
+        os.path.join(cwd, "reference.config"),
+    ]
+
+    for config in reference_configs:
+        if os.path.exists(config):
+            largs += ["--set", "reference-config-file=%s" % config]
+            break
+
+    # Check if rules were provided with args
+    if args["rules"]:
+        rules_path = os.path.join(test_dir, "test.rules")
+        copyfile(args["rules"], rules_path)
+        largs += ["-S", rules_path]
+    else:
+        largs.append("--disable-detection")
+
+    return largs
+
+
+def create_env():
+    """
+    Return environment for the test being created.
+    """
+    test_output_dir = os.path.join(test_dir, "output")
+    extraenv = {
+        # The suricata source directory.
+        "SRCDIR": cwd,
+        "TZ": "UTC",
+        "output_dir": test_output_dir,
+        "ASAN_OPTIONS": "detect_leaks=0",
+    }
+    env = os.environ.copy()
+    env.update(extraenv)
+
+    return env
+
+
+## Parser
+def parse_args():
+    """
+    Parse arguments and return them to main for processing.
+    """
+    parser = argparse.ArgumentParser(
+        description="Create tests with a given PCAP. Execute the script"
+                " from a valid Suricata source directory.")
+    parser.add_argument("test-name", metavar="<test-name>",
+                        help="Name of the test folder")
+    parser.add_argument("pcap", metavar="<pcap-file>",
+                        help="Path to the PCAP file")
+    parser.add_argument("--rules", metavar="<rules>",
+                        help="Path to rule file")
+    parser.add_argument("--output-path", default=DEFAULT_TEST_DIR, metavar="<output-path>",
+                        help="Path to the folder where generated test.yaml should be put")
+    parser.add_argument("--eventtype-only", default=None, action="store_true",
+                        help="Create filter blocks based on event types only")
+    parser.add_argument("--allow-events", nargs="?", default=None,
+                        help="Create filter blocks for the specified events")
+
+    # add arg to allow stdout only
+    args = parser.parse_args()
+
+    return args
+
+
+def eve2test():
+    """
+    Process the provided eve.json file and write the required checks in the
+    provided output file.
+    """
+    logger.info("Running eve2test...")
+    content = list()
+    event_types = defaultdict(int)
+    eve_path = os.path.join(test_dir, "output", "eve.json")
+    with open(eve_path, "r") as fp:
+        for line in fp:
+            eve_rule = json.loads(line)
+            content.append(eve_rule)
+            eve_type = eve_rule.get("event_type")
+            event_types[eve_type] += 1
+    if args["eventtype_only"]:
+        if args["allow_events"]:
+            logger.warning("--allow-events shall not be used with --eventtype-only")
+        filter_event_type(event_types=event_types)
+        return
+    filter_event_type_params(eve_rules=content)
+
+
+def generate_eve():
+    """
+    Create eve.json with Suricata as per the given configuration and PCAP.
+    In case of successful run, call eve2test to convert eve.json thus
+    created into test.yaml.
+    """
+    largs = create_local_args()
+    env = create_env()
+
+    p = subprocess.Popen(
+        largs, cwd=cwd, env=env,
+        stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+    _, stderr = p.communicate()
+    exit_status = p.wait()
+    if exit_status == 0:
+        eve2test()
+    else:
+        logger.error(stderr.decode("utf-8"))
+        sys.exit(1)
+
+
+def main():
+    init_logger()
+
+    # Check if its a valid Suricata directory
+    if not is_valid_suri_directory():
+        sys.exit(1)
+
+    init_global_params()
+    create_directory(path=args["test-name"])
+    create_directory(path=os.path.join(args["test-name"], "output"))
+    generate_eve()
+
+
+if __name__ == "__main__":
+    main()