async def on_message(self, message):
message = self._decode_json_message(message)
- # Get message type
- t = message.get("message")
+ # Get message type & data
+ type = message.get("type")
+ data = message.get("data")
# Handle log messages
- if t == "log":
- await self.logstream.message(
- timestamp=message.get("timestamp"),
- level=message.get("level"),
- message=message.get("log"),
- )
+ if type == "log":
+ await self._handle_log(**data)
# Handle finished message
- elif t == "finished":
- await self._handle_finished(**message)
+ elif type == "finished":
+ await self._handle_finished(**data)
# Unknown message
else:
log.warning("Received a message of an unknown type: %s" % t)
+ async def _handle_log(self, timestamp=None, level=None, message=None, **kwargs):
+ """
+ Called when a new log message has been received
+ """
+ await self.logstream.message(timestamp, level, message)
+
async def _handle_finished(self, success=False, logfile=None, packages=[], **kwargs):
"""
Called when a job has finished - whether successfully or not