Get the current event loop.
- If there is no current event loop set in the current OS thread,
- the OS thread is main, and :func:`set_event_loop` has not yet
- been called, asyncio will create a new event loop and set it as the
- current one.
+ When called from a coroutine or a callback (e.g. scheduled with
+ call_soon or similar API), this function will always return the
+ running event loop.
+
+ If there is no running event loop set, the function will return
+ the result of calling ``get_event_loop_policy().get_event_loop()``.
Because this function has rather complex behavior (especially
when custom event loop policies are in use), using the
instead of using these lower level functions to manually create and close an
event loop.
- .. deprecated:: 3.10
- Emits a deprecation warning if there is no running event loop.
- In future Python releases, this function may become an alias of
- :func:`get_running_loop` and will accordingly raise a
- :exc:`RuntimeError` if there is no running event loop.
+ .. note::
+ In Python versions 3.10.0--3.10.8 and 3.11.0 this function
+ (and other functions which used it implicitly) emitted a
+ :exc:`DeprecationWarning` if there was no running event loop, even if
+ the current loop was set.
.. function:: set_event_loop(loop)
- The **preferred** function to get the running event loop.
* - :func:`asyncio.get_event_loop`
- - Get an event loop instance (current or via the policy).
+ - Get an event loop instance (running or current via the current policy).
* - :func:`asyncio.set_event_loop`
- Set the event loop as current via the current policy.
On Windows, :class:`ProactorEventLoop` is now used by default.
+ .. versionchanged:: 3.12
+ :meth:`get_event_loop` now raises a :exc:`RuntimeError` if there is no
+ current event loop set.
+
.. class:: WindowsSelectorEventLoopPolicy
around process-global resources, which are best managed from the main interpreter.
(Contributed by Dong-hee Na in :gh:`99127`.)
+* :func:`asyncio.get_event_loop` and many other :mod:`asyncio` functions like
+ :func:`~asyncio.ensure_future`, :func:`~asyncio.shield` or
+ :func:`~asyncio.gather`, and also the
+ :meth:`~asyncio.BaseDefaultEventLoopPolicy.get_event_loop` method of
+ :class:`~asyncio.BaseDefaultEventLoopPolicy` now raise a :exc:`RuntimeError`
+ if called when there is no running event loop and the current event loop was
+ not set.
+ Previously they implicitly created and set a new current event loop.
+ :exc:`DeprecationWarning` is no longer emitted if there is no running
+ event loop but the current event loop is set in the policy.
+ (Contributed by Serhiy Storchaka in :gh:`93453`.)
+
Build Changes
=============
Returns an event loop object implementing the BaseEventLoop interface,
or raises an exception in case no event loop has been set for the
- current context and the current policy does not specify to create one.
+ current context.
It should never return None."""
raise NotImplementedError
Returns an instance of EventLoop or raises an exception.
"""
- if (self._local._loop is None and
- not self._local._set_called and
- threading.current_thread() is threading.main_thread()):
- self.set_event_loop(self.new_event_loop())
-
if self._local._loop is None:
raise RuntimeError('There is no current event loop in thread %r.'
% threading.current_thread().name)
the result of `get_event_loop_policy().get_event_loop()` call.
"""
# NOTE: this function is implemented in C (see _asynciomodule.c)
- return _py__get_event_loop()
-
-
-def _get_event_loop(stacklevel=3):
current_loop = _get_running_loop()
if current_loop is not None:
return current_loop
- import warnings
- warnings.warn('There is no current event loop',
- DeprecationWarning, stacklevel=stacklevel)
return get_event_loop_policy().get_event_loop()
_py__set_running_loop = _set_running_loop
_py_get_running_loop = get_running_loop
_py_get_event_loop = get_event_loop
-_py__get_event_loop = _get_event_loop
try:
# functions in asyncio. Pure Python implementation is
# about 4 times slower than C-accelerated.
from _asyncio import (_get_running_loop, _set_running_loop,
- get_running_loop, get_event_loop, _get_event_loop)
+ get_running_loop, get_event_loop)
except ImportError:
pass
else:
_c__set_running_loop = _set_running_loop
_c_get_running_loop = get_running_loop
_c_get_event_loop = get_event_loop
- _c__get_event_loop = _get_event_loop
if hasattr(os, 'fork'):
the default event loop.
"""
if loop is None:
- self._loop = events._get_event_loop()
+ self._loop = events.get_event_loop()
else:
self._loop = loop
self._callbacks = []
assert isinstance(future, concurrent.futures.Future), \
f'concurrent.futures.Future is expected, got {future!r}'
if loop is None:
- loop = events._get_event_loop()
+ loop = events.get_event_loop()
new_future = loop.create_future()
_chain_future(future, new_future)
return new_future
def __init__(self, loop=None):
if loop is None:
- self._loop = events._get_event_loop(stacklevel=4)
+ self._loop = events.get_event_loop()
else:
self._loop = loop
self._paused = False
self._limit = limit
if loop is None:
- self._loop = events._get_event_loop()
+ self._loop = events.get_event_loop()
else:
self._loop = loop
self._buffer = bytearray()
from .queues import Queue # Import here to avoid circular import problem.
done = Queue()
- loop = events._get_event_loop()
+ loop = events.get_event_loop()
todo = {ensure_future(f, loop=loop) for f in set(fs)}
timeout_handle = None
'is required')
if loop is None:
- loop = events._get_event_loop(stacklevel=4)
+ loop = events.get_event_loop()
try:
return loop.create_task(coro_or_future)
except RuntimeError:
gather won't cancel any other awaitables.
"""
if not coros_or_futures:
- loop = events._get_event_loop()
+ loop = events.get_event_loop()
outer = loop.create_future()
outer.set_result([])
return outer
def test_env_var_debug(self):
code = '\n'.join((
'import asyncio',
- 'loop = asyncio.get_event_loop()',
+ 'loop = asyncio.new_event_loop()',
'print(loop.get_debug())'))
# Test with -E to not fail if the unit test was run with
def test_get_event_loop(self):
policy = asyncio.DefaultEventLoopPolicy()
self.assertIsNone(policy._local._loop)
-
- loop = policy.get_event_loop()
- self.assertIsInstance(loop, asyncio.AbstractEventLoop)
-
- self.assertIs(policy._local._loop, loop)
- self.assertIs(loop, policy.get_event_loop())
- loop.close()
-
- def test_get_event_loop_calls_set_event_loop(self):
- policy = asyncio.DefaultEventLoopPolicy()
-
- with mock.patch.object(
- policy, "set_event_loop",
- wraps=policy.set_event_loop) as m_set_event_loop:
-
- loop = policy.get_event_loop()
-
- # policy._local._loop must be set through .set_event_loop()
- # (the unix DefaultEventLoopPolicy needs this call to attach
- # the child watcher correctly)
- m_set_event_loop.assert_called_with(loop)
-
- loop.close()
+ with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
+ policy.get_event_loop()
def test_get_event_loop_after_set_none(self):
policy = asyncio.DefaultEventLoopPolicy()
def test_set_event_loop(self):
policy = asyncio.DefaultEventLoopPolicy()
- old_loop = policy.get_event_loop()
+ old_loop = policy.new_event_loop()
+ policy.set_event_loop(old_loop)
self.assertRaises(TypeError, policy.set_event_loop, object())
asyncio.set_event_loop_policy(Policy())
loop = asyncio.new_event_loop()
- with self.assertWarns(DeprecationWarning) as cm:
- with self.assertRaises(TestError):
- asyncio.get_event_loop()
- self.assertEqual(cm.filename, __file__)
+ with self.assertRaises(TestError):
+ asyncio.get_event_loop()
asyncio.set_event_loop(None)
- with self.assertWarns(DeprecationWarning) as cm:
- with self.assertRaises(TestError):
- asyncio.get_event_loop()
- self.assertEqual(cm.filename, __file__)
+ with self.assertRaises(TestError):
+ asyncio.get_event_loop()
with self.assertRaisesRegex(RuntimeError, 'no running'):
asyncio.get_running_loop()
loop.run_until_complete(func())
asyncio.set_event_loop(loop)
- with self.assertWarns(DeprecationWarning) as cm:
- with self.assertRaises(TestError):
- asyncio.get_event_loop()
- self.assertEqual(cm.filename, __file__)
-
+ with self.assertRaises(TestError):
+ asyncio.get_event_loop()
asyncio.set_event_loop(None)
- with self.assertWarns(DeprecationWarning) as cm:
- with self.assertRaises(TestError):
- asyncio.get_event_loop()
- self.assertEqual(cm.filename, __file__)
+ with self.assertRaises(TestError):
+ asyncio.get_event_loop()
finally:
asyncio.set_event_loop_policy(old_policy)
loop = asyncio.new_event_loop()
self.addCleanup(loop.close)
- with self.assertWarns(DeprecationWarning) as cm:
- loop2 = asyncio.get_event_loop()
- self.addCleanup(loop2.close)
- self.assertEqual(cm.filename, __file__)
+ with self.assertRaisesRegex(RuntimeError, 'no current'):
+ asyncio.get_event_loop()
asyncio.set_event_loop(None)
- with self.assertWarns(DeprecationWarning) as cm:
- with self.assertRaisesRegex(RuntimeError, 'no current'):
- asyncio.get_event_loop()
- self.assertEqual(cm.filename, __file__)
+ with self.assertRaisesRegex(RuntimeError, 'no current'):
+ asyncio.get_event_loop()
with self.assertRaisesRegex(RuntimeError, 'no running'):
asyncio.get_running_loop()
loop.run_until_complete(func())
asyncio.set_event_loop(loop)
- with self.assertWarns(DeprecationWarning) as cm:
- self.assertIs(asyncio.get_event_loop(), loop)
- self.assertEqual(cm.filename, __file__)
+ self.assertIs(asyncio.get_event_loop(), loop)
asyncio.set_event_loop(None)
- with self.assertWarns(DeprecationWarning) as cm:
- with self.assertRaisesRegex(RuntimeError, 'no current'):
- asyncio.get_event_loop()
- self.assertEqual(cm.filename, __file__)
+ with self.assertRaisesRegex(RuntimeError, 'no current'):
+ asyncio.get_event_loop()
finally:
asyncio.set_event_loop_policy(old_policy)
self.assertTrue(f.cancelled())
def test_constructor_without_loop(self):
- with self.assertWarns(DeprecationWarning) as cm:
- with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'):
- self._new_future()
- self.assertEqual(cm.filename, __file__)
+ with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
+ self._new_future()
def test_constructor_use_running_loop(self):
async def test():
self.assertIs(f.get_loop(), self.loop)
def test_constructor_use_global_loop(self):
- # Deprecated in 3.10
+ # Deprecated in 3.10, undeprecated in 3.12
asyncio.set_event_loop(self.loop)
self.addCleanup(asyncio.set_event_loop, None)
- with self.assertWarns(DeprecationWarning) as cm:
- f = self._new_future()
- self.assertEqual(cm.filename, __file__)
+ f = self._new_future()
self.assertIs(f._loop, self.loop)
self.assertIs(f.get_loop(), self.loop)
return (arg, threading.get_ident())
ex = concurrent.futures.ThreadPoolExecutor(1)
f1 = ex.submit(run, 'oi')
- with self.assertWarns(DeprecationWarning) as cm:
- with self.assertRaises(RuntimeError):
- asyncio.wrap_future(f1)
- self.assertEqual(cm.filename, __file__)
+ with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
+ asyncio.wrap_future(f1)
ex.shutdown(wait=True)
def test_wrap_future_use_running_loop(self):
ex.shutdown(wait=True)
def test_wrap_future_use_global_loop(self):
- # Deprecated in 3.10
+ # Deprecated in 3.10, undeprecated in 3.12
asyncio.set_event_loop(self.loop)
self.addCleanup(asyncio.set_event_loop, None)
def run(arg):
return (arg, threading.get_ident())
ex = concurrent.futures.ThreadPoolExecutor(1)
f1 = ex.submit(run, 'oi')
- with self.assertWarns(DeprecationWarning) as cm:
- f2 = asyncio.wrap_future(f1)
- self.assertEqual(cm.filename, __file__)
+ f2 = asyncio.wrap_future(f1)
self.assertIs(self.loop, f2._loop)
ex.shutdown(wait=True)
self.assertEqual(data, b'data')
def test_streamreader_constructor_without_loop(self):
- with self.assertWarns(DeprecationWarning) as cm:
- with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'):
- asyncio.StreamReader()
- self.assertEqual(cm.filename, __file__)
+ with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
+ asyncio.StreamReader()
def test_streamreader_constructor_use_running_loop(self):
# asyncio issue #184: Ensure that StreamReaderProtocol constructor
def test_streamreader_constructor_use_global_loop(self):
# asyncio issue #184: Ensure that StreamReaderProtocol constructor
# retrieves the current loop if the loop parameter is not set
- # Deprecated in 3.10
+ # Deprecated in 3.10, undeprecated in 3.12
self.addCleanup(asyncio.set_event_loop, None)
asyncio.set_event_loop(self.loop)
- with self.assertWarns(DeprecationWarning) as cm:
- reader = asyncio.StreamReader()
- self.assertEqual(cm.filename, __file__)
+ reader = asyncio.StreamReader()
self.assertIs(reader._loop, self.loop)
def test_streamreaderprotocol_constructor_without_loop(self):
reader = mock.Mock()
- with self.assertWarns(DeprecationWarning) as cm:
- with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'):
- asyncio.StreamReaderProtocol(reader)
- self.assertEqual(cm.filename, __file__)
+ with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
+ asyncio.StreamReaderProtocol(reader)
def test_streamreaderprotocol_constructor_use_running_loop(self):
# asyncio issue #184: Ensure that StreamReaderProtocol constructor
def test_streamreaderprotocol_constructor_use_global_loop(self):
# asyncio issue #184: Ensure that StreamReaderProtocol constructor
# retrieves the current loop if the loop parameter is not set
- # Deprecated in 3.10
+ # Deprecated in 3.10, undeprecated in 3.12
self.addCleanup(asyncio.set_event_loop, None)
asyncio.set_event_loop(self.loop)
reader = mock.Mock()
- with self.assertWarns(DeprecationWarning) as cm:
- protocol = asyncio.StreamReaderProtocol(reader)
- self.assertEqual(cm.filename, __file__)
+ protocol = asyncio.StreamReaderProtocol(reader)
self.assertIs(protocol._loop, self.loop)
def test_multiple_drain(self):
a = notmuch()
self.addCleanup(a.close)
- with self.assertWarns(DeprecationWarning) as cm:
- with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'):
- asyncio.ensure_future(a)
- self.assertEqual(cm.filename, __file__)
+ with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
+ asyncio.ensure_future(a)
async def test():
return asyncio.ensure_future(notmuch())
self.assertTrue(t.done())
self.assertEqual(t.result(), 'ok')
- # Deprecated in 3.10
+ # Deprecated in 3.10, undeprecated in 3.12
asyncio.set_event_loop(self.loop)
self.addCleanup(asyncio.set_event_loop, None)
- with self.assertWarns(DeprecationWarning) as cm:
- t = asyncio.ensure_future(notmuch())
- self.assertEqual(cm.filename, __file__)
+ t = asyncio.ensure_future(notmuch())
self.assertIs(t._loop, self.loop)
self.loop.run_until_complete(t)
self.assertTrue(t.done())
self.addCleanup(a.close)
futs = asyncio.as_completed([a])
- with self.assertWarns(DeprecationWarning) as cm:
- with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'):
- list(futs)
- self.assertEqual(cm.filename, __file__)
+ with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
+ list(futs)
def test_as_completed_coroutine_use_running_loop(self):
loop = self.new_test_loop()
inner = coro()
self.addCleanup(inner.close)
- with self.assertWarns(DeprecationWarning) as cm:
- with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'):
- asyncio.shield(inner)
- self.assertEqual(cm.filename, __file__)
+ with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
+ asyncio.shield(inner)
def test_shield_coroutine_use_running_loop(self):
async def coro():
self.assertEqual(res, 42)
def test_shield_coroutine_use_global_loop(self):
- # Deprecated in 3.10
+ # Deprecated in 3.10, undeprecated in 3.12
async def coro():
return 42
asyncio.set_event_loop(self.loop)
self.addCleanup(asyncio.set_event_loop, None)
- with self.assertWarns(DeprecationWarning) as cm:
- outer = asyncio.shield(coro())
- self.assertEqual(cm.filename, __file__)
+ outer = asyncio.shield(coro())
self.assertEqual(outer._loop, self.loop)
res = self.loop.run_until_complete(outer)
self.assertEqual(res, 42)
self.assertIsNone(asyncio.current_task(loop=self.loop))
def test_current_task_no_running_loop_implicit(self):
- with self.assertRaises(RuntimeError):
+ with self.assertRaisesRegex(RuntimeError, 'no running event loop'):
asyncio.current_task()
def test_current_task_with_implicit_loop(self):
return asyncio.gather(*args, **kwargs)
def test_constructor_empty_sequence_without_loop(self):
- with self.assertWarns(DeprecationWarning) as cm:
- with self.assertRaises(RuntimeError):
- asyncio.gather()
- self.assertEqual(cm.filename, __file__)
+ with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
+ asyncio.gather()
def test_constructor_empty_sequence_use_running_loop(self):
async def gather():
self.assertEqual(fut.result(), [])
def test_constructor_empty_sequence_use_global_loop(self):
- # Deprecated in 3.10
+ # Deprecated in 3.10, undeprecated in 3.12
asyncio.set_event_loop(self.one_loop)
self.addCleanup(asyncio.set_event_loop, None)
- with self.assertWarns(DeprecationWarning) as cm:
- fut = asyncio.gather()
- self.assertEqual(cm.filename, __file__)
+ fut = asyncio.gather()
self.assertIsInstance(fut, asyncio.Future)
self.assertIs(fut._loop, self.one_loop)
self._run_loop(self.one_loop)
self.addCleanup(gen1.close)
gen2 = coro()
self.addCleanup(gen2.close)
- with self.assertWarns(DeprecationWarning) as cm:
- with self.assertRaises(RuntimeError):
- asyncio.gather(gen1, gen2)
- self.assertEqual(cm.filename, __file__)
+ with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
+ asyncio.gather(gen1, gen2)
def test_constructor_use_running_loop(self):
async def coro():
self.one_loop.run_until_complete(fut)
def test_constructor_use_global_loop(self):
- # Deprecated in 3.10
+ # Deprecated in 3.10, undeprecated in 3.12
async def coro():
return 'abc'
asyncio.set_event_loop(self.other_loop)
self.addCleanup(asyncio.set_event_loop, None)
gen1 = coro()
gen2 = coro()
- with self.assertWarns(DeprecationWarning) as cm:
- fut = asyncio.gather(gen1, gen2)
- self.assertEqual(cm.filename, __file__)
+ fut = asyncio.gather(gen1, gen2)
self.assertIs(fut._loop, self.other_loop)
self.other_loop.run_until_complete(fut)
def test_child_watcher_replace_mainloop_existing(self):
policy = self.create_policy()
- loop = policy.get_event_loop()
+ loop = policy.new_event_loop()
+ policy.set_event_loop(loop)
# Explicitly setup SafeChildWatcher,
# default ThreadedChildWatcher has no _loop property
# child
try:
loop = asyncio.get_event_loop_policy().get_event_loop()
- os.write(w, str(id(loop)).encode())
+ except RuntimeError:
+ os.write(w, b'NO LOOP')
+ except:
+ os.write(w, b'ERROR:' + ascii(sys.exc_info()).encode())
finally:
os._exit(0)
else:
# parent
- child_loop = int(os.read(r, 100).decode())
- self.assertNotEqual(child_loop, id(loop))
+ self.assertEqual(os.read(r, 100), b'NO LOOP')
wait_process(pid, exitcode=0)
@hashlib_helper.requires_hashdigest('md5')
def test_unawaited_warning_during_shutdown(self):
code = ("import asyncio\n"
"async def f(): pass\n"
- "asyncio.gather(f())\n")
+ "async def t(): asyncio.gather(f())\n"
+ "asyncio.run(t())\n")
assert_python_ok("-c", code)
code = ("import sys\n"
--- /dev/null
+:func:`asyncio.get_event_loop` and many other :mod:`asyncio` functions like
+:func:`asyncio.ensure_future`, :func:`asyncio.shield` or
+:func:`asyncio.gather`, and also the
+:meth:`~asyncio.BaseDefaultEventLoopPolicy.get_event_loop` method of
+:class:`asyncio.BaseDefaultEventLoopPolicy` now raise a :exc:`RuntimeError`
+if called when there is no running event loop and the current event loop was
+not set. Previously they implicitly created and set a new current event
+loop. :exc:`DeprecationWarning` is no longer emitted if there is no running
+event loop but the current event loop was set.
static PyObject *
-get_event_loop(asyncio_state *state, int stacklevel)
+get_event_loop(asyncio_state *state)
{
PyObject *loop;
PyObject *policy;
return loop;
}
- if (PyErr_WarnEx(PyExc_DeprecationWarning,
- "There is no current event loop",
- stacklevel))
- {
- return NULL;
- }
-
policy = PyObject_CallNoArgs(state->asyncio_get_event_loop_policy);
if (policy == NULL) {
return NULL;
if (loop == Py_None) {
asyncio_state *state = get_asyncio_state_by_def((PyObject *)fut);
- loop = get_event_loop(state, 1);
+ loop = get_event_loop(state);
if (loop == NULL) {
return -1;
}
/*[clinic end generated code: output=2a2d8b2f824c648b input=9364bf2916c8655d]*/
{
asyncio_state *state = get_asyncio_state(module);
- return get_event_loop(state, 1);
-}
-
-/*[clinic input]
-_asyncio._get_event_loop
- stacklevel: int = 3
-[clinic start generated code]*/
-
-static PyObject *
-_asyncio__get_event_loop_impl(PyObject *module, int stacklevel)
-/*[clinic end generated code: output=9c1d6d3c802e67c9 input=d17aebbd686f711d]*/
-{
- asyncio_state *state = get_asyncio_state(module);
- return get_event_loop(state, stacklevel-1);
+ return get_event_loop(state);
}
/*[clinic input]
static PyMethodDef asyncio_methods[] = {
_ASYNCIO_GET_EVENT_LOOP_METHODDEF
- _ASYNCIO__GET_EVENT_LOOP_METHODDEF
_ASYNCIO_GET_RUNNING_LOOP_METHODDEF
_ASYNCIO__GET_RUNNING_LOOP_METHODDEF
_ASYNCIO__SET_RUNNING_LOOP_METHODDEF
return _asyncio_get_event_loop_impl(module);
}
-PyDoc_STRVAR(_asyncio__get_event_loop__doc__,
-"_get_event_loop($module, /, stacklevel=3)\n"
-"--\n"
-"\n");
-
-#define _ASYNCIO__GET_EVENT_LOOP_METHODDEF \
- {"_get_event_loop", _PyCFunction_CAST(_asyncio__get_event_loop), METH_FASTCALL|METH_KEYWORDS, _asyncio__get_event_loop__doc__},
-
-static PyObject *
-_asyncio__get_event_loop_impl(PyObject *module, int stacklevel);
-
-static PyObject *
-_asyncio__get_event_loop(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
-{
- PyObject *return_value = NULL;
- #if defined(Py_BUILD_CORE) && !defined(Py_BUILD_CORE_MODULE)
-
- #define NUM_KEYWORDS 1
- static struct {
- PyGC_Head _this_is_not_used;
- PyObject_VAR_HEAD
- PyObject *ob_item[NUM_KEYWORDS];
- } _kwtuple = {
- .ob_base = PyVarObject_HEAD_INIT(&PyTuple_Type, NUM_KEYWORDS)
- .ob_item = { &_Py_ID(stacklevel), },
- };
- #undef NUM_KEYWORDS
- #define KWTUPLE (&_kwtuple.ob_base.ob_base)
-
- #else // !Py_BUILD_CORE
- # define KWTUPLE NULL
- #endif // !Py_BUILD_CORE
-
- static const char * const _keywords[] = {"stacklevel", NULL};
- static _PyArg_Parser _parser = {
- .keywords = _keywords,
- .fname = "_get_event_loop",
- .kwtuple = KWTUPLE,
- };
- #undef KWTUPLE
- PyObject *argsbuf[1];
- Py_ssize_t noptargs = nargs + (kwnames ? PyTuple_GET_SIZE(kwnames) : 0) - 0;
- int stacklevel = 3;
-
- args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 0, 1, 0, argsbuf);
- if (!args) {
- goto exit;
- }
- if (!noptargs) {
- goto skip_optional_pos;
- }
- stacklevel = _PyLong_AsInt(args[0]);
- if (stacklevel == -1 && PyErr_Occurred()) {
- goto exit;
- }
-skip_optional_pos:
- return_value = _asyncio__get_event_loop_impl(module, stacklevel);
-
-exit:
- return return_value;
-}
-
PyDoc_STRVAR(_asyncio_get_running_loop__doc__,
"get_running_loop($module, /)\n"
"--\n"
exit:
return return_value;
}
-/*[clinic end generated code: output=550bc6603df89ed9 input=a9049054013a1b77]*/
+/*[clinic end generated code: output=83580c190031241c input=a9049054013a1b77]*/