Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self.job)
- async def close(self):
+ def close(self):
"""
Called to close all connections to consumers
"""
self.backend.logstreams._close(self)
# Close all connections to consumers
- if self.consumers:
- asyncio.gather(
- *(c.close() for c in self.consumers)
- )
+ for consumer in self.consumers:
+ consumer.close()
async def join(self, consumer):
"""
# Open a new log stream
self.logstream = self.backend.logstreams.open(self.job)
- async def on_close(self):
+ def on_close(self):
# Close the logstream
- await self.logstream.close()
+ self.logstream.close()
async def on_message(self, message):
message = self._decode_json_message(message)