'Handle', 'TimerHandle',
'get_event_loop_policy', 'set_event_loop_policy',
'get_event_loop', 'set_event_loop', 'new_event_loop',
- 'get_child_watcher', 'set_child_watcher',
'_set_running_loop', 'get_running_loop',
'_get_running_loop',
)
the current context, set_event_loop must be called explicitly."""
raise NotImplementedError
- # Child processes handling (Unix only).
-
- def get_child_watcher(self):
- "Get the watcher for child processes."
- raise NotImplementedError
-
- def set_child_watcher(self, watcher):
- """Set the watcher for child processes."""
- raise NotImplementedError
-
-
class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
"""Default policy implementation for accessing the event loop.
return get_event_loop_policy().new_event_loop()
-def get_child_watcher():
- """Equivalent to calling get_event_loop_policy().get_child_watcher()."""
- return get_event_loop_policy().get_child_watcher()
-
-
-def set_child_watcher(watcher):
- """Equivalent to calling
- get_event_loop_policy().set_child_watcher(watcher)."""
- return get_event_loop_policy().set_child_watcher(watcher)
-
-
# Alias pure-Python implementations for testing purposes.
_py__get_running_loop = _get_running_loop
_py__set_running_loop = _set_running_loop
extra=None, **kwargs):
with warnings.catch_warnings():
warnings.simplefilter('ignore', DeprecationWarning)
- watcher = events.get_child_watcher()
+ watcher = events.get_event_loop_policy()._watcher
with watcher:
if not watcher.is_active():
return True
-class BaseChildWatcher(AbstractChildWatcher):
-
- def __init__(self):
- self._loop = None
- self._callbacks = {}
-
- def close(self):
- self.attach_loop(None)
-
- def is_active(self):
- return self._loop is not None and self._loop.is_running()
-
- def _do_waitpid(self, expected_pid):
- raise NotImplementedError()
-
- def _do_waitpid_all(self):
- raise NotImplementedError()
-
- def attach_loop(self, loop):
- assert loop is None or isinstance(loop, events.AbstractEventLoop)
-
- if self._loop is not None and loop is None and self._callbacks:
- warnings.warn(
- 'A loop is being detached '
- 'from a child watcher with pending handlers',
- RuntimeWarning)
-
- if self._loop is not None:
- self._loop.remove_signal_handler(signal.SIGCHLD)
-
- self._loop = loop
- if loop is not None:
- loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
-
- # Prevent a race condition in case a child terminated
- # during the switch.
- self._do_waitpid_all()
-
- def _sig_chld(self):
- try:
- self._do_waitpid_all()
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
- # self._loop should always be available here
- # as '_sig_chld' is added as a signal handler
- # in 'attach_loop'
- self._loop.call_exception_handler({
- 'message': 'Unknown exception in SIGCHLD handler',
- 'exception': exc,
- })
-
-
class ThreadedChildWatcher(AbstractChildWatcher):
"""Threaded child watcher implementation.
def __init__(self):
super().__init__()
- self._watcher = None
-
- def _init_watcher(self):
- with events._lock:
- if self._watcher is None: # pragma: no branch
- if can_use_pidfd():
- self._watcher = PidfdChildWatcher()
- else:
- self._watcher = ThreadedChildWatcher()
+ if can_use_pidfd():
+ self._watcher = PidfdChildWatcher()
+ else:
+ self._watcher = ThreadedChildWatcher()
def set_event_loop(self, loop):
"""Set the event loop.
threading.current_thread() is threading.main_thread()):
self._watcher.attach_loop(loop)
- def get_child_watcher(self):
- """Get the watcher for child processes.
-
- If not yet set, a ThreadedChildWatcher object is automatically created.
- """
- if self._watcher is None:
- self._init_watcher()
-
- warnings._deprecated("get_child_watcher",
- "{name!r} is deprecated as of Python 3.12 and will be "
- "removed in Python {remove}.", remove=(3, 14))
- return self._watcher
-
- def set_child_watcher(self, watcher):
- """Set the watcher for child processes."""
-
- assert watcher is None or isinstance(watcher, AbstractChildWatcher)
-
- if self._watcher is not None:
- self._watcher.close()
-
- self._watcher = watcher
- warnings._deprecated("set_child_watcher",
- "{name!r} is deprecated as of Python 3.12 and will be "
- "removed in Python {remove}.", remove=(3, 14))
-
-
SelectorEventLoop = _UnixSelectorEventLoop
DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy
EventLoop = SelectorEventLoop
class UnixEventLoopTestsMixin(EventLoopTestsMixin):
def setUp(self):
super().setUp()
- with warnings.catch_warnings():
- warnings.simplefilter('ignore', DeprecationWarning)
- watcher = asyncio.ThreadedChildWatcher()
- watcher.attach_loop(self.loop)
- asyncio.set_child_watcher(watcher)
+ watcher = asyncio.ThreadedChildWatcher()
+ watcher.attach_loop(self.loop)
+ policy = asyncio.get_event_loop_policy()
+ policy._watcher = watcher
def tearDown(self):
- with warnings.catch_warnings():
- warnings.simplefilter('ignore', DeprecationWarning)
- asyncio.set_child_watcher(None)
+ policy = asyncio.get_event_loop_policy()
+ policy._watcher = None
super().tearDown()
self.assertRaises(NotImplementedError, policy.get_event_loop)
self.assertRaises(NotImplementedError, policy.set_event_loop, object())
self.assertRaises(NotImplementedError, policy.new_event_loop)
- self.assertRaises(NotImplementedError, policy.get_child_watcher)
- self.assertRaises(NotImplementedError, policy.set_child_watcher,
- object())
def test_get_event_loop(self):
policy = asyncio.DefaultEventLoopPolicy()
def tearDown(self):
try:
if sys.platform != 'win32':
- with warnings.catch_warnings():
- warnings.simplefilter('ignore', DeprecationWarning)
- asyncio.set_child_watcher(None)
+ policy = asyncio.get_event_loop_policy()
+ policy._watcher = None
super().tearDown()
finally:
watcher = self._get_watcher()
watcher.attach_loop(self.loop)
- with warnings.catch_warnings():
- warnings.simplefilter('ignore', DeprecationWarning)
- policy.set_child_watcher(watcher)
+ policy._watcher = watcher
def tearDown(self):
super().tearDown()
policy = asyncio.get_event_loop_policy()
- with warnings.catch_warnings():
- warnings.simplefilter('ignore', DeprecationWarning)
- watcher = policy.get_child_watcher()
- policy.set_child_watcher(None)
+ watcher = policy._watcher
+ policy._watcher = None
watcher.attach_loop(None)
watcher.close()
return unix_events.PidfdChildWatcher()
- class GenericWatcherTests(test_utils.TestCase):
-
- def test_create_subprocess_fails_with_inactive_watcher(self):
- watcher = mock.create_autospec(asyncio.AbstractChildWatcher)
- watcher.is_active.return_value = False
-
- async def execute():
- asyncio.set_child_watcher(watcher)
-
- with self.assertRaises(RuntimeError):
- await subprocess.create_subprocess_exec(
- os_helper.FakePath(sys.executable), '-c', 'pass')
-
- watcher.add_child_handler.assert_not_called()
-
- with asyncio.Runner(loop_factory=asyncio.new_event_loop) as runner:
- with warnings.catch_warnings():
- warnings.simplefilter('ignore', DeprecationWarning)
- self.assertIsNone(runner.run(execute()))
- self.assertListEqual(watcher.mock_calls, [
- mock.call.__enter__(),
- mock.call.is_active(),
- mock.call.__exit__(RuntimeError, mock.ANY, mock.ANY),
- ], watcher.mock_calls)
-
-
- @unittest.skipUnless(
- unix_events.can_use_pidfd(),
- "operating system does not support pidfds",
- )
- def test_create_subprocess_with_pidfd(self):
- async def in_thread():
- proc = await asyncio.create_subprocess_exec(
- *PROGRAM_CAT,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- )
- stdout, stderr = await proc.communicate(b"some data")
- return proc.returncode, stdout
-
- async def main():
- # asyncio.Runner did not call asyncio.set_event_loop()
- with warnings.catch_warnings():
- warnings.simplefilter('error', DeprecationWarning)
- # get_event_loop() raises DeprecationWarning if
- # set_event_loop() was never called and RuntimeError if
- # it was called at least once.
- with self.assertRaises((RuntimeError, DeprecationWarning)):
- asyncio.get_event_loop_policy().get_event_loop()
- return await asyncio.to_thread(asyncio.run, in_thread())
- with self.assertWarns(DeprecationWarning):
- asyncio.set_child_watcher(asyncio.PidfdChildWatcher())
- try:
- with asyncio.Runner(loop_factory=asyncio.new_event_loop) as runner:
- returncode, stdout = runner.run(main())
- self.assertEqual(returncode, 0)
- self.assertEqual(stdout, b'some data')
- finally:
- with self.assertWarns(DeprecationWarning):
- asyncio.set_child_watcher(None)
else:
# Windows
class SubprocessProactorTests(SubprocessMixin, test_utils.TestCase):
NotImplementedError, watcher.__exit__, f, f, f)
-class PolicyTests(unittest.TestCase):
-
- def create_policy(self):
- return asyncio.DefaultEventLoopPolicy()
-
- @mock.patch('asyncio.unix_events.can_use_pidfd')
- def test_get_default_child_watcher(self, m_can_use_pidfd):
- m_can_use_pidfd.return_value = False
- policy = self.create_policy()
- self.assertIsNone(policy._watcher)
- with self.assertWarns(DeprecationWarning):
- watcher = policy.get_child_watcher()
- self.assertIsInstance(watcher, asyncio.ThreadedChildWatcher)
-
- self.assertIs(policy._watcher, watcher)
- with self.assertWarns(DeprecationWarning):
- self.assertIs(watcher, policy.get_child_watcher())
-
- m_can_use_pidfd.return_value = True
- policy = self.create_policy()
- self.assertIsNone(policy._watcher)
- with self.assertWarns(DeprecationWarning):
- watcher = policy.get_child_watcher()
- self.assertIsInstance(watcher, asyncio.PidfdChildWatcher)
-
- self.assertIs(policy._watcher, watcher)
- with self.assertWarns(DeprecationWarning):
- self.assertIs(watcher, policy.get_child_watcher())
-
class TestFunctional(unittest.TestCase):
def setUp(self):
policy = support.maybe_get_event_loop_policy()
if policy is not None:
try:
- with warnings.catch_warnings():
- warnings.simplefilter('ignore', DeprecationWarning)
- watcher = policy.get_child_watcher()
- except NotImplementedError:
+ watcher = policy._watcher
+ except AttributeError:
# watcher is not implemented by EventLoopPolicy, e.g. Windows
pass
else: