]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
GH-66285: fix forking in asyncio (#99769)
authorKumar Aditya <59607654+kumaraditya303@users.noreply.github.com>
Sun, 27 Nov 2022 05:54:48 +0000 (11:24 +0530)
committerGitHub <noreply@github.com>
Sun, 27 Nov 2022 05:54:48 +0000 (11:24 +0530)
Closes #66285

Lib/asyncio/events.py
Lib/test/test_asyncio/test_unix_events.py
Misc/NEWS.d/next/Library/2022-11-17-10-56-47.gh-issue-66285.KvjlaB.rst [new file with mode: 0644]

index a327ba54a323a8e2b04d984051ad41ebefbac8f7..2836bbcc463fe52f364cedf6f6c99dd4bc7fd7fa 100644 (file)
@@ -13,6 +13,7 @@ __all__ = (
 
 import contextvars
 import os
+import signal
 import socket
 import subprocess
 import sys
@@ -842,3 +843,13 @@ else:
     _c_get_running_loop = get_running_loop
     _c_get_event_loop = get_event_loop
     _c__get_event_loop = _get_event_loop
+
+
+if hasattr(os, 'fork'):
+    def on_fork():
+        # Reset the loop and wakeupfd in the forked child process.
+        if _event_loop_policy is not None:
+            _event_loop_policy._local = BaseDefaultEventLoopPolicy._Local()
+        signal.set_wakeup_fd(-1)
+
+    os.register_at_fork(after_in_child=on_fork)
index 93e8611f184d259c3217cff75cec992d56a0fae9..092edb215854b7e54e452314350ef63197fe131f 100644 (file)
@@ -3,6 +3,7 @@
 import contextlib
 import errno
 import io
+import multiprocessing
 import os
 import pathlib
 import signal
@@ -15,6 +16,8 @@ from unittest import mock
 import warnings
 from test.support import os_helper
 from test.support import socket_helper
+from test.support import wait_process
+from test.support import hashlib_helper
 
 if sys.platform == 'win32':
     raise unittest.SkipTest('UNIX only')
@@ -1867,5 +1870,100 @@ class TestFunctional(unittest.TestCase):
             wsock.close()
 
 
+@unittest.skipUnless(hasattr(os, 'fork'), 'requires os.fork()')
+class TestFork(unittest.IsolatedAsyncioTestCase):
+
+    async def test_fork_not_share_event_loop(self):
+        # The forked process should not share the event loop with the parent
+        loop = asyncio.get_running_loop()
+        r, w = os.pipe()
+        self.addCleanup(os.close, r)
+        self.addCleanup(os.close, w)
+        pid = os.fork()
+        if pid == 0:
+            # child
+            try:
+                loop = asyncio.get_event_loop_policy().get_event_loop()
+                os.write(w, str(id(loop)).encode())
+            finally:
+                os._exit(0)
+        else:
+            # parent
+            child_loop = int(os.read(r, 100).decode())
+            self.assertNotEqual(child_loop, id(loop))
+            wait_process(pid, exitcode=0)
+
+    @hashlib_helper.requires_hashdigest('md5')
+    def test_fork_signal_handling(self):
+        # Sending signal to the forked process should not affect the parent
+        # process
+        ctx = multiprocessing.get_context('fork')
+        manager = ctx.Manager()
+        self.addCleanup(manager.shutdown)
+        child_started = manager.Event()
+        child_handled = manager.Event()
+        parent_handled = manager.Event()
+
+        def child_main():
+            signal.signal(signal.SIGTERM, lambda *args: child_handled.set())
+            child_started.set()
+            time.sleep(1)
+
+        async def main():
+            loop = asyncio.get_running_loop()
+            loop.add_signal_handler(signal.SIGTERM, lambda *args: parent_handled.set())
+
+            process = ctx.Process(target=child_main)
+            process.start()
+            child_started.wait()
+            os.kill(process.pid, signal.SIGTERM)
+            process.join()
+
+            async def func():
+                await asyncio.sleep(0.1)
+                return 42
+
+            # Test parent's loop is still functional
+            self.assertEqual(await asyncio.create_task(func()), 42)
+
+        asyncio.run(main())
+
+        self.assertFalse(parent_handled.is_set())
+        self.assertTrue(child_handled.is_set())
+
+    @hashlib_helper.requires_hashdigest('md5')
+    def test_fork_asyncio_run(self):
+        ctx = multiprocessing.get_context('fork')
+        manager = ctx.Manager()
+        self.addCleanup(manager.shutdown)
+        result = manager.Value('i', 0)
+
+        async def child_main():
+            await asyncio.sleep(0.1)
+            result.value = 42
+
+        process = ctx.Process(target=lambda: asyncio.run(child_main()))
+        process.start()
+        process.join()
+
+        self.assertEqual(result.value, 42)
+
+    @hashlib_helper.requires_hashdigest('md5')
+    def test_fork_asyncio_subprocess(self):
+        ctx = multiprocessing.get_context('fork')
+        manager = ctx.Manager()
+        self.addCleanup(manager.shutdown)
+        result = manager.Value('i', 1)
+
+        async def child_main():
+            proc = await asyncio.create_subprocess_exec(sys.executable, '-c', 'pass')
+            result.value = await proc.wait()
+
+        process = ctx.Process(target=lambda: asyncio.run(child_main()))
+        process.start()
+        process.join()
+
+        self.assertEqual(result.value, 0)
+
 if __name__ == '__main__':
     unittest.main()
diff --git a/Misc/NEWS.d/next/Library/2022-11-17-10-56-47.gh-issue-66285.KvjlaB.rst b/Misc/NEWS.d/next/Library/2022-11-17-10-56-47.gh-issue-66285.KvjlaB.rst
new file mode 100644 (file)
index 0000000..ebd8217
--- /dev/null
@@ -0,0 +1 @@
+Fix :mod:`asyncio` to not share event loop and signal wakeupfd in forked processes. Patch by Kumar Aditya.