From 1d4be0a65f66c2eecefb8b8bd4d115a82fa61067 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Fri, 1 Jun 2018 19:39:10 +0200 Subject: [PATCH] bpo-33532: Fix test_multiprocessing_forkserver.test_ignore() (GH-7323) Use also support.SOCK_MAX_SIZE, not only support.PIPE_MAX_SIZE, to get the size for a blocking send into a multiprocessing pipe. Replace also test_support with support. --- Lib/test/test_multiprocessing.py | 48 +++++++++++++++++--------------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py index b293e2fbc4ea..b6747effc2c5 100644 --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/test_multiprocessing.py @@ -16,16 +16,16 @@ import logging import errno import weakref import test.script_helper -from test import test_support +from test import support from StringIO import StringIO -_multiprocessing = test_support.import_module('_multiprocessing') +_multiprocessing = support.import_module('_multiprocessing') # import threading after _multiprocessing to raise a more relevant error # message: "No module named _multiprocessing". _multiprocessing is not compiled # without thread support. import threading # Work around broken sem_open implementations -test_support.import_module('multiprocessing.synchronize') +support.import_module('multiprocessing.synchronize') import multiprocessing.dummy import multiprocessing.connection @@ -341,8 +341,8 @@ class _TestProcess(BaseTestCase): if self.TYPE == 'threads': self.skipTest('test not appropriate for {}'.format(self.TYPE)) - testfn = test_support.TESTFN - self.addCleanup(test_support.unlink, testfn) + testfn = support.TESTFN + self.addCleanup(support.unlink, testfn) for reason, code in (([1, 2, 3], 1), ('ignore this', 1)): p = self.Process(target=self._test_sys_exit, args=(reason, testfn)) @@ -640,7 +640,7 @@ class _TestQueue(BaseTestCase): p.join() def test_no_import_lock_contention(self): - with test_support.temp_cwd(): + with support.temp_cwd(): module_name = 'imported_by_an_imported_module' with open(module_name + '.py', 'w') as f: f.write("""if 1: @@ -652,7 +652,7 @@ class _TestQueue(BaseTestCase): q.close() """) - with test_support.DirsOnSysPath(os.getcwd()): + with support.DirsOnSysPath(os.getcwd()): try: __import__(module_name) except Queue.Empty: @@ -1517,10 +1517,10 @@ class _TestRemoteManager(BaseTestCase): #'hall\xc3\xa5 v\xc3\xa4rlden'] # UTF-8 ] result = values[:] - if test_support.have_unicode: + if support.have_unicode: #result[-1] = u'hall\xe5 v\xe4rlden' - uvalue = test_support.u(r'\u043f\u0440\u0438\u0432\u0456\u0442 ' - r'\u0441\u0432\u0456\u0442') + uvalue = support.u(r'\u043f\u0440\u0438\u0432\u0456\u0442 ' + r'\u0441\u0432\u0456\u0442') values.append(uvalue) result.append(uvalue) @@ -1538,7 +1538,7 @@ class _TestRemoteManager(BaseTestCase): authkey = os.urandom(32) manager = QueueManager( - address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER + address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER ) manager.start() @@ -1575,7 +1575,7 @@ class _TestManagerRestart(BaseTestCase): def test_rapid_restart(self): authkey = os.urandom(32) manager = QueueManager( - address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER) + address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER) srvr = manager.get_server() addr = srvr.address # Close the connection.Listener socket which gets opened as a part @@ -1786,13 +1786,13 @@ class _TestConnection(BaseTestCase): p = self.Process(target=self._writefd, args=(child_conn, b"foo")) p.daemon = True p.start() - with open(test_support.TESTFN, "wb") as f: + with open(support.TESTFN, "wb") as f: fd = f.fileno() if msvcrt: fd = msvcrt.get_osfhandle(fd) reduction.send_handle(conn, fd, p.pid) p.join() - with open(test_support.TESTFN, "rb") as f: + with open(support.TESTFN, "rb") as f: self.assertEqual(f.read(), b"foo") @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") @@ -1811,7 +1811,7 @@ class _TestConnection(BaseTestCase): p = self.Process(target=self._writefd, args=(child_conn, b"bar", True)) p.daemon = True p.start() - with open(test_support.TESTFN, "wb") as f: + with open(support.TESTFN, "wb") as f: fd = f.fileno() for newfd in range(256, MAXFD): if not self._is_fd_assigned(newfd): @@ -1824,7 +1824,7 @@ class _TestConnection(BaseTestCase): finally: os.close(newfd) p.join() - with open(test_support.TESTFN, "rb") as f: + with open(support.TESTFN, "rb") as f: self.assertEqual(f.read(), b"bar") @classmethod @@ -2207,7 +2207,7 @@ class _TestFinalize(BaseTestCase): gc.set_threshold(5, 5, 5) threads = [threading.Thread(target=run_finalizers), threading.Thread(target=make_finalizers)] - with test_support.start_threads(threads): + with support.start_threads(threads): time.sleep(4.0) # Wait a bit to trigger race condition finish = True if exc: @@ -2635,7 +2635,7 @@ class TestFlags(unittest.TestCase): flags = (tuple(sys.flags), grandchild_flags) print(json.dumps(flags)) - @test_support.requires_unicode # XXX json needs unicode support + @support.requires_unicode # XXX json needs unicode support def test_flags(self): import json, subprocess # start child process using unusual flags @@ -2681,6 +2681,9 @@ class TestForkAwareThreadLock(unittest.TestCase): class TestIgnoreEINTR(unittest.TestCase): + # Sending CONN_MAX_SIZE bytes into a multiprocessing pipe must block + CONN_MAX_SIZE = max(support.PIPE_MAX_SIZE, support.SOCK_MAX_SIZE) + @classmethod def _test_ignore(cls, conn): def handler(signum, frame): @@ -2689,7 +2692,7 @@ class TestIgnoreEINTR(unittest.TestCase): conn.send('ready') x = conn.recv() conn.send(x) - conn.send_bytes(b'x' * test_support.PIPE_MAX_SIZE) + conn.send_bytes(b'x' * cls.CONN_MAX_SIZE) @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') def test_ignore(self): @@ -2708,8 +2711,7 @@ class TestIgnoreEINTR(unittest.TestCase): self.assertEqual(conn.recv(), 1234) time.sleep(0.1) os.kill(p.pid, signal.SIGUSR1) - self.assertEqual(conn.recv_bytes(), - b'x' * test_support.PIPE_MAX_SIZE) + self.assertEqual(conn.recv_bytes(), b'x' * self.CONN_MAX_SIZE) time.sleep(0.1) p.join() finally: @@ -2766,7 +2768,7 @@ def test_main(run=None): check_enough_semaphores() if run is None: - from test.test_support import run_unittest as run + from test.support import run_unittest as run util.get_temp_dir() # creates temp directory for use by all processes @@ -2791,7 +2793,7 @@ def test_main(run=None): # module during these tests is at least platform dependent and possibly # non-deterministic on any given platform. So we don't mind if the listed # warnings aren't actually raised. - with test_support.check_py3k_warnings( + with support.check_py3k_warnings( (".+__(get|set)slice__ has been removed", DeprecationWarning), (r"sys.exc_clear\(\) not supported", DeprecationWarning), quiet=True): -- 2.47.3