async def sleep(delay, result=None, *, loop=None):
"""Coroutine that completes after a given time (in seconds)."""
+ if loop is not None:
+ warnings.warn("The loop argument is deprecated since Python 3.8, "
+ "and scheduled for removal in Python 3.10.",
+ DeprecationWarning, stacklevel=2)
+
if delay <= 0:
await __sleep0()
return result
if loop is None:
loop = events.get_running_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
future = loop.create_future()
h = loop.call_later(delay,
after catching an exception (raised by one of the awaitables) from
gather won't cancel any other awaitables.
"""
+ if loop is not None:
+ warnings.warn("The loop argument is deprecated since Python 3.8, "
+ "and scheduled for removal in Python 3.10.",
+ DeprecationWarning, stacklevel=2)
+
if not coros_or_futures:
if loop is None:
loop = events.get_event_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
outer = loop.create_future()
outer.set_result([])
return outer
import os
-from test.support import load_package_tests, import_module
+from test import support
+import unittest
# Skip tests if we don't have concurrent.futures.
-import_module('concurrent.futures')
+support.import_module('concurrent.futures')
-def load_tests(*args):
- return load_package_tests(os.path.dirname(__file__), *args)
+
+def load_tests(loader, _, pattern):
+ pkg_dir = os.path.dirname(__file__)
+ suite = AsyncioTestSuite()
+ return support.load_package_tests(pkg_dir, loader, suite, pattern)
+
+
+class AsyncioTestSuite(unittest.TestSuite):
+ """A custom test suite that also runs setup/teardown for the whole package.
+
+ Normally unittest only runs setUpModule() and tearDownModule() within each
+ test module part of the test suite. Copying those functions to each file
+ would be tedious, let's run this once and for all.
+ """
+ def run(self, result, debug=False):
+ ignore = support.ignore_deprecations_from
+ tokens = {
+ ignore("asyncio.base_events", like=r".*loop argument.*"),
+ ignore("asyncio.unix_events", like=r".*loop argument.*"),
+ ignore("asyncio.futures", like=r".*loop argument.*"),
+ ignore("asyncio.runners", like=r".*loop argument.*"),
+ ignore("asyncio.subprocess", like=r".*loop argument.*"),
+ ignore("asyncio.tasks", like=r".*loop argument.*"),
+ ignore("test.test_asyncio.test_queues", like=r".*loop argument.*"),
+ ignore("test.test_asyncio.test_tasks", like=r".*loop argument.*"),
+ }
+ try:
+ super().run(result, debug=debug)
+ finally:
+ support.clear_ignored_deprecations(*tokens)