ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
-async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
+async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED):
"""Wait for the Futures and coroutines given by fs to complete.
The fs iterable must not be empty.
if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
raise ValueError(f'Invalid return_when value: {return_when}')
- if loop is None:
- loop = events.get_running_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
+ loop = events.get_running_loop()
fs = set(fs)
waiter.set_result(None)
-async def wait_for(fut, timeout, *, loop=None):
+async def wait_for(fut, timeout):
"""Wait for the single Future or coroutine to complete, with timeout.
Coroutine will be wrapped in Task.
This function is a coroutine.
"""
- if loop is None:
- loop = events.get_running_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
+ loop = events.get_running_loop()
if timeout is None:
return await fut
# This is *not* a @coroutine! It is just an iterator (yielding Futures).
-def as_completed(fs, *, loop=None, timeout=None):
+def as_completed(fs, *, timeout=None):
"""Return an iterator whose values are coroutines.
When waiting for the yielded coroutines you'll get the results (or
from .queues import Queue # Import here to avoid circular import problem.
done = Queue()
- if loop is None:
- loop = events.get_event_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
+ loop = events.get_event_loop()
todo = {ensure_future(f, loop=loop) for f in set(fs)}
timeout_handle = None
yield
-async def sleep(delay, result=None, *, loop=None):
+async def sleep(delay, result=None):
"""Coroutine that completes after a given time (in seconds)."""
if delay <= 0:
await __sleep0()
return result
- if loop is None:
- loop = events.get_running_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
-
+ loop = events.get_running_loop()
future = loop.create_future()
h = loop.call_later(delay,
futures._set_result_unless_cancelled,
return ret
-def gather(*coros_or_futures, loop=None, return_exceptions=False):
+def gather(*coros_or_futures, return_exceptions=False):
"""Return a future aggregating results from the given coroutines/futures.
Coroutines will be wrapped in a future and scheduled in the event
gather won't cancel any other awaitables.
"""
if not coros_or_futures:
- if loop is None:
- loop = events.get_event_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
+ loop = events.get_event_loop()
outer = loop.create_future()
outer.set_result([])
return outer
children = []
nfuts = 0
nfinished = 0
+ loop = None
for arg in coros_or_futures:
if arg not in arg_to_fut:
fut = ensure_future(arg, loop=loop)
return outer
-def shield(arg, *, loop=None):
+def shield(arg):
"""Wait for a future, shielding it from cancellation.
The statement
except CancelledError:
res = None
"""
- if loop is not None:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
- inner = ensure_future(arg, loop=loop)
+ inner = ensure_future(arg)
if inner.done():
# Shortcut.
return inner
self.loop.run_until_complete(
asyncio.gather(*[
self.new_task(self.loop, run()) for _ in range(100)
- ], loop=self.loop))
+ ]))
def test_other_loop_future(self):
other_loop = asyncio.new_event_loop()
async def foo():
values = []
- for f in asyncio.as_completed([b, c, a], loop=loop):
+ for f in asyncio.as_completed([b, c, a]):
values.append(await f)
return values
- with self.assertWarns(DeprecationWarning):
- res = loop.run_until_complete(self.new_task(loop, foo()))
+
+ res = loop.run_until_complete(self.new_task(loop, foo()))
self.assertAlmostEqual(0.15, loop.time())
self.assertTrue('a' in res[:2])
self.assertTrue('b' in res[:2])
self.assertEqual(res[2], 'c')
# Doing it again should take no time and exercise a different path.
- with self.assertWarns(DeprecationWarning):
- res = loop.run_until_complete(self.new_task(loop, foo()))
+ res = loop.run_until_complete(self.new_task(loop, foo()))
self.assertAlmostEqual(0.15, loop.time())
def test_as_completed_with_timeout(self):
async def foo():
values = []
- for f in asyncio.as_completed([a, b], timeout=0.12, loop=loop):
+ for f in asyncio.as_completed([a, b], timeout=0.12):
if values:
loop.advance_time(0.02)
try:
values.append((2, exc))
return values
- with self.assertWarns(DeprecationWarning):
- res = loop.run_until_complete(self.new_task(loop, foo()))
+ res = loop.run_until_complete(self.new_task(loop, foo()))
self.assertEqual(len(res), 2, res)
self.assertEqual(res[0], (1, 'a'))
self.assertEqual(res[1][0], 2)
a = asyncio.sleep(0.01, 'a')
async def foo():
- for f in asyncio.as_completed([a], timeout=1, loop=loop):
+ for f in asyncio.as_completed([a], timeout=1):
v = await f
self.assertEqual(v, 'a')
- with self.assertWarns(DeprecationWarning):
- loop.run_until_complete(self.new_task(loop, foo()))
+ loop.run_until_complete(self.new_task(loop, foo()))
def test_as_completed_reverse_wait(self):
yield 0
loop = self.new_test_loop(gen)
+ asyncio.set_event_loop(loop)
a = asyncio.sleep(0.05, 'a')
b = asyncio.sleep(0.10, 'b')
fs = {a, b}
- with self.assertWarns(DeprecationWarning):
- futs = list(asyncio.as_completed(fs, loop=loop))
+ futs = list(asyncio.as_completed(fs))
self.assertEqual(len(futs), 2)
x = loop.run_until_complete(futs[1])
yield 0.05
loop = self.new_test_loop(gen)
+ asyncio.set_event_loop(loop)
a = asyncio.sleep(0.05, 'a')
b = asyncio.sleep(0.05, 'b')
fs = {a, b}
- with self.assertWarns(DeprecationWarning):
- futs = list(asyncio.as_completed(fs, loop=loop))
+
+ futs = list(asyncio.as_completed(fs))
self.assertEqual(len(futs), 2)
waiter = asyncio.wait(futs)
# Deprecation from passing coros in futs to asyncio.wait()
def runner():
result = []
c = coro('ham')
- for f in asyncio.as_completed([c, c, coro('spam')],
- loop=self.loop):
+ for f in asyncio.as_completed([c, c, coro('spam')]):
result.append((yield from f))
return result
- with self.assertWarns(DeprecationWarning):
- fut = self.new_task(self.loop, runner())
- self.loop.run_until_complete(fut)
+ fut = self.new_task(self.loop, runner())
+ self.loop.run_until_complete(fut)
result = fut.result()
self.assertEqual(set(result), {'ham', 'spam'})
self.assertEqual(len(result), 2)
self.assertIsNone(asyncio.current_task(loop=self.loop))
async def coro(loop):
- self.assertIs(asyncio.current_task(loop=loop), task)
+ self.assertIs(asyncio.current_task(), task)
self.assertIs(asyncio.current_task(None), task)
self.assertIs(asyncio.current_task(), task)
fut2 = self.new_future(self.loop)
async def coro1(loop):
- self.assertTrue(asyncio.current_task(loop=loop) is task1)
+ self.assertTrue(asyncio.current_task() is task1)
await fut1
- self.assertTrue(asyncio.current_task(loop=loop) is task1)
+ self.assertTrue(asyncio.current_task() is task1)
fut2.set_result(True)
async def coro2(loop):
- self.assertTrue(asyncio.current_task(loop=loop) is task2)
+ self.assertTrue(asyncio.current_task() is task2)
fut1.set_result(True)
await fut2
- self.assertTrue(asyncio.current_task(loop=loop) is task2)
+ self.assertTrue(asyncio.current_task() is task2)
task1 = self.new_task(self.loop, coro1(self.loop))
task2 = self.new_task(self.loop, coro2(self.loop))
# as_completed() expects a list of futures, not a future instance
self.assertRaises(TypeError, self.loop.run_until_complete,
- asyncio.as_completed(fut, loop=self.loop))
+ asyncio.as_completed(fut))
coro = coroutine_function()
self.assertRaises(TypeError, self.loop.run_until_complete,
- asyncio.as_completed(coro, loop=self.loop))
+ asyncio.as_completed(coro))
coro.close()
def test_wait_invalid_args(self):
"""Ensure that a gathering future refuses to be cancelled once all
children are done"""
loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
self.addCleanup(loop.close)
fut = self.new_future(loop)
# gathering task is done at the same time as the child future
def child_coro():
return (yield from fut)
- gather_future = asyncio.gather(child_coro(), loop=loop)
+ gather_future = asyncio.gather(child_coro())
gather_task = asyncio.ensure_future(gather_future, loop=loop)
cancel_result = None
while True:
time += 0.05
await asyncio.gather(asyncio.sleep(0.05),
- return_exceptions=True,
- loop=loop)
+ return_exceptions=True)
if time > 1:
return
task = loop.create_task(sub(random.randint(0, 10)))
tasks.append(task)
- await asyncio.gather(*tasks, loop=loop)
+ await asyncio.gather(*tasks)
loop = asyncio.new_event_loop()
try:
self._run_loop(self.one_loop)
self.assertTrue(fut.done())
self.assertEqual(fut.result(), [])
- with self.assertWarns(DeprecationWarning):
- fut = asyncio.gather(*seq_or_iter, loop=self.other_loop)
- self.assertIs(fut._loop, self.other_loop)
def test_constructor_empty_sequence(self):
self._check_empty_sequence([])
fut2 = self.other_loop.create_future()
with self.assertRaises(ValueError):
asyncio.gather(fut1, fut2)
- with self.assertRaises(ValueError):
- asyncio.gather(fut1, loop=self.other_loop)
def test_constructor_homogenous_futures(self):
children = [self.other_loop.create_future() for i in range(3)]
self.assertIs(fut._loop, self.other_loop)
self._run_loop(self.other_loop)
self.assertFalse(fut.done())
- fut = asyncio.gather(*children, loop=self.other_loop)
+ fut = asyncio.gather(*children)
self.assertIs(fut._loop, self.other_loop)
self._run_loop(self.other_loop)
self.assertFalse(fut.done())
self.one_loop.run_until_complete(fut)
self.set_event_loop(self.other_loop, cleanup=False)
+ asyncio.set_event_loop(self.other_loop)
gen3 = coro()
gen4 = coro()
- fut2 = asyncio.gather(gen3, gen4, loop=self.other_loop)
+ fut2 = asyncio.gather(gen3, gen4)
self.assertIs(fut2._loop, self.other_loop)
self.other_loop.run_until_complete(fut2)
def coro(s):
return s
c = coro('abc')
- fut = asyncio.gather(c, c, coro('def'), c, loop=self.one_loop)
+ fut = asyncio.gather(c, c, coro('def'), c)
self._run_loop(self.one_loop)
self.assertEqual(fut.result(), ['abc', 'abc', 'def', 'abc'])
async def outer():
nonlocal proof, gatherer
- gatherer = asyncio.gather(child1, child2, loop=self.one_loop)
+ gatherer = asyncio.gather(child1, child2)
await gatherer
proof += 100
b = self.one_loop.create_future()
async def outer():
- await asyncio.gather(inner(a), inner(b), loop=self.one_loop)
+ await asyncio.gather(inner(a), inner(b))
f = asyncio.ensure_future(outer(), loop=self.one_loop)
test_utils.run_briefly(self.one_loop)
self.loop.run_until_complete(coro())
self.assertEqual(result, 11)
- def test_loop_argument_is_deprecated(self):
- # Remove test when loop argument is removed in Python 3.10
- with self.assertWarns(DeprecationWarning):
- self.loop.run_until_complete(asyncio.sleep(0.01, loop=self.loop))
-
class WaitTests(test_utils.TestCase):
def setUp(self):
self.loop = None
super().tearDown()
- def test_loop_argument_is_deprecated_in_wait(self):
- # Remove test when loop argument is removed in Python 3.10
- with self.assertWarns(DeprecationWarning):
- self.loop.run_until_complete(
- asyncio.wait([coroutine_function()], loop=self.loop))
-
- def test_loop_argument_is_deprecated_in_wait_for(self):
- # Remove test when loop argument is removed in Python 3.10
- with self.assertWarns(DeprecationWarning):
- self.loop.run_until_complete(
- asyncio.wait_for(coroutine_function(), 0.01, loop=self.loop))
-
def test_coro_is_deprecated_in_wait(self):
# Remove test when passing coros to asyncio.wait() is removed in 3.11
with self.assertWarns(DeprecationWarning):
return 'ok2'
async def inner():
- return await asyncio.gather(coro1(), coro2(), loop=self.loop)
+ return await asyncio.gather(coro1(), coro2())
result = self.loop.run_until_complete(inner())
self.assertEqual(['ok1', 'ok2'], result)