of each message with the handler's level, and only passes a message to a
handler if it's appropriate to do so.
+.. versionchanged:: next
+ The :class:`QueueListener` can be started (and stopped) via the
+ :keyword:`with` statement. For example:
+
+ .. code-block:: python
+
+ with QueueListener(que, handler) as listener:
+ # The queue listener automatically starts
+ # when the 'with' block is entered.
+ pass
+ # The queue listener automatically stops once
+ # the 'with' block is exited.
+
.. _network-logging:
Sending and receiving logging events across a network
.. versionchanged:: 3.5
The ``respect_handler_level`` argument was added.
+ .. versionchanged:: next
+ :class:`QueueListener` can now be used as a context manager via
+ :keyword:`with`. When entering the context, the listener is started. When
+ exiting the context, the listener is stopped.
+ :meth:`~contextmanager.__enter__` returns the
+ :class:`QueueListener` object.
+
.. method:: dequeue(block)
Dequeues a record and return it, optionally blocking.
self._thread = None
self.respect_handler_level = respect_handler_level
+ def __enter__(self):
+ """
+ For use as a context manager. Starts the listener.
+ """
+ self.start()
+ return self
+
+ def __exit__(self, *args):
+ """
+ For use as a context manager. Stops the listener.
+ """
+ self.stop()
+
def dequeue(self, block):
"""
Dequeue a record and return it, optionally blocking.
self.assertEqual(formatted_msg, log_record.msg)
self.assertEqual(formatted_msg, log_record.message)
- @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'),
- 'logging.handlers.QueueListener required for this test')
def test_queue_listener(self):
handler = TestHandler(support.Matcher())
listener = logging.handlers.QueueListener(self.queue, handler)
self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='6'))
handler.close()
- @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'),
- 'logging.handlers.QueueListener required for this test')
+ def test_queue_listener_context_manager(self):
+ handler = TestHandler(support.Matcher())
+ with logging.handlers.QueueListener(self.queue, handler) as listener:
+ self.assertIsInstance(listener, logging.handlers.QueueListener)
+ self.assertIsNotNone(listener._thread)
+ self.assertIsNone(listener._thread)
+
+ # doesn't hurt to call stop() more than once.
+ listener.stop()
+ self.assertIsNone(listener._thread)
+
def test_queue_listener_with_StreamHandler(self):
# Test that traceback and stack-info only appends once (bpo-34334, bpo-46755).
listener = logging.handlers.QueueListener(self.queue, self.root_hdlr)
self.assertEqual(self.stream.getvalue().strip().count('Traceback'), 1)
self.assertEqual(self.stream.getvalue().strip().count('Stack'), 1)
- @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'),
- 'logging.handlers.QueueListener required for this test')
def test_queue_listener_with_multiple_handlers(self):
# Test that queue handler format doesn't affect other handler formats (bpo-35726).
self.que_hdlr.setFormatter(self.root_formatter)