try:
import simplejson as json
-except:
+except ImportError:
import json
-import re
import readline
from socket import socket, AF_UNIX, error
-from time import sleep
import select
import sys
VERSION = "0.2"
INC_SIZE = 1024
+
class SuricataException(Exception):
"""
Generic class for suricatasc exception
"""
def __init__(self, value):
+ super(SuricataException, self).__init__(value)
self.value = value
def __str__(self):
return str(self.value)
+
class SuricataNetException(SuricataException):
"""
Exception raised when network error occur.
"""
- pass
+
class SuricataCommandException(SuricataException):
"""
Exception raised when command is not correct.
"""
- pass
+
class SuricataReturnException(SuricataException):
"""
Exception raised when return message is not correct.
"""
- pass
class SuricataCompleter:
class SuricataSC:
def __init__(self, sck_path, verbose=False):
- self.cmd_list=['shutdown','quit','pcap-file','pcap-file-continuous','pcap-file-number','pcap-file-list','pcap-last-processed','pcap-interrupt','iface-list','iface-stat','register-tenant','unregister-tenant','register-tenant-handler','unregister-tenant-handler', 'add-hostbit', 'remove-hostbit', 'list-hostbit', 'memcap-set', 'memcap-show']
+ self.cmd_list = ['shutdown', 'quit', 'pcap-file', 'pcap-file-continuous', 'pcap-file-number', 'pcap-file-list', 'pcap-last-processed', 'pcap-interrupt', 'iface-list', 'iface-stat', 'register-tenant', 'unregister-tenant', 'register-tenant-handler', 'unregister-tenant-handler', 'add-hostbit', 'remove-hostbit', 'list-hostbit', 'memcap-set', 'memcap-show']
self.sck_path = sck_path
self.verbose = verbose
+ self.socket = socket(AF_UNIX)
def json_recv(self):
cmdret = None
break
return cmdret
- def send_command(self, command, arguments = None):
+ def send_command(self, command, arguments=None):
if command not in self.cmd_list and command != 'command-list':
raise SuricataCommandException("No such command: %s", command)
cmdmsg = {}
cmdmsg['command'] = command
- if (arguments != None):
+ if arguments:
cmdmsg['arguments'] = arguments
if self.verbose:
print("SND: " + json.dumps(cmdmsg))
else:
cmdret = None
- if cmdret == None:
+ if not cmdret:
raise SuricataReturnException("Unable to get message from server")
if self.verbose:
def connect(self):
try:
- self.socket = socket(AF_UNIX)
self.socket.connect(self.sck_path)
except error as err:
raise SuricataNetException(err)
else:
cmdret = None
- if cmdret == None:
+ if not cmdret:
raise SuricataReturnException("Unable to get message from server")
if self.verbose:
arguments = {}
arguments["filename"] = filename
arguments["output-dir"] = output
- if tenant != None:
+ if tenant:
arguments["tenant"] = int(tenant)
- if continuous != None:
+ if continuous:
arguments["continuous"] = continuous
- if delete_when_done != None:
+ if delete_when_done:
arguments["delete-when-done"] = delete_when_done
elif "pcap-file-continuous " in command:
try:
arguments["filename"] = filename
arguments["output-dir"] = output
arguments["continuous"] = True
- if tenant != None:
+ if tenant:
arguments["tenant"] = int(tenant)
- if delete_when_done != None:
+ if delete_when_done:
arguments["delete-when-done"] = delete_when_done
elif "iface-stat" in command:
try:
arguments = {}
arguments["id"] = int(tenantid)
arguments["htype"] = htype
- if hargs != None:
+ if hargs:
arguments["hargs"] = int(hargs)
elif "register-tenant-handler" in command:
try:
arguments = {}
arguments["id"] = int(tenantid)
arguments["htype"] = htype
- if hargs != None:
+ if hargs:
arguments["hargs"] = int(hargs)
elif "unregister-tenant" in command:
try:
else:
command = input(">>> ").strip()
if command == "quit":
- break;
+ break
try:
(cmd, arguments) = self.parse_command(command)
except SuricataCommandException as err: