]> git.ipfire.org Git - pbs.git/commitdiff
jobs: Fix log streaming
authorMichael Tremer <michael.tremer@ipfire.org>
Fri, 7 Feb 2025 11:58:48 +0000 (11:58 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Fri, 7 Feb 2025 11:58:48 +0000 (11:58 +0000)
This will now dynamically turn this on and off whenever it is needed.

Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/buildservice/jobs.py
src/buildservice/logstreams.py
src/web/builders.py

index 239cd2f602dec558fe17400d4c25000769487929..5adf9925c6516e596ec40e87e2283dfa73d64771 100644 (file)
@@ -828,6 +828,48 @@ class Job(database.Base, database.BackendMixin, database.SoftDeleteMixin):
                "User", foreign_keys=[aborted_by_id], lazy="selectin",
        )
 
+       # Log Streaming
+
+       async def enable_log_streaming(self):
+               """
+                       Enables log streaming for this job.
+               """
+               # Don't run this when the job has already finished
+               if self.has_finished():
+                       return
+
+               # Send a message to the builder
+               if self.builder:
+                       try:
+                               await self.builder.send_message({
+                                       "job_id"  : "%s" % self.uuid,
+                                       "command" : "launch-log-stream",
+                               })
+
+                       # Ignore if the builder is not online
+                       except BuilderNotOnlineError:
+                               pass
+
+       async def disable_log_streaming(self):
+               """
+                       Disables log streaming for this job.
+               """
+               # Don't run this when the job has already finished
+               if self.has_finished():
+                       return
+
+               # Send a message to the builder
+               if self.builder:
+                       try:
+                               await self.builder.send_message({
+                                       "job_id"  : "%s" % self.uuid,
+                                       "command" : "terminate-log-stream",
+                               })
+
+                       # Ignore if the builder is not online
+                       except BuilderNotOnlineError:
+                               pass
+
        # Message
 
        message = Column(Text, nullable=False, default="")
index 44f790f9e51b79ccb824cb300baa4b128adf0575..8ca62c1929a8a486cfeeaaf6a1c64819ad0a2425 100644 (file)
@@ -20,7 +20,9 @@
 
 import asyncio
 import collections
+import datetime
 import logging
+import uuid
 
 from . import base
 
@@ -32,22 +34,38 @@ BUFFER_MAX_SIZE = 2048
 class LogStreams(base.Object):
        streams = {}
 
-       def open(self, job):
-               stream = LogStream(self.backend, job)
+       async def open(self, job):
+               """
+                       Opens a new stream
+               """
+               # Fail if a stream already exists
+               if job.uuid in self.streams:
+                       raise ValueError("Stream for %s exists" % job)
 
-               # XXX Check that we are not replacing an existing stream for the same job
+               # Create a new stream
+               stream = LogStream(self.backend, job.uuid)
 
                # Register the stream
-               self.streams[job] = stream
+               self.streams[job.uuid] = stream
+
+               # Turn on streaming
+               await job.enable_log_streaming()
 
                return stream
 
-       def _close(self, job):
+       async def close(self, uuid):
                """
                        Closes the stream for a job
                """
+               # Fetch the job
+               job = await self.backend.jobs.get_by_uuid(uuid)
+
+               # Turn off the log stream
+               if job:
+                       await job.disable_log_streaming()
+
                try:
-                       del self.streams[job]
+                       del self.streams[uuid]
                except KeyError:
                        return
 
@@ -55,16 +73,46 @@ class LogStreams(base.Object):
                """
                        Joins the stream for the given job
                """
+               # Fetch the stream
                try:
-                       stream = self.streams[job]
+                       stream = self.streams[job.uuid]
+
+               # If the stream does not exist, open a new one
                except KeyError:
-                       return
+                       stream = await self.open(job)
 
                # Join the stream
                await stream.join(consumer, **kwargs)
 
                return stream
 
