import test.support
import test.support.script_helper
from test import support
+from test.support import hashlib_helper
from test.support import socket_helper
from test.support import threading_helper
# Make queue finalizer run before the server is stopped
del queue
+
+@hashlib_helper.requires_hashdigest('md5')
class _TestManagerRestart(BaseTestCase):
@classmethod
#
@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
+@hashlib_helper.requires_hashdigest('md5')
class _TestPicklingConnections(BaseTestCase):
ALLOWED_TYPES = ('processes',)
@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory")
+@hashlib_helper.requires_hashdigest('md5')
class _TestSharedMemory(BaseTestCase):
ALLOWED_TYPES = ('processes',)
+@hashlib_helper.requires_hashdigest('md5')
class OtherTest(unittest.TestCase):
# TODO: add more tests for deliver/answer challenge.
def test_deliver_challenge_auth_failure(self):
def initializer(ns):
ns.test += 1
+@hashlib_helper.requires_hashdigest('md5')
class TestInitializers(unittest.TestCase):
def setUp(self):
self.mgr = multiprocessing.Manager()
any(process.is_alive() for process in forked_processes))
+@hashlib_helper.requires_hashdigest('md5')
class TestSyncManagerTypes(unittest.TestCase):
"""Test all the types which can be shared between a parent and a
child process by using a manager which acts as an intermediary
Mixin = local_globs[type_.capitalize() + 'Mixin']
class Temp(base, Mixin, unittest.TestCase):
pass
+ if type_ == 'manager':
+ Temp = hashlib_helper.requires_hashdigest('md5')(Temp)
Temp.__name__ = Temp.__qualname__ = newname
Temp.__module__ = __module__
remote_globs[newname] = Temp
# Skip tests if sem_open implementation is broken.
support.import_module('multiprocessing.synchronize')
+from test.support import hashlib_helper
from test.support.script_helper import assert_python_ok
import contextlib
self.assertIn('raise RuntimeError(123) # some comment',
f1.getvalue())
+ @hashlib_helper.requires_hashdigest('md5')
def test_ressources_gced_in_workers(self):
# Ensure that argument for a job are correctly gc-ed after the job
# is finished