# Fetch CPU count
cpu_count = multiprocessing.cpu_count()
+ # Fetch the current event loop
+ self.loop = asyncio.get_running_loop()
+
+ # Are we running?
+ self.is_running = multiprocessing.Event()
+
# Create an events queue
self.queue = multiprocessing.Queue(1024)
Worker(reporter=self) for _ in range(cpu_count)
]
+ # Register any signals
+ for signo in (signal.SIGINT, signal.SIGTERM):
+ self.loop.add_signal_handler(signo, self.terminate)
+
async def run(self):
"""
The main loop of the application.
"""
log.debug("Starting reporter...")
+ # We are now running
+ self.is_running.set()
+
# Start all workers
for worker in self.workers:
worker.start()
# Sleep for forever
- while True:
+ while self.is_running.is_set():
await asyncio.sleep(1)
# Write some data into the queue
self.queue.put("ABC", block=False)
+ log.debug("Reporter has exited")
+
+ def terminate(self):
+ """
+ Called when the reporter is supposed to terminate.
+ """
+ log.debug("Terminating...")
+
+ # We are no longer running
+ self.is_running.clear()
+
# Terminate all workers
for worker in self.workers:
worker.terminate()
for worker in self.workers:
worker.join()
- log.debug("Reporter has exited")
-
class Worker(multiprocessing.Process):
def __init__(self, reporter):
"""
log.debug("Worker %s launched" % self.pid)
+ # Reset signal handlers
+ for signo in (signal.SIGINT, signal.SIGTERM):
+ signal.signal(signo, signal.SIG_DFL)
+
# Loop for forever
- while True:
+ while self.reporter.is_running.is_set():
event = self.reporter.queue.get(block=True)
# Log the event