From: Michael Tremer Date: Fri, 7 Feb 2025 11:58:48 +0000 (+0000) Subject: jobs: Fix log streaming X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=4b8f1396db9e9ee98a8cd60956db74e0f09c2363;p=pbs.git jobs: Fix log streaming This will now dynamically turn this on and off whenever it is needed. Signed-off-by: Michael Tremer --- diff --git a/src/buildservice/jobs.py b/src/buildservice/jobs.py index 239cd2f6..5adf9925 100644 --- a/src/buildservice/jobs.py +++ b/src/buildservice/jobs.py @@ -828,6 +828,48 @@ class Job(database.Base, database.BackendMixin, database.SoftDeleteMixin): "User", foreign_keys=[aborted_by_id], lazy="selectin", ) + # Log Streaming + + async def enable_log_streaming(self): + """ + Enables log streaming for this job. + """ + # Don't run this when the job has already finished + if self.has_finished(): + return + + # Send a message to the builder + if self.builder: + try: + await self.builder.send_message({ + "job_id" : "%s" % self.uuid, + "command" : "launch-log-stream", + }) + + # Ignore if the builder is not online + except BuilderNotOnlineError: + pass + + async def disable_log_streaming(self): + """ + Disables log streaming for this job. + """ + # Don't run this when the job has already finished + if self.has_finished(): + return + + # Send a message to the builder + if self.builder: + try: + await self.builder.send_message({ + "job_id" : "%s" % self.uuid, + "command" : "terminate-log-stream", + }) + + # Ignore if the builder is not online + except BuilderNotOnlineError: + pass + # Message message = Column(Text, nullable=False, default="") diff --git a/src/buildservice/logstreams.py b/src/buildservice/logstreams.py index 44f790f9..8ca62c19 100644 --- a/src/buildservice/logstreams.py +++ b/src/buildservice/logstreams.py @@ -20,7 +20,9 @@ import asyncio import collections +import datetime import logging +import uuid from . import base @@ -32,22 +34,38 @@ BUFFER_MAX_SIZE = 2048 class LogStreams(base.Object): streams = {} - def open(self, job): - stream = LogStream(self.backend, job) + async def open(self, job): + """ + Opens a new stream + """ + # Fail if a stream already exists + if job.uuid in self.streams: + raise ValueError("Stream for %s exists" % job) - # XXX Check that we are not replacing an existing stream for the same job + # Create a new stream + stream = LogStream(self.backend, job.uuid) # Register the stream - self.streams[job] = stream + self.streams[job.uuid] = stream + + # Turn on streaming + await job.enable_log_streaming() return stream - def _close(self, job): + async def close(self, uuid): """ Closes the stream for a job """ + # Fetch the job + job = await self.backend.jobs.get_by_uuid(uuid) + + # Turn off the log stream + if job: + await job.disable_log_streaming() + try: - del self.streams[job] + del self.streams[uuid] except KeyError: return @@ -55,16 +73,46 @@ class LogStreams(base.Object): """ Joins the stream for the given job """ + # Fetch the stream try: - stream = self.streams[job] + stream = self.streams[job.uuid] + + # If the stream does not exist, open a new one except KeyError: - return + stream = await self.open(job) # Join the stream await stream.join(consumer, **kwargs) return stream + async def log(self, message): + """ + Receives a raw log message + """ + job_id = uuid.UUID(message.get("job_id")) + + try: + stream = self.streams[job_id] + + # Close the session if nobody is listening + except KeyError: + return await self.close(job_id) + + # Fetch the payload + data = message.get("data") + + # Extract all fields + timestamp = data.get("timestamp") + priority = data.get("priority") + line = data.get("line") + + # Parse the timestamp + timestamp = datetime.datetime.fromisoformat(timestamp) + + # Process the message + await stream.message(timestamp, priority, line) + class LogStream(base.Object): levels = { @@ -74,8 +122,8 @@ class LogStream(base.Object): logging.ERROR : "ERROR", } - def init(self, job): - self.job = job + def init(self, uuid): + self.uuid = uuid # Lock when buffer is being modified self._lock = asyncio.Lock() @@ -87,14 +135,14 @@ class LogStream(base.Object): self.consumers = [] def __repr__(self): - return "<%s %s>" % (self.__class__.__name__, self.job) + return "<%s %s>" % (self.__class__.__name__, self.uuid) - def close(self): + async def close(self): """ Called to close all connections to consumers """ # De-register the stream - self.backend.logstreams._close(self) + await self.backend.logstreams.close(self.uuid) # Close all connections to consumers for consumer in self.consumers: @@ -107,7 +155,7 @@ class LogStream(base.Object): # Store a reference to the consumer self.consumers.append(consumer) - log.debug("%s has joined the stream for %s" % (consumer, self.job)) + log.debug("%s has joined the stream for %s" % (consumer, self.uuid)) # Select all messages we want to send async with self._lock: @@ -126,35 +174,31 @@ class LogStream(base.Object): except IndexError: pass - log.debug("%s has left the stream for %s" % (consumer, self.job)) + log.debug("%s has left the stream for %s" % (consumer, self.uuid)) - async def message(self, timestamp, level, message): - # Skip empty messages - if message is None: - return + # Close the stream if there are no consumers left + if not self.consumers: + self.backend.run_task(self.close) + async def message(self, timestamp, level, line): # Translate the level try: level = self.levels[level] except KeyError: level = "UNKNOWN" - # Queue the message line by line - for line in message.splitlines(): - # Form a message object that we will send to the consumers - m = { - "timestamp" : timestamp, - "level" : level, - "message" : line, - } - - # Append the message to the buffer - async with self._lock: - self.buffer.append(m) - - # Send the message to all consumers - async with asyncio.TaskGroup() as tasks: - for c in self.consumers: - tasks.create_task( - c.message(m), - ) + # Form a message object that we will send to the consumers + m = { + "timestamp" : timestamp.isoformat(), + "level" : level, + "message" : line, + } + + # Append the message to the buffer + async with self._lock: + self.buffer.append(m) + + # Send the message to all consumers + async with asyncio.TaskGroup() as tasks: + for c in self.consumers: + await c.message(m) diff --git a/src/web/builders.py b/src/web/builders.py index 89b3c938..afcecee2 100644 --- a/src/web/builders.py +++ b/src/web/builders.py @@ -64,8 +64,12 @@ class APIv1ControlHandler(base.APIMixin, base.BackendMixin, tornado.websocket.We type = message.get("type") data = message.get("data") + # Fast path for log messages + if type == "log": + return self.backend.logstreams.log(message) + # Handle stats - if type == "stats": + elif type == "stats": async with await self.db.transaction(): await builder.log_stats(**data)