their parent process exits. The manager classes are defined in the
:mod:`multiprocessing.managers` module:
-.. class:: BaseManager([address[, authkey]])
+.. class:: BaseManager(address=None, authkey=None, serializer='pickle', ctx=None, *, shutdown_timeout=1.0)
Create a BaseManager object.
*authkey* is ``None`` then ``current_process().authkey`` is used.
Otherwise *authkey* is used and it must be a byte string.
+ *serializer* must be ``'pickle'`` (use :mod:`pickle` serialization) or
+ ``'xmlrpclib'`` (use :mod:`xmlrpc.client` serialization).
+
+ *ctx* is a context object, or ``None`` (use the current context). See the
+ :func:`get_context` function.
+
+ *shutdown_timeout* is a timeout in seconds used to wait until the process
+ used by the manager completes in the :meth:`shutdown` method. If the
+ shutdown times out, the process is terminated. If terminating the process
+ also times out, the process is killed.
+
+ .. versionchanged: 3.11
+ Added the *shutdown_timeout* parameter.
+
.. method:: start([initializer[, initargs]])
Start a subprocess to start the manager. If *initializer* is not ``None``
_Server = Server
def __init__(self, address=None, authkey=None, serializer='pickle',
- ctx=None):
+ ctx=None, *, shutdown_timeout=1.0):
if authkey is None:
authkey = process.current_process().authkey
self._address = address # XXX not final address if eg ('', 0)
self._serializer = serializer
self._Listener, self._Client = listener_client[serializer]
self._ctx = ctx or get_context()
+ self._shutdown_timeout = shutdown_timeout
def get_server(self):
'''
self._state.value = State.STARTED
self.shutdown = util.Finalize(
self, type(self)._finalize_manager,
- args=(self._process, self._address, self._authkey,
- self._state, self._Client),
+ args=(self._process, self._address, self._authkey, self._state,
+ self._Client, self._shutdown_timeout),
exitpriority=0
)
self.shutdown()
@staticmethod
- def _finalize_manager(process, address, authkey, state, _Client):
+ def _finalize_manager(process, address, authkey, state, _Client,
+ shutdown_timeout):
'''
Shutdown the manager process; will be registered as a finalizer
'''
except Exception:
pass
- process.join(timeout=1.0)
+ process.join(timeout=shutdown_timeout)
if process.is_alive():
util.info('manager still alive')
if hasattr(process, 'terminate'):
util.info('trying to `terminate()` manager process')
process.terminate()
- process.join(timeout=0.1)
+ process.join(timeout=shutdown_timeout)
if process.is_alive():
util.info('manager still alive after terminate')
+ process.kill()
+ process.join()
state.value = State.SHUTDOWN
try:
else:
TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
+# BaseManager.shutdown_timeout
+SHUTDOWN_TIMEOUT = support.SHORT_TIMEOUT
+
HAVE_GETVALUE = not getattr(_multiprocessing,
'HAVE_BROKEN_SEM_GETVALUE', False)
ALLOWED_TYPES = ('manager',)
def test_mymanager(self):
- manager = MyManager()
+ manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT)
manager.start()
self.common(manager)
manager.shutdown()
self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
def test_mymanager_context(self):
- with MyManager() as manager:
+ manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT)
+ with manager:
self.common(manager)
# bpo-30356: BaseManager._finalize_manager() sends SIGTERM
# to the manager process if it takes longer than 1 second to stop,
self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
def test_mymanager_context_prestarted(self):
- manager = MyManager()
+ manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT)
manager.start()
with manager:
self.common(manager)
@classmethod
def _putter(cls, address, authkey):
manager = QueueManager2(
- address=address, authkey=authkey, serializer=SERIALIZER
- )
+ address=address, authkey=authkey, serializer=SERIALIZER,
+ shutdown_timeout=SHUTDOWN_TIMEOUT)
manager.connect()
queue = manager.get_queue()
# Note that xmlrpclib will deserialize object as a list not a tuple
authkey = os.urandom(32)
manager = QueueManager(
- address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER
- )
+ address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER,
+ shutdown_timeout=SHUTDOWN_TIMEOUT)
manager.start()
self.addCleanup(manager.shutdown)
p.start()
manager2 = QueueManager2(
- address=manager.address, authkey=authkey, serializer=SERIALIZER
- )
+ address=manager.address, authkey=authkey, serializer=SERIALIZER,
+ shutdown_timeout=SHUTDOWN_TIMEOUT)
manager2.connect()
queue = manager2.get_queue()
@classmethod
def _putter(cls, address, authkey):
manager = QueueManager(
- address=address, authkey=authkey, serializer=SERIALIZER)
+ address=address, authkey=authkey, serializer=SERIALIZER,
+ shutdown_timeout=SHUTDOWN_TIMEOUT)
manager.connect()
queue = manager.get_queue()
queue.put('hello world')
def test_rapid_restart(self):
authkey = os.urandom(32)
manager = QueueManager(
- address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER)
+ address=(socket_helper.HOST, 0), authkey=authkey,
+ serializer=SERIALIZER, shutdown_timeout=SHUTDOWN_TIMEOUT)
try:
srvr = manager.get_server()
addr = srvr.address
manager.shutdown()
manager = QueueManager(
- address=addr, authkey=authkey, serializer=SERIALIZER)
+ address=addr, authkey=authkey, serializer=SERIALIZER,
+ shutdown_timeout=SHUTDOWN_TIMEOUT)
try:
manager.start()
self.addCleanup(manager.shutdown)
# (sporadic failure on buildbots)
time.sleep(1.0)
manager = QueueManager(
- address=addr, authkey=authkey, serializer=SERIALIZER)
+ address=addr, authkey=authkey, serializer=SERIALIZER,
+ shutdown_timeout=SHUTDOWN_TIMEOUT)
if hasattr(manager, "shutdown"):
self.addCleanup(manager.shutdown)