+       async def log(self, message):
+               """
+                       Receives a raw log message
+               """
+               job_id = uuid.UUID(message.get("job_id"))
+
+               try:
+                       stream = self.streams[job_id]
+
+               # Close the session if nobody is listening
+               except KeyError:
+                       return await self.close(job_id)
+
+               # Fetch the payload
+               data = message.get("data")
+
+               # Extract all fields
+               timestamp = data.get("timestamp")
+               priority  = data.get("priority")
+               line      = data.get("line")
+
+               # Parse the timestamp
+               timestamp = datetime.datetime.fromisoformat(timestamp)
+
+               # Process the message
+               await stream.message(timestamp, priority, line)
+
 
 class LogStream(base.Object):
        levels = {
@@ -74,8 +122,8 @@ class LogStream(base.Object):
                logging.ERROR   : "ERROR",
        }
 
-       def init(self, job):
-               self.job = job
+       def init(self, uuid):
+               self.uuid = uuid
 
                # Lock when buffer is being modified
                self._lock = asyncio.Lock()
@@ -87,14 +135,14 @@ class LogStream(base.Object):
                self.consumers = []
 
        def __repr__(self):
-               return "<%s %s>" % (self.__class__.__name__, self.job)
+               return "<%s %s>" % (self.__class__.__name__, self.uuid)
 
-       def close(self):
+       async def close(self):
                """
                        Called to close all connections to consumers
                """
                # De-register the stream
-               self.backend.logstreams._close(self)
+               await self.backend.logstreams.close(self.uuid)
 
                # Close all connections to consumers
                for consumer in self.consumers:
@@ -107,7 +155,7 @@ class LogStream(base.Object):
                # Store a reference to the consumer
                self.consumers.append(consumer)
 
-               log.debug("%s has joined the stream for %s" % (consumer, self.job))
+               log.debug("%s has joined the stream for %s" % (consumer, self.uuid))
 
                # Select all messages we want to send
                async with self._lock:
@@ -126,35 +174,31 @@ class LogStream(base.Object):
                except IndexError:
                        pass
 
-               log.debug("%s has left the stream for %s" % (consumer, self.job))
+               log.debug("%s has left the stream for %s" % (consumer, self.uuid))
 
-       async def message(self, timestamp, level, message):
-               # Skip empty messages
-               if message is None:
-                       return
+               # Close the stream if there are no consumers left
+               if not self.consumers:
+                       self.backend.run_task(self.close)
 
+       async def message(self, timestamp, level, line):
                # Translate the level
                try:
                        level = self.levels[level]
                except KeyError:
                        level = "UNKNOWN"
 
-               # Queue the message line by line
-               for line in message.splitlines():
-                       # Form a message object that we will send to the consumers
-                       m = {
-                               "timestamp" : timestamp,
-                               "level"     : level,
-                               "message"   : line,
-                       }
-
-                       # Append the message to the buffer
-                       async with self._lock:
-                               self.buffer.append(m)
-
-                       # Send the message to all consumers
-                       async with asyncio.TaskGroup() as tasks:
-                               for c in self.consumers:
-                                       tasks.create_task(
-                                               c.message(m),
-                                       )
+               # Form a message object that we will send to the consumers
+               m = {
+                       "timestamp" : timestamp.isoformat(),
+                       "level"     : level,
+                       "message"   : line,
+               }
+
+               # Append the message to the buffer
+               async with self._lock:
+                       self.buffer.append(m)
+
+               # Send the message to all consumers
+               async with asyncio.TaskGroup() as tasks:
+                       for c in self.consumers:
+                               await c.message(m)
index 89b3c938e10438f043adfd7a531e61c32be25488..afcecee25a529696e49479505e375c7bff93f5ea 100644 (file)
@@ -64,8 +64,12 @@ class APIv1ControlHandler(base.APIMixin, base.BackendMixin, tornado.websocket.We
                type = message.get("type")
                data = message.get("data")
 
+               # Fast path for log messages
+               if type == "log":
+                       return self.backend.logstreams.log(message)
+
                # Handle stats
-               if type == "stats":
+               elif type == "stats":
                        async with await self.db.transaction():
                                await builder.log_stats(**data)