def init(self, job):
self.job = job
+ # Lock when buffer is being modified
+ self._lock = asyncio.Lock()
+
# Buffer for messages
self.buffer = collections.deque(maxlen=BUFFER_MAX_SIZE)
# Store a reference to the consumer
self.consumers.append(consumer)
- # Create a (shallow) copy of the buffer
- # to avoid changes to the original one while this is running.
- buffer = self.buffer.copy()
-
# Send all messages in the buffer
- for i, message in enumerate(buffer):
- # Only sent up to limit messages
- if limit and i >= limit:
- break
+ async with self._lock:
+ for i, message in enumerate(self.buffer):
+ # Only sent up to limit messages
+ if limit and i >= limit:
+ break
- # Send the message
- await consumer.message(message)
+ # Send the message
+ await consumer.message(message)
log.debug("%s has joined the stream for %s" % (consumer, self.job))
}
# Append the message to the buffer
- self.buffer.append(m)
+ async with self._lock:
+ self.buffer.append(m)
# Send the message to all consumers
async with asyncio.TaskGroup() as tasks: