self.assertIsNone(self.loop._default_executor)
def test_shutdown_default_executor_timeout(self):
+ event = threading.Event()
+
class DummyExecutor(concurrent.futures.ThreadPoolExecutor):
def shutdown(self, wait=True, *, cancel_futures=False):
if wait:
- time.sleep(0.1)
+ event.wait()
self.loop._process_events = mock.Mock()
self.loop._write_to_self = mock.Mock()
executor = DummyExecutor()
self.loop.set_default_executor(executor)
- with self.assertWarnsRegex(RuntimeWarning,
- "The executor did not finishing joining"):
- self.loop.run_until_complete(
- self.loop.shutdown_default_executor(timeout=0.01))
+ try:
+ with self.assertWarnsRegex(RuntimeWarning,
+ "The executor did not finishing joining"):
+ self.loop.run_until_complete(
+ self.loop.shutdown_default_executor(timeout=0.01))
+ finally:
+ event.set()
def test_call_soon(self):
def cb():