"User", foreign_keys=[aborted_by_id], lazy="selectin",
)
- # Log Streaming
-
- async def enable_log_streaming(self):
- """
- Enables log streaming for this job.
- """
- # Don't run this when the job has already finished
- if self.has_finished():
- return
-
- # Send a message to the builder
- if self.builder:
- try:
- await self.builder.send_message({
- "job_id" : "%s" % self.uuid,
- "command" : "launch-log-stream",
- })
-
- # Ignore if the builder is not online
- except BuilderNotOnlineError:
- pass
-
- async def disable_log_streaming(self):
- """
- Disables log streaming for this job.
- """
- # Don't run this when the job has already finished
- if self.has_finished():
- return
-
- # Send a message to the builder
- if self.builder:
- try:
- await self.builder.send_message({
- "job_id" : "%s" % self.uuid,
- "command" : "terminate-log-stream",
- })
-
- # Ignore if the builder is not online
- except BuilderNotOnlineError:
- pass
-
# Message
message = Column(Text, nullable=False, default="")
class LogStreams(base.Object):
streams = {}
- async def open(self, job):
+ async def open(self, uuid):
"""
Opens a new stream
"""
# Fail if a stream already exists
- if job.uuid in self.streams:
- raise ValueError("Stream for %s exists" % job)
+ if uuid in self.streams:
+ raise ValueError("Stream for %s exists" % uuid)
# Create a new stream
- stream = LogStream(self.backend, job.uuid)
+ stream = LogStream(self.backend, uuid)
# Register the stream
- self.streams[job.uuid] = stream
-
- # Turn on streaming
- await job.enable_log_streaming()
+ self.streams[uuid] = stream
return stream
# Fetch the job
job = await self.backend.jobs.get_by_uuid(uuid)
- # Turn off the log stream
- if job:
- await job.disable_log_streaming()
-
try:
del self.streams[uuid]
except KeyError:
# If the stream does not exist, open a new one
except KeyError:
- stream = await self.open(job)
+ stream = await self.open(job.uuid)
# Join the stream
await stream.join(consumer, **kwargs)
try:
stream = self.streams[job_id]
- # Close the session if nobody is listening
+ # If the stream does not exist, yet, we create a new one
except KeyError:
- return await self.close(job_id)
+ stream = await self.open(job_id)
# Fetch the payload
data = message.get("data")