]> git.ipfire.org Git - pakfire.git/commitdiff
daemon: Use the new control connection for sending stats
authorMichael Tremer <michael.tremer@ipfire.org>
Wed, 26 Apr 2023 13:43:14 +0000 (13:43 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Wed, 26 Apr 2023 15:14:29 +0000 (15:14 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/pakfire/daemon.py
src/pakfire/hub.py

index 6c84c472675b8b46d91ba91df856824c972bc4f7..9865864161c6f9134b181503c5706121901111c1 100644 (file)
@@ -39,7 +39,7 @@ class Daemon(object):
 
                # Connect to the Pakfire Hub
                self.hub = self.connect_to_hub()
-               self.queue = None
+               self.control = None
 
                # Set when this process receives a shutdown signal
                self._shutdown_signalled = None
@@ -69,14 +69,14 @@ class Daemon(object):
                # Initialize shutdown signal
                self._shutdown_signalled = asyncio.Event()
 
-               # Send builder information
-               self.stats = await self.hub.builder_stats()
-
-               # Join the job queue
-               self.queue = await self.hub.queue(self.job_received)
+               # Create the control connection
+               self.control = await self.hub.control(daemon=self)
 
                # Run main loop
                while True:
+                       # Submit stats
+                       await self.control.submit_stats()
+
                        # Check if we are running by awaiting the shutdown signal
                        try:
                                await asyncio.wait_for(self._shutdown_signalled.wait(), timeout=5)
@@ -84,10 +84,6 @@ class Daemon(object):
                        except asyncio.TimeoutError:
                                pass
 
-                       # Send some information about this builder
-                       if self.stats:
-                               await self.stats.submit()
-
                # Main loop has ended, but we wait until all workers have finished.
                self.terminate_all_workers()
 
@@ -106,9 +102,9 @@ class Daemon(object):
                self.log.info(_("Shutting down..."))
                self._shutdown_signalled.set()
 
-               # Close queue connection so we won't receive any new jobs
-               if self.queue:
-                       self.queue.close()
+               # Close the control connection
+               if self.control:
+                       self.control.close()
 
        def terminate_all_workers(self):
                """
index 8545eb2a69e7039e857e3d0641ccb8fec1be13ba..baeeca55068c05e4d61b6fa4fe640915d06594dc 100644 (file)
@@ -380,24 +380,11 @@ class Hub(object):
 
        # Builder
 
-       async def builder_stats(self):
+       async def control(self, *args, **kwargs):
                """
-                       Sends information about this host to the hub.
-
-                       This information is something that doesn't change during
-                       the lifetime of the daemon.
-               """
-               return await self._proxy(BuilderStats)
-
-       async def queue(self, job_received_callback):
+                       Creates a control connection
                """
-                       Connects to the hub and asks for a build job
-               """
-               on_message_callback = functools.partial(
-                       self._decode_json_message, job_received_callback,
-               )
-
-               return await self._proxy(QueueProxy, on_message_callback=on_message_callback)
+               return await self._proxy(ControlConnection, *args, **kwargs)
 
        @staticmethod
        def _decode_json_message(callback, message):
@@ -437,7 +424,7 @@ class HubObject(object):
                self.conn = None
 
                # Callbacks
-               self.on_message_callback = on_message_callback
+               self._on_message_callback = on_message_callback
 
                # Perform custom initialization
                self.init(*args, **kwargs)
@@ -500,6 +487,17 @@ class HubObject(object):
                        else:
                                return
 
+       def close(self):
+               """
+                       Closes the connection
+               """
+               if self.conn:
+                       self.conn.close()
+
+       def on_message_callback(self, message):
+               if self._on_message_callback:
+                       return self._on_message_callback(message)
+
        async def write_message(self, message, **kwargs):
                """
                        Sends a message but encodes it into JSON first
@@ -522,27 +520,26 @@ class HubObject(object):
                        return await self.write_message(message, **kwargs)
 
 
-class QueueProxy(HubObject):
-       url = "/api/v1/jobs/queue"
+class ControlConnection(HubObject):
+       url = "/api/v1/builders/control"
 
+       def init(self, daemon):
+               self.daemon = daemon
 
-class BuilderStats(HubObject):
-       """
-               Proxy for Builder Stats
-       """
-       def init(self):
                # Fetch processor information
                self.cpu = cpuinfo.get_cpu_info()
 
                # Fetch the native architecture
                self.native_arch = _pakfire.native_arch()
 
-       @property
-       def url(self):
-               return "/api/v1/builders/stats"
+       def on_message_callback(self, message):
+               print(message)
 
-       async def submit(self):
-               log.debug("Sending stat message to hub...")
+       async def submit_stats(self):
+               """
+                       Sends stats about this builder
+               """
+               log.debug("Sending stats...")
 
                # Fetch processor information
                cpu_times = psutil.cpu_times_percent()
@@ -555,6 +552,8 @@ class BuilderStats(HubObject):
                loadavg = psutil.getloadavg()
 
                await self.write_message({
+                       "type"            : "stats",
+
                        # CPU info
                        "cpu_model"       : self.cpu.get("brand_raw"),
                        "cpu_count"       : self.cpu.get("count"),