except ImportError:
import json
import readline
-from socket import socket, AF_UNIX, error
import select
import sys
+from socket import AF_UNIX, error, socket
from suricata.sc.specs import argsd
"iface-list",
]
self.fn_commands = [
- "pcap-file ",
- "pcap-file-continuous ",
+ "pcap-file",
+ "pcap-file-continuous",
"iface-stat",
"conf-get",
"unregister-tenant-handler",
full_cmd = command.split()
cmd = full_cmd[0]
cmd_specs = argsd[cmd]
+ required_args_count = len([d["required"] for d in cmd_specs if d["required"] and not "val" in d])
arguments = dict()
for c, spec in enumerate(cmd_specs, 1):
spec_type = str if "type" not in spec else spec["type"]
if spec["required"]:
+ if spec.get("val"):
+ arguments[spec["name"]] = spec_type(spec["val"])
+ continue
try:
arguments[spec["name"]] = spec_type(full_cmd[c])
except IndexError:
- raise SuricataCommandException("Missing arguments")
+ phrase = " at least" if required_args_count != len(cmd_specs) else ""
+ msg = "Missing arguments: expected{} {}".format(phrase, required_args_count)
+ raise SuricataCommandException(msg)
elif c < len(full_cmd):
arguments[spec["name"]] = spec_type(full_cmd[c])
return cmd, arguments
def parse_command(self, command):
arguments = None
- cmd = command.split(maxsplit=2)[0]
+ cmd = command.split(maxsplit=1)[0] if command else None
if cmd in self.cmd_list:
if cmd in self.fn_commands:
cmd, arguments = getattr(self, "execute")(command=command)