This method should never return ``None``.
+ .. deprecated:: next
+ The :class:`AbstractEventLoopPolicy` class is deprecated and
+ will be removed in Python 3.16.
+
.. _asyncio-policy-builtin:
The :meth:`get_event_loop` method of the default asyncio policy now
raises a :exc:`RuntimeError` if there is no set event loop.
+ .. deprecated:: next
+ The :class:`DefaultEventLoopPolicy` class is deprecated and
+ will be removed in Python 3.16.
+
.. class:: WindowsSelectorEventLoopPolicy
.. availability:: Windows.
+ .. deprecated:: next
+ The :class:`WindowsSelectorEventLoopPolicy` class is deprecated and
+ will be removed in Python 3.16.
+
.. class:: WindowsProactorEventLoopPolicy
.. availability:: Windows.
+ .. deprecated:: next
+ The :class:`WindowsProactorEventLoopPolicy` class is deprecated and
+ will be removed in Python 3.16.
+
.. _asyncio-custom-policies:
else:
from .unix_events import * # pragma: no cover
__all__ += unix_events.__all__
+
+def __getattr__(name: str):
+ import warnings
+
+ deprecated = {
+ "AbstractEventLoopPolicy",
+ "DefaultEventLoopPolicy",
+ "WindowsSelectorEventLoopPolicy",
+ "WindowsProactorEventLoopPolicy",
+ }
+ if name in deprecated:
+ warnings._deprecated(f"asyncio.{name}", remove=(3, 16))
+ # deprecated things have underscores in front of them
+ return globals()["_" + name]
+
+ raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
# SPDX-FileCopyrightText: Copyright (c) 2015-2021 MagicStack Inc. http://magic.io
__all__ = (
- 'AbstractEventLoopPolicy',
+ '_AbstractEventLoopPolicy',
'AbstractEventLoop', 'AbstractServer',
'Handle', 'TimerHandle',
'_get_event_loop_policy',
raise NotImplementedError
-class AbstractEventLoopPolicy:
+class _AbstractEventLoopPolicy:
"""Abstract policy for accessing the event loop."""
def get_event_loop(self):
the current context, set_event_loop must be called explicitly."""
raise NotImplementedError
-class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
+class _BaseDefaultEventLoopPolicy(_AbstractEventLoopPolicy):
"""Default policy implementation for accessing the event loop.
In this policy, each thread has its own event loop. However, we
global _event_loop_policy
with _lock:
if _event_loop_policy is None: # pragma: no branch
- from . import DefaultEventLoopPolicy
- _event_loop_policy = DefaultEventLoopPolicy()
+ from . import _DefaultEventLoopPolicy
+ _event_loop_policy = _DefaultEventLoopPolicy()
def _get_event_loop_policy():
If policy is None, the default policy is restored."""
global _event_loop_policy
- if policy is not None and not isinstance(policy, AbstractEventLoopPolicy):
+ if policy is not None and not isinstance(policy, _AbstractEventLoopPolicy):
raise TypeError(f"policy must be an instance of AbstractEventLoopPolicy or None, not '{type(policy).__name__}'")
_event_loop_policy = policy
def on_fork():
# Reset the loop and wakeupfd in the forked child process.
if _event_loop_policy is not None:
- _event_loop_policy._local = BaseDefaultEventLoopPolicy._Local()
+ _event_loop_policy._local = _BaseDefaultEventLoopPolicy._Local()
_set_running_loop(None)
signal.set_wakeup_fd(-1)
__all__ = (
'SelectorEventLoop',
- 'DefaultEventLoopPolicy',
+ '_DefaultEventLoopPolicy',
'EventLoop',
)
return True
-class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
+class _UnixDefaultEventLoopPolicy(events._BaseDefaultEventLoopPolicy):
"""UNIX event loop policy"""
_loop_factory = _UnixSelectorEventLoop
SelectorEventLoop = _UnixSelectorEventLoop
-DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy
+_DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy
EventLoop = SelectorEventLoop
__all__ = (
'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
- 'DefaultEventLoopPolicy', 'WindowsSelectorEventLoopPolicy',
- 'WindowsProactorEventLoopPolicy', 'EventLoop',
+ '_DefaultEventLoopPolicy', '_WindowsSelectorEventLoopPolicy',
+ '_WindowsProactorEventLoopPolicy', 'EventLoop',
)
SelectorEventLoop = _WindowsSelectorEventLoop
-class WindowsSelectorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
+class _WindowsSelectorEventLoopPolicy(events._BaseDefaultEventLoopPolicy):
_loop_factory = SelectorEventLoop
-class WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
+class _WindowsProactorEventLoopPolicy(events._BaseDefaultEventLoopPolicy):
_loop_factory = ProactorEventLoop
-DefaultEventLoopPolicy = WindowsProactorEventLoopPolicy
+_DefaultEventLoopPolicy = _WindowsProactorEventLoopPolicy
EventLoop = ProactorEventLoop
class PolicyTests(unittest.TestCase):
+ def test_abstract_event_loop_policy_deprecation(self):
+ with self.assertWarnsRegex(
+ DeprecationWarning, "'asyncio.AbstractEventLoopPolicy' is deprecated"):
+ policy = asyncio.AbstractEventLoopPolicy()
+ self.assertIsInstance(policy, asyncio.AbstractEventLoopPolicy)
+
+ def test_default_event_loop_policy_deprecation(self):
+ with self.assertWarnsRegex(
+ DeprecationWarning, "'asyncio.DefaultEventLoopPolicy' is deprecated"):
+ policy = asyncio.DefaultEventLoopPolicy()
+ self.assertIsInstance(policy, asyncio.DefaultEventLoopPolicy)
+
def test_event_loop_policy(self):
- policy = asyncio.AbstractEventLoopPolicy()
+ policy = asyncio._AbstractEventLoopPolicy()
self.assertRaises(NotImplementedError, policy.get_event_loop)
self.assertRaises(NotImplementedError, policy.set_event_loop, object())
self.assertRaises(NotImplementedError, policy.new_event_loop)
def test_get_event_loop(self):
- policy = asyncio.DefaultEventLoopPolicy()
+ policy = asyncio._DefaultEventLoopPolicy()
self.assertIsNone(policy._local._loop)
with self.assertRaises(RuntimeError):
self.assertIsNone(policy._local._loop)
def test_get_event_loop_does_not_call_set_event_loop(self):
- policy = asyncio.DefaultEventLoopPolicy()
+ policy = asyncio._DefaultEventLoopPolicy()
with mock.patch.object(
policy, "set_event_loop",
m_set_event_loop.assert_not_called()
def test_get_event_loop_after_set_none(self):
- policy = asyncio.DefaultEventLoopPolicy()
+ policy = asyncio._DefaultEventLoopPolicy()
policy.set_event_loop(None)
self.assertRaises(RuntimeError, policy.get_event_loop)
def test_get_event_loop_thread(self, m_current_thread):
def f():
- policy = asyncio.DefaultEventLoopPolicy()
+ policy = asyncio._DefaultEventLoopPolicy()
self.assertRaises(RuntimeError, policy.get_event_loop)
th = threading.Thread(target=f)
th.join()
def test_new_event_loop(self):
- policy = asyncio.DefaultEventLoopPolicy()
+ policy = asyncio._DefaultEventLoopPolicy()
loop = policy.new_event_loop()
self.assertIsInstance(loop, asyncio.AbstractEventLoop)
loop.close()
def test_set_event_loop(self):
- policy = asyncio.DefaultEventLoopPolicy()
+ policy = asyncio._DefaultEventLoopPolicy()
old_loop = policy.new_event_loop()
policy.set_event_loop(old_loop)
with self.assertWarnsRegex(
DeprecationWarning, "'asyncio.get_event_loop_policy' is deprecated"):
policy = asyncio.get_event_loop_policy()
- self.assertIsInstance(policy, asyncio.AbstractEventLoopPolicy)
+ self.assertIsInstance(policy, asyncio._AbstractEventLoopPolicy)
self.assertIs(policy, asyncio.get_event_loop_policy())
def test_set_event_loop_policy(self):
DeprecationWarning, "'asyncio.get_event_loop_policy' is deprecated"):
old_policy = asyncio.get_event_loop_policy()
- policy = asyncio.DefaultEventLoopPolicy()
+ policy = asyncio._DefaultEventLoopPolicy()
with self.assertWarnsRegex(
DeprecationWarning, "'asyncio.set_event_loop_policy' is deprecated"):
asyncio.set_event_loop_policy(policy)
class TestError(Exception):
pass
- class Policy(asyncio.DefaultEventLoopPolicy):
+ class Policy(asyncio._DefaultEventLoopPolicy):
def get_event_loop(self):
raise TestError
def test_get_event_loop_returns_running_loop2(self):
old_policy = asyncio._get_event_loop_policy()
try:
- asyncio._set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
+ asyncio._set_event_loop_policy(asyncio._DefaultEventLoopPolicy())
loop = asyncio.new_event_loop()
self.addCleanup(loop.close)
_thread.interrupt_main()
-class TestPolicy(asyncio.AbstractEventLoopPolicy):
+class TestPolicy(asyncio._AbstractEventLoopPolicy):
def __init__(self, loop_factory):
self.loop_factory = loop_factory
def test_selector_win_policy(self):
async def main():
- self.assertIsInstance(
- asyncio.get_running_loop(),
- asyncio.SelectorEventLoop)
+ self.assertIsInstance(asyncio.get_running_loop(), asyncio.SelectorEventLoop)
old_policy = asyncio._get_event_loop_policy()
try:
- asyncio._set_event_loop_policy(
- asyncio.WindowsSelectorEventLoopPolicy())
+ with self.assertWarnsRegex(
+ DeprecationWarning,
+ "'asyncio.WindowsSelectorEventLoopPolicy' is deprecated",
+ ):
+ asyncio._set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main())
finally:
asyncio._set_event_loop_policy(old_policy)
old_policy = asyncio._get_event_loop_policy()
try:
- asyncio._set_event_loop_policy(
- asyncio.WindowsProactorEventLoopPolicy())
+ with self.assertWarnsRegex(
+ DeprecationWarning,
+ "'asyncio.WindowsProactorEventLoopPolicy' is deprecated",
+ ):
+ asyncio._set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
asyncio.run(main())
finally:
asyncio._set_event_loop_policy(old_policy)