]> git.ipfire.org Git - pakfire.git/commitdiff
daemon: Receive jobs over the new control connection
authorMichael Tremer <michael.tremer@ipfire.org>
Wed, 26 Apr 2023 15:13:53 +0000 (15:13 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Wed, 26 Apr 2023 15:14:29 +0000 (15:14 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/pakfire/daemon.py
src/pakfire/hub.py

index 9865864161c6f9134b181503c5706121901111c1..3278d37b495f37511004ee9e168e1a42a9a6832e 100644 (file)
@@ -168,14 +168,6 @@ class Daemon(object):
                """
                        Called when this builder was assigned a new job
                """
-               # Check for correct message type
-               if not job.get("message") == "job":
-                       raise RuntimeError("Received a message of an unknown type:\n%s" % job)
-
-               # Log what we have received
-               self.log.debug("Received job:")
-               self.log.debug("%s" % json.dumps(job, sort_keys=True, indent=4))
-
                # Launch a new worker
                worker = Worker(self, job)
                self.workers.append(worker)
index baeeca55068c05e4d61b6fa4fe640915d06594dc..615e1d231396dd95e774650a1ff21648220000ce 100644 (file)
@@ -378,16 +378,8 @@ class Hub(object):
 
                return "%s:%s" % (algo, h.hexdigest())
 
-       # Builder
-
-       async def control(self, *args, **kwargs):
-               """
-                       Creates a control connection
-               """
-               return await self._proxy(ControlConnection, *args, **kwargs)
-
        @staticmethod
-       def _decode_json_message(callback, message):
+       def _decode_json_message(message):
                """
                        Takes a received message and decodes it.
 
@@ -403,29 +395,33 @@ class Hub(object):
                        log.error("Could not decode JSON message:\n%s" % message)
                        return
 
-               return callback(message)
+               return message
+
+       # Builder
+
+       async def control(self, *args, **kwargs):
+               """
+                       Creates a control connection
+               """
+               return await self._proxy(ControlConnection, *args, **kwargs)
 
        async def job(self, job_id):
                """
-                       Connects to the given job
+                       Creates a control connection for a certain job
                """
-               # Return a Job proxy
-               return await self._proxy(Job, job_id)
+               return await self._proxy(JobControlConnection, job_id)
 
 
 class HubObject(object):
        # Disable Nagle's algorithm?
        nodelay = False
 
-       def __init__(self, hub, *args, on_message_callback=None, **kwargs):
+       def __init__(self, hub, *args, **kwargs):
                self.hub = hub
 
                # The active connection
                self.conn = None
 
-               # Callbacks
-               self._on_message_callback = on_message_callback
-
                # Perform custom initialization
                self.init(*args, **kwargs)
 
@@ -494,9 +490,8 @@ class HubObject(object):
                if self.conn:
                        self.conn.close()
 
-       def on_message_callback(self, message):
-               if self._on_message_callback:
-                       return self._on_message_callback(message)
+       async def on_message_callback(self, message):
+               raise NotImplementedError
 
        async def write_message(self, message, **kwargs):
                """
@@ -533,7 +528,26 @@ class ControlConnection(HubObject):
                self.native_arch = _pakfire.native_arch()
 
        def on_message_callback(self, message):
-               print(message)
+               message = self.hub._decode_json_message(message)
+
+               # Ignore empty messages
+               if message is None:
+                       return
+
+               # Log the received message
+               log.debug("Received message:\n%s" % json.dumps(message, indent=4))
+
+               # Fetch the message type & data
+               type = message.get("type")
+               data = message.get("data")
+
+               # Handle jobs
+               if type == "job":
+                       self.daemon.job_received(data)
+
+               # Log an error for unknown messages and ignore them
+               else:
+                       log.error("Received message of unknown type '%s'" % type)
 
        async def submit_stats(self):
                """
@@ -552,53 +566,54 @@ class ControlConnection(HubObject):
                loadavg = psutil.getloadavg()
 
                await self.write_message({
-                       "type"            : "stats",
-
-                       # CPU info
-                       "cpu_model"       : self.cpu.get("brand_raw"),
-                       "cpu_count"       : self.cpu.get("count"),
-                       "cpu_arch"        : self.native_arch,
-
-                       # Pakfire + OS
-                       "pakfire_version" : PAKFIRE_VERSION,
-                       "os_name"         : util.get_distro_name(),
-
-                       # CPU Times
-                       "cpu_user"       : cpu_times.user,
-                       "cpu_nice"       : cpu_times.nice,
-                       "cpu_system"     : cpu_times.system,
-                       "cpu_idle"       : cpu_times.idle,
-                       "cpu_iowait"     : cpu_times.iowait,
-                       "cpu_irq"        : cpu_times.irq,
-                       "cpu_softirq"    : cpu_times.softirq,
-                       "cpu_steal"      : cpu_times.steal,
-                       "cpu_guest"      : cpu_times.guest,
-                       "cpu_guest_nice" : cpu_times.guest_nice,
-
-                       # Load average
-                       "loadavg1"       : loadavg[0],
-                       "loadavg5"       : loadavg[1],
-                       "loadavg15"      : loadavg[2],
-
-                       # Memory
-                       "mem_total"      : mem.total,
-                       "mem_available"  : mem.available,
-                       "mem_used"       : mem.used,
-                       "mem_free"       : mem.free,
-                       "mem_active"     : mem.active,
-                       "mem_inactive"   : mem.inactive,
-                       "mem_buffers"    : mem.buffers,
-                       "mem_cached"     : mem.cached,
-                       "mem_shared"     : mem.shared,
-
-                       # Swap
-                       "swap_total"     : swap.total,
-                       "swap_used"      : swap.used,
-                       "swap_free"      : swap.free,
+                       "type" : "stats",
+                       "data" : {
+                               # CPU info
+                               "cpu_model"       : self.cpu.get("brand_raw"),
+                               "cpu_count"       : self.cpu.get("count"),
+                               "cpu_arch"        : self.native_arch,
+
+                               # Pakfire + OS
+                               "pakfire_version" : PAKFIRE_VERSION,
+                               "os_name"         : util.get_distro_name(),
+
+                               # CPU Times
+                               "cpu_user"       : cpu_times.user,
+                               "cpu_nice"       : cpu_times.nice,
+                               "cpu_system"     : cpu_times.system,
+                               "cpu_idle"       : cpu_times.idle,
+                               "cpu_iowait"     : cpu_times.iowait,
+                               "cpu_irq"        : cpu_times.irq,
+                               "cpu_softirq"    : cpu_times.softirq,
+                               "cpu_steal"      : cpu_times.steal,
+                               "cpu_guest"      : cpu_times.guest,
+                               "cpu_guest_nice" : cpu_times.guest_nice,
+
+                               # Load average
+                               "loadavg1"       : loadavg[0],
+                               "loadavg5"       : loadavg[1],
+                               "loadavg15"      : loadavg[2],
+
+                               # Memory
+                               "mem_total"      : mem.total,
+                               "mem_available"  : mem.available,
+                               "mem_used"       : mem.used,
+                               "mem_free"       : mem.free,
+                               "mem_active"     : mem.active,
+                               "mem_inactive"   : mem.inactive,
+                               "mem_buffers"    : mem.buffers,
+                               "mem_cached"     : mem.cached,
+                               "mem_shared"     : mem.shared,
+
+                               # Swap
+                               "swap_total"     : swap.total,
+                               "swap_used"      : swap.used,
+                               "swap_free"      : swap.free,
+                       },
                })
 
 
-class Job(HubObject):
+class JobControlConnection(HubObject):
        """
                Proxy for Build Jobs
        """