self._interrupt_count = 0
try:
- if self._set_event_loop:
- events.set_event_loop(self._loop)
return self._loop.run_until_complete(task)
except exceptions.CancelledError:
if self._interrupt_count > 0:
return
if self._loop_factory is None:
self._loop = events.new_event_loop()
- self._set_event_loop = True
+ if not self._set_event_loop:
+ # Call set_event_loop only once to avoid calling
+ # attach_loop multiple times on child watchers
+ events.set_event_loop(self._loop)
+ self._set_event_loop = True
else:
self._loop = self._loop_factory()
if self._debug is not None:
):
runner.run(coro())
+ def test_set_event_loop_called_once(self):
+ # See https://github.com/python/cpython/issues/95736
+ async def coro():
+ pass
+
+ policy = asyncio.get_event_loop_policy()
+ policy.set_event_loop = mock.Mock()
+ runner = asyncio.Runner()
+ runner.run(coro())
+ runner.run(coro())
+
+ self.assertEqual(1, policy.set_event_loop.call_count)
+ runner.close()
+
if __name__ == '__main__':
unittest.main()