# The active connection
self.conn = None
+ # Callbacks
+ self.callbacks = {}
+
# Perform custom initialization
self.init(*args, **kwargs)
if self.conn:
self.conn.close()
- async def on_message_callback(self, message):
- raise NotImplementedError
+ def on_message_callback(self, message):
+ # Fail if no callbacks have been set
+ if not self.callbacks:
+ raise NotImplementedError
+
+ # Decode the message
+ message = self.hub._decode_json_message(message)
+
+ # Ignore empty messages
+ if message is None:
+ return
+
+ # Log the received message
+ log.debug("Received message:\n%s" % json.dumps(message, indent=4))
+
+ # Fetch the message type & data
+ type = message.get("type")
+ data = message.get("data")
+
+ # Find a suitable callback
+ try:
+ callback = self.callbacks[type]
+
+ # Log an error for unknown messages and ignore them
+ except KeyError:
+ log.error("Received message of unknown type '%s'" % type)
+ return
+
+ # Call the callback
+ callback(data)
async def write_message(self, message, **kwargs):
"""
def init(self, daemon):
self.daemon = daemon
+ # Callbacks
+ self.callbacks = {
+ "job" : self.daemon.job_received,
+ }
+
# Fetch processor information
self.cpu = cpuinfo.get_cpu_info()
# Fetch the native architecture
self.native_arch = _pakfire.native_arch()
- def on_message_callback(self, message):
- message = self.hub._decode_json_message(message)
-
- # Ignore empty messages
- if message is None:
- return
-
- # Log the received message
- log.debug("Received message:\n%s" % json.dumps(message, indent=4))
-
- # Fetch the message type & data
- type = message.get("type")
- data = message.get("data")
-
- # Handle jobs
- if type == "job":
- self.daemon.job_received(data)
-
- # Log an error for unknown messages and ignore them
- else:
- log.error("Received message of unknown type '%s'" % type)
-
async def submit_stats(self):
"""
Sends stats about this builder
"""
Proxy for Build Jobs
"""
- def init(self, id, abort_callback=None):
+ def init(self, id, worker):
self.id = id
+ # Callbacks
self.callbacks = {
- "abort" : abort_callback,
+ "abort" : worker.abort,
}
@property
def url(self):
return "/api/v1/jobs/%s" % self.id
- def on_message_callback(self, message):
- message = self.hub._decode_json_message(message)
-
- # Ignore empty messages
- if message is None:
- return
-
- # Log the received message
- log.debug("Received message:\n%s" % json.dumps(message, indent=4))
-
- # Fetch the message type & data
- type = message.get("type")
- data = message.get("data")
-
- # Find a suitable callback
- try:
- callback = self.callbacks[type]
-
- # Log an error for unknown messages and ignore them
- except KeyError:
- log.error("Received message of unknown type '%s'" % type)
- return
-
- # Call the callback
- callback(data)
-
async def finished(self, success, packages=None, logfile=None):
"""
Will tell the hub that a job has finished