return "%s:%s" % (algo, h.hexdigest())
- # Builder
-
- async def control(self, *args, **kwargs):
- """
- Creates a control connection
- """
- return await self._proxy(ControlConnection, *args, **kwargs)
-
@staticmethod
- def _decode_json_message(callback, message):
+ def _decode_json_message(message):
"""
Takes a received message and decodes it.
log.error("Could not decode JSON message:\n%s" % message)
return
- return callback(message)
+ return message
+
+ # Builder
+
+ async def control(self, *args, **kwargs):
+ """
+ Creates a control connection
+ """
+ return await self._proxy(ControlConnection, *args, **kwargs)
async def job(self, job_id):
"""
- Connects to the given job
+ Creates a control connection for a certain job
"""
- # Return a Job proxy
- return await self._proxy(Job, job_id)
+ return await self._proxy(JobControlConnection, job_id)
class HubObject(object):
# Disable Nagle's algorithm?
nodelay = False
- def __init__(self, hub, *args, on_message_callback=None, **kwargs):
+ def __init__(self, hub, *args, **kwargs):
self.hub = hub
# The active connection
self.conn = None
- # Callbacks
- self._on_message_callback = on_message_callback
-
# Perform custom initialization
self.init(*args, **kwargs)
if self.conn:
self.conn.close()
- def on_message_callback(self, message):
- if self._on_message_callback:
- return self._on_message_callback(message)
+ async def on_message_callback(self, message):
+ raise NotImplementedError
async def write_message(self, message, **kwargs):
"""
self.native_arch = _pakfire.native_arch()
def on_message_callback(self, message):
- print(message)
+ message = self.hub._decode_json_message(message)
+
+ # Ignore empty messages
+ if message is None:
+ return
+
+ # Log the received message
+ log.debug("Received message:\n%s" % json.dumps(message, indent=4))
+
+ # Fetch the message type & data
+ type = message.get("type")
+ data = message.get("data")
+
+ # Handle jobs
+ if type == "job":
+ self.daemon.job_received(data)
+
+ # Log an error for unknown messages and ignore them
+ else:
+ log.error("Received message of unknown type '%s'" % type)
async def submit_stats(self):
"""
loadavg = psutil.getloadavg()
await self.write_message({
- "type" : "stats",
-
- # CPU info
- "cpu_model" : self.cpu.get("brand_raw"),
- "cpu_count" : self.cpu.get("count"),
- "cpu_arch" : self.native_arch,
-
- # Pakfire + OS
- "pakfire_version" : PAKFIRE_VERSION,
- "os_name" : util.get_distro_name(),
-
- # CPU Times
- "cpu_user" : cpu_times.user,
- "cpu_nice" : cpu_times.nice,
- "cpu_system" : cpu_times.system,
- "cpu_idle" : cpu_times.idle,
- "cpu_iowait" : cpu_times.iowait,
- "cpu_irq" : cpu_times.irq,
- "cpu_softirq" : cpu_times.softirq,
- "cpu_steal" : cpu_times.steal,
- "cpu_guest" : cpu_times.guest,
- "cpu_guest_nice" : cpu_times.guest_nice,
-
- # Load average
- "loadavg1" : loadavg[0],
- "loadavg5" : loadavg[1],
- "loadavg15" : loadavg[2],
-
- # Memory
- "mem_total" : mem.total,
- "mem_available" : mem.available,
- "mem_used" : mem.used,
- "mem_free" : mem.free,
- "mem_active" : mem.active,
- "mem_inactive" : mem.inactive,
- "mem_buffers" : mem.buffers,
- "mem_cached" : mem.cached,
- "mem_shared" : mem.shared,
-
- # Swap
- "swap_total" : swap.total,
- "swap_used" : swap.used,
- "swap_free" : swap.free,
+ "type" : "stats",
+ "data" : {
+ # CPU info
+ "cpu_model" : self.cpu.get("brand_raw"),
+ "cpu_count" : self.cpu.get("count"),
+ "cpu_arch" : self.native_arch,
+
+ # Pakfire + OS
+ "pakfire_version" : PAKFIRE_VERSION,
+ "os_name" : util.get_distro_name(),
+
+ # CPU Times
+ "cpu_user" : cpu_times.user,
+ "cpu_nice" : cpu_times.nice,
+ "cpu_system" : cpu_times.system,
+ "cpu_idle" : cpu_times.idle,
+ "cpu_iowait" : cpu_times.iowait,
+ "cpu_irq" : cpu_times.irq,
+ "cpu_softirq" : cpu_times.softirq,
+ "cpu_steal" : cpu_times.steal,
+ "cpu_guest" : cpu_times.guest,
+ "cpu_guest_nice" : cpu_times.guest_nice,
+
+ # Load average
+ "loadavg1" : loadavg[0],
+ "loadavg5" : loadavg[1],
+ "loadavg15" : loadavg[2],
+
+ # Memory
+ "mem_total" : mem.total,
+ "mem_available" : mem.available,
+ "mem_used" : mem.used,
+ "mem_free" : mem.free,
+ "mem_active" : mem.active,
+ "mem_inactive" : mem.inactive,
+ "mem_buffers" : mem.buffers,
+ "mem_cached" : mem.cached,
+ "mem_shared" : mem.shared,
+
+ # Swap
+ "swap_total" : swap.total,
+ "swap_used" : swap.used,
+ "swap_free" : swap.free,
+ },
})
-class Job(HubObject):
+class JobControlConnection(HubObject):
"""
Proxy for Build Jobs
"""