From: Shivani Bhardwaj Date: Tue, 9 Jul 2019 17:55:35 +0000 (+0530) Subject: Add createst script X-Git-Tag: suricata-6.0.4~296 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=b1f345dff8b5beb8151766d38bd4a7c475ce6b0e;p=thirdparty%2Fsuricata-verify.git Add createst script createst is a script to produce a test directory with test.yaml as per the PCAP and configuration provided. This currently implements the functionality of creating the "checks" block in `test.yaml` from a given `eve.json`. You can add other configuration in the file thus created. Usage ===== ``` usage: createst.py [-h] [--output-path ] [--eventtype-only] [--allow-events [ALLOW_EVENTS]] Create tests with a given PCAP. Execute the script from a valid Suricata source directory. positional arguments: Name of the test folder Path to the PCAP file optional arguments: -h, --help show this help message and exit --output-path Path to the folder where generated test.yaml should be put --eventtype-only Create filter blocks based on event types only --allow-events [ALLOW_EVENTS] Create filter blocks for the specified events ``` --- diff --git a/README.md b/README.md index 00124f097..26632dbf7 100644 --- a/README.md +++ b/README.md @@ -129,3 +129,31 @@ checks: args: cat fast.log | wc -l | xargs expect: 1 ``` + +## Adding a new test the automated way: createst + +Script to create a test directory with test.yaml for a given PCAP. This +needs to be run from a valid Suricata source directory. + +### Usage +``` +usage: createst.py [-h] [--output-path ] [--eventtype-only] + [--allow-events [ALLOW_EVENTS]] + + +Create tests with a given PCAP. Execute the script from a valid Suricata source +directory. + +positional arguments: + Name of the test folder + Path to the PCAP file + +optional arguments: + -h, --help show this help message and exit + --output-path + Path to the folder where generated test.yaml should be + put + --eventtype-only Create filter blocks based on event types only + --allow-events [ALLOW_EVENTS] + Create filter blocks for the specified events +``` diff --git a/createst.py b/createst.py new file mode 100755 index 000000000..6b8fa4c25 --- /dev/null +++ b/createst.py @@ -0,0 +1,408 @@ +#! /bin/python +# +# Copyright (C) 2019 Open Information Security Foundation +# +# You can copy, redistribute or modify this Program under the terms of +# the GNU General Public License version 2 as published by the Free +# Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# version 2 along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + +import argparse +import glob +import json +import logging +import os +import subprocess +import sys +from collections import defaultdict +from shutil import copyfile + +import yaml +from yaml.representer import Representer + +# Get a logger instance +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + +# Define YAML representer +yaml.add_representer(defaultdict, Representer.represent_dict) + +WIN32 = sys.platform == "win32" +suricata_bin = "src\suricata.exe" if WIN32 else "./src/suricata" +suricata_yaml = "suricata.yaml" if WIN32 else "./suricata.yaml" +CUR_DIR = os.path.dirname(os.path.realpath(__file__)) +DEFAULT_TEST_DIR = os.path.join(CUR_DIR, "tests") + +# Fields to exclude from the filter block +skip_fields = ["timestamp", "flow_id", "last_reload"] + + +def init_logger(): + """ + Initialize logger handler and format. + """ + handler = logging.StreamHandler(sys.stdout) + handler.setLevel(logging.DEBUG) + formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") + handler.setFormatter(formatter) + logger.addHandler(handler) + + +def init_global_params(): + """ + Initialize global parameters before getting into major functions. + """ + global args + global test_dir + global cwd + args = vars(parse_args()) + test_dir = os.path.join(args["output_path"], args["test-name"]) + # Get current working directory. This should be a valid Suricata source directory. + cwd = os.getcwd() + + +def modify_eve_dict(eve_dict): + """ + Modify nested event dictionary to make it follow a more flattened + approach so as to use full functionality of the run.py function + `find_value(name, obj)`. The dictionaries of the format + { + "event_type":"http", + "http":{ + "hostname":"supportingfunctions.us.newsweaver.com", + "http_port":443 + } + } + + will be converted to + + { + "event_type":"http", + "http.hostname":"supportingfunctions.us.newsweaver.com", + "http.http_port":443 + } + """ + modified = {} + + def flatten(x, name=''): + """ + Recursive function to flatten the nested dictionaries. + """ + if type(x) is dict: + for val in x: + flatten(x[val], name + val + '.') + elif type(x) is list: + i = 0 + for val in x: + name = name.rstrip('.') + flatten(val, name + '[' + str(i) + ']' + '.') + i += 1 + else: + if name[:-1] not in ["flow.start", "flow.end"]: + modified[name[:-1]] = x + + flatten(eve_dict) + return modified + + +def create_directory(path): + """ + Helper function to create output directories. + """ + dir_path = os.path.join(args["output_path"], path) + try: + os.mkdir(dir_path) + except OSError as e: + logger.error(e) + sys.exit(1) + + +def write_to_file(data): + """ + Check for the output test.yaml file if it exists, else create one and write + to it. + """ + test_yaml_path = os.path.join(test_dir, "test.yaml") + try: + os.remove(test_yaml_path) + except FileNotFoundError: + logger.info("{} not found. Creating...".format(test_yaml_path)) + except IsADirectoryError as e: + logger.error(e) + sys.exit(1) + with open(test_yaml_path, "w+") as fp: + fp.write("# *** Add configuration here ***\n\n") + fp.write(data) + + +def test_yaml_format(func): + """ + Decorator to convert all events to test.yaml format. + """ + def decorated(*args, **kwargs): + """ + Add filters per event in eve.json. + """ + eve_list = func(kwargs["eve_ds"]) + all_eve_list = [] + for item in eve_list: + eve_dict = { + "filter": { + "count": item[1], + "match": modify_eve_dict(item[0]), + }, + } + all_eve_list.append(eve_dict) + all_eve_dict = {"checks": all_eve_list} + all_eve_yaml = yaml.dump(all_eve_dict, default_flow_style=False) + return all_eve_yaml + return decorated + + +def is_valid_suri_directory(): + """ + Check if the current working directory is a valid Suricata source + directory by checking presence of suricata.yaml and `src/suricata`. + """ + if not (os.path.exists(suricata_yaml) and + os.path.exists(suricata_bin)): + logger.error("This is not a Suricata source directory or " + + "Suricata is not built") + return False + return True + + +def get_manipulated_list(): + """ + Manipulate eve.json to load json successfully and skip the fields + mentioned in `skip_fields` variable. + """ + eve_path = os.path.join(test_dir, "output", "eve.json") + allow_events = args["allow_events"].strip().split(",") if args["allow_events"] else [] + with open(eve_path, "r") as fp: + content = fp.read() + content_list = content.strip().split("\n") + jcontent_list = [json.loads(e) for e in content_list] + all_content_list = [] + for e in jcontent_list: + md = {k: v for k, v in e.items() if k not in skip_fields} + if "event_type" in md and md["event_type"] == "stats": + continue + all_content_list.append(md) + if allow_events: + def_eve_content_list = [item for item in all_content_list if item["event_type"] in allow_events] + if not def_eve_content_list: + logger.error("No matching events found.") + sys.exit(0) + return def_eve_content_list + return all_content_list + + +def filter_event_type_params(eve_rules): + """ + Create a filter block based on *ALL* the parameters of any event. + """ + mlist = get_manipulated_list() + all_eve_list = get_all_eve_list(eve_ds=mlist) + write_to_file(data=all_eve_list) + + +def filter_event_type(event_types): + """ + Filter based *ONLY* on the event types. + """ + all_eve_list = get_eve_list_by_type(eve_ds=event_types) + write_to_file(data=all_eve_list) + + +@test_yaml_format +def get_eve_list_by_type(eve_ds): + eve_list = [] + for k, v in eve_ds.items(): + key = {"event_type": k} + eve_list.append((key, v)) + return eve_list + + +@test_yaml_format +def get_all_eve_list(eve_ds): + eve_list = [] + for item in eve_ds: + eve_list.append((item, 1)) + return eve_list + + +def get_suricata_yaml_path(): + """ + Return the path to the suricata.yaml particular to the current test. + """ + if os.path.exists(os.path.join(cwd, "suricata.yaml")): + return os.path.join(cwd, "suricata.yaml") + return os.path.join(test_dir, "suricata.yaml") + + +def create_local_args(): + """ + Return a list of all the arguments required to be passed to a particular + local test. + """ + test_output_dir = os.path.join(test_dir, "output") + pcap_path = os.path.join(test_dir, "input.pcap") + # Copy PCAP to the test directory + copyfile(args["pcap"], pcap_path) + + largs = [ + os.path.join(cwd, "src/suricata"), + '-r', pcap_path, + '-l', test_output_dir, + "--init-errors-fatal", + "-c", get_suricata_yaml_path(), + ] + # In Suricata 5.0 the classification.config and + # reference.config were moved into the etc/ directory. For now + # check there and the top level directory to still support + # 4.1. + classification_configs = [ + os.path.join(cwd, "etc", "classification.config"), + os.path.join(cwd, "classification.config"), + ] + + for config in classification_configs: + if os.path.exists(config): + largs += ["--set", "classification-file=%s" % config] + break + + reference_configs = [ + os.path.join(cwd, "etc", "reference.config"), + os.path.join(cwd, "reference.config"), + ] + + for config in reference_configs: + if os.path.exists(config): + largs += ["--set", "reference-config-file=%s" % config] + break + + # Check if rules were provided with args + if args["rules"]: + rules_path = os.path.join(test_dir, "test.rules") + copyfile(args["rules"], rules_path) + largs += ["-S", rules_path] + else: + largs.append("--disable-detection") + + return largs + + +def create_env(): + """ + Return environment for the test being created. + """ + test_output_dir = os.path.join(test_dir, "output") + extraenv = { + # The suricata source directory. + "SRCDIR": cwd, + "TZ": "UTC", + "output_dir": test_output_dir, + "ASAN_OPTIONS": "detect_leaks=0", + } + env = os.environ.copy() + env.update(extraenv) + + return env + + +## Parser +def parse_args(): + """ + Parse arguments and return them to main for processing. + """ + parser = argparse.ArgumentParser( + description="Create tests with a given PCAP. Execute the script" + " from a valid Suricata source directory.") + parser.add_argument("test-name", metavar="", + help="Name of the test folder") + parser.add_argument("pcap", metavar="", + help="Path to the PCAP file") + parser.add_argument("--rules", metavar="", + help="Path to rule file") + parser.add_argument("--output-path", default=DEFAULT_TEST_DIR, metavar="", + help="Path to the folder where generated test.yaml should be put") + parser.add_argument("--eventtype-only", default=None, action="store_true", + help="Create filter blocks based on event types only") + parser.add_argument("--allow-events", nargs="?", default=None, + help="Create filter blocks for the specified events") + + # add arg to allow stdout only + args = parser.parse_args() + + return args + + +def eve2test(): + """ + Process the provided eve.json file and write the required checks in the + provided output file. + """ + logger.info("Running eve2test...") + content = list() + event_types = defaultdict(int) + eve_path = os.path.join(test_dir, "output", "eve.json") + with open(eve_path, "r") as fp: + for line in fp: + eve_rule = json.loads(line) + content.append(eve_rule) + eve_type = eve_rule.get("event_type") + event_types[eve_type] += 1 + if args["eventtype_only"]: + if args["allow_events"]: + logger.warning("--allow-events shall not be used with --eventtype-only") + filter_event_type(event_types=event_types) + return + filter_event_type_params(eve_rules=content) + + +def generate_eve(): + """ + Create eve.json with Suricata as per the given configuration and PCAP. + In case of successful run, call eve2test to convert eve.json thus + created into test.yaml. + """ + largs = create_local_args() + env = create_env() + + p = subprocess.Popen( + largs, cwd=cwd, env=env, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + _, stderr = p.communicate() + exit_status = p.wait() + if exit_status == 0: + eve2test() + else: + logger.error(stderr.decode("utf-8")) + sys.exit(1) + + +def main(): + init_logger() + + # Check if its a valid Suricata directory + if not is_valid_suri_directory(): + sys.exit(1) + + init_global_params() + create_directory(path=args["test-name"]) + create_directory(path=os.path.join(args["test-name"], "output")) + generate_eve() + + +if __name__ == "__main__": + main()