"User", foreign_keys=[aborted_by_id], lazy="selectin",
)
+ # Log Streaming
+
+ async def enable_log_streaming(self):
+ """
+ Enables log streaming for this job.
+ """
+ # Don't run this when the job has already finished
+ if self.has_finished():
+ return
+
+ # Send a message to the builder
+ if self.builder:
+ try:
+ await self.builder.send_message({
+ "job_id" : "%s" % self.uuid,
+ "command" : "launch-log-stream",
+ })
+
+ # Ignore if the builder is not online
+ except BuilderNotOnlineError:
+ pass
+
+ async def disable_log_streaming(self):
+ """
+ Disables log streaming for this job.
+ """
+ # Don't run this when the job has already finished
+ if self.has_finished():
+ return
+
+ # Send a message to the builder
+ if self.builder:
+ try:
+ await self.builder.send_message({
+ "job_id" : "%s" % self.uuid,
+ "command" : "terminate-log-stream",
+ })
+
+ # Ignore if the builder is not online
+ except BuilderNotOnlineError:
+ pass
+
# Message
message = Column(Text, nullable=False, default="")
import asyncio
import collections
+import datetime
import logging
+import uuid
from . import base
class LogStreams(base.Object):
streams = {}
- def open(self, job):
- stream = LogStream(self.backend, job)
+ async def open(self, job):
+ """
+ Opens a new stream
+ """
+ # Fail if a stream already exists
+ if job.uuid in self.streams:
+ raise ValueError("Stream for %s exists" % job)
- # XXX Check that we are not replacing an existing stream for the same job
+ # Create a new stream
+ stream = LogStream(self.backend, job.uuid)
# Register the stream
- self.streams[job] = stream
+ self.streams[job.uuid] = stream
+
+ # Turn on streaming
+ await job.enable_log_streaming()
return stream
- def _close(self, job):
+ async def close(self, uuid):
"""
Closes the stream for a job
"""
+ # Fetch the job
+ job = await self.backend.jobs.get_by_uuid(uuid)
+
+ # Turn off the log stream
+ if job:
+ await job.disable_log_streaming()
+
try:
- del self.streams[job]
+ del self.streams[uuid]
except KeyError:
return
"""
Joins the stream for the given job
"""
+ # Fetch the stream
try:
- stream = self.streams[job]
+ stream = self.streams[job.uuid]
+
+ # If the stream does not exist, open a new one
except KeyError:
- return
+ stream = await self.open(job)
# Join the stream
await stream.join(consumer, **kwargs)
return stream
+ async def log(self, message):
+ """
+ Receives a raw log message
+ """
+ job_id = uuid.UUID(message.get("job_id"))
+
+ try:
+ stream = self.streams[job_id]
+
+ # Close the session if nobody is listening
+ except KeyError:
+ return await self.close(job_id)
+
+ # Fetch the payload
+ data = message.get("data")
+
+ # Extract all fields
+ timestamp = data.get("timestamp")
+ priority = data.get("priority")
+ line = data.get("line")
+
+ # Parse the timestamp
+ timestamp = datetime.datetime.fromisoformat(timestamp)
+
+ # Process the message
+ await stream.message(timestamp, priority, line)
+
class LogStream(base.Object):
levels = {
logging.ERROR : "ERROR",
}
- def init(self, job):
- self.job = job
+ def init(self, uuid):
+ self.uuid = uuid
# Lock when buffer is being modified
self._lock = asyncio.Lock()
self.consumers = []
def __repr__(self):
- return "<%s %s>" % (self.__class__.__name__, self.job)
+ return "<%s %s>" % (self.__class__.__name__, self.uuid)
- def close(self):
+ async def close(self):
"""
Called to close all connections to consumers
"""
# De-register the stream
- self.backend.logstreams._close(self)
+ await self.backend.logstreams.close(self.uuid)
# Close all connections to consumers
for consumer in self.consumers:
# Store a reference to the consumer
self.consumers.append(consumer)
- log.debug("%s has joined the stream for %s" % (consumer, self.job))
+ log.debug("%s has joined the stream for %s" % (consumer, self.uuid))
# Select all messages we want to send
async with self._lock:
except IndexError:
pass
- log.debug("%s has left the stream for %s" % (consumer, self.job))
+ log.debug("%s has left the stream for %s" % (consumer, self.uuid))
- async def message(self, timestamp, level, message):
- # Skip empty messages
- if message is None:
- return
+ # Close the stream if there are no consumers left
+ if not self.consumers:
+ self.backend.run_task(self.close)
+ async def message(self, timestamp, level, line):
# Translate the level
try:
level = self.levels[level]
except KeyError:
level = "UNKNOWN"
- # Queue the message line by line
- for line in message.splitlines():
- # Form a message object that we will send to the consumers
- m = {
- "timestamp" : timestamp,
- "level" : level,
- "message" : line,
- }
-
- # Append the message to the buffer
- async with self._lock:
- self.buffer.append(m)
-
- # Send the message to all consumers
- async with asyncio.TaskGroup() as tasks:
- for c in self.consumers:
- tasks.create_task(
- c.message(m),
- )
+ # Form a message object that we will send to the consumers
+ m = {
+ "timestamp" : timestamp.isoformat(),
+ "level" : level,
+ "message" : line,
+ }
+
+ # Append the message to the buffer
+ async with self._lock:
+ self.buffer.append(m)
+
+ # Send the message to all consumers
+ async with asyncio.TaskGroup() as tasks:
+ for c in self.consumers:
+ await c.message(m)