From 2804140693b9efdb5e3fc2266e29d21205979b6e Mon Sep 17 00:00:00 2001 From: Michael Tremer Date: Wed, 26 Apr 2023 15:13:53 +0000 Subject: [PATCH] daemon: Receive jobs over the new control connection Signed-off-by: Michael Tremer --- src/pakfire/daemon.py | 8 --- src/pakfire/hub.py | 145 +++++++++++++++++++++++------------------- 2 files changed, 80 insertions(+), 73 deletions(-) diff --git a/src/pakfire/daemon.py b/src/pakfire/daemon.py index 986586416..3278d37b4 100644 --- a/src/pakfire/daemon.py +++ b/src/pakfire/daemon.py @@ -168,14 +168,6 @@ class Daemon(object): """ Called when this builder was assigned a new job """ - # Check for correct message type - if not job.get("message") == "job": - raise RuntimeError("Received a message of an unknown type:\n%s" % job) - - # Log what we have received - self.log.debug("Received job:") - self.log.debug("%s" % json.dumps(job, sort_keys=True, indent=4)) - # Launch a new worker worker = Worker(self, job) self.workers.append(worker) diff --git a/src/pakfire/hub.py b/src/pakfire/hub.py index baeeca550..615e1d231 100644 --- a/src/pakfire/hub.py +++ b/src/pakfire/hub.py @@ -378,16 +378,8 @@ class Hub(object): return "%s:%s" % (algo, h.hexdigest()) - # Builder - - async def control(self, *args, **kwargs): - """ - Creates a control connection - """ - return await self._proxy(ControlConnection, *args, **kwargs) - @staticmethod - def _decode_json_message(callback, message): + def _decode_json_message(message): """ Takes a received message and decodes it. @@ -403,29 +395,33 @@ class Hub(object): log.error("Could not decode JSON message:\n%s" % message) return - return callback(message) + return message + + # Builder + + async def control(self, *args, **kwargs): + """ + Creates a control connection + """ + return await self._proxy(ControlConnection, *args, **kwargs) async def job(self, job_id): """ - Connects to the given job + Creates a control connection for a certain job """ - # Return a Job proxy - return await self._proxy(Job, job_id) + return await self._proxy(JobControlConnection, job_id) class HubObject(object): # Disable Nagle's algorithm? nodelay = False - def __init__(self, hub, *args, on_message_callback=None, **kwargs): + def __init__(self, hub, *args, **kwargs): self.hub = hub # The active connection self.conn = None - # Callbacks - self._on_message_callback = on_message_callback - # Perform custom initialization self.init(*args, **kwargs) @@ -494,9 +490,8 @@ class HubObject(object): if self.conn: self.conn.close() - def on_message_callback(self, message): - if self._on_message_callback: - return self._on_message_callback(message) + async def on_message_callback(self, message): + raise NotImplementedError async def write_message(self, message, **kwargs): """ @@ -533,7 +528,26 @@ class ControlConnection(HubObject): self.native_arch = _pakfire.native_arch() def on_message_callback(self, message): - print(message) + message = self.hub._decode_json_message(message) + + # Ignore empty messages + if message is None: + return + + # Log the received message + log.debug("Received message:\n%s" % json.dumps(message, indent=4)) + + # Fetch the message type & data + type = message.get("type") + data = message.get("data") + + # Handle jobs + if type == "job": + self.daemon.job_received(data) + + # Log an error for unknown messages and ignore them + else: + log.error("Received message of unknown type '%s'" % type) async def submit_stats(self): """ @@ -552,53 +566,54 @@ class ControlConnection(HubObject): loadavg = psutil.getloadavg() await self.write_message({ - "type" : "stats", - - # CPU info - "cpu_model" : self.cpu.get("brand_raw"), - "cpu_count" : self.cpu.get("count"), - "cpu_arch" : self.native_arch, - - # Pakfire + OS - "pakfire_version" : PAKFIRE_VERSION, - "os_name" : util.get_distro_name(), - - # CPU Times - "cpu_user" : cpu_times.user, - "cpu_nice" : cpu_times.nice, - "cpu_system" : cpu_times.system, - "cpu_idle" : cpu_times.idle, - "cpu_iowait" : cpu_times.iowait, - "cpu_irq" : cpu_times.irq, - "cpu_softirq" : cpu_times.softirq, - "cpu_steal" : cpu_times.steal, - "cpu_guest" : cpu_times.guest, - "cpu_guest_nice" : cpu_times.guest_nice, - - # Load average - "loadavg1" : loadavg[0], - "loadavg5" : loadavg[1], - "loadavg15" : loadavg[2], - - # Memory - "mem_total" : mem.total, - "mem_available" : mem.available, - "mem_used" : mem.used, - "mem_free" : mem.free, - "mem_active" : mem.active, - "mem_inactive" : mem.inactive, - "mem_buffers" : mem.buffers, - "mem_cached" : mem.cached, - "mem_shared" : mem.shared, - - # Swap - "swap_total" : swap.total, - "swap_used" : swap.used, - "swap_free" : swap.free, + "type" : "stats", + "data" : { + # CPU info + "cpu_model" : self.cpu.get("brand_raw"), + "cpu_count" : self.cpu.get("count"), + "cpu_arch" : self.native_arch, + + # Pakfire + OS + "pakfire_version" : PAKFIRE_VERSION, + "os_name" : util.get_distro_name(), + + # CPU Times + "cpu_user" : cpu_times.user, + "cpu_nice" : cpu_times.nice, + "cpu_system" : cpu_times.system, + "cpu_idle" : cpu_times.idle, + "cpu_iowait" : cpu_times.iowait, + "cpu_irq" : cpu_times.irq, + "cpu_softirq" : cpu_times.softirq, + "cpu_steal" : cpu_times.steal, + "cpu_guest" : cpu_times.guest, + "cpu_guest_nice" : cpu_times.guest_nice, + + # Load average + "loadavg1" : loadavg[0], + "loadavg5" : loadavg[1], + "loadavg15" : loadavg[2], + + # Memory + "mem_total" : mem.total, + "mem_available" : mem.available, + "mem_used" : mem.used, + "mem_free" : mem.free, + "mem_active" : mem.active, + "mem_inactive" : mem.inactive, + "mem_buffers" : mem.buffers, + "mem_cached" : mem.cached, + "mem_shared" : mem.shared, + + # Swap + "swap_total" : swap.total, + "swap_used" : swap.used, + "swap_free" : swap.free, + }, }) -class Job(HubObject): +class JobControlConnection(HubObject): """ Proxy for Build Jobs """ -- 2.39.5