This deprecates `asyncio.get_event_loop_policy` and will be removed in Python 3.16.
Return the current process-wide policy.
+ .. deprecated:: next
+ The :func:`get_event_loop_policy` function is deprecated and
+ will be removed in Python 3.16.
+
.. function:: set_event_loop_policy(policy)
Set the current process-wide policy to *policy*.
'AbstractEventLoopPolicy',
'AbstractEventLoop', 'AbstractServer',
'Handle', 'TimerHandle',
+ '_get_event_loop_policy',
'get_event_loop_policy',
'_set_event_loop_policy',
'set_event_loop_policy',
_event_loop_policy = DefaultEventLoopPolicy()
-def get_event_loop_policy():
+def _get_event_loop_policy():
"""Get the current event loop policy."""
if _event_loop_policy is None:
_init_event_loop_policy()
return _event_loop_policy
+def get_event_loop_policy():
+ warnings._deprecated('asyncio.get_event_loop_policy', remove=(3, 16))
+ return _get_event_loop_policy()
def _set_event_loop_policy(policy):
"""Set the current event loop policy.
_event_loop_policy = policy
def set_event_loop_policy(policy):
- warnings._deprecated('set_event_loop_policy', remove=(3,16))
+ warnings._deprecated('asyncio.set_event_loop_policy', remove=(3,16))
_set_event_loop_policy(policy)
def get_event_loop():
current_loop = _get_running_loop()
if current_loop is not None:
return current_loop
- return get_event_loop_policy().get_event_loop()
+ return _get_event_loop_policy().get_event_loop()
def set_event_loop(loop):
"""Equivalent to calling get_event_loop_policy().set_event_loop(loop)."""
- get_event_loop_policy().set_event_loop(loop)
+ _get_event_loop_policy().set_event_loop(loop)
def new_event_loop():
"""Equivalent to calling get_event_loop_policy().new_event_loop()."""
- return get_event_loop_policy().new_event_loop()
+ return _get_event_loop_policy().new_event_loop()
# Alias pure-Python implementations for testing purposes.
self.assertRegex(repr(h), regex)
def test_handle_source_traceback(self):
- loop = asyncio.get_event_loop_policy().new_event_loop()
+ loop = asyncio.new_event_loop()
loop.set_debug(True)
self.set_event_loop(loop)
old_loop.close()
def test_get_event_loop_policy(self):
- policy = asyncio.get_event_loop_policy()
- self.assertIsInstance(policy, asyncio.AbstractEventLoopPolicy)
- self.assertIs(policy, asyncio.get_event_loop_policy())
+ with self.assertWarnsRegex(
+ DeprecationWarning, "'asyncio.get_event_loop_policy' is deprecated"):
+ policy = asyncio.get_event_loop_policy()
+ self.assertIsInstance(policy, asyncio.AbstractEventLoopPolicy)
+ self.assertIs(policy, asyncio.get_event_loop_policy())
def test_set_event_loop_policy(self):
with self.assertWarnsRegex(
- DeprecationWarning, "'set_event_loop_policy' is deprecated"):
+ DeprecationWarning, "'asyncio.set_event_loop_policy' is deprecated"):
self.assertRaises(
TypeError, asyncio.set_event_loop_policy, object())
- old_policy = asyncio.get_event_loop_policy()
+ with self.assertWarnsRegex(
+ DeprecationWarning, "'asyncio.get_event_loop_policy' is deprecated"):
+ old_policy = asyncio.get_event_loop_policy()
policy = asyncio.DefaultEventLoopPolicy()
with self.assertWarnsRegex(
- DeprecationWarning, "'set_event_loop_policy' is deprecated"):
+ DeprecationWarning, "'asyncio.set_event_loop_policy' is deprecated"):
asyncio.set_event_loop_policy(policy)
- self.assertIs(policy, asyncio.get_event_loop_policy())
- self.assertIsNot(policy, old_policy)
+
+ with self.assertWarnsRegex(
+ DeprecationWarning, "'asyncio.get_event_loop_policy' is deprecated"):
+ self.assertIs(policy, asyncio.get_event_loop_policy())
+ self.assertIsNot(policy, old_policy)
class GetEventLoopTestsMixin:
def get_event_loop(self):
raise TestError
- old_policy = asyncio.get_event_loop_policy()
+ old_policy = asyncio._get_event_loop_policy()
try:
asyncio._set_event_loop_policy(Policy())
loop = asyncio.new_event_loop()
self.assertIs(asyncio._get_running_loop(), None)
def test_get_event_loop_returns_running_loop2(self):
- old_policy = asyncio.get_event_loop_policy()
+ old_policy = asyncio._get_event_loop_policy()
try:
asyncio._set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
loop = asyncio.new_event_loop()
asyncio._set_event_loop_policy(policy)
def tearDown(self):
- policy = asyncio.get_event_loop_policy()
+ policy = asyncio._get_event_loop_policy()
if policy.loop is not None:
self.assertTrue(policy.loop.is_closed())
self.assertTrue(policy.loop.shutdown_ag_run)
await asyncio.sleep(0)
return 42
- policy = asyncio.get_event_loop_policy()
+ policy = asyncio._get_event_loop_policy()
policy.set_event_loop = mock.Mock()
asyncio.run(main())
self.assertTrue(policy.set_event_loop.called)
async def coro():
pass
- policy = asyncio.get_event_loop_policy()
+ policy = asyncio._get_event_loop_policy()
policy.set_event_loop = mock.Mock()
runner = asyncio.Runner()
runner.run(coro())
def setUp(self):
super().setUp()
- policy = asyncio.get_event_loop_policy()
- self.loop = policy.new_event_loop()
+ self.loop = asyncio.new_event_loop()
self.set_event_loop(self.loop)
def test_watcher_implementation(self):
asyncio.get_running_loop(),
asyncio.SelectorEventLoop)
- old_policy = asyncio.get_event_loop_policy()
+ old_policy = asyncio._get_event_loop_policy()
try:
asyncio._set_event_loop_policy(
asyncio.WindowsSelectorEventLoopPolicy())
asyncio.get_running_loop(),
asyncio.ProactorEventLoop)
- old_policy = asyncio.get_event_loop_policy()
+ old_policy = asyncio._get_event_loop_policy()
try:
asyncio._set_event_loop_policy(
asyncio.WindowsProactorEventLoopPolicy())
class SyncAsyncExitStack(AsyncExitStack):
@staticmethod
def run_coroutine(coro):
- loop = asyncio.get_event_loop_policy().get_event_loop()
+ loop = asyncio.new_event_loop()
t = loop.create_task(coro)
t.add_done_callback(lambda f: loop.stop())
loop.run_forever()
class TestCase1(unittest.IsolatedAsyncioTestCase):
def setUp(self):
- asyncio.get_event_loop_policy().get_event_loop()
+ asyncio._get_event_loop_policy().get_event_loop()
async def test_demo1(self):
pass
}
WITH_MOD("asyncio.events")
- GET_MOD_ATTR(state->asyncio_get_event_loop_policy, "get_event_loop_policy")
+ GET_MOD_ATTR(state->asyncio_get_event_loop_policy, "_get_event_loop_policy")
WITH_MOD("asyncio.base_futures")
GET_MOD_ATTR(state->asyncio_future_repr_func, "_future_repr")