]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
[3.12] gh-111284: Make multiprocessing tests with threads faster and more reliable...
authorMiss Islington (bot) <31488909+miss-islington@users.noreply.github.com>
Mon, 30 Oct 2023 18:00:57 +0000 (19:00 +0100)
committerGitHub <noreply@github.com>
Mon, 30 Oct 2023 18:00:57 +0000 (18:00 +0000)
(cherry picked from commit 624ace5a2f02715d084c29eaf2211cd0dd550690)

Co-authored-by: Serhiy Storchaka <storchaka@gmail.com>
Lib/test/_test_multiprocessing.py

index 83e9d165c411daa9a25f2a8ec1f8b03df2b163df..d52b10c2ec183cd0d3586b030e5c172527dd3f48 100644 (file)
@@ -2438,8 +2438,11 @@ class _TestContainers(BaseTestCase):
 #
 #
 
-def sqr(x, wait=0.0):
-    time.sleep(wait)
+def sqr(x, wait=0.0, event=None):
+    if event is None:
+        time.sleep(wait)
+    else:
+        event.wait(wait)
     return x*x
 
 def mul(x, y):
@@ -2578,10 +2581,18 @@ class _TestPool(BaseTestCase):
         self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
 
     def test_async_timeout(self):
-        res = self.pool.apply_async(sqr, (6, TIMEOUT2 + support.SHORT_TIMEOUT))
-        get = TimingWrapper(res.get)
-        self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
-        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
+        p = self.Pool(3)
+        try:
+            event = threading.Event() if self.TYPE == 'threads' else None
+            res = p.apply_async(sqr, (6, TIMEOUT2 + support.SHORT_TIMEOUT, event))
+            get = TimingWrapper(res.get)
+            self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
+            self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
+        finally:
+            if event is not None:
+                event.set()
+            p.terminate()
+            p.join()
 
     def test_imap(self):
         it = self.pool.imap(sqr, list(range(10)))
@@ -2683,10 +2694,11 @@ class _TestPool(BaseTestCase):
 
     def test_terminate(self):
         # Simulate slow tasks which take "forever" to complete
+        p = self.Pool(3)
         args = [support.LONG_TIMEOUT for i in range(10_000)]
-        result = self.pool.map_async(time.sleep, args, chunksize=1)
-        self.pool.terminate()
-        self.pool.join()
+        result = p.map_async(time.sleep, args, chunksize=1)
+        p.terminate()
+        p.join()
 
     def test_empty_iterable(self):
         # See Issue 12157