]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-109974: Fix threading lock_tests race conditions (#110057)
authorVictor Stinner <vstinner@python.org>
Fri, 29 Sep 2023 00:34:27 +0000 (02:34 +0200)
committerGitHub <noreply@github.com>
Fri, 29 Sep 2023 00:34:27 +0000 (02:34 +0200)
Fix race conditions in test_threading lock tests. Wait until a
condition is met rather than using time.sleep() with a hardcoded
number of seconds.

* Replace sleeping loops with support.sleeping_retry() which raises
  an exception on timeout.
* Add wait_threads_blocked(nthread) which computes a sleep depending
  on the number of threads. Remove _wait() function.
* test_set_and_clear(): use a way longer Event.wait() timeout.
* BarrierTests.test_repr(): wait until the 2 threads are waiting for
  the barrier. Use a way longer timeout for Barrier.wait() timeout.
* test_thread_leak() no longer needs to count
  len(threading.enumerate()): Bunch uses
  threading_helper.wait_threads_exit() internally which does it in
  wait_for_finished().
* Add BaseLockTests.wait_phase() which implements a timeout.
  test_reacquire() and test_recursion_count() use wait_phase().

Lib/test/lock_tests.py
Misc/NEWS.d/next/Tests/2023-09-29-00-19-21.gh-issue-109974.Sh_g-r.rst [new file with mode: 0644]

index e53e24b18f27609eae8b7900e09ade7b71aadb73..cbaae3afd6dde353752b5a2babcbaae123187b44 100644 (file)
@@ -19,22 +19,24 @@ requires_fork = unittest.skipUnless(support.has_fork_support,
                                      "(no _at_fork_reinit method)")
 
 
-def _wait():
-    # A crude wait/yield function not relying on synchronization primitives.
-    time.sleep(0.01)
+def wait_threads_blocked(nthread):
+    # Arbitrary sleep to wait until N threads are blocked,
+    # like waiting for a lock.
+    time.sleep(0.010 * nthread)
+
 
 class Bunch(object):
     """
     A bunch of threads.
     """
-    def __init__(self, f, n, wait_before_exit=False):
+    def __init__(self, func, nthread, wait_before_exit=False):
         """
-        Construct a bunch of `n` threads running the same function `f`.
+        Construct a bunch of `nthread` threads running the same function `func`.
         If `wait_before_exit` is True, the threads won't terminate until
         do_finish() is called.
         """
-        self.f = f
-        self.n = n
+        self.func = func
+        self.nthread = nthread
         self.started = []
         self.finished = []
         self._can_exit = not wait_before_exit
@@ -45,26 +47,30 @@ class Bunch(object):
             tid = threading.get_ident()
             self.started.append(tid)
             try:
-                f()
+                func()
             finally:
                 self.finished.append(tid)
-                while not self._can_exit:
-                    _wait()
+                for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+                    if self._can_exit:
+                        break
 
         try:
-            for i in range(n):
+            for i in range(nthread):
                 start_new_thread(task, ())
         except:
             self._can_exit = True
             raise
 
     def wait_for_started(self):
-        while len(self.started) < self.n:
-            _wait()
+        for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+            if len(self.started) >= self.nthread:
+                break
 
     def wait_for_finished(self):
-        while len(self.finished) < self.n:
-            _wait()
+        for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+            if len(self.finished) >= self.nthread:
+                break
+
         # Wait for threads exit
         self.wait_thread.__exit__(None, None, None)
 
@@ -94,6 +100,12 @@ class BaseLockTests(BaseTestCase):
     Tests for both recursive and non-recursive locks.
     """
 
+    def wait_phase(self, phase, expected):
+        for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+            if len(phase) >= expected:
+                break
+        self.assertEqual(len(phase), expected)
+
     def test_constructor(self):
         lock = self.locktype()
         del lock
@@ -138,15 +150,18 @@ class BaseLockTests(BaseTestCase):
     def test_acquire_contended(self):
         lock = self.locktype()
         lock.acquire()
-        N = 5
         def f():
             lock.acquire()
             lock.release()
 
+        # Threads block on lock.acquire()
+        N = 5
         b = Bunch(f, N)
         b.wait_for_started()
-        _wait()
+        wait_threads_blocked(N)
         self.assertEqual(len(b.finished), 0)
+
+        # Threads unblocked
         lock.release()
         b.wait_for_finished()
         self.assertEqual(len(b.finished), N)
@@ -174,17 +189,10 @@ class BaseLockTests(BaseTestCase):
         def f():
             lock.acquire()
             lock.release()
-        n = len(threading.enumerate())
+
         # We run many threads in the hope that existing threads ids won't
         # be recycled.
         Bunch(f, 15).wait_for_finished()
-        if len(threading.enumerate()) != n:
-            # There is a small window during which a Thread instance's
-            # target function has finished running, but the Thread is still
-            # alive and registered.  Avoid spurious failures by waiting a
-            # bit more (seen on a buildbot).
-            time.sleep(0.4)
-            self.assertEqual(n, len(threading.enumerate()))
 
     def test_timeout(self):
         lock = self.locktype()
@@ -242,15 +250,13 @@ class LockTests(BaseLockTests):
             phase.append(None)
 
         with threading_helper.wait_threads_exit():
+            # Thread blocked on lock.acquire()
             start_new_thread(f, ())
-            while len(phase) == 0:
-                _wait()
-            _wait()
-            self.assertEqual(len(phase), 1)
+            self.wait_phase(phase, 1)
+
+            # Thread unblocked
             lock.release()
-            while len(phase) == 1:
-                _wait()
-            self.assertEqual(len(phase), 2)
+            self.wait_phase(phase, 2)
 
     def test_different_thread(self):
         # Lock can be released from a different thread.
@@ -349,21 +355,20 @@ class RLockTests(BaseLockTests):
         def f():
             lock.acquire()
             phase.append(None)
-            while len(phase) == 1:
-                _wait()
+
+            self.wait_phase(phase, 2)
             lock.release()
             phase.append(None)
 
         with threading_helper.wait_threads_exit():
+            # Thread blocked on lock.acquire()
             start_new_thread(f, ())
-            while len(phase) == 0:
-                _wait()
-            self.assertEqual(len(phase), 1)
+            self.wait_phase(phase, 1)
             self.assertEqual(0, lock._recursion_count())
+
+            # Thread unblocked
             phase.append(None)
-            while len(phase) == 2:
-                _wait()
-            self.assertEqual(len(phase), 3)
+            self.wait_phase(phase, 3)
             self.assertEqual(0, lock._recursion_count())
 
     def test_different_thread(self):
@@ -421,10 +426,14 @@ class EventTests(BaseTestCase):
         def f():
             results1.append(evt.wait())
             results2.append(evt.wait())
+
+        # Threads blocked on first evt.wait()
         b = Bunch(f, N)
         b.wait_for_started()
-        _wait()
+        wait_threads_blocked(N)
         self.assertEqual(len(results1), 0)
+
+        # Threads unblocked
         evt.set()
         b.wait_for_finished()
         self.assertEqual(results1, [True] * N)
@@ -464,19 +473,22 @@ class EventTests(BaseTestCase):
             self.assertTrue(r)
 
     def test_set_and_clear(self):
-        # Issue #13502: check that wait() returns true even when the event is
+        # gh-57711: check that wait() returns true even when the event is
         # cleared before the waiting thread is woken up.
-        evt = self.eventtype()
+        event = self.eventtype()
         results = []
-        timeout = 0.250
-        N = 5
         def f():
-            results.append(evt.wait(timeout * 4))
+            results.append(event.wait(support.LONG_TIMEOUT))
+
+        # Threads blocked on event.wait()
+        N = 5
         b = Bunch(f, N)
         b.wait_for_started()
-        time.sleep(timeout)
-        evt.set()
-        evt.clear()
+        wait_threads_blocked(N)
+
+        # Threads unblocked
+        event.set()
+        event.clear()
         b.wait_for_finished()
         self.assertEqual(results, [True] * N)
 
@@ -533,15 +545,14 @@ class ConditionTests(BaseTestCase):
         # Note that this test is sensitive to timing.  If the worker threads
         # don't execute in a timely fashion, the main thread may think they
         # are further along then they are.  The main thread therefore issues
-        # _wait() statements to try to make sure that it doesn't race ahead
-        # of the workers.
+        # wait_threads_blocked() statements to try to make sure that it doesn't
+        # race ahead of the workers.
         # Secondly, this test assumes that condition variables are not subject
         # to spurious wakeups.  The absence of spurious wakeups is an implementation
         # detail of Condition Variables in current CPython, but in general, not
         # a guaranteed property of condition variables as a programming
         # construct.  In particular, it is possible that this can no longer
         # be conveniently guaranteed should their implementation ever change.
-        N = 5
         ready = []
         results1 = []
         results2 = []
@@ -550,57 +561,84 @@ class ConditionTests(BaseTestCase):
             cond.acquire()
             ready.append(phase_num)
             result = cond.wait()
+
             cond.release()
             results1.append((result, phase_num))
+
             cond.acquire()
             ready.append(phase_num)
+
             result = cond.wait()
             cond.release()
             results2.append((result, phase_num))
+
+        N = 5
         b = Bunch(f, N)
         b.wait_for_started()
         # first wait, to ensure all workers settle into cond.wait() before
         # we continue. See issues #8799 and #30727.
-        while len(ready) < 5:
-            _wait()
+        for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+            if len(ready) >= N:
+                break
+
         ready.clear()
         self.assertEqual(results1, [])
+
         # Notify 3 threads at first
+        count1 = 3
         cond.acquire()
-        cond.notify(3)
-        _wait()
+        cond.notify(count1)
+        wait_threads_blocked(count1)
+
+        # Phase 1
         phase_num = 1
         cond.release()
-        while len(results1) < 3:
-            _wait()
-        self.assertEqual(results1, [(True, 1)] * 3)
+        for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+            if len(results1) >= count1:
+                break
+
+        self.assertEqual(results1, [(True, 1)] * count1)
         self.assertEqual(results2, [])
-        # make sure all awaken workers settle into cond.wait()
-        while len(ready) < 3:
-            _wait()
+
+        # Wait until awaken workers are blocked on cond.wait()
+        for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+            if len(ready) >= count1 :
+                break
+
         # Notify 5 threads: they might be in their first or second wait
         cond.acquire()
         cond.notify(5)
-        _wait()
+        wait_threads_blocked(N)
+
+        # Phase 2
         phase_num = 2
         cond.release()
-        while len(results1) + len(results2) < 8:
-            _wait()
-        self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2)
-        self.assertEqual(results2, [(True, 2)] * 3)
-        # make sure all workers settle into cond.wait()
-        while len(ready) < 5:
-            _wait()
+        for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+            if len(results1) + len(results2) >= (N + count1):
+                break
+
+        count2 = N - count1
+        self.assertEqual(results1, [(True, 1)] * count1 + [(True, 2)] * count2)
+        self.assertEqual(results2, [(True, 2)] * count1)
+
+        # Make sure all workers settle into cond.wait()
+        for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+            if len(ready) >= N:
+                break
+
         # Notify all threads: they are all in their second wait
         cond.acquire()
         cond.notify_all()
-        _wait()
+        wait_threads_blocked(N)
+
+        # Phase 3
         phase_num = 3
         cond.release()
-        while len(results2) < 5:
-            _wait()
-        self.assertEqual(results1, [(True, 1)] * 3 + [(True,2)] * 2)
-        self.assertEqual(results2, [(True, 2)] * 3 + [(True, 3)] * 2)
+        for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+            if len(results2) >= N:
+                break
+        self.assertEqual(results1, [(True, 1)] * count1 + [(True, 2)] * count2)
+        self.assertEqual(results2, [(True, 2)] * count1 + [(True, 3)] * count2)
         b.wait_for_finished()
 
     def test_notify(self):
@@ -611,19 +649,22 @@ class ConditionTests(BaseTestCase):
 
     def test_timeout(self):
         cond = self.condtype()
+        timeout = 0.5
         results = []
-        N = 5
         def f():
             cond.acquire()
             t1 = time.monotonic()
-            result = cond.wait(0.5)
+            result = cond.wait(timeout)
             t2 = time.monotonic()
             cond.release()
             results.append((t2 - t1, result))
+
+        N = 5
         Bunch(f, N).wait_for_finished()
         self.assertEqual(len(results), N)
+
         for dt, result in results:
-            self.assertTimeout(dt, 0.5)
+            self.assertTimeout(dt, timeout)
             # Note that conceptually (that"s the condition variable protocol)
             # a wait() may succeed even if no one notifies us and before any
             # timeout occurs.  Spurious wakeups can occur.
@@ -636,13 +677,13 @@ class ConditionTests(BaseTestCase):
         state = 0
         def f():
             with cond:
-                result = cond.wait_for(lambda : state==4)
+                result = cond.wait_for(lambda: state == 4)
                 self.assertTrue(result)
                 self.assertEqual(state, 4)
         b = Bunch(f, 1)
         b.wait_for_started()
         for i in range(4):
-            time.sleep(0.01)
+            time.sleep(0.010)
             with cond:
                 state += 1
                 cond.notify()
@@ -660,14 +701,16 @@ class ConditionTests(BaseTestCase):
                 self.assertFalse(result)
                 self.assertTimeout(dt, 0.1)
                 success.append(None)
+
         b = Bunch(f, 1)
         b.wait_for_started()
         # Only increment 3 times, so state == 4 is never reached.
         for i in range(3):
-            time.sleep(0.01)
+            time.sleep(0.010)
             with cond:
                 state += 1
                 cond.notify()
+
         b.wait_for_finished()
         self.assertEqual(len(success), 1)
 
@@ -697,70 +740,107 @@ class BaseSemaphoreTests(BaseTestCase):
         del sem
 
     def test_acquire_contended(self):
-        sem = self.semtype(7)
+        sem_value = 7
+        sem = self.semtype(sem_value)
         sem.acquire()
-        N = 10
+
         sem_results = []
         results1 = []
         results2 = []
         phase_num = 0
-        def f():
+
+        def func():
             sem_results.append(sem.acquire())
             results1.append(phase_num)
+
             sem_results.append(sem.acquire())
             results2.append(phase_num)
-        b = Bunch(f, 10)
+
+        def wait_count(count):
+            for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+                if len(results1) + len(results2) >= count:
+                    break
+
+        # Phase 0
+        N = 10
+        b = Bunch(func, N)
         b.wait_for_started()
-        while len(results1) + len(results2) < 6:
-            _wait()
-        self.assertEqual(results1 + results2, [0] * 6)
+        count1 = sem_value - 1
+        wait_count(count1)
+        self.assertEqual(results1 + results2, [0] * count1)
+
+        # Phase 1
         phase_num = 1
-        for i in range(7):
+        for i in range(sem_value):
             sem.release()
-        while len(results1) + len(results2) < 13:
-            _wait()
-        self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
+        count2 = sem_value
+        wait_count(count1 + count2)
+        self.assertEqual(sorted(results1 + results2),
+                         [0] * count1 + [1] * count2)
+
+        # Phase 2
         phase_num = 2
-        for i in range(6):
+        count3 = (sem_value - 1)
+        for i in range(count3):
             sem.release()
-        while len(results1) + len(results2) < 19:
-            _wait()
-        self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
+        wait_count(count1 + count2 + count3)
+        self.assertEqual(sorted(results1 + results2),
+                         [0] * count1 + [1] * count2 + [2] * count3)
         # The semaphore is still locked
         self.assertFalse(sem.acquire(False))
+
         # Final release, to let the last thread finish
+        count4 = 1
         sem.release()
         b.wait_for_finished()
-        self.assertEqual(sem_results, [True] * (6 + 7 + 6 + 1))
+        self.assertEqual(sem_results,
+                         [True] * (count1 + count2 + count3 + count4))
 
     def test_multirelease(self):
-        sem = self.semtype(7)
+        sem_value = 7
+        sem = self.semtype(sem_value)
         sem.acquire()
+
         results1 = []
         results2 = []
         phase_num = 0
-        def f():
+        def func():
             sem.acquire()
             results1.append(phase_num)
+
             sem.acquire()
             results2.append(phase_num)
-        b = Bunch(f, 10)
+
+        def wait_count(count):
+            for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+                if len(results1) + len(results2) >= count:
+                    break
+
+        # Phase 0
+        b = Bunch(func, 10)
         b.wait_for_started()
-        while len(results1) + len(results2) < 6:
-            _wait()
-        self.assertEqual(results1 + results2, [0] * 6)
+        count1 = sem_value - 1
+        wait_count(count1)
+        self.assertEqual(results1 + results2, [0] * count1)
+
+        # Phase 1
         phase_num = 1
-        sem.release(7)
-        while len(results1) + len(results2) < 13:
-            _wait()
-        self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
+        count2 = sem_value
+        sem.release(count2)
+        wait_count(count1 + count2)
+        self.assertEqual(sorted(results1 + results2),
+                         [0] * count1 + [1] * count2)
+
+        # Phase 2
         phase_num = 2
-        sem.release(6)
-        while len(results1) + len(results2) < 19:
-            _wait()
-        self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
+        count3 = sem_value - 1
+        sem.release(count3)
+        wait_count(count1 + count2 + count3)
+        self.assertEqual(sorted(results1 + results2),
+                         [0] * count1 + [1] * count2 + [2] * count3)
         # The semaphore is still locked
         self.assertFalse(sem.acquire(False))
+
         # Final release, to let the last thread finish
         sem.release()
         b.wait_for_finished()
@@ -806,10 +886,14 @@ class BaseSemaphoreTests(BaseTestCase):
         def f():
             sem.acquire()
             sem.release()
+
+        # Thread blocked on sem.acquire()
         b = Bunch(f, 1)
         b.wait_for_started()
-        _wait()
+        wait_threads_blocked(1)
         self.assertFalse(b.finished)
+
+        # Thread unblocked
         sem.release()
         b.wait_for_finished()
 
@@ -882,6 +966,7 @@ class BarrierTests(BaseTestCase):
 
     def setUp(self):
         self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout)
+
     def tearDown(self):
         self.barrier.abort()
 
@@ -979,8 +1064,9 @@ class BarrierTests(BaseTestCase):
             i = self.barrier.wait()
             if i == self.N//2:
                 # Wait until the other threads are all in the barrier.
-                while self.barrier.n_waiting < self.N-1:
-                    time.sleep(0.001)
+                for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+                    if self.barrier.n_waiting >= (self.N - 1):
+                        break
                 self.barrier.reset()
             else:
                 try:
@@ -1068,16 +1154,29 @@ class BarrierTests(BaseTestCase):
         b.wait()
 
     def test_repr(self):
-        b = self.barriertype(3)
-        self.assertRegex(repr(b), r"<\w+\.Barrier at .*: waiters=0/3>")
+        barrier = self.barriertype(3)
+        timeout = support.LONG_TIMEOUT
+        self.assertRegex(repr(barrier), r"<\w+\.Barrier at .*: waiters=0/3>")
         def f():
-            b.wait(3)
-        bunch = Bunch(f, 2)
+            barrier.wait(timeout)
+
+        # Threads blocked on barrier.wait()
+        N = 2
+        bunch = Bunch(f, N)
         bunch.wait_for_started()
-        time.sleep(0.2)
-        self.assertRegex(repr(b), r"<\w+\.Barrier at .*: waiters=2/3>")
-        b.wait(3)
+        for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+            if barrier.n_waiting >= N:
+                break
+        self.assertRegex(repr(barrier),
+                         r"<\w+\.Barrier at .*: waiters=2/3>")
+
+        # Threads unblocked
+        barrier.wait(timeout)
         bunch.wait_for_finished()
-        self.assertRegex(repr(b), r"<\w+\.Barrier at .*: waiters=0/3>")
-        b.abort()
-        self.assertRegex(repr(b), r"<\w+\.Barrier at .*: broken>")
+        self.assertRegex(repr(barrier),
+                         r"<\w+\.Barrier at .*: waiters=0/3>")
+
+        # Abort the barrier
+        barrier.abort()
+        self.assertRegex(repr(barrier),
+                         r"<\w+\.Barrier at .*: broken>")
diff --git a/Misc/NEWS.d/next/Tests/2023-09-29-00-19-21.gh-issue-109974.Sh_g-r.rst b/Misc/NEWS.d/next/Tests/2023-09-29-00-19-21.gh-issue-109974.Sh_g-r.rst
new file mode 100644 (file)
index 0000000..a130cf6
--- /dev/null
@@ -0,0 +1,3 @@
+Fix race conditions in test_threading lock tests. Wait until a condition is met
+rather than using :func:`time.sleep` with a hardcoded number of seconds. Patch
+by Victor Stinner.