import select
import sys
-SURICATASC_VERSION = "1.0"
+from suricata.sc.specs import argsd
+SURICATASC_VERSION = "1.0"
VERSION = "0.2"
INC_SIZE = 1024
class SuricataNetException(SuricataException):
"""
- Exception raised when network error occur.
+ Exception raised when a network error occurs
"""
class SuricataCommandException(SuricataException):
"""
- Exception raised when command is not correct.
+ Exception raised when the command is incorrect
"""
class SuricataReturnException(SuricataException):
"""
- Exception raised when return message is not correct.
+ Exception raised when return message is incorrect
"""
return None
return None
+
class SuricataSC:
def __init__(self, sck_path, verbose=False):
- self.cmd_list = ['shutdown', 'quit', 'pcap-file', 'pcap-file-continuous', 'pcap-file-number', 'pcap-file-list', 'pcap-last-processed', 'pcap-interrupt', 'iface-list', 'iface-stat', 'register-tenant', 'unregister-tenant', 'register-tenant-handler', 'unregister-tenant-handler', 'add-hostbit', 'remove-hostbit', 'list-hostbit', 'memcap-set', 'memcap-show']
+ self.basic_commands = [
+ "shutdown",
+ "quit",
+ "pcap-file-number",
+ "pcap-file-list",
+ "pcap-last-processed",
+ "pcap-interrupt",
+ "iface-list",
+ ]
+ self.fn_commands = [
+ "pcap-file ",
+ "pcap-file-continuous ",
+ "iface-stat",
+ "conf-get",
+ "unregister-tenant-handler",
+ "register-tenant-handler",
+ "unregister-tenant",
+ "register-tenant",
+ "reload-tenant",
+ "add-hostbit",
+ "remove-hostbit",
+ "list-hostbit",
+ "memcap-set",
+ "memcap-show",
+ ]
+ self.cmd_list = self.basic_commands + self.fn_commands
self.sck_path = sck_path
self.verbose = verbose
self.socket = socket(AF_UNIX)
def send_command(self, command, arguments=None):
if command not in self.cmd_list and command != 'command-list':
- raise SuricataCommandException("No such command: %s", command)
+ raise SuricataCommandException("Command not found: {}".format(command))
cmdmsg = {}
cmdmsg['command'] = command
cmdret = self.json_recv()
else:
cmdret = None
-
if not cmdret:
raise SuricataReturnException("Unable to get message from server")
self.cmd_list = cmdret["message"]["commands"]
self.cmd_list.append("quit")
-
def close(self):
self.socket.close()
+ def execute(self, command):
+ full_cmd = command.split()
+ cmd = full_cmd[0]
+ cmd_specs = argsd[cmd]
+ arguments = dict()
+ for c, spec in enumerate(cmd_specs, 1):
+ spec_type = str if "type" not in spec else spec["type"]
+ if spec["required"]:
+ try:
+ arguments[spec["name"]] = spec_type(full_cmd[c])
+ except IndexError:
+ raise SuricataCommandException("Missing arguments")
+ elif c < len(full_cmd):
+ arguments[spec["name"]] = spec_type(full_cmd[c])
+ return cmd, arguments
+
def parse_command(self, command):
arguments = None
- if command.split(' ', 2)[0] in self.cmd_list:
- if "pcap-file " in command:
- try:
- parts = command.split(' ')
- except:
- raise SuricataCommandException("Arguments to command '%s' is missing" % (command))
- cmd, filename, output = parts[0], parts[1], parts[2]
- tenant = None
- if len(parts) > 3:
- tenant = parts[3]
- continuous = None
- if len(parts) > 4:
- continuous = parts[4]
- delete_when_done = None
- if len(parts) > 5:
- delete_when_done = parts[5]
- if cmd != "pcap-file":
- raise SuricataCommandException("Invalid command '%s'" % (command))
- else:
- arguments = {}
- arguments["filename"] = filename
- arguments["output-dir"] = output
- if tenant:
- arguments["tenant"] = int(tenant)
- if continuous:
- arguments["continuous"] = continuous
- if delete_when_done:
- arguments["delete-when-done"] = delete_when_done
- elif "pcap-file-continuous " in command:
- try:
- parts = command.split(' ')
- except:
- raise SuricataCommandException("Arguments to command '%s' is missing" % (command))
- cmd, filename, output = parts[0], parts[1], parts[2]
- tenant = None
- if len(parts) > 3:
- tenant = parts[3]
- delete_when_done = None
- if len(parts) > 4:
- delete_when_done = parts[4]
- if cmd != "pcap-file":
- raise SuricataCommandException("Invalid command '%s'" % (command))
- else:
- arguments = {}
- arguments["filename"] = filename
- arguments["output-dir"] = output
- arguments["continuous"] = True
- if tenant:
- arguments["tenant"] = int(tenant)
- if delete_when_done:
- arguments["delete-when-done"] = delete_when_done
- elif "iface-stat" in command:
- try:
- [cmd, iface] = command.split(' ', 1)
- except:
- raise SuricataCommandException("Unable to split command '%s'" % (command))
- if cmd != "iface-stat":
- raise SuricataCommandException("Invalid command '%s'" % (command))
- else:
- arguments = {}
- arguments["iface"] = iface
- elif "conf-get" in command:
- try:
- [cmd, variable] = command.split(' ', 1)
- except:
- raise SuricataCommandException("Unable to split command '%s'" % (command))
- if cmd != "conf-get":
- raise SuricataCommandException("Invalid command '%s'" % (command))
- else:
- arguments = {}
- arguments["variable"] = variable
- elif "unregister-tenant-handler" in command:
- try:
- parts = command.split(' ')
- except:
- raise SuricataCommandException("Arguments to command '%s' is missing" % (command))
- cmd, tenantid, htype = parts[0], parts[1], parts[2]
- hargs = None
- if len(parts) > 3:
- hargs = parts[3]
- if cmd != "unregister-tenant-handler":
- raise SuricataCommandException("Invalid command '%s'" % (command))
- else:
- arguments = {}
- arguments["id"] = int(tenantid)
- arguments["htype"] = htype
- if hargs:
- arguments["hargs"] = int(hargs)
- elif "register-tenant-handler" in command:
- try:
- parts = command.split(' ')
- except:
- raise SuricataCommandException("Arguments to command '%s' is missing" % (command))
- cmd, tenantid, htype = parts[0], parts[1], parts[2]
- hargs = None
- if len(parts) > 3:
- hargs = parts[3]
- if cmd != "register-tenant-handler":
- raise SuricataCommandException("Invalid command '%s'" % (command))
- else:
- arguments = {}
- arguments["id"] = int(tenantid)
- arguments["htype"] = htype
- if hargs:
- arguments["hargs"] = int(hargs)
- elif "unregister-tenant" in command:
- try:
- [cmd, tenantid] = command.split(' ', 1)
- except:
- raise SuricataCommandException("Unable to split command '%s'" % (command))
- if cmd != "unregister-tenant":
- raise SuricataCommandException("Invalid command '%s'" % (command))
- else:
- arguments = {}
- arguments["id"] = int(tenantid)
- elif "register-tenant" in command:
- try:
- [cmd, tenantid, filename] = command.split(' ', 2)
- except:
- raise SuricataCommandException("Arguments to command '%s' is missing" % (command))
- if cmd != "register-tenant":
- raise SuricataCommandException("Invalid command '%s'" % (command))
- else:
- arguments = {}
- arguments["id"] = int(tenantid)
- arguments["filename"] = filename
- elif "reload-tenant" in command:
- try:
- [cmd, tenantid, filename] = command.split(' ', 2)
- except:
- raise SuricataCommandException("Arguments to command '%s' is missing" % (command))
- if cmd != "reload-tenant":
- raise SuricataCommandException("Invalid command '%s'" % (command))
- else:
- arguments = {}
- arguments["id"] = int(tenantid)
- arguments["filename"] = filename
- elif "add-hostbit" in command:
- try:
- [cmd, ipaddress, hostbit, expire] = command.split(' ')
- except:
- raise SuricataCommandException("Arguments to command '%s' is missing" % (command))
- if cmd != "add-hostbit":
- raise SuricataCommandException("Invalid command '%s'" % (command))
- else:
- arguments = {}
- arguments["ipaddress"] = ipaddress
- arguments["hostbit"] = hostbit
- arguments["expire"] = int(expire)
- elif "remove-hostbit" in command:
- try:
- [cmd, ipaddress, hostbit] = command.split(' ', 2)
- except:
- raise SuricataCommandException("Arguments to command '%s' is missing" % (command))
- if cmd != "remove-hostbit":
- raise SuricataCommandException("Invalid command '%s'" % (command))
- else:
- arguments = {}
- arguments["ipaddress"] = ipaddress
- arguments["hostbit"] = hostbit
- elif "list-hostbit" in command:
- try:
- [cmd, ipaddress] = command.split(' ')
- except:
- raise SuricataCommandException("Arguments to command '%s' is missing" % (command))
- if cmd != "list-hostbit":
- raise SuricataCommandException("Invalid command '%s'" % (command))
- else:
- arguments = {}
- arguments["ipaddress"] = ipaddress
- elif "memcap-set" in command:
- try:
- [cmd, config, memcap] = command.split(' ', 2)
- except:
- raise SuricataCommandException("Arguments to command '%s' is missing" % (command))
- if cmd != "memcap-set":
- raise SuricataCommandException("Invalid command '%s'" % (command))
- else:
- arguments = {}
- arguments["config"] = config
- arguments["memcap"] = memcap
- elif "memcap-show" in command:
- try:
- [cmd, config] = command.split(' ')
- except:
- raise SuricataCommandException("Arguments to command '%s' is missing" % (command))
- if cmd != "memcap-show":
- raise SuricataCommandException("Invalid command '%s'" % (command))
- else:
- arguments = {}
- arguments["config"] = config
+ cmd = command.split(maxsplit=2)[0]
+ if cmd in self.cmd_list:
+ if cmd in self.fn_commands:
+ cmd, arguments = getattr(self, "execute")(command=command)
else:
cmd = command
else:
- raise SuricataCommandException("Unknown command '%s'" % (command))
- return (cmd, arguments)
+ raise SuricataCommandException("Unknown command {}".format(command))
+ return cmd, arguments
def interactive(self):
print("Command list: " + ", ".join(self.cmd_list))
if command == "quit":
break
try:
- (cmd, arguments) = self.parse_command(command)
+ cmd, arguments = self.parse_command(command)
except SuricataCommandException as err:
print(err)
continue