import logging
import logging.handlers
import multiprocessing
+import os
import signal
+import socket
import sys
+SOCKET_PATH = "/var/run/suricata-reporter.socket"
+
log = logging.getLogger("suricata-reporter")
log.setLevel(logging.DEBUG)
for signo in (signal.SIGINT, signal.SIGTERM):
self.loop.add_signal_handler(signo, self.terminate)
+ # Create the socket
+ self.sock = self._create_socket()
+
+ def _create_socket(self):
+ """
+ Creates a new socket to receive messages on
+ """
+ # Create a new, non-blocking UNIX datagram socket
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM|socket.SOCK_NONBLOCK)
+
+ # Bind to the some path
+ try:
+ sock.bind(SOCKET_PATH)
+ except OSError as e:
+ log.error("Failed to bind to socket: %s" % e)
+
+ # Terminate immediately
+ raise SystemExit(1)
+
+ # Call something whenever we receive data on the socket
+ self.loop.add_reader(sock.fileno(), self._receive_message, sock)
+
+ # Return the socket
+ return sock
+
async def run(self):
"""
The main loop of the application.
# We are no longer running
self.is_running.clear()
+ # Remove the socket so we won't receive any more data
+ try:
+ os.unlink(SOCKET_PATH)
+ except OSError as e:
+ log.error("Failed to remove %s: %s" % (SOCKET_PATH, e))
+
# Terminate all workers
for worker in self.workers:
worker.terminate()
for worker in self.workers:
worker.join()
+ def _receive_message(self, sock):
+ """
+ Called when there is some socket activity.
+
+ It will read the entire datagram and push it into the queue.
+ """
+ # Read the data from the socket
+ data, _ = sock.recvfrom(65535)
+
+ # Push the data straight into the queue
+ try:
+ self.queue.put(data, block=False)
+
+ # Log a message if the queue is full
+ except queue.Full as e:
+ log.warning("Failed to push event into the queue. The queue seems to be full.")
+
class Worker(multiprocessing.Process):
def __init__(self, reporter):