NUMTASKS = 10
NUMTRIPS = 3
-POLL_SLEEP = 0.010 # seconds = 10 ms
_print_mutex = thread.allocate_lock()
with threading_helper.wait_threads_exit():
thread.start_new_thread(task, ())
- while not started:
- time.sleep(POLL_SLEEP)
+ for _ in support.sleeping_retry(support.LONG_TIMEOUT):
+ if started:
+ break
self.assertEqual(thread._count(), orig + 1)
+
# Allow the task to finish.
mut.release()
+
# The only reliable way to be sure that the thread ended from the
- # interpreter's point of view is to wait for the function object to be
- # destroyed.
+ # interpreter's point of view is to wait for the function object to
+ # be destroyed.
done = []
wr = weakref.ref(task, lambda _: done.append(None))
del task
- while not done:
- time.sleep(POLL_SLEEP)
+
+ for _ in support.sleeping_retry(support.LONG_TIMEOUT):
+ if done:
+ break
support.gc_collect() # For PyPy or other GCs.
self.assertEqual(thread._count(), orig)