From 9216b62393ae523aa6c6cd1339a78b65d153e667 Mon Sep 17 00:00:00 2001 From: Michael Tremer Date: Wed, 26 Apr 2023 13:43:14 +0000 Subject: [PATCH] daemon: Use the new control connection for sending stats Signed-off-by: Michael Tremer --- src/pakfire/daemon.py | 22 +++++++---------- src/pakfire/hub.py | 57 +++++++++++++++++++++---------------------- 2 files changed, 37 insertions(+), 42 deletions(-) diff --git a/src/pakfire/daemon.py b/src/pakfire/daemon.py index 6c84c4726..986586416 100644 --- a/src/pakfire/daemon.py +++ b/src/pakfire/daemon.py @@ -39,7 +39,7 @@ class Daemon(object): # Connect to the Pakfire Hub self.hub = self.connect_to_hub() - self.queue = None + self.control = None # Set when this process receives a shutdown signal self._shutdown_signalled = None @@ -69,14 +69,14 @@ class Daemon(object): # Initialize shutdown signal self._shutdown_signalled = asyncio.Event() - # Send builder information - self.stats = await self.hub.builder_stats() - - # Join the job queue - self.queue = await self.hub.queue(self.job_received) + # Create the control connection + self.control = await self.hub.control(daemon=self) # Run main loop while True: + # Submit stats + await self.control.submit_stats() + # Check if we are running by awaiting the shutdown signal try: await asyncio.wait_for(self._shutdown_signalled.wait(), timeout=5) @@ -84,10 +84,6 @@ class Daemon(object): except asyncio.TimeoutError: pass - # Send some information about this builder - if self.stats: - await self.stats.submit() - # Main loop has ended, but we wait until all workers have finished. self.terminate_all_workers() @@ -106,9 +102,9 @@ class Daemon(object): self.log.info(_("Shutting down...")) self._shutdown_signalled.set() - # Close queue connection so we won't receive any new jobs - if self.queue: - self.queue.close() + # Close the control connection + if self.control: + self.control.close() def terminate_all_workers(self): """ diff --git a/src/pakfire/hub.py b/src/pakfire/hub.py index 8545eb2a6..baeeca550 100644 --- a/src/pakfire/hub.py +++ b/src/pakfire/hub.py @@ -380,24 +380,11 @@ class Hub(object): # Builder - async def builder_stats(self): + async def control(self, *args, **kwargs): """ - Sends information about this host to the hub. - - This information is something that doesn't change during - the lifetime of the daemon. - """ - return await self._proxy(BuilderStats) - - async def queue(self, job_received_callback): + Creates a control connection """ - Connects to the hub and asks for a build job - """ - on_message_callback = functools.partial( - self._decode_json_message, job_received_callback, - ) - - return await self._proxy(QueueProxy, on_message_callback=on_message_callback) + return await self._proxy(ControlConnection, *args, **kwargs) @staticmethod def _decode_json_message(callback, message): @@ -437,7 +424,7 @@ class HubObject(object): self.conn = None # Callbacks - self.on_message_callback = on_message_callback + self._on_message_callback = on_message_callback # Perform custom initialization self.init(*args, **kwargs) @@ -500,6 +487,17 @@ class HubObject(object): else: return + def close(self): + """ + Closes the connection + """ + if self.conn: + self.conn.close() + + def on_message_callback(self, message): + if self._on_message_callback: + return self._on_message_callback(message) + async def write_message(self, message, **kwargs): """ Sends a message but encodes it into JSON first @@ -522,27 +520,26 @@ class HubObject(object): return await self.write_message(message, **kwargs) -class QueueProxy(HubObject): - url = "/api/v1/jobs/queue" +class ControlConnection(HubObject): + url = "/api/v1/builders/control" + def init(self, daemon): + self.daemon = daemon -class BuilderStats(HubObject): - """ - Proxy for Builder Stats - """ - def init(self): # Fetch processor information self.cpu = cpuinfo.get_cpu_info() # Fetch the native architecture self.native_arch = _pakfire.native_arch() - @property - def url(self): - return "/api/v1/builders/stats" + def on_message_callback(self, message): + print(message) - async def submit(self): - log.debug("Sending stat message to hub...") + async def submit_stats(self): + """ + Sends stats about this builder + """ + log.debug("Sending stats...") # Fetch processor information cpu_times = psutil.cpu_times_percent() @@ -555,6 +552,8 @@ class BuilderStats(HubObject): loadavg = psutil.getloadavg() await self.write_message({ + "type" : "stats", + # CPU info "cpu_model" : self.cpu.get("brand_raw"), "cpu_count" : self.cpu.get("count"), -- 2.39.5