From: Michael Tremer Date: Tue, 25 Jul 2023 14:15:08 +0000 (+0000) Subject: logstream: Lock buffer when it is being modified X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=cc58f07490eefefe7c340e19e81928cfb5e79aee;p=pbs.git logstream: Lock buffer when it is being modified This might be cheaper than copying it when it becomes large. Signed-off-by: Michael Tremer --- diff --git a/src/buildservice/logstreams.py b/src/buildservice/logstreams.py index 8769fd2b..673a27d7 100644 --- a/src/buildservice/logstreams.py +++ b/src/buildservice/logstreams.py @@ -77,6 +77,9 @@ class LogStream(base.Object): def init(self, job): self.job = job + # Lock when buffer is being modified + self._lock = asyncio.Lock() + # Buffer for messages self.buffer = collections.deque(maxlen=BUFFER_MAX_SIZE) @@ -104,18 +107,15 @@ class LogStream(base.Object): # Store a reference to the consumer self.consumers.append(consumer) - # Create a (shallow) copy of the buffer - # to avoid changes to the original one while this is running. - buffer = self.buffer.copy() - # Send all messages in the buffer - for i, message in enumerate(buffer): - # Only sent up to limit messages - if limit and i >= limit: - break + async with self._lock: + for i, message in enumerate(self.buffer): + # Only sent up to limit messages + if limit and i >= limit: + break - # Send the message - await consumer.message(message) + # Send the message + await consumer.message(message) log.debug("%s has joined the stream for %s" % (consumer, self.job)) @@ -151,7 +151,8 @@ class LogStream(base.Object): } # Append the message to the buffer - self.buffer.append(m) + async with self._lock: + self.buffer.append(m) # Send the message to all consumers async with asyncio.TaskGroup() as tasks: