* :mod:`asyncio` policy system is deprecated and will be removed in Python 3.16.
In particular, the following classes and functions are deprecated:
- * :class:`asyncio.AbstractEventLoopPolicy`
- * :class:`asyncio.DefaultEventLoopPolicy`
- * :class:`asyncio.WindowsSelectorEventLoopPolicy`
- * :class:`asyncio.WindowsProactorEventLoopPolicy`
- * :func:`asyncio.get_event_loop_policy`
- * :func:`asyncio.set_event_loop_policy`
+ * :class:`!asyncio.AbstractEventLoopPolicy`
+ * :class:`!asyncio.DefaultEventLoopPolicy`
+ * :class:`!asyncio.WindowsSelectorEventLoopPolicy`
+ * :class:`!asyncio.WindowsProactorEventLoopPolicy`
+ * :func:`!asyncio.get_event_loop_policy`
+ * :func:`!asyncio.set_event_loop_policy`
Users should use :func:`asyncio.run` or :class:`asyncio.Runner` with
*loop_factory* to use the desired event loop implementation.
running event loop.
If there is no running event loop set, the function will return
- the result of the ``get_event_loop_policy().get_event_loop()`` call.
+ the loop set by :func:`set_event_loop`, or raise a :exc:`RuntimeError`
+ if no loop has been set.
- Because this function has rather complex behavior (especially
- when custom event loop policies are in use), using the
+ Because this function has rather complex behavior, using the
:func:`get_running_loop` function is preferred to :func:`get_event_loop`
in coroutines and callbacks.
.. versionchanged:: 3.14
Raises a :exc:`RuntimeError` if there is no current event loop.
- .. note::
-
- The :mod:`!asyncio` policy system is deprecated and will be removed
- in Python 3.16; from there on, this function will return the current
- running event loop if present else it will return the
- loop set by :func:`set_event_loop`.
-
.. function:: set_event_loop(loop)
Set *loop* as the current event loop for the current OS thread.
Create and return a new event loop object.
-Note that the behaviour of :func:`get_event_loop`, :func:`set_event_loop`,
-and :func:`new_event_loop` functions can be altered by
-:ref:`setting a custom event loop policy <asyncio-policies>`.
-
.. rubric:: Contents
- The **preferred** function to get the running event loop.
* - :func:`asyncio.get_event_loop`
- - Get an event loop instance (running or current via the current policy).
+ - Get the running event loop or the event loop set for the current thread.
* - :func:`asyncio.set_event_loop`
- - Set the event loop as current via the current policy.
+ - Set the event loop for the current thread.
* - :func:`asyncio.new_event_loop`
- Create a new event loop.
- Called when the child process has exited. It can be called before
:meth:`~SubprocessProtocol.pipe_data_received` and
:meth:`~SubprocessProtocol.pipe_connection_lost` methods.
-
-
-Event Loop Policies
-===================
-
-Policies is a low-level mechanism to alter the behavior of
-functions like :func:`asyncio.get_event_loop`. See also
-the main :ref:`policies section <asyncio-policies>` for more
-details.
-
-
-.. rubric:: Accessing Policies
-.. list-table::
- :widths: 50 50
- :class: full-width-table
-
- * - :meth:`asyncio.get_event_loop_policy`
- - Return the current process-wide policy.
-
- * - :meth:`asyncio.set_event_loop_policy`
- - Set a new process-wide policy.
-
- * - :class:`AbstractEventLoopPolicy`
- - Base class for policy objects.
+++ /dev/null
-.. currentmodule:: asyncio
-
-
-.. _asyncio-policies:
-
-========
-Policies
-========
-
-.. warning::
-
- Policies are deprecated and will be removed in Python 3.16.
- Users are encouraged to use the :func:`asyncio.run` function
- or the :class:`asyncio.Runner` with *loop_factory* to use
- the desired loop implementation.
-
-
-An event loop policy is a global object
-used to get and set the current :ref:`event loop <asyncio-event-loop>`,
-as well as create new event loops.
-The default policy can be :ref:`replaced <asyncio-policy-get-set>` with
-:ref:`built-in alternatives <asyncio-policy-builtin>`
-to use different event loop implementations,
-or substituted by a :ref:`custom policy <asyncio-custom-policies>`
-that can override these behaviors.
-
-The :ref:`policy object <asyncio-policy-objects>`
-gets and sets a separate event loop per *context*.
-This is per-thread by default,
-though custom policies could define *context* differently.
-
-Custom event loop policies can control the behavior of
-:func:`get_event_loop`, :func:`set_event_loop`, and :func:`new_event_loop`.
-
-Policy objects should implement the APIs defined
-in the :class:`AbstractEventLoopPolicy` abstract base class.
-
-
-.. _asyncio-policy-get-set:
-
-Getting and Setting the Policy
-==============================
-
-The following functions can be used to get and set the policy
-for the current process:
-
-.. function:: get_event_loop_policy()
-
- Return the current process-wide policy.
-
- .. deprecated:: 3.14
- The :func:`get_event_loop_policy` function is deprecated and
- will be removed in Python 3.16.
-
-.. function:: set_event_loop_policy(policy)
-
- Set the current process-wide policy to *policy*.
-
- If *policy* is set to ``None``, the default policy is restored.
-
- .. deprecated:: 3.14
- The :func:`set_event_loop_policy` function is deprecated and
- will be removed in Python 3.16.
-
-
-.. _asyncio-policy-objects:
-
-Policy Objects
-==============
-
-The abstract event loop policy base class is defined as follows:
-
-.. class:: AbstractEventLoopPolicy
-
- An abstract base class for asyncio policies.
-
- .. method:: get_event_loop()
-
- Get the event loop for the current context.
-
- Return an event loop object implementing the
- :class:`AbstractEventLoop` interface.
-
- This method should never return ``None``.
-
- .. versionchanged:: 3.6
-
- .. method:: set_event_loop(loop)
-
- Set the event loop for the current context to *loop*.
-
- .. method:: new_event_loop()
-
- Create and return a new event loop object.
-
- This method should never return ``None``.
-
- .. deprecated:: 3.14
- The :class:`AbstractEventLoopPolicy` class is deprecated and
- will be removed in Python 3.16.
-
-
-.. _asyncio-policy-builtin:
-
-asyncio ships with the following built-in policies:
-
-
-.. class:: DefaultEventLoopPolicy
-
- The default asyncio policy. Uses :class:`SelectorEventLoop`
- on Unix and :class:`ProactorEventLoop` on Windows.
-
- There is no need to install the default policy manually. asyncio
- is configured to use the default policy automatically.
-
- .. versionchanged:: 3.8
-
- On Windows, :class:`ProactorEventLoop` is now used by default.
-
- .. versionchanged:: 3.14
- The :meth:`get_event_loop` method of the default asyncio policy now
- raises a :exc:`RuntimeError` if there is no set event loop.
-
- .. deprecated:: 3.14
- The :class:`DefaultEventLoopPolicy` class is deprecated and
- will be removed in Python 3.16.
-
-
-.. class:: WindowsSelectorEventLoopPolicy
-
- An alternative event loop policy that uses the
- :class:`SelectorEventLoop` event loop implementation.
-
- .. availability:: Windows.
-
- .. deprecated:: 3.14
- The :class:`WindowsSelectorEventLoopPolicy` class is deprecated and
- will be removed in Python 3.16.
-
-
-.. class:: WindowsProactorEventLoopPolicy
-
- An alternative event loop policy that uses the
- :class:`ProactorEventLoop` event loop implementation.
-
- .. availability:: Windows.
-
- .. deprecated:: 3.14
- The :class:`WindowsProactorEventLoopPolicy` class is deprecated and
- will be removed in Python 3.16.
-
-
-.. _asyncio-custom-policies:
-
-Custom Policies
-===============
-
-To implement a new event loop policy, it is recommended to subclass
-:class:`DefaultEventLoopPolicy` and override the methods for which
-custom behavior is wanted, e.g.::
-
- class MyEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
-
- def get_event_loop(self):
- """Get the event loop.
-
- This may be None or an instance of EventLoop.
- """
- loop = super().get_event_loop()
- # Do something with loop ...
- return loop
-
- asyncio.set_event_loop_policy(MyEventLoopPolicy())
otherwise :func:`asyncio.new_event_loop` is used. The loop is closed at the end.
This function should be used as a main entry point for asyncio programs,
and should ideally only be called once. It is recommended to use
- *loop_factory* to configure the event loop instead of policies.
- Passing :class:`asyncio.EventLoop` allows running asyncio without the
- policy system.
+ *loop_factory* to configure the event loop.
The executor is given a timeout duration of 5 minutes to shutdown.
If the executor hasn't finished within that duration, a warning is
*coro* can be any awaitable object.
- .. note::
-
- The :mod:`!asyncio` policy system is deprecated and will be removed
- in Python 3.16; from there on, an explicit *loop_factory* is needed
- to configure the event loop.
-
Runner context manager
======================
asyncio-eventloop.rst
asyncio-future.rst
asyncio-protocol.rst
- asyncio-policy.rst
asyncio-platforms.rst
asyncio-extending.rst
try:
checked_ids = checked[name]
except KeyError:
- successful = False
- print(f'{name}: (page missing)')
- print()
+ if (name, '(page missing)') not in excluded:
+ successful = False
+ print(f'{name}: (page missing)')
+ print()
else:
missing_ids = set(baseline_ids) - set(checked_ids)
if missing_ids:
# Removed sections
library/asyncio-task.html: terminating-a-task-group
+library/asyncio-llapi-index.html: event-loop-policies
deprecations/index.html: pending-removal-in-python-3-15
deprecations/index.html: pending-removal-in-python-3-16
deprecations/index.html: c-api-pending-removal-in-python-3-15
# Moved to a different page
c-api/typeobj.html: c.Py_tp_base
c-api/typeobj.html: c.Py_tp_bases
+
+# Removed pages
+library/asyncio-policy.html: (page missing)
and will be removed in Python 3.16.
In particular, the following classes and functions are deprecated:
- * :class:`asyncio.AbstractEventLoopPolicy`
- * :class:`asyncio.DefaultEventLoopPolicy`
- * :class:`asyncio.WindowsSelectorEventLoopPolicy`
- * :class:`asyncio.WindowsProactorEventLoopPolicy`
- * :func:`asyncio.get_event_loop_policy`
- * :func:`asyncio.set_event_loop_policy`
+ * :class:`!asyncio.AbstractEventLoopPolicy`
+ * :class:`!asyncio.DefaultEventLoopPolicy`
+ * :class:`!asyncio.WindowsSelectorEventLoopPolicy`
+ * :class:`!asyncio.WindowsProactorEventLoopPolicy`
+ * :func:`!asyncio.get_event_loop_policy`
+ * :func:`!asyncio.set_event_loop_policy`
Users should use :func:`asyncio.run` or :class:`asyncio.Runner` with
the *loop_factory* argument to use the desired event loop implementation.
which has been deprecated since Python 3.14.
Use :func:`inspect.iscoroutinefunction` instead.
+* The event loop policy system, including the
+ :class:`!asyncio.AbstractEventLoopPolicy`,
+ :class:`!asyncio.DefaultEventLoopPolicy`,
+ :class:`!asyncio.WindowsSelectorEventLoopPolicy` and
+ :class:`!asyncio.WindowsProactorEventLoopPolicy` classes and the
+ :func:`!asyncio.get_event_loop_policy` and
+ :func:`!asyncio.set_event_loop_policy` functions,
+ which have been deprecated since Python 3.14.
+ Use :func:`asyncio.run` or :class:`asyncio.Runner` with a custom
+ *loop_factory* instead.
+
functools
---------
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(generation));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(get));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(get_debug));
- _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(get_event_loop));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(get_loop));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(get_source));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(getattr));
STRUCT_FOR_ID(generation)
STRUCT_FOR_ID(get)
STRUCT_FOR_ID(get_debug)
- STRUCT_FOR_ID(get_event_loop)
STRUCT_FOR_ID(get_loop)
STRUCT_FOR_ID(get_source)
STRUCT_FOR_ID(getattr)
INIT_ID(generation), \
INIT_ID(get), \
INIT_ID(get_debug), \
- INIT_ID(get_event_loop), \
INIT_ID(get_loop), \
INIT_ID(get_source), \
INIT_ID(getattr), \
_PyUnicode_InternStatic(interp, &string);
assert(_PyUnicode_CheckConsistency(string, 1));
assert(PyUnicode_GET_LENGTH(string) != 1);
- string = &_Py_ID(get_event_loop);
- _PyUnicode_InternStatic(interp, &string);
- assert(_PyUnicode_CheckConsistency(string, 1));
- assert(PyUnicode_GET_LENGTH(string) != 1);
string = &_Py_ID(get_loop);
_PyUnicode_InternStatic(interp, &string);
assert(_PyUnicode_CheckConsistency(string, 1));
else:
from .unix_events import * # pragma: no cover
__all__ += unix_events.__all__
-
-def __getattr__(name: str):
- import warnings
-
- match name:
- case "AbstractEventLoopPolicy":
- warnings._deprecated(f"asyncio.{name}", remove=(3, 16))
- return events._AbstractEventLoopPolicy
- case "DefaultEventLoopPolicy":
- warnings._deprecated(f"asyncio.{name}", remove=(3, 16))
- if sys.platform == 'win32':
- return windows_events._DefaultEventLoopPolicy
- return unix_events._DefaultEventLoopPolicy
- case "WindowsSelectorEventLoopPolicy":
- if sys.platform == 'win32':
- warnings._deprecated(f"asyncio.{name}", remove=(3, 16))
- return windows_events._WindowsSelectorEventLoopPolicy
- # Else fall through to the AttributeError below.
- case "WindowsProactorEventLoopPolicy":
- if sys.platform == 'win32':
- warnings._deprecated(f"asyncio.{name}", remove=(3, 16))
- return windows_events._WindowsProactorEventLoopPolicy
- # Else fall through to the AttributeError below.
-
- raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
"AbstractServer",
"Handle",
"TimerHandle",
- "get_event_loop_policy",
- "set_event_loop_policy",
"get_event_loop",
"set_event_loop",
"new_event_loop",
import subprocess
import sys
import threading
-import warnings
from . import format_helpers
raise NotImplementedError
-class _AbstractEventLoopPolicy:
- """Abstract policy for accessing the event loop."""
+class _Local(threading.local):
+ _loop = None
- def get_event_loop(self):
- """Get the event loop for the current context.
- Returns an event loop object implementing the AbstractEventLoop interface,
- or raises an exception in case no event loop has been set for the
- current context and the current policy does not specify to create one.
-
- It should never return None."""
- raise NotImplementedError
-
- def set_event_loop(self, loop):
- """Set the event loop for the current context to loop."""
- raise NotImplementedError
-
- def new_event_loop(self):
- """Create and return a new event loop object according to this
- policy's rules. If there's need to set this loop as the event loop for
- the current context, set_event_loop must be called explicitly."""
- raise NotImplementedError
-
-class _BaseDefaultEventLoopPolicy(_AbstractEventLoopPolicy):
- """Default policy implementation for accessing the event loop.
-
- In this policy, each thread has its own event loop. However, we
- only automatically create an event loop by default for the main
- thread; other threads by default have no event loop.
-
- Other policies may have different rules (e.g. a single global
- event loop, or automatically creating an event loop per thread, or
- using some other notion of context to which an event loop is
- associated).
- """
-
- _loop_factory = None
-
- class _Local(threading.local):
- _loop = None
-
- def __init__(self):
- self._local = self._Local()
-
- def get_event_loop(self):
- """Get the event loop for the current context.
-
- Returns an instance of EventLoop or raises an exception.
- """
- if self._local._loop is None:
- raise RuntimeError('There is no current event loop in thread %r.'
- % threading.current_thread().name)
-
- return self._local._loop
-
- def set_event_loop(self, loop):
- """Set the event loop."""
- if loop is not None and not isinstance(loop, AbstractEventLoop):
- raise TypeError(f"loop must be an instance of AbstractEventLoop or None, not '{type(loop).__name__}'")
- self._local._loop = loop
-
- def new_event_loop(self):
- """Create a new event loop.
-
- You must call set_event_loop() to make this the current event
- loop.
- """
- return self._loop_factory()
-
-
-# Event loop policy. The policy itself is always global, even if the
-# policy's rules say that there is an event loop per thread (or other
-# notion of context). The default policy is installed by the first
-# call to get_event_loop_policy().
-_event_loop_policy = None
-
-# Lock for protecting the on-the-fly creation of the event loop policy.
-_lock = threading.Lock()
+_local = _Local()
# A TLS for the running event loop, used by _get_running_loop.
_running_loop.loop_pid = (loop, os.getpid())
-def _init_event_loop_policy():
- global _event_loop_policy
- with _lock:
- if _event_loop_policy is None: # pragma: no branch
- if sys.platform == 'win32':
- from .windows_events import _DefaultEventLoopPolicy
- else:
- from .unix_events import _DefaultEventLoopPolicy
- _event_loop_policy = _DefaultEventLoopPolicy()
-
+def _get_event_loop():
+ """Return the event loop set for the current thread.
-def _get_event_loop_policy():
- """Get the current event loop policy."""
- if _event_loop_policy is None:
- _init_event_loop_policy()
- return _event_loop_policy
-
-def get_event_loop_policy():
- warnings._deprecated('asyncio.get_event_loop_policy', remove=(3, 16))
- return _get_event_loop_policy()
-
-def _set_event_loop_policy(policy):
- """Set the current event loop policy.
-
- If policy is None, the default policy is restored."""
- global _event_loop_policy
- if policy is not None and not isinstance(policy, _AbstractEventLoopPolicy):
- raise TypeError(f"policy must be an instance of AbstractEventLoopPolicy or None, not '{type(policy).__name__}'")
- _event_loop_policy = policy
+ Raise a RuntimeError if no event loop has been set for the current
+ thread. This is the slow path of get_event_loop(); the running loop
+ is checked by the caller.
+ """
+ loop = _local._loop
+ if loop is None:
+ raise RuntimeError('There is no current event loop in thread %r.'
+ % threading.current_thread().name)
+ return loop
-def set_event_loop_policy(policy):
- warnings._deprecated('asyncio.set_event_loop_policy', remove=(3,16))
- _set_event_loop_policy(policy)
def get_event_loop():
"""Return an asyncio event loop.
or similar API), this function will always return the running event loop.
If there is no running event loop set, the function will return
- the result of `get_event_loop_policy().get_event_loop()` call.
+ the loop set by ``set_event_loop()``, or raise a RuntimeError if
+ no loop has been set.
"""
# NOTE: this function is implemented in C (see _asynciomodule.c)
current_loop = _get_running_loop()
if current_loop is not None:
return current_loop
- return _get_event_loop_policy().get_event_loop()
+ return _get_event_loop()
def set_event_loop(loop):
- """Equivalent to calling get_event_loop_policy().set_event_loop(loop)."""
- _get_event_loop_policy().set_event_loop(loop)
+ """Set the event loop for the current thread to loop.
+
+ If loop is None, the current event loop is unset.
+ """
+ if loop is not None and not isinstance(loop, AbstractEventLoop):
+ raise TypeError(f"loop must be an instance of AbstractEventLoop or None, not '{type(loop).__name__}'")
+ _local._loop = loop
def new_event_loop():
- """Equivalent to calling get_event_loop_policy().new_event_loop()."""
- return _get_event_loop_policy().new_event_loop()
+ """Create and return a new event loop object."""
+ if sys.platform == 'win32':
+ from .windows_events import EventLoop
+ else:
+ from .unix_events import EventLoop
+ return EventLoop()
# Alias pure-Python implementations for testing purposes.
if hasattr(os, 'fork'):
def on_fork():
# Reset the loop and wakeupfd in the forked child process.
- if _event_loop_policy is not None:
- _event_loop_policy._local = _BaseDefaultEventLoopPolicy._Local()
+ global _local
+ _local = _Local()
_set_running_loop(None)
signal.set_wakeup_fd(-1)
return True
-class _UnixDefaultEventLoopPolicy(events._BaseDefaultEventLoopPolicy):
- """UNIX event loop policy"""
- _loop_factory = _UnixSelectorEventLoop
-
-
SelectorEventLoop = _UnixSelectorEventLoop
-_DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy
EventLoop = SelectorEventLoop
import time
import weakref
-from . import events
from . import base_subprocess
from . import futures
from . import exceptions
__all__ = (
'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
- '_DefaultEventLoopPolicy', '_WindowsSelectorEventLoopPolicy',
- '_WindowsProactorEventLoopPolicy', 'EventLoop',
+ 'EventLoop',
)
SelectorEventLoop = _WindowsSelectorEventLoop
-
-class _WindowsSelectorEventLoopPolicy(events._BaseDefaultEventLoopPolicy):
- _loop_factory = SelectorEventLoop
-
-
-class _WindowsProactorEventLoopPolicy(events._BaseDefaultEventLoopPolicy):
- _loop_factory = ProactorEventLoop
-
-
-_DefaultEventLoopPolicy = _WindowsProactorEventLoopPolicy
EventLoop = ProactorEventLoop
'sysconfig._CONFIG_VARS', 'sysconfig._INSTALL_SCHEMES',
'files', 'locale', 'warnings.showwarning',
'shutil_archive_formats', 'shutil_unpack_formats',
- 'asyncio.events._event_loop_policy',
+ 'asyncio.events._local._loop',
'urllib.requests._url_tempfiles', 'urllib.requests._opener',
'stty_echo',
)
urllib_request = self.get_module('urllib.request')
urllib_request._opener = opener
- def get_asyncio_events__event_loop_policy(self):
+ def get_asyncio_events__local__loop(self):
self.try_get_module('asyncio')
- return support.maybe_get_event_loop_policy()
- def restore_asyncio_events__event_loop_policy(self, policy):
+ return support.maybe_get_event_loop()
+ def restore_asyncio_events__local__loop(self, loop):
asyncio = self.get_module('asyncio')
- asyncio.events._set_event_loop_policy(policy)
+ asyncio.events._local._loop = loop
def get_sys_argv(self):
return id(sys.argv), sys.argv, sys.argv[:]
SMALLEST = _SMALLEST()
-def maybe_get_event_loop_policy():
- """Return the global event loop policy if one is set, else return None."""
+def maybe_get_event_loop():
+ """Return the event loop set for the current thread, else return None."""
import asyncio.events
- return asyncio.events._event_loop_policy
+ return asyncio.events._local._loop
# Helpers for testing hashing.
NHASHBITS = sys.hash_info.width # number of bits in hash() result
def tearDown(self):
self.loop.close()
self.loop = None
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
def check_async_iterator_anext(self, ait_class):
with self.subTest(anext="pure-Python"):
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
def mock_socket_module():
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
class ReceiveStuffProto(asyncio.BufferedProtocol):
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
@unittest.skipUnless(decimal.HAVE_CONTEXTVAR, "decimal is built with a thread-local context")
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
class EagerTaskFactoryLoopTests:
from test.support import ALWAYS_EQ, LARGEST, SMALLEST
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
def broken_unix_getsockname():
class PolicyTests(unittest.TestCase):
- def test_abstract_event_loop_policy_deprecation(self):
- with self.assertWarnsRegex(
- DeprecationWarning, "'asyncio.AbstractEventLoopPolicy' is deprecated"):
- policy = asyncio.AbstractEventLoopPolicy()
- self.assertIsInstance(policy, asyncio.AbstractEventLoopPolicy)
-
- def test_default_event_loop_policy_deprecation(self):
- with self.assertWarnsRegex(
- DeprecationWarning, "'asyncio.DefaultEventLoopPolicy' is deprecated"):
- policy = asyncio.DefaultEventLoopPolicy()
- self.assertIsInstance(policy, asyncio.DefaultEventLoopPolicy)
-
- def test_event_loop_policy(self):
- policy = asyncio.events._AbstractEventLoopPolicy()
- self.assertRaises(NotImplementedError, policy.get_event_loop)
- self.assertRaises(NotImplementedError, policy.set_event_loop, object())
- self.assertRaises(NotImplementedError, policy.new_event_loop)
-
def test_get_event_loop(self):
- policy = test_utils.DefaultEventLoopPolicy()
- self.assertIsNone(policy._local._loop)
+ old_loop = asyncio.events._local._loop
+ try:
+ asyncio.set_event_loop(None)
+ self.assertIsNone(asyncio.events._local._loop)
- with self.assertRaises(RuntimeError):
- loop = policy.get_event_loop()
- self.assertIsNone(policy._local._loop)
+ with self.assertRaises(RuntimeError):
+ asyncio.get_event_loop()
+ self.assertIsNone(asyncio.events._local._loop)
+ finally:
+ asyncio.events._local._loop = old_loop
def test_get_event_loop_does_not_call_set_event_loop(self):
- policy = test_utils.DefaultEventLoopPolicy()
+ old_loop = asyncio.events._local._loop
+ try:
+ asyncio.set_event_loop(None)
- with mock.patch.object(
- policy, "set_event_loop",
- wraps=policy.set_event_loop) as m_set_event_loop:
+ with mock.patch(
+ 'asyncio.events.set_event_loop',
+ wraps=asyncio.events.set_event_loop) as m_set_event_loop:
- with self.assertRaises(RuntimeError):
- loop = policy.get_event_loop()
+ with self.assertRaises(RuntimeError):
+ asyncio.get_event_loop()
+
+ m_set_event_loop.assert_not_called()
- m_set_event_loop.assert_not_called()
+ self.assertIsNone(asyncio.events._local._loop)
+ finally:
+ asyncio.events._local._loop = old_loop
def test_get_event_loop_after_set_none(self):
- policy = test_utils.DefaultEventLoopPolicy()
- policy.set_event_loop(None)
- self.assertRaises(RuntimeError, policy.get_event_loop)
+ old_loop = asyncio.events._local._loop
+ try:
+ asyncio.set_event_loop(None)
+ self.assertRaises(RuntimeError, asyncio.get_event_loop)
+ finally:
+ asyncio.events._local._loop = old_loop
- @mock.patch('asyncio.events.threading.current_thread')
- def test_get_event_loop_thread(self, m_current_thread):
+ def test_get_event_loop_thread(self):
def f():
- policy = test_utils.DefaultEventLoopPolicy()
- self.assertRaises(RuntimeError, policy.get_event_loop)
+ self.assertRaises(RuntimeError, asyncio.get_event_loop)
th = threading.Thread(target=f)
th.start()
th.join()
def test_new_event_loop(self):
- policy = test_utils.DefaultEventLoopPolicy()
-
- loop = policy.new_event_loop()
+ loop = asyncio.new_event_loop()
self.assertIsInstance(loop, asyncio.AbstractEventLoop)
loop.close()
def test_set_event_loop(self):
- policy = test_utils.DefaultEventLoopPolicy()
- old_loop = policy.new_event_loop()
- policy.set_event_loop(old_loop)
-
- self.assertRaises(TypeError, policy.set_event_loop, object())
-
- loop = policy.new_event_loop()
- policy.set_event_loop(loop)
- self.assertIs(loop, policy.get_event_loop())
- self.assertIsNot(old_loop, policy.get_event_loop())
- loop.close()
- old_loop.close()
-
- def test_get_event_loop_policy(self):
- with self.assertWarnsRegex(
- DeprecationWarning, "'asyncio.get_event_loop_policy' is deprecated"):
- policy = asyncio.get_event_loop_policy()
- self.assertIsInstance(policy, asyncio.events._AbstractEventLoopPolicy)
- self.assertIs(policy, asyncio.get_event_loop_policy())
-
- def test_set_event_loop_policy(self):
- with self.assertWarnsRegex(
- DeprecationWarning, "'asyncio.set_event_loop_policy' is deprecated"):
- self.assertRaises(
- TypeError, asyncio.set_event_loop_policy, object())
-
- with self.assertWarnsRegex(
- DeprecationWarning, "'asyncio.get_event_loop_policy' is deprecated"):
- old_policy = asyncio.get_event_loop_policy()
+ old_loop = asyncio.events._local._loop
+ loop = None
+ try:
+ self.assertRaises(TypeError, asyncio.set_event_loop, object())
- policy = test_utils.DefaultEventLoopPolicy()
- with self.assertWarnsRegex(
- DeprecationWarning, "'asyncio.set_event_loop_policy' is deprecated"):
- asyncio.set_event_loop_policy(policy)
+ loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
+ self.assertIs(loop, asyncio.get_event_loop())
- with self.assertWarnsRegex(
- DeprecationWarning, "'asyncio.get_event_loop_policy' is deprecated"):
- self.assertIs(policy, asyncio.get_event_loop_policy())
- self.assertIsNot(policy, old_policy)
+ asyncio.set_event_loop(None)
+ self.assertRaises(RuntimeError, asyncio.get_event_loop)
+ finally:
+ asyncio.events._local._loop = old_loop
+ if loop is not None:
+ loop.close()
class GetEventLoopTestsMixin:
def test_get_event_loop_returns_running_loop(self):
- class TestError(Exception):
- pass
-
- class Policy(test_utils.DefaultEventLoopPolicy):
- def get_event_loop(self):
- raise TestError
-
- old_policy = asyncio.events._get_event_loop_policy()
+ old_loop = asyncio.events._local._loop
+ loop = None
try:
- asyncio.events._set_event_loop_policy(Policy())
+ asyncio.set_event_loop(None)
loop = asyncio.new_event_loop()
- with self.assertRaises(TestError):
- asyncio.get_event_loop()
- asyncio.set_event_loop(None)
- with self.assertRaises(TestError):
+ # No running loop and no loop set for the thread: the running
+ # loop is checked first, then get_event_loop() falls back to
+ # the per-thread loop and raises RuntimeError.
+ with self.assertRaisesRegex(RuntimeError, 'no current'):
asyncio.get_event_loop()
with self.assertRaisesRegex(RuntimeError, 'no running'):
asyncio.get_running_loop()
self.assertIs(asyncio._get_running_loop(), None)
+ # While a loop is running, get_event_loop() returns it
+ # without consulting the per-thread loop.
async def func():
self.assertIs(asyncio.get_event_loop(), loop)
self.assertIs(asyncio.get_running_loop(), loop)
loop.run_until_complete(func())
- asyncio.set_event_loop(loop)
- with self.assertRaises(TestError):
- asyncio.get_event_loop()
+ # Back outside the running loop with no loop set: raises again.
asyncio.set_event_loop(None)
- with self.assertRaises(TestError):
+ with self.assertRaisesRegex(RuntimeError, 'no current'):
asyncio.get_event_loop()
finally:
- asyncio.events._set_event_loop_policy(old_policy)
+ asyncio.events._local._loop = old_loop
if loop is not None:
loop.close()
self.assertIs(asyncio._get_running_loop(), None)
def test_get_event_loop_returns_running_loop2(self):
- old_policy = asyncio.events._get_event_loop_policy()
+ old_loop = asyncio.events._local._loop
+ loop = None
try:
- asyncio.events._set_event_loop_policy(test_utils.DefaultEventLoopPolicy())
+ asyncio.set_event_loop(None)
loop = asyncio.new_event_loop()
self.addCleanup(loop.close)
asyncio.get_event_loop()
finally:
- asyncio.events._set_event_loop_policy(old_policy)
+ asyncio.events._local._loop = old_loop
if loop is not None:
loop.close()
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
class TestFreeThreading:
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
def _fakefunc(f):
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
class FutureTests:
# To prevent a warning "test altered the execution environment"
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
def capture_test_stack(*, fut=None, depth=1):
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
class LockTests(unittest.IsolatedAsyncioTestCase):
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
# Test that asyncio.iscoroutine() uses collections.abc.Coroutine
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
def close_transport(transport):
def tearDownModule():
# not needed for the test file but added for uniformness with all other
# asyncio test files for the sake of unified cleanup
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
class ProtocolsAbsTests(unittest.TestCase):
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
class QueueBasicTests(unittest.IsolatedAsyncioTestCase):
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
def interrupt_self():
_thread.interrupt_main()
-class TestPolicy(asyncio.events._AbstractEventLoopPolicy):
-
- def __init__(self, loop_factory):
- self.loop_factory = loop_factory
- self.loop = None
-
- def get_event_loop(self):
- # shouldn't ever be called by asyncio.run()
- raise RuntimeError
-
- def new_event_loop(self):
- return self.loop_factory()
-
- def set_event_loop(self, loop):
- if loop is not None:
- # we want to check if the loop is closed
- # in BaseTest.tearDown
- self.loop = loop
-
-
class BaseTest(unittest.TestCase):
def new_loop(self):
loop.shutdown_ag_run = True
loop.shutdown_asyncgens = shutdown_asyncgens
+ # Track created loops so tearDown can assert they were closed and
+ # had their async generators shut down.
+ self._loops.append(loop)
return loop
def setUp(self):
super().setUp()
- policy = TestPolicy(self.new_loop)
- asyncio.events._set_event_loop_policy(policy)
+ # Isolate the current thread's event loop and use ``new_loop`` as
+ # an explicit ``loop_factory`` in the tests.
+ self._loops = []
+ asyncio.set_event_loop(None)
def tearDown(self):
- policy = asyncio.events._get_event_loop_policy()
- if policy.loop is not None:
- self.assertTrue(policy.loop.is_closed())
- self.assertTrue(policy.loop.shutdown_ag_run)
+ if self._loops:
+ loop = self._loops[-1]
+ self.assertTrue(loop.is_closed())
+ self.assertTrue(loop.shutdown_ag_run)
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
super().tearDown()
await asyncio.sleep(0)
return 42
- self.assertEqual(asyncio.run(main()), 42)
+ self.assertEqual(asyncio.run(main(), loop_factory=self.new_loop), 42)
def test_asyncio_run_raises(self):
async def main():
raise ValueError('spam')
with self.assertRaisesRegex(ValueError, 'spam'):
- asyncio.run(main())
+ asyncio.run(main(), loop_factory=self.new_loop)
def test_asyncio_run_only_coro(self):
for o in {1, lambda: None}:
loop = asyncio.get_event_loop()
self.assertIs(loop.get_debug(), expected)
- asyncio.run(main(False), debug=False)
- asyncio.run(main(True), debug=True)
+ asyncio.run(main(False), debug=False, loop_factory=self.new_loop)
+ asyncio.run(main(True), debug=True, loop_factory=self.new_loop)
with mock.patch('asyncio.coroutines._is_debug_mode', lambda: True):
- asyncio.run(main(True))
- asyncio.run(main(False), debug=False)
+ asyncio.run(main(True), loop_factory=self.new_loop)
+ asyncio.run(main(False), debug=False, loop_factory=self.new_loop)
with mock.patch('asyncio.coroutines._is_debug_mode', lambda: False):
- asyncio.run(main(True), debug=True)
- asyncio.run(main(False))
+ asyncio.run(main(True), debug=True, loop_factory=self.new_loop)
+ asyncio.run(main(False), loop_factory=self.new_loop)
def test_asyncio_run_from_running_loop(self):
async def main():
coro = main()
try:
- asyncio.run(coro)
+ asyncio.run(coro, loop_factory=self.new_loop)
finally:
coro.close() # Suppress ResourceWarning
with self.assertRaisesRegex(RuntimeError,
'cannot be called from a running'):
- asyncio.run(main())
+ asyncio.run(main(), loop_factory=self.new_loop)
def test_asyncio_run_cancels_hanging_tasks(self):
lo_task = None
lo_task = asyncio.create_task(leftover())
return 123
- self.assertEqual(asyncio.run(main()), 123)
+ self.assertEqual(asyncio.run(main(), loop_factory=self.new_loop), 123)
self.assertTrue(lo_task.done())
def test_asyncio_run_reports_hanging_tasks_errors(self):
lo_task = asyncio.create_task(leftover())
return 123
- self.assertEqual(asyncio.run(main()), 123)
+ self.assertEqual(asyncio.run(main(), loop_factory=self.new_loop), 123)
self.assertTrue(lo_task.done())
call_exc_handler_mock.assert_called_with({
raise FancyExit
with self.assertRaises(FancyExit):
- asyncio.run(main())
+ asyncio.run(main(), loop_factory=self.new_loop)
self.assertTrue(lazyboy.done())
await asyncio.sleep(0)
return 42
- policy = asyncio.events._get_event_loop_policy()
- policy.set_event_loop = mock.Mock()
- asyncio.run(main())
- self.assertTrue(policy.set_event_loop.called)
+ # asyncio.run() must set the event loop for the running thread.
+ # This only happens when no explicit loop_factory is passed, so
+ # patch the default loop creation to use a mock loop and verify
+ # set_event_loop() is called.
+ with mock.patch('asyncio.events.new_event_loop', self.new_loop), \
+ mock.patch('asyncio.runners.events.set_event_loop') as set_event_loop:
+ asyncio.run(main())
+ self.assertTrue(set_event_loop.called)
def test_asyncio_run_without_uncancel(self):
# See https://github.com/python/cpython/issues/95097
loop.set_task_factory(Task)
return loop
- asyncio.events._set_event_loop_policy(TestPolicy(new_event_loop))
with self.assertRaises(asyncio.CancelledError):
- asyncio.run(main())
+ asyncio.run(main(), loop_factory=new_event_loop)
def test_asyncio_run_loop_factory(self):
factory = mock.Mock()
async def coro():
pass
- policy = asyncio.events._get_event_loop_policy()
- policy.set_event_loop = mock.Mock()
- runner = asyncio.Runner()
- runner.run(coro())
- runner.run(coro())
-
- self.assertEqual(1, policy.set_event_loop.call_count)
- runner.close()
+ # The runner must call set_event_loop() exactly once even when
+ # run() is called multiple times. This only happens on the
+ # implicit loop_factory path, so patch the default loop creation.
+ with mock.patch('asyncio.events.new_event_loop', self.new_loop), \
+ mock.patch('asyncio.runners.events.set_event_loop') as set_event_loop:
+ runner = asyncio.Runner()
+ runner.run(coro())
+ runner.run(coro())
+
+ self.assertEqual(1, set_event_loop.call_count)
+ runner.close()
def test_no_repr_is_call_on_the_task_result(self):
# See https://github.com/python/cpython/issues/112559.
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
class TestBaseSelectorEventLoop(BaseSelectorEventLoop):
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
class MySendfileProto(asyncio.Protocol):
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
class BaseStartServer(func_tests.FunctionalTestCaseMixin):
from test.test_asyncio import utils as test_utils
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
class ServerContextvarsTestCase:
loop_factory = None # To be defined in subclasses
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
class MyProto(asyncio.Protocol):
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
class MyBaseProto(asyncio.Protocol):
def new_loop(self):
return asyncio.new_event_loop()
- def new_policy(self):
- return asyncio.DefaultEventLoopPolicy()
-
async def wait_closed(self, obj):
if not isinstance(obj, asyncio.StreamWriter):
return
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
@unittest.skipIf(ssl is None, 'No ssl module')
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
class StaggeredTests(unittest.IsolatedAsyncioTestCase):
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
class StreamTests(test_utils.TestCase):
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
class TestSubprocessTransport(base_subprocess.BaseSubprocessTransport):
# To prevent a warning "test altered the execution environment"
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
class MyExc(Exception):
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
async def coroutine_function():
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
class ToThreadTests(unittest.IsolatedAsyncioTestCase):
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
class TimeoutTests(unittest.IsolatedAsyncioTestCase):
def tearDownModule():
# not needed for the test file but added for uniformness with all other
# asyncio test files for the sake of unified cleanup
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
class TransportTests(unittest.TestCase):
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
MOCK_ANY = mock.ANY
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
# The following value can be used as a very small timeout:
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
class UpperProto(asyncio.Protocol):
thr.join()
-class WinPolicyTests(WindowsEventsTestCase):
-
- def test_selector_win_policy(self):
- async def main():
- self.assertIsInstance(asyncio.get_running_loop(), asyncio.SelectorEventLoop)
-
- old_policy = asyncio.events._get_event_loop_policy()
- try:
- with self.assertWarnsRegex(
- DeprecationWarning,
- "'asyncio.WindowsSelectorEventLoopPolicy' is deprecated",
- ):
- asyncio.events._set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
- asyncio.run(main())
- finally:
- asyncio.events._set_event_loop_policy(old_policy)
-
- def test_proactor_win_policy(self):
- async def main():
- self.assertIsInstance(
- asyncio.get_running_loop(),
- asyncio.ProactorEventLoop)
-
- old_policy = asyncio.events._get_event_loop_policy()
- try:
- with self.assertWarnsRegex(
- DeprecationWarning,
- "'asyncio.WindowsProactorEventLoopPolicy' is deprecated",
- ):
- asyncio.events._set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
- asyncio.run(main())
- finally:
- asyncio.events._set_event_loop_policy(old_policy)
-
-
if __name__ == '__main__':
unittest.main()
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
class PipeTests(unittest.TestCase):
await asyncio.sleep(0)
if exc is not None:
raise exc
-
-
-if sys.platform == 'win32':
- DefaultEventLoopPolicy = asyncio.windows_events._DefaultEventLoopPolicy
-else:
- DefaultEventLoopPolicy = asyncio.unix_events._DefaultEventLoopPolicy
@classmethod
def setUpClass(cls):
- # Most uses of asyncio will implicitly call set_event_loop_policy()
- # with the default policy if a policy hasn't been set already.
+ # Most uses of asyncio will implicitly set a thread event loop
+ # if one hasn't been set already.
# If that happens in a test, like here, we'll end up with a failure
# when --fail-env-changed is used. That's why the other tests that
- # use asyncio are careful to set the policy back to None and why
+ # use asyncio are careful to set the loop back to None and why
# we're careful to do so here. We also validate that no other
- # tests left a policy in place, just in case.
- policy = support.maybe_get_event_loop_policy()
- assert policy is None, policy
- cls.addClassCleanup(lambda: asyncio.events._set_event_loop_policy(None))
+ # tests left a loop in place, just in case.
+ loop = support.maybe_get_event_loop()
+ assert loop is None, loop
+ cls.addClassCleanup(lambda: asyncio.set_event_loop(None))
def setUp(self):
super().setUp()
pass
finally:
loop.close()
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
self.assertEqual(buffer, [1, 2, 'MyException'])
@classmethod
def tearDownClass(cls):
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
def _asyncgenstate(self):
return inspect.getasyncgenstate(self.asyncgen)
logging.logAsyncioTasks = False
runner.run(make_record(self.assertIsNone))
finally:
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
@support.requires_working_socket()
def test_taskName_without_asyncio_imported(self):
logging.logAsyncioTasks = False
runner.run(make_record(self.assertIsNone))
finally:
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
class BasicConfigTest(unittest.TestCase):
data = f.read().strip()
self.assertRegex(data, r'Task-\d+ - hello world')
finally:
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
if handler:
handler.close()
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
class MiscTests(unittest.TestCase):
import zipapp
import zipfile
-from asyncio.events import _set_event_loop_policy
+from asyncio import set_event_loop
from contextlib import ExitStack, redirect_stdout
from io import StringIO
from test import support
# Ensure that asyncio state has been cleared at the end of the test.
# This prevents a "test altered the execution environment" warning if
# asyncio features are used.
- _set_event_loop_policy(None)
+ set_event_loop(None)
# A doctest of pdb could have residues. For example, pdb could still
# be running, or breakpoints might be left uncleared. These residues
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
class TestCM:
class TestCase1(unittest.IsolatedAsyncioTestCase):
def setUp(self):
- asyncio.events._get_event_loop_policy().get_event_loop()
+ asyncio.get_event_loop()
async def test_demo1(self):
pass
self.assertTrue(result.wasSuccessful())
def test_loop_factory(self):
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
class TestCase1(unittest.IsolatedAsyncioTestCase):
loop_factory = asyncio.EventLoop
test = TestCase1('test_demo1')
result = test.run()
self.assertTrue(result.wasSuccessful())
- self.assertIsNone(support.maybe_get_event_loop_policy())
+ self.assertIsNone(support.maybe_get_event_loop())
if __name__ == "__main__":
unittest.main()
def tearDownModule():
- asyncio.events._set_event_loop_policy(None)
+ asyncio.set_event_loop(None)
class AsyncClass:
# To share contextvars between setUp(), test and tearDown() we need to execute
# them inside the same task.
- # Note: the test case modifies event loop policy if the policy was not instantiated
- # yet, unless loop_factory=asyncio.EventLoop is set.
- # asyncio.get_event_loop_policy() creates a default policy on demand but never
- # returns None
+ # Note: the test case sets the per-thread event loop unless
+ # loop_factory=asyncio.EventLoop is set.
# I believe this is not an issue in user level tests but python itself for testing
- # should reset a policy in every test module
- # by calling asyncio.set_event_loop_policy(None) in tearDownModule()
+ # should reset the event loop in every test module
+ # by calling asyncio.set_event_loop(None) in tearDownModule()
# or set loop_factory=asyncio.EventLoop
loop_factory = None
--- /dev/null
+Remove the deprecated :mod:`asyncio` event loop policy system. Patch by Kumar Aditya.
PyObject *iscoroutine_typecache;
/* Imports from asyncio.events. */
- PyObject *asyncio_get_event_loop_policy;
+ PyObject *asyncio_get_event_loop;
/* Imports from asyncio.base_futures. */
PyObject *asyncio_future_repr_func;
get_event_loop(asyncio_state *state)
{
PyObject *loop;
- PyObject *policy;
_PyThreadStateImpl *ts = (_PyThreadStateImpl *)_PyThreadState_GET();
loop = Py_XNewRef(ts->asyncio_running_loop);
return loop;
}
- policy = PyObject_CallNoArgs(state->asyncio_get_event_loop_policy);
- if (policy == NULL) {
- return NULL;
- }
-
- loop = PyObject_CallMethodNoArgs(policy, &_Py_ID(get_event_loop));
- Py_DECREF(policy);
- return loop;
+ return PyObject_CallNoArgs(state->asyncio_get_event_loop);
}
running event loop.
If there is no running event loop set, the function will return
-the result of `get_event_loop_policy().get_event_loop()` call.
+the loop set by `set_event_loop()`, or raise a RuntimeError if
+no loop has been set.
[clinic start generated code]*/
static PyObject *
_asyncio_get_event_loop_impl(PyObject *module)
-/*[clinic end generated code: output=2a2d8b2f824c648b input=9364bf2916c8655d]*/
+/*[clinic end generated code: output=2a2d8b2f824c648b input=fa104f00dc7995dc]*/
{
asyncio_state *state = get_asyncio_state(module);
return get_event_loop(state);
Py_VISIT(state->asyncio_mod);
Py_VISIT(state->traceback_extract_stack);
Py_VISIT(state->asyncio_future_repr_func);
- Py_VISIT(state->asyncio_get_event_loop_policy);
+ Py_VISIT(state->asyncio_get_event_loop);
Py_VISIT(state->asyncio_iscoroutine_func);
Py_VISIT(state->asyncio_task_get_stack_func);
Py_VISIT(state->asyncio_task_print_stack_func);
Py_CLEAR(state->asyncio_mod);
Py_CLEAR(state->traceback_extract_stack);
Py_CLEAR(state->asyncio_future_repr_func);
- Py_CLEAR(state->asyncio_get_event_loop_policy);
+ Py_CLEAR(state->asyncio_get_event_loop);
Py_CLEAR(state->asyncio_iscoroutine_func);
Py_CLEAR(state->asyncio_task_get_stack_func);
Py_CLEAR(state->asyncio_task_print_stack_func);
}
WITH_MOD("asyncio.events")
- GET_MOD_ATTR(state->asyncio_get_event_loop_policy, "_get_event_loop_policy")
+ GET_MOD_ATTR(state->asyncio_get_event_loop, "_get_event_loop")
WITH_MOD("asyncio.base_futures")
GET_MOD_ATTR(state->asyncio_future_repr_func, "_future_repr")
"running event loop.\n"
"\n"
"If there is no running event loop set, the function will return\n"
-"the result of `get_event_loop_policy().get_event_loop()` call.");
+"the loop set by `set_event_loop()`, or raise a RuntimeError if\n"
+"no loop has been set.");
#define _ASYNCIO_GET_EVENT_LOOP_METHODDEF \
{"get_event_loop", (PyCFunction)_asyncio_get_event_loop, METH_NOARGS, _asyncio_get_event_loop__doc__},
exit:
return return_value;
}
-/*[clinic end generated code: output=32996fb47c48245b input=a9049054013a1b77]*/
+/*[clinic end generated code: output=22e74568ff49f81f input=a9049054013a1b77]*/