# Store a reference to the consumer
self.consumers.append(consumer)
- # Send all messages in the buffer
- async with self._lock:
- for i, message in enumerate(self.buffer):
- # Only sent up to limit messages
- if limit and i >= limit:
- break
+ log.debug("%s has joined the stream for %s" % (consumer, self.job))
- # Send the message
- await consumer.message(message)
+ # Select all messages we want to send
+ async with self._lock:
+ buffer = collections.deque(self.buffer, limit)
- log.debug("%s has joined the stream for %s" % (consumer, self.job))
+ # Send all messages
+ for message in buffer:
+ await consumer.message(message)
def leave(self, consumer):
"""