From: Michael Tremer Date: Thu, 27 Mar 2025 10:47:59 +0000 (+0000) Subject: jobs: Permanently stream logs X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=b0f26f338729547fda87cc170e0c60c5cb4b3525;p=pbs.git jobs: Permanently stream logs This is just a lot easier than the turning on and off which turned out to be a bit fragile and racy. Signed-off-by: Michael Tremer --- diff --git a/src/buildservice/jobs.py b/src/buildservice/jobs.py index 85557d70..7b356b04 100644 --- a/src/buildservice/jobs.py +++ b/src/buildservice/jobs.py @@ -844,48 +844,6 @@ class Job(database.Base, database.BackendMixin, database.SoftDeleteMixin): "User", foreign_keys=[aborted_by_id], lazy="selectin", ) - # Log Streaming - - async def enable_log_streaming(self): - """ - Enables log streaming for this job. - """ - # Don't run this when the job has already finished - if self.has_finished(): - return - - # Send a message to the builder - if self.builder: - try: - await self.builder.send_message({ - "job_id" : "%s" % self.uuid, - "command" : "launch-log-stream", - }) - - # Ignore if the builder is not online - except BuilderNotOnlineError: - pass - - async def disable_log_streaming(self): - """ - Disables log streaming for this job. - """ - # Don't run this when the job has already finished - if self.has_finished(): - return - - # Send a message to the builder - if self.builder: - try: - await self.builder.send_message({ - "job_id" : "%s" % self.uuid, - "command" : "terminate-log-stream", - }) - - # Ignore if the builder is not online - except BuilderNotOnlineError: - pass - # Message message = Column(Text, nullable=False, default="") diff --git a/src/buildservice/logstreams.py b/src/buildservice/logstreams.py index 8ca62c19..e451af8e 100644 --- a/src/buildservice/logstreams.py +++ b/src/buildservice/logstreams.py @@ -34,22 +34,19 @@ BUFFER_MAX_SIZE = 2048 class LogStreams(base.Object): streams = {} - async def open(self, job): + async def open(self, uuid): """ Opens a new stream """ # Fail if a stream already exists - if job.uuid in self.streams: - raise ValueError("Stream for %s exists" % job) + if uuid in self.streams: + raise ValueError("Stream for %s exists" % uuid) # Create a new stream - stream = LogStream(self.backend, job.uuid) + stream = LogStream(self.backend, uuid) # Register the stream - self.streams[job.uuid] = stream - - # Turn on streaming - await job.enable_log_streaming() + self.streams[uuid] = stream return stream @@ -60,10 +57,6 @@ class LogStreams(base.Object): # Fetch the job job = await self.backend.jobs.get_by_uuid(uuid) - # Turn off the log stream - if job: - await job.disable_log_streaming() - try: del self.streams[uuid] except KeyError: @@ -79,7 +72,7 @@ class LogStreams(base.Object): # If the stream does not exist, open a new one except KeyError: - stream = await self.open(job) + stream = await self.open(job.uuid) # Join the stream await stream.join(consumer, **kwargs) @@ -95,9 +88,9 @@ class LogStreams(base.Object): try: stream = self.streams[job_id] - # Close the session if nobody is listening + # If the stream does not exist, yet, we create a new one except KeyError: - return await self.close(job_id) + stream = await self.open(job_id) # Fetch the payload data = message.get("data")