From: Michael Tremer Date: Sat, 9 Aug 2025 14:14:53 +0000 (+0100) Subject: suricata-reporter: Add ability to reload the configuration X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=7be9123884f7c977e8922b296711b357dd8b3556;p=ipfire-2.x.git suricata-reporter: Add ability to reload the configuration Signed-off-by: Michael Tremer --- diff --git a/config/suricata/suricata-reporter b/config/suricata/suricata-reporter index b711d9fcc..01b9d5151 100644 --- a/config/suricata/suricata-reporter +++ b/config/suricata/suricata-reporter @@ -55,19 +55,11 @@ class Reporter(object): """ This is the main class that handles all the things... """ - def __init__(self, config): - # Parse the configuration file - self.config = configparser.ConfigParser() - self.config.read(config) - - # Fetch CPU count - workers = self.config.getint("DEFAULT", "workers", - fallback=multiprocessing.cpu_count()) + def __init__(self, config_path): + self.config_path = config_path - # Check if workers are a positive number - if workers < 1: - log.error("Invalid number of workers: %s" % workers) - raise SystemExit(1) + # Parse the configuration file + self.config = self.read_config() # Fetch the current event loop self.loop = asyncio.get_running_loop() @@ -78,18 +70,28 @@ class Reporter(object): # Create an events queue self.queue = multiprocessing.Queue(1024) - # Create as many workers as we have processors - self.workers = [ - Worker(reporter=self) for _ in range(workers) - ] + # Keep references to our workers + self.workers = [] # Register any signals for signo in (signal.SIGINT, signal.SIGTERM): self.loop.add_signal_handler(signo, self.terminate) + # Reload the configuration on SIGHUP + self.loop.add_signal_handler(signal.SIGHUP, self.reload) + # Create the socket self.sock = self._create_socket() + def read_config(self): + """ + Reads or re-reads the configuration. + """ + config = configparser.ConfigParser() + config.read(self.config_path) + + return config + @property def socket_path(self): return self.config.get("DEFAULT", "socket", @@ -142,19 +144,73 @@ class Reporter(object): # Return the socket return sock + def launch_workers(self): + """ + Launches workers... + """ + cpu_count = multiprocessing.cpu_count() + + # Fetch CPU count + workers = self.config.getint("DEFAULT", "workers", fallback=cpu_count) + + # Reset workers if a negative number was provided + if workers < 1: + workers = cpu_count + + # Create as many workers as we have processors + self.workers = [ + Worker(reporter=self) for _ in range(workers) + ] + + # Start them all immediately + for worker in self.workers: + worker.start() + + def terminate_workers(self): + """ + Terminates all running workers. + """ + log.debug("Terminating workers...") + + # Terminate all workers + for worker in self.workers: + worker.terminate() + + log.debug("Waiting for all workers to terminate...") + + # Wait until all workers have terminated + for worker in self.workers: + worker.join() + + log.debug("All workers have terminated...") + + # Reset the workers + self.workers = [] + async def run(self): """ The main loop of the application. """ log.debug("Starting reporter...") - # Start all workers - for worker in self.workers: - worker.start() + # Launch all workers + self.launch_workers() # Wait until we have terminated await self.is_terminated.wait() + # Remove the socket so we won't receive any more data + try: + os.unlink(self.socket_path) + except OSError as e: + log.error("Failed to remove %s: %s" % (self.socket_path, e)) + + # Close the queue + self.queue.close() + + # Terminate all workers + self.terminate_workers() + log.debug("Reporter has exited") def terminate(self): @@ -166,22 +222,20 @@ class Reporter(object): # We are no longer running self.is_terminated.set() - # Remove the socket so we won't receive any more data - try: - os.unlink(self.socket_path) - except OSError as e: - log.error("Failed to remove %s: %s" % (self.socket_path, e)) + def reload(self): + """ + Called on SIGHUP. + """ + log.info("Reloading...") - # Close the queue - self.queue.close() + # Re-read the configuration + self.config = self.read_config() # Terminate all workers - for worker in self.workers: - worker.terminate() + self.terminate_workers() - # Wait until all workers have terminated - for worker in self.workers: - worker.join() + # Launch a new set of workers + self.launch_workers() def _receive_message(self, sock): """