import select
import sys
from socket import AF_UNIX, error, socket
+from inspect import currentframe
from suricata.sc.specs import argsd
INC_SIZE = 1024
+def get_linenumber():
+ cf = currentframe()
+ return cf.f_back.f_lineno
+
+
class SuricataException(Exception):
"""
Generic class for suricatasc exception
def send_command(self, command, arguments=None):
if command not in self.cmd_list and command != 'command-list':
- raise SuricataCommandException("Command not found: {}".format(command))
+ raise SuricataCommandException("L{}: Command not found: {}".format(get_linenumber(), command))
cmdmsg = {}
cmdmsg['command'] = command
else:
cmdret = None
if not cmdret:
- raise SuricataReturnException("Unable to get message from server")
+ raise SuricataReturnException("L{}: Unable to get message from server".format(get_linenumber))
if self.verbose:
print("RCV: "+ json.dumps(cmdret))
self.socket = socket(AF_UNIX)
self.socket.connect(self.sck_path)
except error as err:
- raise SuricataNetException(err)
+ raise SuricataNetException("L{}: {}".format(get_linenumber(), err))
self.socket.settimeout(10)
#send version
cmdret = None
if not cmdret:
- raise SuricataReturnException("Unable to get message from server")
+ raise SuricataReturnException("L{}: Unable to get message from server".format(get_linenumber()))
if self.verbose:
print("RCV: "+ json.dumps(cmdret))
if cmdret["return"] == "NOK":
- raise SuricataReturnException("Error: %s" % (cmdret["message"]))
+ raise SuricataReturnException("L{}: Error: {}".format(get_linenumber(), cmdret["message"]))
cmdret = self.send_command("command-list")
except IndexError:
phrase = " at least" if required_args_count != len(cmd_specs) else ""
msg = "Missing arguments: expected{} {}".format(phrase, required_args_count)
- raise SuricataCommandException(msg)
+ raise SuricataCommandException("L{}: {}".format(get_linenumber(), msg))
except ValueError as ve:
- raise SuricataCommandException("Erroneous arguments: {}".format(ve))
+ raise SuricataCommandException("L{}: Erroneous arguments: {}".format(get_linenumber(), ve))
elif c < len(full_cmd):
arguments[spec["name"]] = spec_type(full_cmd[c])
return cmd, arguments
if cmd in self.fn_commands:
cmd, arguments = getattr(self, "execute")(command=command)
else:
- raise SuricataCommandException("Unknown command: {}".format(command))
+ raise SuricataCommandException("L{}: Unknown command: {}".format(get_linenumber(), command))
return cmd, arguments
def interactive(self):