self._loop = None
self._context = None
self._interrupt_count = 0
+ self._set_event_loop = False
def __enter__(self):
self._lazy_init()
loop.run_until_complete(loop.shutdown_asyncgens())
loop.run_until_complete(loop.shutdown_default_executor())
finally:
+ if self._set_event_loop:
+ events.set_event_loop(None)
loop.close()
self._loop = None
self._state = _State.CLOSED
self._interrupt_count = 0
try:
+ if self._set_event_loop:
+ events.set_event_loop(self._loop)
return self._loop.run_until_complete(task)
except exceptions.CancelledError:
if self._interrupt_count > 0 and task.uncancel() == 0:
return
if self._loop_factory is None:
self._loop = events.new_event_loop()
+ self._set_event_loop = True
else:
self._loop = self._loop_factory()
if self._debug is not None:
self.assertIsNone(spinner.ag_frame)
self.assertFalse(spinner.ag_running)
+ def test_asyncio_run_set_event_loop(self):
+ #See https://github.com/python/cpython/issues/93896
+
+ async def main():
+ await asyncio.sleep(0)
+ return 42
+
+ policy = asyncio.get_event_loop_policy()
+ policy.set_event_loop = mock.Mock()
+ asyncio.run(main())
+ self.assertTrue(policy.set_event_loop.called)
+
class RunnerTests(BaseTest):