raise ValueError(f'Invalid return_when value: {return_when}')
if loop is None:
- loop = events.get_event_loop()
+ loop = events.get_running_loop()
+ else:
+ warnings.warn("The loop argument is deprecated and scheduled for"
+ "removal in Python 4.0.",
+ DeprecationWarning, stacklevel=2)
fs = {ensure_future(f, loop=loop) for f in set(fs)}
This function is a coroutine.
"""
if loop is None:
- loop = events.get_event_loop()
+ loop = events.get_running_loop()
+ else:
+ warnings.warn("The loop argument is deprecated and scheduled for"
+ "removal in Python 4.0.",
+ DeprecationWarning, stacklevel=2)
if timeout is None:
return await fut
return result
if loop is None:
- loop = events.get_event_loop()
+ loop = events.get_running_loop()
+ else:
+ warnings.warn("The loop argument is deprecated and scheduled for"
+ "removal in Python 4.0.",
+ DeprecationWarning, stacklevel=2)
+
future = loop.create_future()
h = loop.call_later(delay,
futures._set_result_unless_cancelled,
self.loop.run_until_complete(coro())
self.assertEqual(result, 11)
+ def test_loop_argument_is_deprecated(self):
+ # Remove test when loop argument is removed in Python 4.0
+ with self.assertWarns(DeprecationWarning):
+ self.loop.run_until_complete(asyncio.sleep(0.01, loop=self.loop))
+
+
+class WaitTests(test_utils.TestCase):
+ def setUp(self):
+ super().setUp()
+ self.loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(None)
+
+ def tearDown(self):
+ self.loop.close()
+ self.loop = None
+ super().tearDown()
+
+ def test_loop_argument_is_deprecated_in_wait(self):
+ # Remove test when loop argument is removed in Python 4.0
+ with self.assertWarns(DeprecationWarning):
+ self.loop.run_until_complete(
+ asyncio.wait([coroutine_function()], loop=self.loop))
+
+ def test_loop_argument_is_deprecated_in_wait_for(self):
+ # Remove test when loop argument is removed in Python 4.0
+ with self.assertWarns(DeprecationWarning):
+ self.loop.run_until_complete(
+ asyncio.wait_for(coroutine_function(), 0.01, loop=self.loop))
+
class CompatibilityTests(test_utils.TestCase):
# Tests for checking a bridge between old-styled coroutines