]> git.ipfire.org Git - pakfire.git/commitdiff
hub: Refactor message handling
authorMichael Tremer <michael.tremer@ipfire.org>
Wed, 26 Apr 2023 17:19:57 +0000 (17:19 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Wed, 26 Apr 2023 17:19:57 +0000 (17:19 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/pakfire/daemon.py
src/pakfire/hub.py

index 561301f86c1d733ccf96c10dd8922cf80c75477f..43f79971d7edc3976cd6824893ae4559c93d00d1 100644 (file)
@@ -227,7 +227,7 @@ class Worker(multiprocessing.Process):
                        raise ValueError("Did not received a package URL")
 
                # Connect to the hub
-               self.job = await self.hub.job(job_id, abort_callback=self.abort)
+               self.job = await self.hub.job(job_id, worker=self)
 
                # Setup build logger
                logger = BuildLogger(self.log, self.job)
index e7459b1ac784eb0cf003e9482b988a0c43000a4c..3e95b864725a15d9b76c8de8b87cdf393de80616 100644 (file)
@@ -422,6 +422,9 @@ class HubObject(object):
                # The active connection
                self.conn = None
 
+               # Callbacks
+               self.callbacks = {}
+
                # Perform custom initialization
                self.init(*args, **kwargs)
 
@@ -490,8 +493,36 @@ class HubObject(object):
                if self.conn:
                        self.conn.close()
 
-       async def on_message_callback(self, message):
-               raise NotImplementedError
+       def on_message_callback(self, message):
+               # Fail if no callbacks have been set
+               if not self.callbacks:
+                       raise NotImplementedError
+
+               # Decode the message
+               message = self.hub._decode_json_message(message)
+
+               # Ignore empty messages
+               if message is None:
+                       return
+
+               # Log the received message
+               log.debug("Received message:\n%s" % json.dumps(message, indent=4))
+
+               # Fetch the message type & data
+               type = message.get("type")
+               data = message.get("data")
+
+               # Find a suitable callback
+               try:
+                       callback = self.callbacks[type]
+
+               # Log an error for unknown messages and ignore them
+               except KeyError:
+                       log.error("Received message of unknown type '%s'" % type)
+                       return
+
+               # Call the callback
+               callback(data)
 
        async def write_message(self, message, **kwargs):
                """
@@ -521,34 +552,17 @@ class ControlConnection(HubObject):
        def init(self, daemon):
                self.daemon = daemon
 
+               # Callbacks
+               self.callbacks = {
+                       "job" : self.daemon.job_received,
+               }
+
                # Fetch processor information
                self.cpu = cpuinfo.get_cpu_info()
 
                # Fetch the native architecture
                self.native_arch = _pakfire.native_arch()
 
-       def on_message_callback(self, message):
-               message = self.hub._decode_json_message(message)
-
-               # Ignore empty messages
-               if message is None:
-                       return
-
-               # Log the received message
-               log.debug("Received message:\n%s" % json.dumps(message, indent=4))
-
-               # Fetch the message type & data
-               type = message.get("type")
-               data = message.get("data")
-
-               # Handle jobs
-               if type == "job":
-                       self.daemon.job_received(data)
-
-               # Log an error for unknown messages and ignore them
-               else:
-                       log.error("Received message of unknown type '%s'" % type)
-
        async def submit_stats(self):
                """
                        Sends stats about this builder
@@ -617,43 +631,18 @@ class JobControlConnection(HubObject):
        """
                Proxy for Build Jobs
        """
-       def init(self, id, abort_callback=None):
+       def init(self, id, worker):
                self.id = id
 
+               # Callbacks
                self.callbacks = {
-                       "abort" : abort_callback,
+                       "abort" : worker.abort,
                }
 
        @property
        def url(self):
                return "/api/v1/jobs/%s" % self.id
 
-       def on_message_callback(self, message):
-               message = self.hub._decode_json_message(message)
-
-               # Ignore empty messages
-               if message is None:
-                       return
-
-               # Log the received message
-               log.debug("Received message:\n%s" % json.dumps(message, indent=4))
-
-               # Fetch the message type & data
-               type = message.get("type")
-               data = message.get("data")
-
-               # Find a suitable callback
-               try:
-                       callback = self.callbacks[type]
-
-               # Log an error for unknown messages and ignore them
-               except KeyError:
-                       log.error("Received message of unknown type '%s'" % type)
-                       return
-
-               # Call the callback
-               callback(data)
-
        async def finished(self, success, packages=None, logfile=None):
                """
                        Will tell the hub that a job has finished