]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-91231: Add shutdown_timeout to multiprocessing BaseManager (#32112)
authorVictor Stinner <vstinner@python.org>
Tue, 19 Apr 2022 14:27:00 +0000 (16:27 +0200)
committerGitHub <noreply@github.com>
Tue, 19 Apr 2022 14:27:00 +0000 (16:27 +0200)
Add an optional keyword 'shutdown_timeout' parameter to the
multiprocessing.BaseManager constructor. Kill the process if
terminate() takes longer than the timeout.

Multiprocessing tests pass test.support.SHORT_TIMEOUT
to BaseManager.shutdown_timeout.

Doc/library/multiprocessing.rst
Lib/multiprocessing/managers.py
Lib/test/_test_multiprocessing.py
Misc/NEWS.d/next/Library/2022-04-19-15-30-06.gh-issue-91231.AWy4Cs.rst [new file with mode: 0644]

index ee40688781690d14726eb6b9cd91330fe5df3529..83aa5cb87f49f31f7087835071047d08ffb1e32f 100644 (file)
@@ -1676,7 +1676,7 @@ Manager processes will be shutdown as soon as they are garbage collected or
 their parent process exits.  The manager classes are defined in the
 :mod:`multiprocessing.managers` module:
 
-.. class:: BaseManager([address[, authkey]])
+.. class:: BaseManager(address=None, authkey=None, serializer='pickle', ctx=None, *, shutdown_timeout=1.0)
 
    Create a BaseManager object.
 
@@ -1691,6 +1691,20 @@ their parent process exits.  The manager classes are defined in the
    *authkey* is ``None`` then ``current_process().authkey`` is used.
    Otherwise *authkey* is used and it must be a byte string.
 
+   *serializer* must be ``'pickle'`` (use :mod:`pickle` serialization) or
+   ``'xmlrpclib'`` (use :mod:`xmlrpc.client` serialization).
+
+   *ctx* is a context object, or ``None`` (use the current context). See the
+   :func:`get_context` function.
+
+   *shutdown_timeout* is a timeout in seconds used to wait until the process
+   used by the manager completes in the :meth:`shutdown` method. If the
+   shutdown times out, the process is terminated. If terminating the process
+   also times out, the process is killed.
+
+   .. versionchanged: 3.11
+      Added the *shutdown_timeout* parameter.
+
    .. method:: start([initializer[, initargs]])
 
       Start a subprocess to start the manager.  If *initializer* is not ``None``
index d97381926d47bcece939ec63469a5d7322702185..3f6479b7e3a693a14324631365183281e54700b6 100644 (file)
@@ -497,7 +497,7 @@ class BaseManager(object):
     _Server = Server
 
     def __init__(self, address=None, authkey=None, serializer='pickle',
-                 ctx=None):
+                 ctx=None, *, shutdown_timeout=1.0):
         if authkey is None:
             authkey = process.current_process().authkey
         self._address = address     # XXX not final address if eg ('', 0)
@@ -507,6 +507,7 @@ class BaseManager(object):
         self._serializer = serializer
         self._Listener, self._Client = listener_client[serializer]
         self._ctx = ctx or get_context()
