Deprecate `asyncio.set_event_loop` to be removed in Python 3.16.
Set *loop* as the current event loop for the current OS thread.
+ .. deprecated:: next
+ The :func:`set_event_loop` function is deprecated and will be removed
+ in Python 3.16.
+
.. function:: new_event_loop()
Create and return a new event loop object.
return_code = 0
loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
+ asyncio._set_event_loop(loop)
repl_locals = {'asyncio': asyncio}
for key in {'__name__', '__package__',
# SPDX-FileCopyrightText: Copyright (c) 2015-2021 MagicStack Inc. http://magic.io
__all__ = (
- '_AbstractEventLoopPolicy',
- 'AbstractEventLoop', 'AbstractServer',
- 'Handle', 'TimerHandle',
- '_get_event_loop_policy',
- 'get_event_loop_policy',
- '_set_event_loop_policy',
- 'set_event_loop_policy',
- 'get_event_loop', 'set_event_loop', 'new_event_loop',
- '_set_running_loop', 'get_running_loop',
- '_get_running_loop',
+ "_AbstractEventLoopPolicy",
+ "AbstractEventLoop",
+ "AbstractServer",
+ "Handle",
+ "TimerHandle",
+ "_get_event_loop_policy",
+ "get_event_loop_policy",
+ "_set_event_loop_policy",
+ "set_event_loop_policy",
+ "get_event_loop",
+ "_set_event_loop",
+ "set_event_loop",
+ "new_event_loop",
+ "_set_running_loop",
+ "get_running_loop",
+ "_get_running_loop",
)
import contextvars
return _get_event_loop_policy().get_event_loop()
+def _set_event_loop(loop):
+ _get_event_loop_policy().set_event_loop(loop)
+
def set_event_loop(loop):
"""Equivalent to calling get_event_loop_policy().set_event_loop(loop)."""
- _get_event_loop_policy().set_event_loop(loop)
+ warnings._deprecated('asyncio.set_event_loop', remove=(3,16))
+ _set_event_loop(loop)
def new_event_loop():
loop.shutdown_default_executor(constants.THREAD_JOIN_TIMEOUT))
finally:
if self._set_event_loop:
- events.set_event_loop(None)
+ events._set_event_loop(None)
loop.close()
self._loop = None
self._state = _State.CLOSED
if not self._set_event_loop:
# Call set_event_loop only once to avoid calling
# attach_loop multiple times on child watchers
- events.set_event_loop(self._loop)
+ events._set_event_loop(self._loop)
self._set_event_loop = True
else:
self._loop = self._loop_factory()
def setUp(self):
self.loop = asyncio.new_event_loop()
- asyncio.set_event_loop(None)
+ asyncio._set_event_loop(None)
def tearDown(self):
self.loop.close()
def setUp(self):
self.loop = self.new_loop()
- asyncio.set_event_loop(None)
+ asyncio._set_event_loop(None)
self.loop.set_exception_handler(self.loop_exception_handler)
self.__unhandled_exceptions = []
self.fail('unexpected calls to loop.call_exception_handler()')
finally:
- asyncio.set_event_loop(None)
+ asyncio._set_event_loop(None)
self.loop = None
def tcp_server(self, server_prog, *,
if create_loop:
loop2 = base_events.BaseEventLoop()
try:
- asyncio.set_event_loop(loop2)
+ asyncio._set_event_loop(loop2)
self.check_thread(loop, debug)
finally:
- asyncio.set_event_loop(None)
+ asyncio._set_event_loop(None)
loop2.close()
else:
self.check_thread(loop, debug)
loop = Loop()
self.addCleanup(loop.close)
- asyncio.set_event_loop(loop)
+ asyncio._set_event_loop(loop)
def run_loop():
def zero_error():
async def stop_loop_coro(loop):
loop.stop()
- asyncio.set_event_loop(self.loop)
+ asyncio._set_event_loop(self.loop)
self.loop.set_debug(True)
self.loop.slow_callback_duration = 0.0
return 'hello'
loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
+ asyncio._set_event_loop(loop)
return loop.run_until_complete(doit())
class PolicyTests(unittest.TestCase):
+ def test_asyncio_set_event_loop_deprecation(self):
+ with self.assertWarnsRegex(
+ DeprecationWarning, "'asyncio.set_event_loop' is deprecated"):
+ loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
+ self.assertIs(loop, asyncio.get_event_loop())
+ loop.close()
+
def test_abstract_event_loop_policy_deprecation(self):
with self.assertWarnsRegex(
DeprecationWarning, "'asyncio.AbstractEventLoopPolicy' is deprecated"):
super().setUp()
self.loop = asyncio.new_event_loop()
- asyncio.set_event_loop(self.loop)
+ asyncio._set_event_loop(self.loop)
def tearDown(self):
try:
super().tearDown()
finally:
self.loop.close()
- asyncio.set_event_loop(None)
+ asyncio._set_event_loop(None)
events._get_running_loop = self._get_running_loop_saved
events._set_running_loop = self._set_running_loop_saved
with self.assertRaises(TestError):
asyncio.get_event_loop()
- asyncio.set_event_loop(None)
+ asyncio._set_event_loop(None)
with self.assertRaises(TestError):
asyncio.get_event_loop()
loop.run_until_complete(func())
- asyncio.set_event_loop(loop)
+ asyncio._set_event_loop(loop)
with self.assertRaises(TestError):
asyncio.get_event_loop()
- asyncio.set_event_loop(None)
+ asyncio._set_event_loop(None)
with self.assertRaises(TestError):
asyncio.get_event_loop()
with self.assertRaisesRegex(RuntimeError, 'no current'):
asyncio.get_event_loop()
- asyncio.set_event_loop(None)
+ asyncio._set_event_loop(None)
with self.assertRaisesRegex(RuntimeError, 'no current'):
asyncio.get_event_loop()
loop.run_until_complete(func())
- asyncio.set_event_loop(loop)
+ asyncio._set_event_loop(loop)
self.assertIs(asyncio.get_event_loop(), loop)
- asyncio.set_event_loop(None)
+ asyncio._set_event_loop(None)
with self.assertRaisesRegex(RuntimeError, 'no current'):
asyncio.get_event_loop()
def test_constructor_use_global_loop(self):
# Deprecated in 3.10, undeprecated in 3.12
- asyncio.set_event_loop(self.loop)
- self.addCleanup(asyncio.set_event_loop, None)
+ asyncio._set_event_loop(self.loop)
+ self.addCleanup(asyncio._set_event_loop, None)
f = self._new_future()
self.assertIs(f._loop, self.loop)
self.assertIs(f.get_loop(), self.loop)
def test_wrap_future_use_global_loop(self):
# Deprecated in 3.10, undeprecated in 3.12
- asyncio.set_event_loop(self.loop)
- self.addCleanup(asyncio.set_event_loop, None)
+ asyncio._set_event_loop(self.loop)
+ self.addCleanup(asyncio._set_event_loop, None)
def run(arg):
return (arg, threading.get_ident())
ex = concurrent.futures.ThreadPoolExecutor(1)
try:
reader, writer = self.loop.run_until_complete(open_connection_fut)
finally:
- asyncio.set_event_loop(None)
+ asyncio._set_event_loop(None)
writer.write(b'GET / HTTP/1.0\r\n\r\n')
f = reader.read()
data = self.loop.run_until_complete(f)
# asyncio issue #184: Ensure that StreamReaderProtocol constructor
# retrieves the current loop if the loop parameter is not set
# Deprecated in 3.10, undeprecated in 3.12
- self.addCleanup(asyncio.set_event_loop, None)
- asyncio.set_event_loop(self.loop)
+ self.addCleanup(asyncio._set_event_loop, None)
+ asyncio._set_event_loop(self.loop)
reader = asyncio.StreamReader()
self.assertIs(reader._loop, self.loop)
# asyncio issue #184: Ensure that StreamReaderProtocol constructor
# retrieves the current loop if the loop parameter is not set
# Deprecated in 3.10, undeprecated in 3.12
- self.addCleanup(asyncio.set_event_loop, None)
- asyncio.set_event_loop(self.loop)
+ self.addCleanup(asyncio._set_event_loop, None)
+ asyncio._set_event_loop(self.loop)
reader = mock.Mock()
protocol = asyncio.StreamReaderProtocol(reader)
self.assertIs(protocol._loop, self.loop)
self.assertEqual(t.result(), 'ok')
# Deprecated in 3.10, undeprecated in 3.12
- asyncio.set_event_loop(self.loop)
- self.addCleanup(asyncio.set_event_loop, None)
+ asyncio._set_event_loop(self.loop)
+ self.addCleanup(asyncio._set_event_loop, None)
t = asyncio.ensure_future(notmuch())
self.assertIs(t._loop, self.loop)
self.loop.run_until_complete(t)
async def coro():
return 42
- asyncio.set_event_loop(self.loop)
- self.addCleanup(asyncio.set_event_loop, None)
+ asyncio._set_event_loop(self.loop)
+ self.addCleanup(asyncio._set_event_loop, None)
outer = asyncio.shield(coro())
self.assertEqual(outer._loop, self.loop)
res = self.loop.run_until_complete(outer)
self.assertEqual(self.all_tasks(loop=self.loop), {task})
- asyncio.set_event_loop(None)
+ asyncio._set_event_loop(None)
# execute the task so it waits for future
self.loop._run_once()
def test_constructor_empty_sequence_use_global_loop(self):
# Deprecated in 3.10, undeprecated in 3.12
- asyncio.set_event_loop(self.one_loop)
- self.addCleanup(asyncio.set_event_loop, None)
+ asyncio._set_event_loop(self.one_loop)
+ self.addCleanup(asyncio._set_event_loop, None)
fut = asyncio.gather()
self.assertIsInstance(fut, asyncio.Future)
self.assertIs(fut._loop, self.one_loop)
# Deprecated in 3.10, undeprecated in 3.12
async def coro():
return 'abc'
- asyncio.set_event_loop(self.other_loop)
- self.addCleanup(asyncio.set_event_loop, None)
+ asyncio._set_event_loop(self.other_loop)
+ self.addCleanup(asyncio._set_event_loop, None)
gen1 = coro()
gen2 = coro()
fut = asyncio.gather(gen1, gen2)
def setUp(self):
self.loop = asyncio.new_event_loop()
- asyncio.set_event_loop(self.loop)
+ asyncio._set_event_loop(self.loop)
def tearDown(self):
self.loop.close()
- asyncio.set_event_loop(None)
+ asyncio._set_event_loop(None)
def test_add_reader_invalid_argument(self):
def assert_raises():
if loop is None:
raise AssertionError('loop is None')
# ensure that the event loop is passed explicitly in asyncio
- events.set_event_loop(None)
+ events._set_event_loop(None)
if cleanup:
self.addCleanup(self.close_loop, loop)
self._thread_cleanup = threading_helper.threading_setup()
def tearDown(self):
- events.set_event_loop(None)
+ events._set_event_loop(None)
# Detect CPython bug #23353: ensure that yield/yield-from is not used
# in an except block of a generator
buffer.append('unreachable')
loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
+ asyncio._set_event_loop(loop)
try:
loop.run_until_complete(f())
except MyException:
co = get_coroutine()
- self.addCleanup(asyncio.set_event_loop_policy, None)
+ self.addCleanup(asyncio._set_event_loop_policy, None)
a, b = asyncio.run(co())
self.assertIsInstance(a, TypeVar)
def test_setup_get_event_loop(self):
# See https://github.com/python/cpython/issues/95736
# Make sure the default event loop is not used
- asyncio.set_event_loop(None)
+ asyncio._set_event_loop(None)
class TestCase1(unittest.IsolatedAsyncioTestCase):
def setUp(self):