]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-121723: Relax constraints on queue objects for `logging.handlers.QueueHandler...
authorBénédikt Tran <10796600+picnixz@users.noreply.github.com>
Fri, 2 Aug 2024 11:16:32 +0000 (13:16 +0200)
committerGitHub <noreply@github.com>
Fri, 2 Aug 2024 11:16:32 +0000 (12:16 +0100)
Doc/library/logging.config.rst
Lib/logging/config.py
Lib/test/test_logging.py
Misc/NEWS.d/next/Library/2024-07-23-10-59-38.gh-issue-121723.iJEf7e.rst [new file with mode: 0644]

index dfbf0b1cf2f9ff9da12deb32fd03c41cd16a1ff8..0ddbc1a5f880480f37bc913b8170e6476db453ae 100644 (file)
@@ -753,9 +753,12 @@ The ``queue`` and ``listener`` keys are optional.
 
 If the ``queue`` key is present, the corresponding value can be one of the following:
 
-* An actual instance of :class:`queue.Queue` or a subclass thereof. This is of course
-  only possible if you are constructing or modifying the configuration dictionary in
-  code.
+* An object implementing the :class:`queue.Queue` public API. For instance,
+  this may be an actual instance of :class:`queue.Queue` or a subclass thereof,
+  or a proxy obtained by :meth:`multiprocessing.managers.SyncManager.Queue`.
+
+  This is of course only possible if you are constructing or modifying
+  the configuration dictionary in code.
 
 * A string that resolves to a callable which, when called with no arguments, returns
   the :class:`queue.Queue` instance to use. That callable could be a
index 95e129ae988c24fd1bbb562ecdcecaef4a9b714b..3781cb1aeb9ae2193df498cd092aeeea2dd3cceb 100644 (file)
@@ -497,6 +497,33 @@ class BaseConfigurator(object):
             value = tuple(value)
         return value
 
+def _is_queue_like_object(obj):
+    """Check that *obj* implements the Queue API."""
+    if isinstance(obj, queue.Queue):
+        return True
+    # defer importing multiprocessing as much as possible
+    from multiprocessing.queues import Queue as MPQueue
+    if isinstance(obj, MPQueue):
+        return True
+    # Depending on the multiprocessing start context, we cannot create
+    # a multiprocessing.managers.BaseManager instance 'mm' to get the
+    # runtime type of mm.Queue() or mm.JoinableQueue() (see gh-119819).
+    #
+    # Since we only need an object implementing the Queue API, we only
+    # do a protocol check, but we do not use typing.runtime_checkable()
+    # and typing.Protocol to reduce import time (see gh-121723).
+    #
+    # Ideally, we would have wanted to simply use strict type checking
+    # instead of a protocol-based type checking since the latter does
+    # not check the method signatures.
+    queue_interface = [
+        'empty', 'full', 'get', 'get_nowait',
+        'put', 'put_nowait', 'join', 'qsize',
+        'task_done',
+    ]
+    return all(callable(getattr(obj, method, None))
+               for method in queue_interface)
+
 class DictConfigurator(BaseConfigurator):
     """
     Configure logging using a dictionary-like object to describe the
@@ -791,32 +818,8 @@ class DictConfigurator(BaseConfigurator):
                         if '()' not in qspec:
                             raise TypeError('Invalid queue specifier %r' % qspec)
                         config['queue'] = self.configure_custom(dict(qspec))
-                    else:
-                        from multiprocessing.queues import Queue as MPQueue
-
-                        if not isinstance(qspec, (queue.Queue, MPQueue)):
-                            # Safely check if 'qspec' is an instance of Manager.Queue
-                            # / Manager.JoinableQueue
-
-                            from multiprocessing import Manager as MM
-                            from multiprocessing.managers import BaseProxy
-
-                            # if it's not an instance of BaseProxy, it also can't be
-                            # an instance of Manager.Queue / Manager.JoinableQueue
-                            if isinstance(qspec, BaseProxy):
-                                # Sometimes manager or queue creation might fail
-                                # (e.g. see issue gh-120868). In that case, any
-                                # exception during the creation of these queues will
-                                # propagate up to the caller and be wrapped in a
-                                # `ValueError`, whose cause will indicate the details of
-                                # the failure.
-                                mm = MM()
-                                proxy_queue = mm.Queue()
-                                proxy_joinable_queue = mm.JoinableQueue()
-                                if not isinstance(qspec, (type(proxy_queue), type(proxy_joinable_queue))):
-                                    raise TypeError('Invalid queue specifier %r' % qspec)
-                            else:
-                                raise TypeError('Invalid queue specifier %r' % qspec)
+                    elif not _is_queue_like_object(qspec):
+                        raise TypeError('Invalid queue specifier %r' % qspec)
 
                 if 'listener' in config:
                     lspec = config['listener']
index 6d688d4b81bbf4f836f23a12eb8669323ce5c495..49523756e115c625cb0d308a12367c7e619d1828 100644 (file)
@@ -2368,6 +2368,26 @@ class CustomListener(logging.handlers.QueueListener):
 class CustomQueue(queue.Queue):
     pass
 
+class CustomQueueProtocol:
+    def __init__(self, maxsize=0):
+        self.queue = queue.Queue(maxsize)
+
+    def __getattr__(self, attribute):
+        queue = object.__getattribute__(self, 'queue')
+        return getattr(queue, attribute)
+
+class CustomQueueFakeProtocol(CustomQueueProtocol):
+    # An object implementing the Queue API (incorrect signatures).
+    # The object will be considered a valid queue class since we
+    # do not check the signatures (only callability of methods)
+    # but will NOT be usable in production since a TypeError will
+    # be raised due to a missing argument.
+    def empty(self, x):
+        pass
+
+class CustomQueueWrongProtocol(CustomQueueProtocol):
+    empty = None
+
 def queueMaker():
     return queue.Queue()
 
@@ -3901,18 +3921,16 @@ class ConfigDictTest(BaseTest):
     @threading_helper.requires_working_threading()
     @support.requires_subprocess()
     def test_config_queue_handler(self):
-        q = CustomQueue()
-        dq = {
-            '()': __name__ + '.CustomQueue',
-            'maxsize': 10
-        }
+        qs = [CustomQueue(), CustomQueueProtocol()]
+        dqs = [{'()': f'{__name__}.{cls}', 'maxsize': 10}
+               for cls in ['CustomQueue', 'CustomQueueProtocol']]
         dl = {
             '()': __name__ + '.listenerMaker',
             'arg1': None,
             'arg2': None,
             'respect_handler_level': True
         }
-        qvalues = (None, __name__ + '.queueMaker', __name__ + '.CustomQueue', dq, q)
+        qvalues = (None, __name__ + '.queueMaker', __name__ + '.CustomQueue', *dqs, *qs)
         lvalues = (None, __name__ + '.CustomListener', dl, CustomListener)
         for qspec, lspec in itertools.product(qvalues, lvalues):
             self.do_queuehandler_configuration(qspec, lspec)
@@ -3932,15 +3950,21 @@ class ConfigDictTest(BaseTest):
     @support.requires_subprocess()
     @patch("multiprocessing.Manager")
     def test_config_queue_handler_does_not_create_multiprocessing_manager(self, manager):
-        # gh-120868
+        # gh-120868, gh-121723
 
         from multiprocessing import Queue as MQ
 
         q1 = {"()": "queue.Queue", "maxsize": -1}
         q2 = MQ()
         q3 = queue.Queue()
-
-        for qspec in (q1, q2, q3):
+        # CustomQueueFakeProtocol passes the checks but will not be usable
+        # since the signatures are incompatible. Checking the Queue API
+        # without testing the type of the actual queue is a trade-off
+        # between usability and the work we need to do in order to safely
+        # check that the queue object correctly implements the API.
+        q4 = CustomQueueFakeProtocol()
+
+        for qspec in (q1, q2, q3, q4):
             self.apply_config(
                 {
                     "version": 1,
@@ -3956,21 +3980,62 @@ class ConfigDictTest(BaseTest):
 
     @patch("multiprocessing.Manager")
     def test_config_queue_handler_invalid_config_does_not_create_multiprocessing_manager(self, manager):
-        # gh-120868
+        # gh-120868, gh-121723
 
-        with self.assertRaises(ValueError):
-            self.apply_config(
-                {
-                    "version": 1,
-                    "handlers": {
-                        "queue_listener": {
-                            "class": "logging.handlers.QueueHandler",
-                            "queue": object(),
+        for qspec in [object(), CustomQueueWrongProtocol()]:
+            with self.assertRaises(ValueError):
+                self.apply_config(
+                    {
+                        "version": 1,
+                        "handlers": {
+                            "queue_listener": {
+                                "class": "logging.handlers.QueueHandler",
+                                "queue": qspec,
+                            },
                         },
-                    },
+                    }
+                )
+            manager.assert_not_called()
+
+    @skip_if_tsan_fork
+    @support.requires_subprocess()
+    @unittest.skipUnless(support.Py_DEBUG, "requires a debug build for testing"
+                                           "assertions in multiprocessing")
+    def test_config_queue_handler_multiprocessing_context(self):
+        # regression test for gh-121723
+        if support.MS_WINDOWS:
+            start_methods = ['spawn']
+        else:
+            start_methods = ['spawn', 'fork', 'forkserver']
+        for start_method in start_methods:
+            with self.subTest(start_method=start_method):
+                ctx = multiprocessing.get_context(start_method)
+                with ctx.Manager() as manager:
+                    q = manager.Queue()
+                    records = []
+                    # use 1 process and 1 task per child to put 1 record
+                    with ctx.Pool(1, initializer=self._mpinit_issue121723,
+                                  initargs=(q, "text"), maxtasksperchild=1):
+                        records.append(q.get(timeout=60))
+                    self.assertTrue(q.empty())
+                self.assertEqual(len(records), 1)
+
+    @staticmethod
+    def _mpinit_issue121723(qspec, message_to_log):
+        # static method for pickling support
+        logging.config.dictConfig({
+            'version': 1,
+            'disable_existing_loggers': True,
+            'handlers': {
+                'log_to_parent': {
+                    'class': 'logging.handlers.QueueHandler',
+                    'queue': qspec
                 }
-            )
-        manager.assert_not_called()
+            },
+            'root': {'handlers': ['log_to_parent'], 'level': 'DEBUG'}
+        })
+        # log a message (this creates a record put in the queue)
+        logging.getLogger().info(message_to_log)
 
     @skip_if_tsan_fork
     @support.requires_subprocess()
diff --git a/Misc/NEWS.d/next/Library/2024-07-23-10-59-38.gh-issue-121723.iJEf7e.rst b/Misc/NEWS.d/next/Library/2024-07-23-10-59-38.gh-issue-121723.iJEf7e.rst
new file mode 100644 (file)
index 0000000..cabb402
--- /dev/null
@@ -0,0 +1,3 @@
+Make :func:`logging.config.dictConfig` accept any object implementing the
+Queue public API. See the :ref:`queue configuration <configure-queue>`
+section for details. Patch by Bénédikt Tran.