def test_timeout(self):
short_timeout = 0.050
+ long_timeout = short_timeout * 10
- future = self.executor.submit(self.event.wait)
+ future = self.executor.submit(time.sleep, long_timeout)
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE,
finished)
self.assertEqual(set([future]), pending)
- # Set the event to allow the future to complete
- self.event.set()
-
class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase):
import multiprocessing
import sys
-import threading
import time
import unittest
from concurrent import futures
self.t1 = time.monotonic()
if hasattr(self, "ctx"):
- self.manager = multiprocessing.Manager()
- self.event = self.manager.Event()
self.executor = self.executor_type(
max_workers=self.worker_count,
mp_context=self.get_context(),
**self.executor_kwargs)
else:
- self.event = threading.Event()
self.executor = self.executor_type(
max_workers=self.worker_count,
**self.executor_kwargs)
def tearDown(self):
self.executor.shutdown(wait=True)
self.executor = None
- if hasattr(self, "ctx"):
- self.manager.shutdown()
- self.manager = None
dt = time.monotonic() - self.t1
if support.verbose: