import queue
import signal
import socket
+import sqlite3
import subprocess
import sys
# Store the reporter
self.reporter = reporter
+ # Open the database
+ self.db = self._open_database()
+
@property
def config(self):
"""
"""
return self.reporter.config
+ def _open_database(self):
+ """
+ Opens the database
+ """
+ # Fetch the path
+ path = self.config.get("DEFAULT", "database",
+ fallback="/var/log/suricata/reporter.db")
+
+ # Open the database
+ db = sqlite3.connect(path)
+
+ # Enable the write-ahead-log
+ db.execute("PRAGMA journal_mode = WAL")
+
+ # Create the schema
+ db.executescript("""
+ -- Create the main table
+ CREATE TABLE IF NOT EXISTS alerts (
+ id INTEGER PRIMARY KEY,
+
+ -- Store the timestamp
+ timestamp INTEGER NOT NULL,
+
+ -- Store the entire JSON object
+ event JSONB NOT NULL
+ );
+
+ -- Index alerts by their timestamp
+ CREATE INDEX IF NOT EXISTS alerts_timestamp ON alerts(timestamp);
+ """)
+
+ return db
+
def run(self):
"""
This is the main entry point for workers...
# Process the event
self.process(event)
+ # Optimize the database before exiting
+ log.debug("Optimizing the database")
+
+ self.db.execute("PRAGMA optimize")
+ self.db.execute("PRAGMA wal_checkpoint = TRUNCATE")
+
log.debug("Worker %s terminated" % self.pid)
def process(self, event):
# Log the event
log.debug("Received alert: %s" % event)
+ # Write the event to the database
+ self.db.execute("INSERT INTO alerts(timestamp, event) VALUES(?, ?)",
+ (event.timestamp.timestamp(), event.json))
+ self.db.commit()
+
# Send to syslog
if self.config.getboolean("syslog", "enabled", fallback=False):
self.send_to_syslog(event)
def __str__(self):
return "%s" % self.data
+ @property
+ def json(self):
+ """
+ Returns all the data serialised as JSON
+ """
+ return json.dumps(self.data)
+
@property
def type(self):
return self.data.get("event_type")