From: Jason Ish Date: Mon, 19 Feb 2018 13:47:28 +0000 (-0600) Subject: handle rules in spaces in addr and port lists X-Git-Tag: 1.0.0rc1~22 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=37c66d9683c4d0242dd3024aa063b8bf56ecdd73;p=thirdparty%2Fsuricata-update.git handle rules in spaces in addr and port lists Based on an issue reported to py-idstools. --- diff --git a/suricata/update/rule.py b/suricata/update/rule.py index 72990b7..c34eadd 100644 --- a/suricata/update/rule.py +++ b/suricata/update/rule.py @@ -36,40 +36,17 @@ import io logger = logging.getLogger(__name__) +# Compile an re pattern for basic rule matching. +rule_pattern = re.compile(r"^(?P#)*[\s#]*" + r"(?P" + r"(?P
[^()]+)" + r"\((?P.*)\)" + r"$)") + # Rule actions we expect to see. actions = ( "alert", "log", "pass", "activate", "dynamic", "drop", "reject", "sdrop") -# Compiled regular expression to detect a rule and break out some of -# its parts. -rule_pattern = re.compile( - r"^(?P#)*[\s#]*" # Enabled/disabled - r"(?P" - r"(?P
" - r"(?P%s)\s*" # Action - r"(?P[^\s]*)\s*" # Protocol - r"[^\s]*\s*" # Source address(es) - r"[^\s]*\s*" # Source port - r"(?P[-><]+)\s*" # Direction - r"[^\s]*\s*" # Destination address(es) - r"[^\s]*" # Destination port - r")" # End of header. - r"\s*" # Trailing spaces after header. - r"\((?P.*)\)\s*" # Options - r")" - % "|".join(actions)) - -# Another compiled pattern to detect preprocessor rules. We could -# construct the general rule re to pick this up, but its much faster -# this way. -decoder_rule_pattern = re.compile( - r"^(?P#)*[\s#]*" # Enabled/disabled - r"(?P" - r"(?P%s)\s*" # Action - r"\((?P.*)\)\s*" # Options - r")" - % "|".join(actions)) - class NoEndOfOptionError(Exception): """Exception raised when the end of option terminator (semicolon) is missing.""" @@ -222,18 +199,77 @@ def parse(buf, group=None): if type(buf) == type(b""): buf = buf.decode("utf-8") + buf = buf.strip() - m = rule_pattern.match(buf) or decoder_rule_pattern.match(buf) + m = rule_pattern.match(buf) if not m: - return + return None + + if m.group("enabled") == "#": + enabled = False + else: + enabled = True - rule = Rule(enabled=True if m.group("enabled") is None else False, - action=m.group("action"), - group=group) + header = m.group("header").strip() + + # If a decoder rule, the header will be one word. + if len(header.split(" ")) == 1: + action = header + direction = None + else: + states = ["action", + "proto", + "source_addr", + "source_port", + "direction", + "dest_addr", + "dest_port", + ] + state = 0 + + rem = header + while state < len(states): + if not rem: + return None + if rem[0] == "[": + end = rem.find("]") + if end < 0: + return + end += 1 + token = rem[:end].strip() + rem = rem[end:].strip() + else: + end = rem.find(" ") + if end < 0: + token = rem + rem = "" + else: + token = rem[:end].strip() + rem = rem[end:].strip() + + if states[state] == "action": + action = token + elif states[state] == "proto": + proto = token + elif states[state] == "source_addr": + source_addr = token + elif states[state] == "source_port": + source_port = token + elif states[state] == "direction": + direction = token + elif states[state] == "dest_addr": + dest_addr = token + elif states[state] == "dest_port": + dest_port = token + + state += 1 + + if action not in actions: + return None - rule["header"] = m.groupdict().get("header", None) - rule["proto"] = m.groupdict().get("proto", None) - rule["direction"] = m.groupdict().get("direction", None) + rule = Rule(enabled=enabled, action=action, group=group) + rule["direction"] = direction + rule["header"] = header options = m.group("options") diff --git a/tests/test_rule.py b/tests/test_rule.py index 5f8af04..e1a3510 100644 --- a/tests/test_rule.py +++ b/tests/test_rule.py @@ -193,3 +193,20 @@ alert dnp3 any any -> any any (msg:"SURICATA DNP3 Request flood detected"; \ self.assertRaises( suricata.update.rule.NoEndOfOptionError, suricata.update.rule.parse, rule_buf) + + def test_parse_addr_list(self): + """Test parsing rules where the addresses and parts are lists with + spaces.""" + + rule = suricata.update.rule.parse("""alert any [$HOME_NET, $OTHER_NET] any -> any any (msg:"TEST"; sid:1; rev:1;)""") + self.assertIsNotNone(rule) + + rule = suricata.update.rule.parse("""alert any [$HOME_NET, $OTHER_NET] [1,2,3] -> any any (msg:"TEST"; sid:1; rev:1;)""") + self.assertIsNotNone(rule) + + rule = suricata.update.rule.parse("""alert any [$HOME_NET, $OTHER_NET] [1,2,3] -> [!$XNET, $YNET] any (msg:"TEST"; sid:1; rev:1;)""") + self.assertIsNotNone(rule) + + rule = suricata.update.rule.parse("""alert any [$HOME_NET, $OTHER_NET] [1,2,3] -> [!$XNET, $YNET] [!2200] (msg:"TEST"; sid:1; rev:1;)""") + self.assertIsNotNone(rule) +