If the ``queue`` key is present, the corresponding value can be one of the following:
-* An object implementing the :class:`queue.Queue` public API. For instance,
- this may be an actual instance of :class:`queue.Queue` or a subclass thereof,
- or a proxy obtained by :meth:`multiprocessing.managers.SyncManager.Queue`.
+* An object implementing the :meth:`Queue.put_nowait <queue.Queue.put_nowait>`
+ and :meth:`Queue.get <queue.Queue.get>` public API. For instance, this may be
+ an actual instance of :class:`queue.Queue` or a subclass thereof, or a proxy
+ obtained by :meth:`multiprocessing.managers.SyncManager.Queue`.
This is of course only possible if you are constructing or modifying
the configuration dictionary in code.
* A string that resolves to a callable which, when called with no arguments, returns
- the :class:`queue.Queue` instance to use. That callable could be a
- :class:`queue.Queue` subclass or a function which returns a suitable queue instance,
+ the queue instance to use. That callable could be a :class:`queue.Queue` subclass
+ or a function which returns a suitable queue instance,
such as ``my.module.queue_factory()``.
* A dict with a ``'()'`` key which is constructed in the usual way as discussed in
def _is_queue_like_object(obj):
"""Check that *obj* implements the Queue API."""
- if isinstance(obj, queue.Queue):
+ if isinstance(obj, (queue.Queue, queue.SimpleQueue)):
return True
# defer importing multiprocessing as much as possible
from multiprocessing.queues import Queue as MPQueue
# Ideally, we would have wanted to simply use strict type checking
# instead of a protocol-based type checking since the latter does
# not check the method signatures.
- queue_interface = [
- 'empty', 'full', 'get', 'get_nowait',
- 'put', 'put_nowait', 'join', 'qsize',
- 'task_done',
- ]
+ #
+ # Note that only 'put_nowait' and 'get' are required by the logging
+ # queue handler and queue listener (see gh-124653) and that other
+ # methods are either optional or unused.
+ minimal_queue_interface = ['put_nowait', 'get']
return all(callable(getattr(obj, method, None))
- for method in queue_interface)
+ for method in minimal_queue_interface)
class DictConfigurator(BaseConfigurator):
"""
return getattr(queue, attribute)
class CustomQueueFakeProtocol(CustomQueueProtocol):
- # An object implementing the Queue API (incorrect signatures).
+ # An object implementing the minimial Queue API for
+ # the logging module but with incorrect signatures.
+ #
# The object will be considered a valid queue class since we
# do not check the signatures (only callability of methods)
# but will NOT be usable in production since a TypeError will
- # be raised due to a missing argument.
- def empty(self, x):
+ # be raised due to the extra argument in 'put_nowait'.
+ def put_nowait(self):
pass
class CustomQueueWrongProtocol(CustomQueueProtocol):
- empty = None
+ put_nowait = None
+
+class MinimalQueueProtocol:
+ def put_nowait(self, x): pass
+ def get(self): pass
def queueMaker():
return queue.Queue()
msg = str(ctx.exception)
self.assertEqual(msg, "Unable to configure handler 'ah'")
+ def _apply_simple_queue_listener_configuration(self, qspec):
+ self.apply_config({
+ "version": 1,
+ "handlers": {
+ "queue_listener": {
+ "class": "logging.handlers.QueueHandler",
+ "queue": qspec,
+ },
+ },
+ })
+
@threading_helper.requires_working_threading()
@support.requires_subprocess()
@patch("multiprocessing.Manager")
def test_config_queue_handler_does_not_create_multiprocessing_manager(self, manager):
- # gh-120868, gh-121723
-
- from multiprocessing import Queue as MQ
-
- q1 = {"()": "queue.Queue", "maxsize": -1}
- q2 = MQ()
- q3 = queue.Queue()
- # CustomQueueFakeProtocol passes the checks but will not be usable
- # since the signatures are incompatible. Checking the Queue API
- # without testing the type of the actual queue is a trade-off
- # between usability and the work we need to do in order to safely
- # check that the queue object correctly implements the API.
- q4 = CustomQueueFakeProtocol()
-
- for qspec in (q1, q2, q3, q4):
- self.apply_config(
- {
- "version": 1,
- "handlers": {
- "queue_listener": {
- "class": "logging.handlers.QueueHandler",
- "queue": qspec,
- },
- },
- }
- )
- manager.assert_not_called()
+ # gh-120868, gh-121723, gh-124653
+
+ for qspec in [
+ {"()": "queue.Queue", "maxsize": -1},
+ queue.Queue(),
+ # queue.SimpleQueue does not inherit from queue.Queue
+ queue.SimpleQueue(),
+ # CustomQueueFakeProtocol passes the checks but will not be usable
+ # since the signatures are incompatible. Checking the Queue API
+ # without testing the type of the actual queue is a trade-off
+ # between usability and the work we need to do in order to safely
+ # check that the queue object correctly implements the API.
+ CustomQueueFakeProtocol(),
+ MinimalQueueProtocol(),
+ ]:
+ with self.subTest(qspec=qspec):
+ self._apply_simple_queue_listener_configuration(qspec)
+ manager.assert_not_called()
@patch("multiprocessing.Manager")
def test_config_queue_handler_invalid_config_does_not_create_multiprocessing_manager(self, manager):
# gh-120868, gh-121723
for qspec in [object(), CustomQueueWrongProtocol()]:
- with self.assertRaises(ValueError):
- self.apply_config(
- {
- "version": 1,
- "handlers": {
- "queue_listener": {
- "class": "logging.handlers.QueueHandler",
- "queue": qspec,
- },
- },
- }
- )
- manager.assert_not_called()
+ with self.subTest(qspec=qspec), self.assertRaises(ValueError):
+ self._apply_simple_queue_listener_configuration(qspec)
+ manager.assert_not_called()
+
+ @skip_if_tsan_fork
+ @support.requires_subprocess()
+ @unittest.skipUnless(support.Py_DEBUG, "requires a debug build for testing"
+ " assertions in multiprocessing")
+ def test_config_reject_simple_queue_handler_multiprocessing_context(self):
+ # multiprocessing.SimpleQueue does not implement 'put_nowait'
+ # and thus cannot be used as a queue-like object (gh-124653)
+
+ import multiprocessing
+
+ if support.MS_WINDOWS:
+ start_methods = ['spawn']
+ else:
+ start_methods = ['spawn', 'fork', 'forkserver']
+
+ for start_method in start_methods:
+ with self.subTest(start_method=start_method):
+ ctx = multiprocessing.get_context(start_method)
+ qspec = ctx.SimpleQueue()
+ with self.assertRaises(ValueError):
+ self._apply_simple_queue_listener_configuration(qspec)
@skip_if_tsan_fork
@support.requires_subprocess()