From a04ccbf5afe42714209d87203b1c298c30f72229 Mon Sep 17 00:00:00 2001 From: Michael Tremer Date: Wed, 26 Apr 2023 17:19:57 +0000 Subject: [PATCH] hub: Refactor message handling Signed-off-by: Michael Tremer --- src/pakfire/daemon.py | 2 +- src/pakfire/hub.py | 93 +++++++++++++++++++------------------------ 2 files changed, 42 insertions(+), 53 deletions(-) diff --git a/src/pakfire/daemon.py b/src/pakfire/daemon.py index 561301f86..43f79971d 100644 --- a/src/pakfire/daemon.py +++ b/src/pakfire/daemon.py @@ -227,7 +227,7 @@ class Worker(multiprocessing.Process): raise ValueError("Did not received a package URL") # Connect to the hub - self.job = await self.hub.job(job_id, abort_callback=self.abort) + self.job = await self.hub.job(job_id, worker=self) # Setup build logger logger = BuildLogger(self.log, self.job) diff --git a/src/pakfire/hub.py b/src/pakfire/hub.py index e7459b1ac..3e95b8647 100644 --- a/src/pakfire/hub.py +++ b/src/pakfire/hub.py @@ -422,6 +422,9 @@ class HubObject(object): # The active connection self.conn = None + # Callbacks + self.callbacks = {} + # Perform custom initialization self.init(*args, **kwargs) @@ -490,8 +493,36 @@ class HubObject(object): if self.conn: self.conn.close() - async def on_message_callback(self, message): - raise NotImplementedError + def on_message_callback(self, message): + # Fail if no callbacks have been set + if not self.callbacks: + raise NotImplementedError + + # Decode the message + message = self.hub._decode_json_message(message) + + # Ignore empty messages + if message is None: + return + + # Log the received message + log.debug("Received message:\n%s" % json.dumps(message, indent=4)) + + # Fetch the message type & data + type = message.get("type") + data = message.get("data") + + # Find a suitable callback + try: + callback = self.callbacks[type] + + # Log an error for unknown messages and ignore them + except KeyError: + log.error("Received message of unknown type '%s'" % type) + return + + # Call the callback + callback(data) async def write_message(self, message, **kwargs): """ @@ -521,34 +552,17 @@ class ControlConnection(HubObject): def init(self, daemon): self.daemon = daemon + # Callbacks + self.callbacks = { + "job" : self.daemon.job_received, + } + # Fetch processor information self.cpu = cpuinfo.get_cpu_info() # Fetch the native architecture self.native_arch = _pakfire.native_arch() - def on_message_callback(self, message): - message = self.hub._decode_json_message(message) - - # Ignore empty messages - if message is None: - return - - # Log the received message - log.debug("Received message:\n%s" % json.dumps(message, indent=4)) - - # Fetch the message type & data - type = message.get("type") - data = message.get("data") - - # Handle jobs - if type == "job": - self.daemon.job_received(data) - - # Log an error for unknown messages and ignore them - else: - log.error("Received message of unknown type '%s'" % type) - async def submit_stats(self): """ Sends stats about this builder @@ -617,43 +631,18 @@ class JobControlConnection(HubObject): """ Proxy for Build Jobs """ - def init(self, id, abort_callback=None): + def init(self, id, worker): self.id = id + # Callbacks self.callbacks = { - "abort" : abort_callback, + "abort" : worker.abort, } @property def url(self): return "/api/v1/jobs/%s" % self.id - def on_message_callback(self, message): - message = self.hub._decode_json_message(message) - - # Ignore empty messages - if message is None: - return - - # Log the received message - log.debug("Received message:\n%s" % json.dumps(message, indent=4)) - - # Fetch the message type & data - type = message.get("type") - data = message.get("data") - - # Find a suitable callback - try: - callback = self.callbacks[type] - - # Log an error for unknown messages and ignore them - except KeyError: - log.error("Received message of unknown type '%s'" % type) - return - - # Call the callback - callback(data) - async def finished(self, success, packages=None, logfile=None): """ Will tell the hub that a job has finished -- 2.39.5