]> git.ipfire.org Git - pbs.git/commitdiff
jobs: Permanently stream logs
authorMichael Tremer <michael.tremer@ipfire.org>
Thu, 27 Mar 2025 10:47:59 +0000 (10:47 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Thu, 27 Mar 2025 10:47:59 +0000 (10:47 +0000)
This is just a lot easier than the turning on and off which turned out
to be a bit fragile and racy.

Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/buildservice/jobs.py
src/buildservice/logstreams.py

index 85557d702d2fcf7d0c0bdbb0739a1986098508b1..7b356b042c94979b64f8f222a34c7ce7e165988d 100644 (file)
@@ -844,48 +844,6 @@ class Job(database.Base, database.BackendMixin, database.SoftDeleteMixin):
                "User", foreign_keys=[aborted_by_id], lazy="selectin",
        )
 
-       # Log Streaming
-
-       async def enable_log_streaming(self):
-               """
-                       Enables log streaming for this job.
-               """
-               # Don't run this when the job has already finished
-               if self.has_finished():
-                       return
-
-               # Send a message to the builder
-               if self.builder:
-                       try:
-                               await self.builder.send_message({
-                                       "job_id"  : "%s" % self.uuid,
-                                       "command" : "launch-log-stream",
-                               })
-
-                       # Ignore if the builder is not online
-                       except BuilderNotOnlineError:
-                               pass
-
-       async def disable_log_streaming(self):
-               """
-                       Disables log streaming for this job.
-               """
-               # Don't run this when the job has already finished
-               if self.has_finished():
-                       return
-
-               # Send a message to the builder
-               if self.builder:
-                       try:
-                               await self.builder.send_message({
-                                       "job_id"  : "%s" % self.uuid,
-                                       "command" : "terminate-log-stream",
-                               })
-
-                       # Ignore if the builder is not online
-                       except BuilderNotOnlineError:
-                               pass
-
        # Message
 
        message = Column(Text, nullable=False, default="")
index 8ca62c1929a8a486cfeeaaf6a1c64819ad0a2425..e451af8ea26478b2104daa5bd340e9036a7e460a 100644 (file)
@@ -34,22 +34,19 @@ BUFFER_MAX_SIZE = 2048
 class LogStreams(base.Object):
        streams = {}
 
-       async def open(self, job):
+       async def open(self, uuid):
                """
                        Opens a new stream
                """
                # Fail if a stream already exists
-               if job.uuid in self.streams:
-                       raise ValueError("Stream for %s exists" % job)
+               if uuid in self.streams:
+                       raise ValueError("Stream for %s exists" % uuid)
 
                # Create a new stream
-               stream = LogStream(self.backend, job.uuid)
+               stream = LogStream(self.backend, uuid)
 
                # Register the stream
-               self.streams[job.uuid] = stream
-
-               # Turn on streaming
-               await job.enable_log_streaming()
+               self.streams[uuid] = stream
 
                return stream
 
@@ -60,10 +57,6 @@ class LogStreams(base.Object):
                # Fetch the job
                job = await self.backend.jobs.get_by_uuid(uuid)
 
-               # Turn off the log stream
-               if job:
-                       await job.disable_log_streaming()
-
                try:
                        del self.streams[uuid]
                except KeyError:
@@ -79,7 +72,7 @@ class LogStreams(base.Object):
 
                # If the stream does not exist, open a new one
                except KeyError:
-                       stream = await self.open(job)
+                       stream = await self.open(job.uuid)
 
                # Join the stream
                await stream.join(consumer, **kwargs)
@@ -95,9 +88,9 @@ class LogStreams(base.Object):
                try:
                        stream = self.streams[job_id]
 
-               # Close the session if nobody is listening
+               # If the stream does not exist, yet, we create a new one
                except KeyError:
-                       return await self.close(job_id)
+                       stream = await self.open(job_id)
 
                # Fetch the payload
                data = message.get("data")