]> git.ipfire.org Git - pbs.git/commitdiff
logstream: Lock buffer when it is being modified
authorMichael Tremer <michael.tremer@ipfire.org>
Tue, 25 Jul 2023 14:15:08 +0000 (14:15 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Tue, 25 Jul 2023 14:15:08 +0000 (14:15 +0000)
This might be cheaper than copying it when it becomes large.

Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/buildservice/logstreams.py

index 8769fd2b017d19deade83795163abaaddd40f851..673a27d782373fcc19f96f4dcb737cbd8b422348 100644 (file)
@@ -77,6 +77,9 @@ class LogStream(base.Object):
        def init(self, job):
                self.job = job
 
+               # Lock when buffer is being modified
+               self._lock = asyncio.Lock()
+
                # Buffer for messages
                self.buffer = collections.deque(maxlen=BUFFER_MAX_SIZE)
 
@@ -104,18 +107,15 @@ class LogStream(base.Object):
                # Store a reference to the consumer
                self.consumers.append(consumer)
 
-               # Create a (shallow) copy of the buffer
-               # to avoid changes to the original one while this is running.
-               buffer = self.buffer.copy()
-
                # Send all messages in the buffer
-               for i, message in enumerate(buffer):
-                       # Only sent up to limit messages
-                       if limit and i >= limit:
-                               break
+               async with self._lock:
+                       for i, message in enumerate(self.buffer):
+                               # Only sent up to limit messages
+                               if limit and i >= limit:
+                                       break
 
-                       # Send the message
-                       await consumer.message(message)
+                               # Send the message
+                               await consumer.message(message)
 
                log.debug("%s has joined the stream for %s" % (consumer, self.job))
 
@@ -151,7 +151,8 @@ class LogStream(base.Object):
                        }
 
                        # Append the message to the buffer
-                       self.buffer.append(m)
+                       async with self._lock:
+                               self.buffer.append(m)
 
                        # Send the message to all consumers
                        async with asyncio.TaskGroup() as tasks: