]> git.ipfire.org Git - thirdparty/suricata-update.git/commitdiff
handle rules in spaces in addr and port lists
authorJason Ish <ish@unx.ca>
Mon, 19 Feb 2018 13:47:28 +0000 (07:47 -0600)
committerJason Ish <ish@unx.ca>
Mon, 19 Feb 2018 13:47:28 +0000 (07:47 -0600)
Based on an issue reported to py-idstools.

suricata/update/rule.py
tests/test_rule.py

index 72990b7a982d448799688ed8f86c1a3341f62790..c34eadd633d6b825ce61ea172f9c648839f23145 100644 (file)
@@ -36,40 +36,17 @@ import io
 
 logger = logging.getLogger(__name__)
 
+# Compile an re pattern for basic rule matching.
+rule_pattern = re.compile(r"^(?P<enabled>#)*[\s#]*"
+                r"(?P<raw>"
+                r"(?P<header>[^()]+)"
+                r"\((?P<options>.*)\)"
+                r"$)")
+
 # Rule actions we expect to see.
 actions = (
     "alert", "log", "pass", "activate", "dynamic", "drop", "reject", "sdrop")
 
-# Compiled regular expression to detect a rule and break out some of
-# its parts.
-rule_pattern = re.compile(
-    r"^(?P<enabled>#)*[\s#]*"      # Enabled/disabled
-    r"(?P<raw>"
-    r"(?P<header>"
-    r"(?P<action>%s)\s*"        # Action
-    r"(?P<proto>[^\s]*)\s*"     # Protocol
-    r"[^\s]*\s*"                # Source address(es)
-    r"[^\s]*\s*"                # Source port
-    r"(?P<direction>[-><]+)\s*"        # Direction
-    r"[^\s]*\s*"                       # Destination address(es)
-    r"[^\s]*"                   # Destination port
-    r")"                        # End of header.
-    r"\s*"                      # Trailing spaces after header.
-    r"\((?P<options>.*)\)\s*"  # Options
-    r")"
-    % "|".join(actions))
-
-# Another compiled pattern to detect preprocessor rules.  We could
-# construct the general rule re to pick this up, but its much faster
-# this way.
-decoder_rule_pattern = re.compile(
-    r"^(?P<enabled>#)*[\s#]*"  # Enabled/disabled
-    r"(?P<raw>"
-    r"(?P<action>%s)\s*"           # Action
-    r"\((?P<options>.*)\)\s*"  # Options
-    r")"
-    % "|".join(actions))
-
 class NoEndOfOptionError(Exception):
     """Exception raised when the end of option terminator (semicolon) is
     missing."""
@@ -222,18 +199,77 @@ def parse(buf, group=None):
 
     if type(buf) == type(b""):
         buf = buf.decode("utf-8")
+    buf = buf.strip()
 
-    m = rule_pattern.match(buf) or decoder_rule_pattern.match(buf)
+    m = rule_pattern.match(buf)
     if not m:
-        return
+        return None
+
+    if m.group("enabled") == "#":
+        enabled = False
+    else:
+        enabled = True
 
-    rule = Rule(enabled=True if m.group("enabled") is None else False,
-                action=m.group("action"),
-                group=group)
+    header = m.group("header").strip()
+
+    # If a decoder rule, the header will be one word.
+    if len(header.split(" ")) == 1:
+        action = header
+        direction = None
+    else:
+        states = ["action",
+                  "proto",
+                  "source_addr",
+                  "source_port",
+                  "direction",
+                  "dest_addr",
+                  "dest_port",
+                  ]
+        state = 0
+
+        rem = header
+        while state < len(states):
+            if not rem:
+                return None
+            if rem[0] == "[":
+                end = rem.find("]")
+                if end < 0:
+                    return
+                end += 1
+                token = rem[:end].strip()
+                rem = rem[end:].strip()
+            else:
+                end = rem.find(" ")
+                if end < 0:
+                    token = rem
+                    rem = ""
+                else:
+                    token = rem[:end].strip()
+                    rem = rem[end:].strip()
+
+            if states[state] == "action":
+                action = token
+            elif states[state] == "proto":
+                proto = token
+            elif states[state] == "source_addr":
+                source_addr = token
+            elif states[state] == "source_port":
+                source_port = token
+            elif states[state] == "direction":
+                direction = token
+            elif states[state] == "dest_addr":
+                dest_addr = token
+            elif states[state] == "dest_port":
+                dest_port = token
+
+            state += 1
+
+    if action not in actions:
+        return None
 
-    rule["header"] = m.groupdict().get("header", None)
-    rule["proto"] = m.groupdict().get("proto", None)
-    rule["direction"] = m.groupdict().get("direction", None)
+    rule = Rule(enabled=enabled, action=action, group=group)
+    rule["direction"] = direction
+    rule["header"] = header
 
     options = m.group("options")
 
index 5f8af046bc37b9282cdd057af9982707a6642f43..e1a351079473b19c557560f1cebf469adc54090b 100644 (file)
@@ -193,3 +193,20 @@ alert dnp3 any any -> any any (msg:"SURICATA DNP3 Request flood detected"; \
         self.assertRaises(
             suricata.update.rule.NoEndOfOptionError,
             suricata.update.rule.parse, rule_buf)
+
+    def test_parse_addr_list(self):
+        """Test parsing rules where the addresses and parts are lists with
+        spaces."""
+    
+        rule = suricata.update.rule.parse("""alert any [$HOME_NET, $OTHER_NET] any -> any any (msg:"TEST"; sid:1; rev:1;)""")
+        self.assertIsNotNone(rule)
+        
+        rule = suricata.update.rule.parse("""alert any [$HOME_NET, $OTHER_NET] [1,2,3] -> any any (msg:"TEST"; sid:1; rev:1;)""")
+        self.assertIsNotNone(rule)
+
+        rule = suricata.update.rule.parse("""alert any [$HOME_NET, $OTHER_NET] [1,2,3] -> [!$XNET, $YNET] any (msg:"TEST"; sid:1; rev:1;)""")
+        self.assertIsNotNone(rule)
+
+        rule = suricata.update.rule.parse("""alert any [$HOME_NET, $OTHER_NET] [1,2,3] -> [!$XNET, $YNET] [!2200] (msg:"TEST"; sid:1; rev:1;)""")
+        self.assertIsNotNone(rule)
+