* :class:`asyncio.WindowsProactorEventLoopPolicy`
* :func:`asyncio.get_event_loop_policy`
* :func:`asyncio.set_event_loop_policy`
- * :func:`asyncio.set_event_loop`
Users should use :func:`asyncio.run` or :class:`asyncio.Runner` with
*loop_factory* to use the desired event loop implementation.
.. note::
The :mod:`!asyncio` policy system is deprecated and will be removed
- in Python 3.16; from there on, this function will always return the
- running event loop.
-
+ in Python 3.16; from there on, this function will return the current
+ running event loop if present else it will return the
+ loop set by :func:`set_event_loop`.
.. function:: set_event_loop(loop)
Set *loop* as the current event loop for the current OS thread.
- .. deprecated:: 3.14
- The :func:`set_event_loop` function is deprecated and will be removed
- in Python 3.16.
-
.. function:: new_event_loop()
Create and return a new event loop object.
return_code = 0
loop = asyncio.new_event_loop()
- asyncio._set_event_loop(loop)
+ asyncio.set_event_loop(loop)
repl_locals = {'asyncio': asyncio}
for key in {'__name__', '__package__',
"_set_event_loop_policy",
"set_event_loop_policy",
"get_event_loop",
- "_set_event_loop",
"set_event_loop",
"new_event_loop",
"_set_running_loop",
return _get_event_loop_policy().get_event_loop()
-def _set_event_loop(loop):
- _get_event_loop_policy().set_event_loop(loop)
-
def set_event_loop(loop):
"""Equivalent to calling get_event_loop_policy().set_event_loop(loop)."""
- warnings._deprecated('asyncio.set_event_loop', remove=(3,16))
- _set_event_loop(loop)
+ _get_event_loop_policy().set_event_loop(loop)
def new_event_loop():
loop.shutdown_default_executor(constants.THREAD_JOIN_TIMEOUT))
finally:
if self._set_event_loop:
- events._set_event_loop(None)
+ events.set_event_loop(None)
loop.close()
self._loop = None
self._state = _State.CLOSED
if not self._set_event_loop:
# Call set_event_loop only once to avoid calling
# attach_loop multiple times on child watchers
- events._set_event_loop(self._loop)
+ events.set_event_loop(self._loop)
self._set_event_loop = True
else:
self._loop = self._loop_factory()
def setUp(self):
self.loop = asyncio.new_event_loop()
- asyncio._set_event_loop(None)
+ asyncio.set_event_loop(None)
def tearDown(self):
self.loop.close()
def setUp(self):
self.loop = self.new_loop()
- asyncio._set_event_loop(None)
+ asyncio.set_event_loop(None)
self.loop.set_exception_handler(self.loop_exception_handler)
self.__unhandled_exceptions = []
self.fail('unexpected calls to loop.call_exception_handler()')
finally:
- asyncio._set_event_loop(None)
+ asyncio.set_event_loop(None)
self.loop = None
def tcp_server(self, server_prog, *,
if create_loop:
loop2 = base_events.BaseEventLoop()
try:
- asyncio._set_event_loop(loop2)
+ asyncio.set_event_loop(loop2)
self.check_thread(loop, debug)
finally:
- asyncio._set_event_loop(None)
+ asyncio.set_event_loop(None)
loop2.close()
else:
self.check_thread(loop, debug)
loop = Loop()
self.addCleanup(loop.close)
- asyncio._set_event_loop(loop)
+ asyncio.set_event_loop(loop)
def run_loop():
def zero_error():
async def stop_loop_coro(loop):
loop.stop()
- asyncio._set_event_loop(self.loop)
+ asyncio.set_event_loop(self.loop)
self.loop.set_debug(True)
self.loop.slow_callback_duration = 0.0
return 'hello'
with contextlib.closing(asyncio.new_event_loop()) as loop:
- asyncio._set_event_loop(loop)
+ asyncio.set_event_loop(loop)
return loop.run_until_complete(doit())
class PolicyTests(unittest.TestCase):
- def test_asyncio_set_event_loop_deprecation(self):
- with self.assertWarnsRegex(
- DeprecationWarning, "'asyncio.set_event_loop' is deprecated"):
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- self.assertIs(loop, asyncio.get_event_loop())
- loop.close()
-
def test_abstract_event_loop_policy_deprecation(self):
with self.assertWarnsRegex(
DeprecationWarning, "'asyncio.AbstractEventLoopPolicy' is deprecated"):
super().setUp()
self.loop = asyncio.new_event_loop()
- asyncio._set_event_loop(self.loop)
+ asyncio.set_event_loop(self.loop)
def tearDown(self):
try:
super().tearDown()
finally:
self.loop.close()
- asyncio._set_event_loop(None)
+ asyncio.set_event_loop(None)
events._get_running_loop = self._get_running_loop_saved
events._set_running_loop = self._set_running_loop_saved
with self.assertRaises(TestError):
asyncio.get_event_loop()
- asyncio._set_event_loop(None)
+ asyncio.set_event_loop(None)
with self.assertRaises(TestError):
asyncio.get_event_loop()
loop.run_until_complete(func())
- asyncio._set_event_loop(loop)
+ asyncio.set_event_loop(loop)
with self.assertRaises(TestError):
asyncio.get_event_loop()
- asyncio._set_event_loop(None)
+ asyncio.set_event_loop(None)
with self.assertRaises(TestError):
asyncio.get_event_loop()
with self.assertRaisesRegex(RuntimeError, 'no current'):
asyncio.get_event_loop()
- asyncio._set_event_loop(None)
+ asyncio.set_event_loop(None)
with self.assertRaisesRegex(RuntimeError, 'no current'):
asyncio.get_event_loop()
loop.run_until_complete(func())
- asyncio._set_event_loop(loop)
+ asyncio.set_event_loop(loop)
self.assertIs(asyncio.get_event_loop(), loop)
- asyncio._set_event_loop(None)
+ asyncio.set_event_loop(None)
with self.assertRaisesRegex(RuntimeError, 'no current'):
asyncio.get_event_loop()
def test_constructor_use_global_loop(self):
# Deprecated in 3.10, undeprecated in 3.12
- asyncio._set_event_loop(self.loop)
- self.addCleanup(asyncio._set_event_loop, None)
+ asyncio.set_event_loop(self.loop)
+ self.addCleanup(asyncio.set_event_loop, None)
f = self._new_future()
self.assertIs(f._loop, self.loop)
self.assertIs(f.get_loop(), self.loop)
def test_wrap_future_use_global_loop(self):
# Deprecated in 3.10, undeprecated in 3.12
- asyncio._set_event_loop(self.loop)
- self.addCleanup(asyncio._set_event_loop, None)
+ asyncio.set_event_loop(self.loop)
+ self.addCleanup(asyncio.set_event_loop, None)
def run(arg):
return (arg, threading.get_ident())
ex = concurrent.futures.ThreadPoolExecutor(1)
try:
reader, writer = self.loop.run_until_complete(open_connection_fut)
finally:
- asyncio._set_event_loop(None)
+ asyncio.set_event_loop(None)
writer.write(b'GET / HTTP/1.0\r\n\r\n')
f = reader.read()
data = self.loop.run_until_complete(f)
# asyncio issue #184: Ensure that StreamReaderProtocol constructor
# retrieves the current loop if the loop parameter is not set
# Deprecated in 3.10, undeprecated in 3.12
- self.addCleanup(asyncio._set_event_loop, None)
- asyncio._set_event_loop(self.loop)
+ self.addCleanup(asyncio.set_event_loop, None)
+ asyncio.set_event_loop(self.loop)
reader = asyncio.StreamReader()
self.assertIs(reader._loop, self.loop)
# asyncio issue #184: Ensure that StreamReaderProtocol constructor
# retrieves the current loop if the loop parameter is not set
# Deprecated in 3.10, undeprecated in 3.12
- self.addCleanup(asyncio._set_event_loop, None)
- asyncio._set_event_loop(self.loop)
+ self.addCleanup(asyncio.set_event_loop, None)
+ asyncio.set_event_loop(self.loop)
reader = mock.Mock()
protocol = asyncio.StreamReaderProtocol(reader)
self.assertIs(protocol._loop, self.loop)
self.assertEqual(t.result(), 'ok')
# Deprecated in 3.10, undeprecated in 3.12
- asyncio._set_event_loop(self.loop)
- self.addCleanup(asyncio._set_event_loop, None)
+ asyncio.set_event_loop(self.loop)
+ self.addCleanup(asyncio.set_event_loop, None)
t = asyncio.ensure_future(notmuch())
self.assertIs(t._loop, self.loop)
self.loop.run_until_complete(t)
async def coro():
return 42
- asyncio._set_event_loop(self.loop)
- self.addCleanup(asyncio._set_event_loop, None)
+ asyncio.set_event_loop(self.loop)
+ self.addCleanup(asyncio.set_event_loop, None)
outer = asyncio.shield(coro())
self.assertEqual(outer._loop, self.loop)
res = self.loop.run_until_complete(outer)
def test_constructor_empty_sequence_use_global_loop(self):
# Deprecated in 3.10, undeprecated in 3.12
- asyncio._set_event_loop(self.one_loop)
- self.addCleanup(asyncio._set_event_loop, None)
+ asyncio.set_event_loop(self.one_loop)
+ self.addCleanup(asyncio.set_event_loop, None)
fut = asyncio.gather()
self.assertIsInstance(fut, asyncio.Future)
self.assertIs(fut._loop, self.one_loop)
# Deprecated in 3.10, undeprecated in 3.12
async def coro():
return 'abc'
- asyncio._set_event_loop(self.other_loop)
- self.addCleanup(asyncio._set_event_loop, None)
+ asyncio.set_event_loop(self.other_loop)
+ self.addCleanup(asyncio.set_event_loop, None)
gen1 = coro()
gen2 = coro()
fut = asyncio.gather(gen1, gen2)
def setUp(self):
self.loop = asyncio.new_event_loop()
- asyncio._set_event_loop(self.loop)
+ asyncio.set_event_loop(self.loop)
def tearDown(self):
self.loop.close()
- asyncio._set_event_loop(None)
+ asyncio.set_event_loop(None)
def test_add_reader_invalid_argument(self):
def assert_raises():
if loop is None:
raise AssertionError('loop is None')
# ensure that the event loop is passed explicitly in asyncio
- events._set_event_loop(None)
+ events.set_event_loop(None)
if cleanup:
self.addCleanup(self.close_loop, loop)
self._thread_cleanup = threading_helper.threading_setup()
def tearDown(self):
- events._set_event_loop(None)
+ events.set_event_loop(None)
# Detect CPython bug #23353: ensure that yield/yield-from is not used
# in an except block of a generator
buffer.append('unreachable')
loop = asyncio.new_event_loop()
- asyncio._set_event_loop(loop)
+ asyncio.set_event_loop(loop)
try:
loop.run_until_complete(f())
except MyException:
def test_setup_get_event_loop(self):
# See https://github.com/python/cpython/issues/95736
# Make sure the default event loop is not used
- asyncio._set_event_loop(None)
+ asyncio.set_event_loop(None)
class TestCase1(unittest.IsolatedAsyncioTestCase):
def setUp(self):