logger = logging.getLogger(__name__)
+# Compile an re pattern for basic rule matching.
+rule_pattern = re.compile(r"^(?P<enabled>#)*[\s#]*"
+ r"(?P<raw>"
+ r"(?P<header>[^()]+)"
+ r"\((?P<options>.*)\)"
+ r"$)")
+
# Rule actions we expect to see.
actions = (
"alert", "log", "pass", "activate", "dynamic", "drop", "reject", "sdrop")
-# Compiled regular expression to detect a rule and break out some of
-# its parts.
-rule_pattern = re.compile(
- r"^(?P<enabled>#)*[\s#]*" # Enabled/disabled
- r"(?P<raw>"
- r"(?P<header>"
- r"(?P<action>%s)\s*" # Action
- r"(?P<proto>[^\s]*)\s*" # Protocol
- r"[^\s]*\s*" # Source address(es)
- r"[^\s]*\s*" # Source port
- r"(?P<direction>[-><]+)\s*" # Direction
- r"[^\s]*\s*" # Destination address(es)
- r"[^\s]*" # Destination port
- r")" # End of header.
- r"\s*" # Trailing spaces after header.
- r"\((?P<options>.*)\)\s*" # Options
- r")"
- % "|".join(actions))
-
-# Another compiled pattern to detect preprocessor rules. We could
-# construct the general rule re to pick this up, but its much faster
-# this way.
-decoder_rule_pattern = re.compile(
- r"^(?P<enabled>#)*[\s#]*" # Enabled/disabled
- r"(?P<raw>"
- r"(?P<action>%s)\s*" # Action
- r"\((?P<options>.*)\)\s*" # Options
- r")"
- % "|".join(actions))
-
class NoEndOfOptionError(Exception):
"""Exception raised when the end of option terminator (semicolon) is
missing."""
if type(buf) == type(b""):
buf = buf.decode("utf-8")
+ buf = buf.strip()
- m = rule_pattern.match(buf) or decoder_rule_pattern.match(buf)
+ m = rule_pattern.match(buf)
if not m:
- return
+ return None
+
+ if m.group("enabled") == "#":
+ enabled = False
+ else:
+ enabled = True
- rule = Rule(enabled=True if m.group("enabled") is None else False,
- action=m.group("action"),
- group=group)
+ header = m.group("header").strip()
+
+ # If a decoder rule, the header will be one word.
+ if len(header.split(" ")) == 1:
+ action = header
+ direction = None
+ else:
+ states = ["action",
+ "proto",
+ "source_addr",
+ "source_port",
+ "direction",
+ "dest_addr",
+ "dest_port",
+ ]
+ state = 0
+
+ rem = header
+ while state < len(states):
+ if not rem:
+ return None
+ if rem[0] == "[":
+ end = rem.find("]")
+ if end < 0:
+ return
+ end += 1
+ token = rem[:end].strip()
+ rem = rem[end:].strip()
+ else:
+ end = rem.find(" ")
+ if end < 0:
+ token = rem
+ rem = ""
+ else:
+ token = rem[:end].strip()
+ rem = rem[end:].strip()
+
+ if states[state] == "action":
+ action = token
+ elif states[state] == "proto":
+ proto = token
+ elif states[state] == "source_addr":
+ source_addr = token
+ elif states[state] == "source_port":
+ source_port = token
+ elif states[state] == "direction":
+ direction = token
+ elif states[state] == "dest_addr":
+ dest_addr = token
+ elif states[state] == "dest_port":
+ dest_port = token
+
+ state += 1
+
+ if action not in actions:
+ return None
- rule["header"] = m.groupdict().get("header", None)
- rule["proto"] = m.groupdict().get("proto", None)
- rule["direction"] = m.groupdict().get("direction", None)
+ rule = Rule(enabled=enabled, action=action, group=group)
+ rule["direction"] = direction
+ rule["header"] = header
options = m.group("options")
self.assertRaises(
suricata.update.rule.NoEndOfOptionError,
suricata.update.rule.parse, rule_buf)
+
+ def test_parse_addr_list(self):
+ """Test parsing rules where the addresses and parts are lists with
+ spaces."""
+
+ rule = suricata.update.rule.parse("""alert any [$HOME_NET, $OTHER_NET] any -> any any (msg:"TEST"; sid:1; rev:1;)""")
+ self.assertIsNotNone(rule)
+
+ rule = suricata.update.rule.parse("""alert any [$HOME_NET, $OTHER_NET] [1,2,3] -> any any (msg:"TEST"; sid:1; rev:1;)""")
+ self.assertIsNotNone(rule)
+
+ rule = suricata.update.rule.parse("""alert any [$HOME_NET, $OTHER_NET] [1,2,3] -> [!$XNET, $YNET] any (msg:"TEST"; sid:1; rev:1;)""")
+ self.assertIsNotNone(rule)
+
+ rule = suricata.update.rule.parse("""alert any [$HOME_NET, $OTHER_NET] [1,2,3] -> [!$XNET, $YNET] [!2200] (msg:"TEST"; sid:1; rev:1;)""")
+ self.assertIsNotNone(rule)
+