if h:
try:
h.acquire()
- h.flush()
+ # MemoryHandlers might not want to be flushed on close,
+ # but circular imports prevent us scoping this to just
+ # those handlers. hence the default to True.
+ if getattr(h, 'flushOnClose', True):
+ h.flush()
h.close()
except (OSError, ValueError):
# Ignore errors which might be caused
# assert that no new lines have been added
self.assert_log_lines(lines) # no change
+ def test_shutdown_flush_on_close(self):
+ """
+ Test that the flush-on-close configuration is respected by the
+ shutdown method.
+ """
+ self.mem_logger.debug(self.next_message())
+ self.assert_log_lines([])
+ self.mem_logger.info(self.next_message())
+ self.assert_log_lines([])
+ # Default behaviour is to flush on close. Check that it happens.
+ logging.shutdown(handlerList=[logging.weakref.ref(self.mem_hdlr)])
+ lines = [
+ ('DEBUG', '1'),
+ ('INFO', '2'),
+ ]
+ self.assert_log_lines(lines)
+ # Now configure for flushing not to be done on close.
+ self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING,
+ self.root_hdlr,
+ False)
+ self.mem_logger.addHandler(self.mem_hdlr)
+ self.mem_logger.debug(self.next_message())
+ self.assert_log_lines(lines) # no change
+ self.mem_logger.info(self.next_message())
+ self.assert_log_lines(lines) # no change
+ # assert that no new lines have been added after shutdown
+ logging.shutdown(handlerList=[logging.weakref.ref(self.mem_hdlr)])
+ self.assert_log_lines(lines) # no change
+
@threading_helper.requires_working_threading()
def test_race_between_set_target_and_flush(self):
class MockRaceConditionHandler: