"""
This is the main class that handles all the things...
"""
- def __init__(self, config):
- # Parse the configuration file
- self.config = configparser.ConfigParser()
- self.config.read(config)
-
- # Fetch CPU count
- workers = self.config.getint("DEFAULT", "workers",
- fallback=multiprocessing.cpu_count())
+ def __init__(self, config_path):
+ self.config_path = config_path
- # Check if workers are a positive number
- if workers < 1:
- log.error("Invalid number of workers: %s" % workers)
- raise SystemExit(1)
+ # Parse the configuration file
+ self.config = self.read_config()
# Fetch the current event loop
self.loop = asyncio.get_running_loop()
# Create an events queue
self.queue = multiprocessing.Queue(1024)
- # Create as many workers as we have processors
- self.workers = [
- Worker(reporter=self) for _ in range(workers)
- ]
+ # Keep references to our workers
+ self.workers = []
# Register any signals
for signo in (signal.SIGINT, signal.SIGTERM):
self.loop.add_signal_handler(signo, self.terminate)
+ # Reload the configuration on SIGHUP
+ self.loop.add_signal_handler(signal.SIGHUP, self.reload)
+
# Create the socket
self.sock = self._create_socket()
+ def read_config(self):
+ """
+ Reads or re-reads the configuration.
+ """
+ config = configparser.ConfigParser()
+ config.read(self.config_path)
+
+ return config
+
@property
def socket_path(self):
return self.config.get("DEFAULT", "socket",
# Return the socket
return sock
+ def launch_workers(self):
+ """
+ Launches workers...
+ """
+ cpu_count = multiprocessing.cpu_count()
+
+ # Fetch CPU count
+ workers = self.config.getint("DEFAULT", "workers", fallback=cpu_count)
+
+ # Reset workers if a negative number was provided
+ if workers < 1:
+ workers = cpu_count
+
+ # Create as many workers as we have processors
+ self.workers = [
+ Worker(reporter=self) for _ in range(workers)
+ ]
+
+ # Start them all immediately
+ for worker in self.workers:
+ worker.start()
+
+ def terminate_workers(self):
+ """
+ Terminates all running workers.
+ """
+ log.debug("Terminating workers...")
+
+ # Terminate all workers
+ for worker in self.workers:
+ worker.terminate()
+
+ log.debug("Waiting for all workers to terminate...")
+
+ # Wait until all workers have terminated
+ for worker in self.workers:
+ worker.join()
+
+ log.debug("All workers have terminated...")
+
+ # Reset the workers
+ self.workers = []
+
async def run(self):
"""
The main loop of the application.
"""
log.debug("Starting reporter...")
- # Start all workers
- for worker in self.workers:
- worker.start()
+ # Launch all workers
+ self.launch_workers()
# Wait until we have terminated
await self.is_terminated.wait()
+ # Remove the socket so we won't receive any more data
+ try:
+ os.unlink(self.socket_path)
+ except OSError as e:
+ log.error("Failed to remove %s: %s" % (self.socket_path, e))
+
+ # Close the queue
+ self.queue.close()
+
+ # Terminate all workers
+ self.terminate_workers()
+
log.debug("Reporter has exited")
def terminate(self):
# We are no longer running
self.is_terminated.set()
- # Remove the socket so we won't receive any more data
- try:
- os.unlink(self.socket_path)
- except OSError as e:
- log.error("Failed to remove %s: %s" % (self.socket_path, e))
+ def reload(self):
+ """
+ Called on SIGHUP.
+ """
+ log.info("Reloading...")
- # Close the queue
- self.queue.close()
+ # Re-read the configuration
+ self.config = self.read_config()
# Terminate all workers
- for worker in self.workers:
- worker.terminate()
+ self.terminate_workers()
- # Wait until all workers have terminated
- for worker in self.workers:
- worker.join()
+ # Launch a new set of workers
+ self.launch_workers()
def _receive_message(self, sock):
"""