import sys
import threading
-import time
import unittest
from concurrent import futures
from test import support
+from test.support import threading_helper
from .util import (
CANCELLED_FUTURE, CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE,
def mul(x, y):
return x * y
-def sleep_and_raise(t):
- time.sleep(t)
+def wait_and_raise(e):
+ e.wait()
raise Exception('this is an exception')
class WaitTests:
def test_20369(self):
# See https://bugs.python.org/issue20369
- future = self.executor.submit(time.sleep, 1.5)
+ future = self.executor.submit(mul, 1, 2)
done, not_done = futures.wait([future, future],
return_when=futures.ALL_COMPLETED)
self.assertEqual({future}, done)
def test_first_completed(self):
+ event = self.create_event()
future1 = self.executor.submit(mul, 21, 2)
- future2 = self.executor.submit(time.sleep, 1.5)
+ future2 = self.executor.submit(event.wait)
- done, not_done = futures.wait(
- [CANCELLED_FUTURE, future1, future2],
- return_when=futures.FIRST_COMPLETED)
+ try:
+ done, not_done = futures.wait(
+ [CANCELLED_FUTURE, future1, future2],
+ return_when=futures.FIRST_COMPLETED)
- self.assertEqual(set([future1]), done)
- self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
+ self.assertEqual(set([future1]), done)
+ self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
+ finally:
+ event.set()
+ future2.result() # wait for job to finish
def test_first_completed_some_already_completed(self):
- future1 = self.executor.submit(time.sleep, 1.5)
-
- finished, pending = futures.wait(
- [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
- return_when=futures.FIRST_COMPLETED)
+ event = self.create_event()
+ future1 = self.executor.submit(event.wait)
- self.assertEqual(
- set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
- finished)
- self.assertEqual(set([future1]), pending)
+ try:
+ finished, pending = futures.wait(
+ [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
+ return_when=futures.FIRST_COMPLETED)
+
+ self.assertEqual(
+ set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
+ finished)
+ self.assertEqual(set([future1]), pending)
+ finally:
+ event.set()
+ future1.result() # wait for job to finish
- @support.requires_resource('walltime')
def test_first_exception(self):
- future1 = self.executor.submit(mul, 2, 21)
- future2 = self.executor.submit(sleep_and_raise, 1.5)
- future3 = self.executor.submit(time.sleep, 3)
+ event1 = self.create_event()
+ event2 = self.create_event()
+ try:
+ future1 = self.executor.submit(mul, 2, 21)
+ future2 = self.executor.submit(wait_and_raise, event1)
+ future3 = self.executor.submit(event2.wait)
- finished, pending = futures.wait(
- [future1, future2, future3],
- return_when=futures.FIRST_EXCEPTION)
+ # Ensure that future1 is completed before future2 finishes
+ def wait_for_future1():
+ future1.result()
+ event1.set()
+
+ t = threading.Thread(target=wait_for_future1)
+ t.start()
+
+ finished, pending = futures.wait(
+ [future1, future2, future3],
+ return_when=futures.FIRST_EXCEPTION)
- self.assertEqual(set([future1, future2]), finished)
- self.assertEqual(set([future3]), pending)
+ self.assertEqual(set([future1, future2]), finished)
+ self.assertEqual(set([future3]), pending)
+
+ threading_helper.join_thread(t)
+ finally:
+ event1.set()
+ event2.set()
+ future3.result() # wait for job to finish
def test_first_exception_some_already_complete(self):
+ event = self.create_event()
future1 = self.executor.submit(divmod, 21, 0)
- future2 = self.executor.submit(time.sleep, 1.5)
-
- finished, pending = futures.wait(
- [SUCCESSFUL_FUTURE,
- CANCELLED_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- future1, future2],
- return_when=futures.FIRST_EXCEPTION)
+ future2 = self.executor.submit(event.wait)
- self.assertEqual(set([SUCCESSFUL_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- future1]), finished)
- self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
+ try:
+ finished, pending = futures.wait(
+ [SUCCESSFUL_FUTURE,
+ CANCELLED_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ future1, future2],
+ return_when=futures.FIRST_EXCEPTION)
+
+ self.assertEqual(set([SUCCESSFUL_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ future1]), finished)
+ self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
+ finally:
+ event.set()
+ future2.result() # wait for job to finish
def test_first_exception_one_already_failed(self):
- future1 = self.executor.submit(time.sleep, 2)
+ event = self.create_event()
+ future1 = self.executor.submit(event.wait)
- finished, pending = futures.wait(
- [EXCEPTION_FUTURE, future1],
- return_when=futures.FIRST_EXCEPTION)
+ try:
+ finished, pending = futures.wait(
+ [EXCEPTION_FUTURE, future1],
+ return_when=futures.FIRST_EXCEPTION)
- self.assertEqual(set([EXCEPTION_FUTURE]), finished)
- self.assertEqual(set([future1]), pending)
+ self.assertEqual(set([EXCEPTION_FUTURE]), finished)
+ self.assertEqual(set([future1]), pending)
+ finally:
+ event.set()
+ future1.result() # wait for job to finish
def test_all_completed(self):
future1 = self.executor.submit(divmod, 2, 0)
def test_timeout(self):
short_timeout = 0.050
- long_timeout = short_timeout * 10
- future = self.executor.submit(time.sleep, long_timeout)
+ event = self.create_event()
+ future = self.executor.submit(event.wait)
- finished, pending = futures.wait(
- [CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE,
- future],
- timeout=short_timeout,
- return_when=futures.ALL_COMPLETED)
-
- self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE]),
- finished)
- self.assertEqual(set([future]), pending)
+ try:
+ finished, pending = futures.wait(
+ [CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future],
+ timeout=short_timeout,
+ return_when=futures.ALL_COMPLETED)
+
+ self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE]),
+ finished)
+ self.assertEqual(set([future]), pending)
+ finally:
+ event.set()
+ future.result() # wait for job to finish
class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase):
import multiprocessing
import sys
+import threading
import time
import unittest
from concurrent import futures
max_workers=self.worker_count,
mp_context=self.get_context(),
**self.executor_kwargs)
+ self.manager = self.get_context().Manager()
else:
self.executor = self.executor_type(
max_workers=self.worker_count,
**self.executor_kwargs)
+ self.manager = None
def tearDown(self):
self.executor.shutdown(wait=True)
self.executor = None
+ if self.manager is not None:
+ self.manager.shutdown()
+ self.manager = None
dt = time.monotonic() - self.t1
if support.verbose:
class ThreadPoolMixin(ExecutorMixin):
executor_type = futures.ThreadPoolExecutor
+ def create_event(self):
+ return threading.Event()
+
class ProcessPoolForkMixin(ExecutorMixin):
executor_type = futures.ProcessPoolExecutor
self.skipTest("TSAN doesn't support threads after fork")
return super().get_context()
+ def create_event(self):
+ return self.manager.Event()
+
class ProcessPoolSpawnMixin(ExecutorMixin):
executor_type = futures.ProcessPoolExecutor
self.skipTest("ProcessPoolExecutor unavailable on this system")
return super().get_context()
+ def create_event(self):
+ return self.manager.Event()
+
class ProcessPoolForkserverMixin(ExecutorMixin):
executor_type = futures.ProcessPoolExecutor
self.skipTest("TSAN doesn't support threads after fork")
return super().get_context()
+ def create_event(self):
+ return self.manager.Event()
+
def create_executor_tests(remote_globals, mixin, bases=(BaseTestCase,),
executor_mixins=(ThreadPoolMixin,