+        self._shutdown_timeout = shutdown_timeout
 
     def get_server(self):
         '''
@@ -570,8 +571,8 @@ class BaseManager(object):
         self._state.value = State.STARTED
         self.shutdown = util.Finalize(
             self, type(self)._finalize_manager,
-            args=(self._process, self._address, self._authkey,
-                  self._state, self._Client),
+            args=(self._process, self._address, self._authkey, self._state,
+                  self._Client, self._shutdown_timeout),
             exitpriority=0
             )
 
@@ -656,7 +657,8 @@ class BaseManager(object):
         self.shutdown()
 
     @staticmethod
-    def _finalize_manager(process, address, authkey, state, _Client):
+    def _finalize_manager(process, address, authkey, state, _Client,
+                          shutdown_timeout):
         '''
         Shutdown the manager process; will be registered as a finalizer
         '''
@@ -671,15 +673,17 @@ class BaseManager(object):
             except Exception:
                 pass
 
-            process.join(timeout=1.0)
+            process.join(timeout=shutdown_timeout)
             if process.is_alive():
                 util.info('manager still alive')
                 if hasattr(process, 'terminate'):
                     util.info('trying to `terminate()` manager process')
                     process.terminate()
-                    process.join(timeout=0.1)
+                    process.join(timeout=shutdown_timeout)
                     if process.is_alive():
                         util.info('manager still alive after terminate')
+                        process.kill()
+                        process.join()
 
         state.value = State.SHUTDOWN
         try:
index bb73d9e7cc75e4a22b4e9d7ac8f01aee61af768f..427fc0c47a3cab4358e8a8b3b16df529ff599363 100644 (file)
@@ -119,6 +119,9 @@ if CHECK_TIMINGS:
 else:
     TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
 
+# BaseManager.shutdown_timeout
+SHUTDOWN_TIMEOUT = support.SHORT_TIMEOUT
+
 HAVE_GETVALUE = not getattr(_multiprocessing,
                             'HAVE_BROKEN_SEM_GETVALUE', False)
 
@@ -2897,7 +2900,7 @@ class _TestMyManager(BaseTestCase):
     ALLOWED_TYPES = ('manager',)
 
     def test_mymanager(self):
-        manager = MyManager()
+        manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT)
         manager.start()
         self.common(manager)
         manager.shutdown()
@@ -2908,7 +2911,8 @@ class _TestMyManager(BaseTestCase):
         self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
 
     def test_mymanager_context(self):
-        with MyManager() as manager:
+        manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT)
+        with manager:
             self.common(manager)
         # bpo-30356: BaseManager._finalize_manager() sends SIGTERM
         # to the manager process if it takes longer than 1 second to stop,
@@ -2916,7 +2920,7 @@ class _TestMyManager(BaseTestCase):
         self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
 
     def test_mymanager_context_prestarted(self):
-        manager = MyManager()
+        manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT)
         manager.start()
         with manager:
             self.common(manager)
@@ -2978,8 +2982,8 @@ class _TestRemoteManager(BaseTestCase):
     @classmethod
     def _putter(cls, address, authkey):
         manager = QueueManager2(
-            address=address, authkey=authkey, serializer=SERIALIZER
-            )
+            address=address, authkey=authkey, serializer=SERIALIZER,
+            shutdown_timeout=SHUTDOWN_TIMEOUT)
         manager.connect()
         queue = manager.get_queue()
         # Note that xmlrpclib will deserialize object as a list not a tuple
@@ -2989,8 +2993,8 @@ class _TestRemoteManager(BaseTestCase):
         authkey = os.urandom(32)
 
         manager = QueueManager(
-            address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER
-            )
+            address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER,
+            shutdown_timeout=SHUTDOWN_TIMEOUT)
         manager.start()
         self.addCleanup(manager.shutdown)
 
@@ -2999,8 +3003,8 @@ class _TestRemoteManager(BaseTestCase):
         p.start()
 
         manager2 = QueueManager2(
-            address=manager.address, authkey=authkey, serializer=SERIALIZER
-            )
+            address=manager.address, authkey=authkey, serializer=SERIALIZER,
+            shutdown_timeout=SHUTDOWN_TIMEOUT)
         manager2.connect()
         queue = manager2.get_queue()
 
@@ -3020,7 +3024,8 @@ class _TestManagerRestart(BaseTestCase):
     @classmethod
     def _putter(cls, address, authkey):
         manager = QueueManager(
-            address=address, authkey=authkey, serializer=SERIALIZER)
+            address=address, authkey=authkey, serializer=SERIALIZER,
+            shutdown_timeout=SHUTDOWN_TIMEOUT)
         manager.connect()
         queue = manager.get_queue()
         queue.put('hello world')
@@ -3028,7 +3033,8 @@ class _TestManagerRestart(BaseTestCase):
     def test_rapid_restart(self):
         authkey = os.urandom(32)
         manager = QueueManager(
-            address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER)
+            address=(socket_helper.HOST, 0), authkey=authkey,
+            serializer=SERIALIZER, shutdown_timeout=SHUTDOWN_TIMEOUT)
         try:
             srvr = manager.get_server()
             addr = srvr.address
@@ -3048,7 +3054,8 @@ class _TestManagerRestart(BaseTestCase):
                 manager.shutdown()
 
         manager = QueueManager(
-            address=addr, authkey=authkey, serializer=SERIALIZER)
+            address=addr, authkey=authkey, serializer=SERIALIZER,
+            shutdown_timeout=SHUTDOWN_TIMEOUT)
         try:
             manager.start()
             self.addCleanup(manager.shutdown)
@@ -3059,7 +3066,8 @@ class _TestManagerRestart(BaseTestCase):
             # (sporadic failure on buildbots)
             time.sleep(1.0)
             manager = QueueManager(
-                address=addr, authkey=authkey, serializer=SERIALIZER)
+                address=addr, authkey=authkey, serializer=SERIALIZER,
+                shutdown_timeout=SHUTDOWN_TIMEOUT)
             if hasattr(manager, "shutdown"):
                 self.addCleanup(manager.shutdown)
 
diff --git a/Misc/NEWS.d/next/Library/2022-04-19-15-30-06.gh-issue-91231.AWy4Cs.rst b/Misc/NEWS.d/next/Library/2022-04-19-15-30-06.gh-issue-91231.AWy4Cs.rst
new file mode 100644 (file)
index 0000000..a61fd8b
--- /dev/null
@@ -0,0 +1,3 @@
+Add an optional keyword *shutdown_timeout* parameter to the
+:class:`multiprocessing.BaseManager` constructor. Kill the process if
+terminate() takes longer than the timeout. Patch by Victor Stinner.