]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-104812: Run Pending Calls in any Thread (gh-104813)
authorEric Snow <ericsnowcurrently@gmail.com>
Tue, 13 Jun 2023 21:02:19 +0000 (15:02 -0600)
committerGitHub <noreply@github.com>
Tue, 13 Jun 2023 21:02:19 +0000 (15:02 -0600)
For a while now, pending calls only run in the main thread (in the main interpreter).  This PR changes things to allow any thread run a pending call, unless the pending call was explicitly added for the main thread to run.

16 files changed:
Include/cpython/ceval.h
Include/internal/pycore_ceval.h
Include/internal/pycore_ceval_state.h
Include/internal/pycore_pystate.h
Lib/test/support/threading_helper.py
Lib/test/test_capi/test_misc.py
Misc/NEWS.d/next/Core and Builtins/2023-06-02-15-15-41.gh-issue-104812.dfZiG5.rst [new file with mode: 0644]
Modules/_queuemodule.c
Modules/_testinternalcapi.c
Modules/_threadmodule.c
Modules/signalmodule.c
Python/ceval.c
Python/ceval_gil.c
Python/pylifecycle.c
Python/pystate.c
Tools/c-analyzer/cpython/ignored.tsv

index 0fbbee10c2edce0bc4faaa8860be7f7bab7e4ebc..a9616bd6a4f5186b1e1d8408639e5729f04ab6b1 100644 (file)
@@ -22,6 +22,8 @@ PyAPI_FUNC(PyObject *) _PyEval_EvalFrameDefault(PyThreadState *tstate, struct _P
 PyAPI_FUNC(void) _PyEval_SetSwitchInterval(unsigned long microseconds);
 PyAPI_FUNC(unsigned long) _PyEval_GetSwitchInterval(void);
 
+PyAPI_FUNC(int) _PyEval_MakePendingCalls(PyThreadState *);
+
 PyAPI_FUNC(Py_ssize_t) PyUnstable_Eval_RequestCodeExtraIndex(freefunc);
 // Old name -- remove when this API changes:
 _Py_DEPRECATED_EXTERNALLY(3.12) static inline Py_ssize_t
index ca2703781de4b088d44d60f8a124a5006f981220..9e9b523e7c222286a2177eb1e2d1238019f54486 100644 (file)
@@ -27,7 +27,8 @@ PyAPI_FUNC(void) _PyEval_SignalReceived(PyInterpreterState *interp);
 PyAPI_FUNC(int) _PyEval_AddPendingCall(
     PyInterpreterState *interp,
     int (*func)(void *),
-    void *arg);
+    void *arg,
+    int mainthreadonly);
 PyAPI_FUNC(void) _PyEval_SignalAsyncExc(PyInterpreterState *interp);
 #ifdef HAVE_FORK
 extern PyStatus _PyEval_ReInitThreads(PyThreadState *tstate);
index 95d1fa16ba40dc92f0d358e4e121e442cd53ecc2..e56e43c6e0c6a7f247de227910e4f1f5e0e10f35 100644 (file)
@@ -13,6 +13,24 @@ extern "C" {
 #include "pycore_gil.h"             // struct _gil_runtime_state
 
 
+struct _pending_calls {
+    int busy;
+    PyThread_type_lock lock;
+    /* Request for running pending calls. */
+    _Py_atomic_int calls_to_do;
+    /* Request for looking at the `async_exc` field of the current
+       thread state.
+       Guarded by the GIL. */
+    int async_exc;
+#define NPENDINGCALLS 32
+    struct _pending_call {
+        int (*func)(void *);
+        void *arg;
+    } calls[NPENDINGCALLS];
+    int first;
+    int last;
+};
+
 typedef enum {
     PERF_STATUS_FAILED = -1,  // Perf trampoline is in an invalid state
     PERF_STATUS_NO_INIT = 0,  // Perf trampoline is not initialized
@@ -49,6 +67,8 @@ struct _ceval_runtime_state {
        the main thread of the main interpreter can handle signals: see
        _Py_ThreadCanHandleSignals(). */
     _Py_atomic_int signals_pending;
+    /* Pending calls to be made only on the main thread. */
+    struct _pending_calls pending_mainthread;
 };
 
 #ifdef PY_HAVE_PERF_TRAMPOLINE
@@ -62,24 +82,6 @@ struct _ceval_runtime_state {
 #endif
 
 
-struct _pending_calls {
-    int busy;
-    PyThread_type_lock lock;
-    /* Request for running pending calls. */
-    _Py_atomic_int calls_to_do;
-    /* Request for looking at the `async_exc` field of the current
-       thread state.
-       Guarded by the GIL. */
-    int async_exc;
-#define NPENDINGCALLS 32
-    struct {
-        int (*func)(void *);
-        void *arg;
-    } calls[NPENDINGCALLS];
-    int first;
-    int last;
-};
-
 struct _ceval_state {
     /* This single variable consolidates all requests to break out of
        the fast path in the eval loop. */
index daa40cf4bcd855452d6af06742cac83b45107cd3..43652c4405ec1a481889f73d314ac45a7bc0619d 100644 (file)
@@ -60,14 +60,6 @@ _Py_ThreadCanHandleSignals(PyInterpreterState *interp)
 }
 
 
-/* Only execute pending calls on the main thread. */
-static inline int
-_Py_ThreadCanHandlePendingCalls(void)
-{
-    return _Py_IsMainThread();
-}
-
-
 /* Variable and static inline functions for in-line access to current thread
    and interpreter state */
 
index b9973c8bf5c914ddf9d14af54b8346229ee3a8c1..7f16050f32b9d192de2c740f3107b86332eb2b2c 100644 (file)
@@ -115,7 +115,11 @@ def join_thread(thread, timeout=None):
 
 @contextlib.contextmanager
 def start_threads(threads, unlock=None):
-    import faulthandler
+    try:
+        import faulthandler
+    except ImportError:
+        # It isn't supported on subinterpreters yet.
+        faulthandler = None
     threads = list(threads)
     started = []
     try:
@@ -147,7 +151,8 @@ def start_threads(threads, unlock=None):
         finally:
             started = [t for t in started if t.is_alive()]
             if started:
-                faulthandler.dump_traceback(sys.stdout)
+                if faulthandler is not None:
+                    faulthandler.dump_traceback(sys.stdout)
                 raise AssertionError('Unable to join %d threads' % len(started))
 
 
index 04a0f8f46cd61e5e147cd1e77e8ad924e9b751c0..58e1a83da5c146854ea16876842d12c100dc90ab 100644 (file)
@@ -2,17 +2,20 @@
 # these are all functions _testcapi exports whose name begins with 'test_'.
 
 import _thread
-from collections import OrderedDict
+from collections import OrderedDict, deque
 import contextlib
 import importlib.machinery
 import importlib.util
+import json
 import os
 import pickle
+import queue
 import random
 import sys
 import textwrap
 import threading
 import time
+import types
 import unittest
 import warnings
 import weakref
@@ -36,6 +39,10 @@ try:
     import _testsinglephase
 except ImportError:
     _testsinglephase = None
+try:
+    import _xxsubinterpreters as _interpreters
+except ModuleNotFoundError:
+    _interpreters = None
 
 # Skip this test if the _testcapi module isn't available.
 _testcapi = import_helper.import_module('_testcapi')
@@ -47,6 +54,12 @@ def decode_stderr(err):
     return err.decode('utf-8', 'replace').replace('\r', '')
 
 
+def requires_subinterpreters(meth):
+    """Decorator to skip a test if subinterpreters are not supported."""
+    return unittest.skipIf(_interpreters is None,
+                           'subinterpreters required')(meth)
+
+
 def testfunction(self):
     """some doc"""
     return self
@@ -1259,6 +1272,10 @@ class TestHeapTypeRelative(unittest.TestCase):
 
 class TestPendingCalls(unittest.TestCase):
 
+    # See the comment in ceval.c (at the "handle_eval_breaker" label)
+    # about when pending calls get run.  This is especially relevant
+    # here for creating deterministic tests.
+
     def pendingcalls_submit(self, l, n):
         def callback():
             #this function can be interrupted by thread switching so let's
@@ -1341,6 +1358,388 @@ class TestPendingCalls(unittest.TestCase):
         gen = genf()
         self.assertEqual(_testcapi.gen_get_code(gen), gen.gi_code)
 
+    class PendingTask(types.SimpleNamespace):
+
+        _add_pending = _testinternalcapi.pending_threadfunc
+
+        def __init__(self, req, taskid=None, notify_done=None):
+            self.id = taskid
+            self.req = req
+            self.notify_done = notify_done
+
+            self.creator_tid = threading.get_ident()
+            self.requester_tid = None
+            self.runner_tid = None
+            self.result = None
+
+        def run(self):
+            assert self.result is None
+            self.runner_tid = threading.get_ident()
+            self._run()
+            if self.notify_done is not None:
+                self.notify_done()
+
+        def _run(self):
+            self.result = self.req
+
+        def run_in_pending_call(self, worker_tids):
+            assert self._add_pending is _testinternalcapi.pending_threadfunc
+            self.requester_tid = threading.get_ident()
+            def callback():
+                assert self.result is None
+                # It can be tricky to control which thread handles
+                # the eval breaker, so we take a naive approach to
+                # make sure.
+                if threading.get_ident() not in worker_tids:
+                    self._add_pending(callback, ensure_added=True)
+                    return
+                self.run()
+            self._add_pending(callback, ensure_added=True)
+
+        def create_thread(self, worker_tids):
+            return threading.Thread(
+                target=self.run_in_pending_call,
+                args=(worker_tids,),
+            )
+
+        def wait_for_result(self):
+            while self.result is None:
+                time.sleep(0.01)
+
+    def test_subthreads_can_handle_pending_calls(self):
+        payload = 'Spam spam spam spam. Lovely spam! Wonderful spam!'
+
+        task = self.PendingTask(payload)
+        def do_the_work():
+            tid = threading.get_ident()
+            t = task.create_thread({tid})
+            with threading_helper.start_threads([t]):
+                task.wait_for_result()
+        t = threading.Thread(target=do_the_work)
+        with threading_helper.start_threads([t]):
+            pass
+
+        self.assertEqual(task.result, payload)
+
+    def test_many_subthreads_can_handle_pending_calls(self):
+        main_tid = threading.get_ident()
+        self.assertEqual(threading.main_thread().ident, main_tid)
+
+        # We can't use queue.Queue since it isn't reentrant relative
+        # to pending calls.
+        _queue = deque()
+        _active = deque()
+        _done_lock = threading.Lock()
+        def queue_put(task):
+            _queue.append(task)
+            _active.append(True)
+        def queue_get():
+            try:
+                task = _queue.popleft()
+            except IndexError:
+                raise queue.Empty
+            return task
+        def queue_task_done():
+            _active.pop()
+            if not _active:
+                try:
+                    _done_lock.release()
+                except RuntimeError:
+                    assert not _done_lock.locked()
+        def queue_empty():
+            return not _queue
+        def queue_join():
+            _done_lock.acquire()
+            _done_lock.release()
+
+        tasks = []
+        for i in range(20):
+            task = self.PendingTask(
+                req=f'request {i}',
+                taskid=i,
+                notify_done=queue_task_done,
+            )
+            tasks.append(task)
+            queue_put(task)
+        # This will be released once all the tasks have finished.
+        _done_lock.acquire()
+
+        def add_tasks(worker_tids):
+            while True:
+                if done:
+                    return
+                try:
+                    task = queue_get()
+                except queue.Empty:
+                    break
+                task.run_in_pending_call(worker_tids)
+
+        done = False
+        def run_tasks():
+            while not queue_empty():
+                if done:
+                    return
+                time.sleep(0.01)
+            # Give the worker a chance to handle any remaining pending calls.
+            while not done:
+                time.sleep(0.01)
+
+        # Start the workers and wait for them to finish.
+        worker_threads = [threading.Thread(target=run_tasks)
+                          for _ in range(3)]
+        with threading_helper.start_threads(worker_threads):
+            try:
+                # Add a pending call for each task.
+                worker_tids = [t.ident for t in worker_threads]
+                threads = [threading.Thread(target=add_tasks, args=(worker_tids,))
+                           for _ in range(3)]
+                with threading_helper.start_threads(threads):
+                    try:
+                        pass
+                    except BaseException:
+                        done = True
+                        raise  # re-raise
+                # Wait for the pending calls to finish.
+                queue_join()
+                # Notify the workers that they can stop.
+                done = True
+            except BaseException:
+                done = True
+                raise  # re-raise
+        runner_tids = [t.runner_tid for t in tasks]
+
+        self.assertNotIn(main_tid, runner_tids)
+        for task in tasks:
+            with self.subTest(f'task {task.id}'):
+                self.assertNotEqual(task.requester_tid, main_tid)
+                self.assertNotEqual(task.requester_tid, task.runner_tid)
+                self.assertNotIn(task.requester_tid, runner_tids)
+
+    @requires_subinterpreters
+    def test_isolated_subinterpreter(self):
+        # We exercise the most important permutations.
+
+        # This test relies on pending calls getting called
+        # (eval breaker tripped) at each loop iteration
+        # and at each call.
+
+        maxtext = 250
+        main_interpid = 0
+        interpid = _interpreters.create()
+        _interpreters.run_string(interpid, f"""if True:
+            import json
+            import os
+            import threading
+            import time
+            import _testinternalcapi
+            from test.support import threading_helper
+            """)
+
+        def create_pipe():
+            r, w = os.pipe()
+            self.addCleanup(lambda: os.close(r))
+            self.addCleanup(lambda: os.close(w))
+            return r, w
+
+        with self.subTest('add in main, run in subinterpreter'):
+            r_ready, w_ready = create_pipe()
+            r_done, w_done= create_pipe()
+            timeout = time.time() + 30  # seconds
+
+            def do_work():
+                _interpreters.run_string(interpid, f"""if True:
+                    # Wait until this interp has handled the pending call.
+                    waiting = False
+                    done = False
+                    def wait(os_read=os.read):
+                        global done, waiting
+                        waiting = True
+                        os_read({r_done}, 1)
+                        done = True
+                    t = threading.Thread(target=wait)
+                    with threading_helper.start_threads([t]):
+                        while not waiting:
+                            pass
+                        os.write({w_ready}, b'\\0')
+                        # Loop to trigger the eval breaker.
+                        while not done:
+                            time.sleep(0.01)
+                            if time.time() > {timeout}:
+                                raise Exception('timed out!')
+                    """)
+            t = threading.Thread(target=do_work)
+            with threading_helper.start_threads([t]):
+                os.read(r_ready, 1)
+                # Add the pending call and wait for it to finish.
+                actual = _testinternalcapi.pending_identify(interpid)
+                # Signal the subinterpreter to stop.
+                os.write(w_done, b'\0')
+
+            self.assertEqual(actual, int(interpid))
+
+        with self.subTest('add in main, run in subinterpreter sub-thread'):
+            r_ready, w_ready = create_pipe()
+            r_done, w_done= create_pipe()
+            timeout = time.time() + 30  # seconds
+
+            def do_work():
+                _interpreters.run_string(interpid, f"""if True:
+                    waiting = False
+                    done = False
+                    def subthread():
+                        while not waiting:
+                            pass
+                        os.write({w_ready}, b'\\0')
+                        # Loop to trigger the eval breaker.
+                        while not done:
+                            time.sleep(0.01)
+                            if time.time() > {timeout}:
+                                raise Exception('timed out!')
+                    t = threading.Thread(target=subthread)
+                    with threading_helper.start_threads([t]):
+                        # Wait until this interp has handled the pending call.
+                        waiting = True
+                        os.read({r_done}, 1)
+                        done = True
+                    """)
+            t = threading.Thread(target=do_work)
+            with threading_helper.start_threads([t]):
+                os.read(r_ready, 1)
+                # Add the pending call and wait for it to finish.
+                actual = _testinternalcapi.pending_identify(interpid)
+                # Signal the subinterpreter to stop.
+                os.write(w_done, b'\0')
+
+            self.assertEqual(actual, int(interpid))
+
+        with self.subTest('add in subinterpreter, run in main'):
+            r_ready, w_ready = create_pipe()
+            r_done, w_done= create_pipe()
+            r_data, w_data= create_pipe()
+            timeout = time.time() + 30  # seconds
+
+            def add_job():
+                os.read(r_ready, 1)
+                _interpreters.run_string(interpid, f"""if True:
+                    # Add the pending call and wait for it to finish.
+                    actual = _testinternalcapi.pending_identify({main_interpid})
+                    # Signal the subinterpreter to stop.
+                    os.write({w_done}, b'\\0')
+                    os.write({w_data}, actual.to_bytes(1, 'little'))
+                    """)
+            # Wait until this interp has handled the pending call.
+            waiting = False
+            done = False
+            def wait(os_read=os.read):
+                nonlocal done, waiting
+                waiting = True
+                os_read(r_done, 1)
+                done = True
+            t1 = threading.Thread(target=add_job)
+            t2 = threading.Thread(target=wait)
+            with threading_helper.start_threads([t1, t2]):
+                while not waiting:
+                    pass
+                os.write(w_ready, b'\0')
+                # Loop to trigger the eval breaker.
+                while not done:
+                    time.sleep(0.01)
+                    if time.time() > timeout:
+                        raise Exception('timed out!')
+                text = os.read(r_data, 1)
+            actual = int.from_bytes(text, 'little')
+
+            self.assertEqual(actual, int(main_interpid))
+
+        with self.subTest('add in subinterpreter, run in sub-thread'):
+            r_ready, w_ready = create_pipe()
+            r_done, w_done= create_pipe()
+            r_data, w_data= create_pipe()
+            timeout = time.time() + 30  # seconds
+
+            def add_job():
+                os.read(r_ready, 1)
+                _interpreters.run_string(interpid, f"""if True:
+                    # Add the pending call and wait for it to finish.
+                    actual = _testinternalcapi.pending_identify({main_interpid})
+                    # Signal the subinterpreter to stop.
+                    os.write({w_done}, b'\\0')
+                    os.write({w_data}, actual.to_bytes(1, 'little'))
+                    """)
+            # Wait until this interp has handled the pending call.
+            waiting = False
+            done = False
+            def wait(os_read=os.read):
+                nonlocal done, waiting
+                waiting = True
+                os_read(r_done, 1)
+                done = True
+            def subthread():
+                while not waiting:
+                    pass
+                os.write(w_ready, b'\0')
+                # Loop to trigger the eval breaker.
+                while not done:
+                    time.sleep(0.01)
+                    if time.time() > timeout:
+                        raise Exception('timed out!')
+            t1 = threading.Thread(target=add_job)
+            t2 = threading.Thread(target=wait)
+            t3 = threading.Thread(target=subthread)
+            with threading_helper.start_threads([t1, t2, t3]):
+                pass
+            text = os.read(r_data, 1)
+            actual = int.from_bytes(text, 'little')
+
+            self.assertEqual(actual, int(main_interpid))
+
+        # XXX We can't use the rest until gh-105716 is fixed.
+        return
+
+        with self.subTest('add in subinterpreter, run in subinterpreter sub-thread'):
+            r_ready, w_ready = create_pipe()
+            r_done, w_done= create_pipe()
+            r_data, w_data= create_pipe()
+            timeout = time.time() + 30  # seconds
+
+            def do_work():
+                _interpreters.run_string(interpid, f"""if True:
+                    waiting = False
+                    done = False
+                    def subthread():
+                        while not waiting:
+                            pass
+                        os.write({w_ready}, b'\\0')
+                        # Loop to trigger the eval breaker.
+                        while not done:
+                            time.sleep(0.01)
+                            if time.time() > {timeout}:
+                                raise Exception('timed out!')
+                    t = threading.Thread(target=subthread)
+                    with threading_helper.start_threads([t]):
+                        # Wait until this interp has handled the pending call.
+                        waiting = True
+                        os.read({r_done}, 1)
+                        done = True
+                    """)
+            t = threading.Thread(target=do_work)
+            #with threading_helper.start_threads([t]):
+            t.start()
+            if True:
+                os.read(r_ready, 1)
+                _interpreters.run_string(interpid, f"""if True:
+                    # Add the pending call and wait for it to finish.
+                    actual = _testinternalcapi.pending_identify({interpid})
+                    # Signal the subinterpreter to stop.
+                    os.write({w_done}, b'\\0')
+                    os.write({w_data}, actual.to_bytes(1, 'little'))
+                    """)
+            t.join()
+            text = os.read(r_data, 1)
+            actual = int.from_bytes(text, 'little')
+
+            self.assertEqual(actual, int(interpid))
+
 
 class SubinterpreterTest(unittest.TestCase):
 
diff --git a/Misc/NEWS.d/next/Core and Builtins/2023-06-02-15-15-41.gh-issue-104812.dfZiG5.rst b/Misc/NEWS.d/next/Core and Builtins/2023-06-02-15-15-41.gh-issue-104812.dfZiG5.rst
new file mode 100644 (file)
index 0000000..da29a8c
--- /dev/null
@@ -0,0 +1,9 @@
+The "pending call" machinery now works for all interpreters, not just the
+main interpreter, and runs in all threads, not just the main thread. Some
+calls are still only done in the main thread, ergo in the main interpreter.
+This change does not affect signal handling nor the existing public C-API
+(``Py_AddPendingCall()``), which both still only target the main thread.
+The new functionality is meant strictly for internal use for now, since
+consequences of its use are not well understood yet outside some very
+restricted cases.  This change brings the capability in line with the
+intention when the state was made per-interpreter several years ago.
index d36a911a57c02ca953335457e7a0993276eec9bb..db5be842b8a35c3571d58d246f3d5d639d49a467 100644 (file)
@@ -210,6 +210,7 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
     PyObject *item;
     PyLockStatus r;
     PY_TIMEOUT_T microseconds;
+    PyThreadState *tstate = PyThreadState_Get();
 
     if (block == 0) {
         /* Non-blocking */
@@ -253,7 +254,7 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
             Py_END_ALLOW_THREADS
         }
 
-        if (r == PY_LOCK_INTR && Py_MakePendingCalls() < 0) {
+        if (r == PY_LOCK_INTR && _PyEval_MakePendingCalls(tstate) < 0) {
             return NULL;
         }
         if (r == PY_LOCK_FAILURE) {
index b43dc7fbf3236c3394fe7798633188d0d697a1a8..3de32a32750ebc82b55bb81839d163f4dc8b785d 100644 (file)
 
 #include "Python.h"
 #include "frameobject.h"
+#include "interpreteridobject.h"  // _PyInterpreterID_LookUp()
 #include "pycore_atomic_funcs.h" // _Py_atomic_int_get()
 #include "pycore_bitutils.h"     // _Py_bswap32()
 #include "pycore_compile.h"      // _PyCompile_CodeGen, _PyCompile_OptimizeCfg, _PyCompile_Assemble
+#include "pycore_ceval.h"        // _PyEval_AddPendingCall
 #include "pycore_fileutils.h"    // _Py_normpath
 #include "pycore_frame.h"        // _PyInterpreterFrame
 #include "pycore_gc.h"           // PyGC_Head
 #include "pycore_hashtable.h"    // _Py_hashtable_new()
 #include "pycore_initconfig.h"   // _Py_GetConfigsAsDict()
-#include "pycore_pathconfig.h"   // _PyPathConfig_ClearGlobal()
 #include "pycore_interp.h"       // _PyInterpreterState_GetConfigCopy()
+#include "pycore_pathconfig.h"   // _PyPathConfig_ClearGlobal()
 #include "pycore_pyerrors.h"     // _Py_UTF8_Edit_Cost()
 #include "pycore_pystate.h"      // _PyThreadState_GET()
 #include "osdefs.h"              // MAXPATHLEN
@@ -838,6 +840,120 @@ set_optimizer(PyObject *self, PyObject *opt)
     Py_RETURN_NONE;
 }
 
+
+static int _pending_callback(void *arg)
+{
+    /* we assume the argument is callable object to which we own a reference */
+    PyObject *callable = (PyObject *)arg;
+    PyObject *r = PyObject_CallNoArgs(callable);
+    Py_DECREF(callable);
+    Py_XDECREF(r);
+    return r != NULL ? 0 : -1;
+}
+
+/* The following requests n callbacks to _pending_callback.  It can be
+ * run from any python thread.
+ */
+static PyObject *
+pending_threadfunc(PyObject *self, PyObject *args, PyObject *kwargs)
+{
+    PyObject *callable;
+    int ensure_added = 0;
+    static char *kwlist[] = {"", "ensure_added", NULL};
+    if (!PyArg_ParseTupleAndKeywords(args, kwargs,
+                                     "O|$p:pending_threadfunc", kwlist,
+                                     &callable, &ensure_added))
+    {
+        return NULL;
+    }
+    PyInterpreterState *interp = PyInterpreterState_Get();
+
+    /* create the reference for the callbackwhile we hold the lock */
+    Py_INCREF(callable);
+
+    int r;
+    Py_BEGIN_ALLOW_THREADS
+    r = _PyEval_AddPendingCall(interp, &_pending_callback, callable, 0);
+    Py_END_ALLOW_THREADS
+    if (r < 0) {
+        /* unsuccessful add */
+        if (!ensure_added) {
+            Py_DECREF(callable);
+            Py_RETURN_FALSE;
+        }
+        do {
+            Py_BEGIN_ALLOW_THREADS
+            r = _PyEval_AddPendingCall(interp, &_pending_callback, callable, 0);
+            Py_END_ALLOW_THREADS
+        } while (r < 0);
+    }
+
+    Py_RETURN_TRUE;
+}
+
+
+static struct {
+    int64_t interpid;
+} pending_identify_result;
+
+static int
+_pending_identify_callback(void *arg)
+{
+    PyThread_type_lock mutex = (PyThread_type_lock)arg;
+    assert(pending_identify_result.interpid == -1);
+    PyThreadState *tstate = PyThreadState_Get();
+    pending_identify_result.interpid = PyInterpreterState_GetID(tstate->interp);
+    PyThread_release_lock(mutex);
+    return 0;
+}
+
+static PyObject *
+pending_identify(PyObject *self, PyObject *args)
+{
+    PyObject *interpid;
+    if (!PyArg_ParseTuple(args, "O:pending_identify", &interpid)) {
+        return NULL;
+    }
+    PyInterpreterState *interp = _PyInterpreterID_LookUp(interpid);
+    if (interp == NULL) {
+        if (!PyErr_Occurred()) {
+            PyErr_SetString(PyExc_ValueError, "interpreter not found");
+        }
+        return NULL;
+    }
+
+    pending_identify_result.interpid = -1;
+
+    PyThread_type_lock mutex = PyThread_allocate_lock();
+    if (mutex == NULL) {
+        return NULL;
+    }
+    PyThread_acquire_lock(mutex, WAIT_LOCK);
+    /* It gets released in _pending_identify_callback(). */
+
+    int r;
+    do {
+        Py_BEGIN_ALLOW_THREADS
+        r = _PyEval_AddPendingCall(interp,
+                                   &_pending_identify_callback, (void *)mutex,
+                                   0);
+        Py_END_ALLOW_THREADS
+    } while (r < 0);
+
+    /* Wait for the pending call to complete. */
+    PyThread_acquire_lock(mutex, WAIT_LOCK);
+    PyThread_release_lock(mutex);
+    PyThread_free_lock(mutex);
+
+    PyObject *res = PyLong_FromLongLong(pending_identify_result.interpid);
+    pending_identify_result.interpid = -1;
+    if (res == NULL) {
+        return NULL;
+    }
+    return res;
+}
+
+
 static PyMethodDef module_functions[] = {
     {"get_configs", get_configs, METH_NOARGS},
     {"get_recursion_depth", get_recursion_depth, METH_NOARGS},
@@ -868,6 +984,10 @@ static PyMethodDef module_functions[] = {
     {"iframe_getlasti", iframe_getlasti, METH_O, NULL},
     {"set_optimizer", set_optimizer,  METH_O, NULL},
     {"get_counter_optimizer", get_counter_optimizer, METH_NOARGS, NULL},
+    {"pending_threadfunc", _PyCFunction_CAST(pending_threadfunc),
+     METH_VARARGS | METH_KEYWORDS},
+//    {"pending_fd_identify", pending_fd_identify, METH_VARARGS, NULL},
+    {"pending_identify", pending_identify, METH_VARARGS, NULL},
     {NULL, NULL} /* sentinel */
 };
 
index b6f878e07526dbca3d00bb59b41b7e5507f3faaa..c553d039462af0545769ececb9f6b8d38d2abf40 100644 (file)
@@ -81,6 +81,7 @@ lock_dealloc(lockobject *self)
 static PyLockStatus
 acquire_timed(PyThread_type_lock lock, _PyTime_t timeout)
 {
+    PyThreadState *tstate = _PyThreadState_GET();
     _PyTime_t endtime = 0;
     if (timeout > 0) {
         endtime = _PyDeadline_Init(timeout);
@@ -103,7 +104,7 @@ acquire_timed(PyThread_type_lock lock, _PyTime_t timeout)
             /* Run signal handlers if we were interrupted.  Propagate
              * exceptions from signal handlers, such as KeyboardInterrupt, by
              * passing up PY_LOCK_INTR.  */
-            if (Py_MakePendingCalls() < 0) {
+            if (_PyEval_MakePendingCalls(tstate) < 0) {
                 return PY_LOCK_INTR;
             }
 
index 2350236ad46b250f239439be60bbbb93097edc29..00ea4343735dabf63c3131a2d5dc9546dae9ab29 100644 (file)
@@ -314,7 +314,8 @@ trip_signal(int sig_num)
                        still use it for this exceptional case. */
                     _PyEval_AddPendingCall(interp,
                                            report_wakeup_send_error,
-                                           (void *)(intptr_t) last_error);
+                                           (void *)(intptr_t) last_error,
+                                           1);
                 }
             }
         }
@@ -333,7 +334,8 @@ trip_signal(int sig_num)
                        still use it for this exceptional case. */
                     _PyEval_AddPendingCall(interp,
                                            report_wakeup_write_error,
-                                           (void *)(intptr_t)errno);
+                                           (void *)(intptr_t)errno,
+                                           1);
                 }
             }
         }
index e81b6beedfcee13636501d534a0944a61bb35633..b91f94d2873963298a9400a5c0fd65a35069421c 100644 (file)
@@ -758,6 +758,61 @@ handle_eval_breaker:
      * We need to do reasonably frequently, but not too frequently.
      * All loops should include a check of the eval breaker.
      * We also check on return from any builtin function.
+     *
+     * ## More Details ###
+     *
+     * The eval loop (this function) normally executes the instructions
+     * of a code object sequentially.  However, the runtime supports a
+     * number of out-of-band execution scenarios that may pause that
+     * sequential execution long enough to do that out-of-band work
+     * in the current thread using the current PyThreadState.
+     *
+     * The scenarios include:
+     *
+     *  - cyclic garbage collection
+     *  - GIL drop requests
+     *  - "async" exceptions
+     *  - "pending calls"  (some only in the main thread)
+     *  - signal handling (only in the main thread)
+     *
+     * When the need for one of the above is detected, the eval loop
+     * pauses long enough to handle the detected case.  Then, if doing
+     * so didn't trigger an exception, the eval loop resumes executing
+     * the sequential instructions.
+     *
+     * To make this work, the eval loop periodically checks if any
+     * of the above needs to happen.  The individual checks can be
+     * expensive if computed each time, so a while back we switched
+     * to using pre-computed, per-interpreter variables for the checks,
+     * and later consolidated that to a single "eval breaker" variable
+     * (now a PyInterpreterState field).
+     *
+     * For the longest time, the eval breaker check would happen
+     * frequently, every 5 or so times through the loop, regardless
+     * of what instruction ran last or what would run next.  Then, in
+     * early 2021 (gh-18334, commit 4958f5d), we switched to checking
+     * the eval breaker less frequently, by hard-coding the check to
+     * specific places in the eval loop (e.g. certain instructions).
+     * The intent then was to check after returning from calls
+     * and on the back edges of loops.
+     *
+     * In addition to being more efficient, that approach keeps
+     * the eval loop from running arbitrary code between instructions
+     * that don't handle that well.  (See gh-74174.)
+     *
+     * Currently, the eval breaker check happens here at the
+     * "handle_eval_breaker" label.  Some instructions come here
+     * explicitly (goto) and some indirectly.  Notably, the check
+     * happens on back edges in the control flow graph, which
+     * pretty much applies to all loops and most calls.
+     * (See bytecodes.c for exact information.)
+     *
+     * One consequence of this approach is that it might not be obvious
+     * how to force any specific thread to pick up the eval breaker,
+     * or for any specific thread to not pick it up.  Mostly this
+     * involves judicious uses of locks and careful ordering of code,
+     * while avoiding code that might trigger the eval breaker
+     * until so desired.
      */
     if (_Py_HandlePending(tstate) != 0) {
         goto error;
index 723cf0f4df94d05f3aa02218323f7916f5e04c08..bb1279f46cf9f7f7194f95e0c5ad8a413550124d 100644 (file)
@@ -68,8 +68,9 @@ COMPUTE_EVAL_BREAKER(PyInterpreterState *interp,
         _Py_atomic_load_relaxed_int32(&ceval2->gil_drop_request)
         | (_Py_atomic_load_relaxed_int32(&ceval->signals_pending)
            && _Py_ThreadCanHandleSignals(interp))
-        | (_Py_atomic_load_relaxed_int32(&ceval2->pending.calls_to_do)
-           && _Py_ThreadCanHandlePendingCalls())
+        | (_Py_atomic_load_relaxed_int32(&ceval2->pending.calls_to_do))
+        | (_Py_IsMainThread() && _Py_IsMainInterpreter(interp)
+           &&_Py_atomic_load_relaxed_int32(&ceval->pending_mainthread.calls_to_do))
         | ceval2->pending.async_exc
         | _Py_atomic_load_relaxed_int32(&ceval2->gc_scheduled));
 }
@@ -95,11 +96,11 @@ RESET_GIL_DROP_REQUEST(PyInterpreterState *interp)
 
 
 static inline void
-SIGNAL_PENDING_CALLS(PyInterpreterState *interp)
+SIGNAL_PENDING_CALLS(struct _pending_calls *pending, PyInterpreterState *interp)
 {
     struct _ceval_runtime_state *ceval = &interp->runtime->ceval;
     struct _ceval_state *ceval2 = &interp->ceval;
-    _Py_atomic_store_relaxed(&ceval2->pending.calls_to_do, 1);
+    _Py_atomic_store_relaxed(&pending->calls_to_do, 1);
     COMPUTE_EVAL_BREAKER(interp, ceval, ceval2);
 }
 
@@ -109,6 +110,9 @@ UNSIGNAL_PENDING_CALLS(PyInterpreterState *interp)
 {
     struct _ceval_runtime_state *ceval = &interp->runtime->ceval;
     struct _ceval_state *ceval2 = &interp->ceval;
+    if (_Py_IsMainThread() && _Py_IsMainInterpreter(interp)) {
+        _Py_atomic_store_relaxed(&ceval->pending_mainthread.calls_to_do, 0);
+    }
     _Py_atomic_store_relaxed(&ceval2->pending.calls_to_do, 0);
     COMPUTE_EVAL_BREAKER(interp, ceval, ceval2);
 }
@@ -803,19 +807,31 @@ _push_pending_call(struct _pending_calls *pending,
     return 0;
 }
 
-/* Pop one item off the queue while holding the lock. */
-static void
-_pop_pending_call(struct _pending_calls *pending,
-                  int (**func)(void *), void **arg)
+static int
+_next_pending_call(struct _pending_calls *pending,
+                   int (**func)(void *), void **arg)
 {
     int i = pending->first;
     if (i == pending->last) {
-        return; /* Queue empty */
+        /* Queue empty */
+        assert(pending->calls[i].func == NULL);
+        return -1;
     }
-
     *func = pending->calls[i].func;
     *arg = pending->calls[i].arg;
-    pending->first = (i + 1) % NPENDINGCALLS;
+    return i;
+}
+
+/* Pop one item off the queue while holding the lock. */
+static void
+_pop_pending_call(struct _pending_calls *pending,
+                  int (**func)(void *), void **arg)
+{
+    int i = _next_pending_call(pending, func, arg);
+    if (i >= 0) {
+        pending->calls[i] = (struct _pending_call){0};
+        pending->first = (i + 1) % NPENDINGCALLS;
+    }
 }
 
 /* This implementation is thread-safe.  It allows
@@ -825,9 +841,16 @@ _pop_pending_call(struct _pending_calls *pending,
 
 int
 _PyEval_AddPendingCall(PyInterpreterState *interp,
-                       int (*func)(void *), void *arg)
+                       int (*func)(void *), void *arg,
+                       int mainthreadonly)
 {
+    assert(!mainthreadonly || _Py_IsMainInterpreter(interp));
     struct _pending_calls *pending = &interp->ceval.pending;
+    if (mainthreadonly) {
+        /* The main thread only exists in the main interpreter. */
+        assert(_Py_IsMainInterpreter(interp));
+        pending = &_PyRuntime.ceval.pending_mainthread;
+    }
     /* Ensure that _PyEval_InitState() was called
        and that _PyEval_FiniState() is not called yet. */
     assert(pending->lock != NULL);
@@ -837,39 +860,17 @@ _PyEval_AddPendingCall(PyInterpreterState *interp,
     PyThread_release_lock(pending->lock);
 
     /* signal main loop */
-    SIGNAL_PENDING_CALLS(interp);
+    SIGNAL_PENDING_CALLS(pending, interp);
     return result;
 }
 
 int
 Py_AddPendingCall(int (*func)(void *), void *arg)
 {
-    /* Best-effort to support subinterpreters and calls with the GIL released.
-
-       First attempt _PyThreadState_GET() since it supports subinterpreters.
-
-       If the GIL is released, _PyThreadState_GET() returns NULL . In this
-       case, use PyGILState_GetThisThreadState() which works even if the GIL
-       is released.
-
-       Sadly, PyGILState_GetThisThreadState() doesn't support subinterpreters:
-       see bpo-10915 and bpo-15751.
-
-       Py_AddPendingCall() doesn't require the caller to hold the GIL. */
-    PyThreadState *tstate = _PyThreadState_GET();
-    if (tstate == NULL) {
-        tstate = PyGILState_GetThisThreadState();
-    }
-
-    PyInterpreterState *interp;
-    if (tstate != NULL) {
-        interp = tstate->interp;
-    }
-    else {
-        /* Last resort: use the main interpreter */
-        interp = _PyInterpreterState_Main();
-    }
-    return _PyEval_AddPendingCall(interp, func, arg);
+    /* Legacy users of this API will continue to target the main thread
+       (of the main interpreter). */
+    PyInterpreterState *interp = _PyInterpreterState_Main();
+    return _PyEval_AddPendingCall(interp, func, arg, 1);
 }
 
 static int
@@ -889,27 +890,24 @@ handle_signals(PyThreadState *tstate)
     return 0;
 }
 
-static int
-make_pending_calls(PyInterpreterState *interp)
+static inline int
+maybe_has_pending_calls(PyInterpreterState *interp)
 {
-    /* only execute pending calls on main thread */
-    if (!_Py_ThreadCanHandlePendingCalls()) {
-        return 0;
+    struct _pending_calls *pending = &interp->ceval.pending;
+    if (_Py_atomic_load_relaxed_int32(&pending->calls_to_do)) {
+        return 1;
     }
-
-    /* don't perform recursive pending calls */
-    if (interp->ceval.pending.busy) {
+    if (!_Py_IsMainThread() || !_Py_IsMainInterpreter(interp)) {
         return 0;
     }
-    interp->ceval.pending.busy = 1;
-
-    /* unsignal before starting to call callbacks, so that any callback
-       added in-between re-signals */
-    UNSIGNAL_PENDING_CALLS(interp);
-    int res = 0;
+    pending = &_PyRuntime.ceval.pending_mainthread;
+    return _Py_atomic_load_relaxed_int32(&pending->calls_to_do);
+}
 
+static int
+_make_pending_calls(struct _pending_calls *pending)
+{
     /* perform a bounded number of calls, in case of recursion */
-    struct _pending_calls *pending = &interp->ceval.pending;
     for (int i=0; i<NPENDINGCALLS; i++) {
         int (*func)(void *) = NULL;
         void *arg = NULL;
@@ -923,19 +921,61 @@ make_pending_calls(PyInterpreterState *interp)
         if (func == NULL) {
             break;
         }
-        res = func(arg);
-        if (res) {
-            goto error;
+        if (func(arg) != 0) {
+            return -1;
         }
     }
+    return 0;
+}
+
+static int
+make_pending_calls(PyInterpreterState *interp)
+{
+    struct _pending_calls *pending = &interp->ceval.pending;
+    struct _pending_calls *pending_main = &_PyRuntime.ceval.pending_mainthread;
+
+    /* Only one thread (per interpreter) may run the pending calls
+       at once.  In the same way, we don't do recursive pending calls. */
+    PyThread_acquire_lock(pending->lock, WAIT_LOCK);
+    if (pending->busy) {
+        /* A pending call was added after another thread was already
+           handling the pending calls (and had already "unsignaled").
+           Once that thread is done, it may have taken care of all the
+           pending calls, or there might be some still waiting.
+           Regardless, this interpreter's pending calls will stay
+           "signaled" until that first thread has finished.  At that
+           point the next thread to trip the eval breaker will take
+           care of any remaining pending calls.  Until then, though,
+           all the interpreter's threads will be tripping the eval
+           breaker every time it's checked. */
+        PyThread_release_lock(pending->lock);
+        return 0;
+    }
+    pending->busy = 1;
+    PyThread_release_lock(pending->lock);
+
+    /* unsignal before starting to call callbacks, so that any callback
+       added in-between re-signals */
+    UNSIGNAL_PENDING_CALLS(interp);
+
+    if (_make_pending_calls(pending) != 0) {
+        pending->busy = 0;
+        /* There might not be more calls to make, but we play it safe. */
+        SIGNAL_PENDING_CALLS(pending, interp);
+        return -1;
+    }
 
-    interp->ceval.pending.busy = 0;
-    return res;
+    if (_Py_IsMainThread() && _Py_IsMainInterpreter(interp)) {
+        if (_make_pending_calls(pending_main) != 0) {
+            pending->busy = 0;
+            /* There might not be more calls to make, but we play it safe. */
+            SIGNAL_PENDING_CALLS(pending_main, interp);
+            return -1;
+        }
+    }
 
-error:
-    interp->ceval.pending.busy = 0;
-    SIGNAL_PENDING_CALLS(interp);
-    return res;
+    pending->busy = 0;
+    return 0;
 }
 
 void
@@ -944,12 +984,6 @@ _Py_FinishPendingCalls(PyThreadState *tstate)
     assert(PyGILState_Check());
     assert(is_tstate_valid(tstate));
 
-    struct _pending_calls *pending = &tstate->interp->ceval.pending;
-
-    if (!_Py_atomic_load_relaxed_int32(&(pending->calls_to_do))) {
-        return;
-    }
-
     if (make_pending_calls(tstate->interp) < 0) {
         PyObject *exc = _PyErr_GetRaisedException(tstate);
         PyErr_BadInternalCall();
@@ -958,6 +992,29 @@ _Py_FinishPendingCalls(PyThreadState *tstate)
     }
 }
 
+int
+_PyEval_MakePendingCalls(PyThreadState *tstate)
+{
+    int res;
+
+    if (_Py_IsMainThread() && _Py_IsMainInterpreter(tstate->interp)) {
+        /* Python signal handler doesn't really queue a callback:
+           it only signals that a signal was received,
+           see _PyEval_SignalReceived(). */
+        res = handle_signals(tstate);
+        if (res != 0) {
+            return res;
+        }
+    }
+
+    res = make_pending_calls(tstate->interp);
+    if (res != 0) {
+        return res;
+    }
+
+    return 0;
+}
+
 /* Py_MakePendingCalls() is a simple wrapper for the sake
    of backward-compatibility. */
 int
@@ -968,19 +1025,11 @@ Py_MakePendingCalls(void)
     PyThreadState *tstate = _PyThreadState_GET();
     assert(is_tstate_valid(tstate));
 
-    /* Python signal handler doesn't really queue a callback: it only signals
-       that a signal was received, see _PyEval_SignalReceived(). */
-    int res = handle_signals(tstate);
-    if (res != 0) {
-        return res;
-    }
-
-    res = make_pending_calls(tstate->interp);
-    if (res != 0) {
-        return res;
+    /* Only execute pending calls on the main thread. */
+    if (!_Py_IsMainThread() || !_Py_IsMainInterpreter(tstate->interp)) {
+        return 0;
     }
-
-    return 0;
+    return _PyEval_MakePendingCalls(tstate);
 }
 
 void
@@ -1020,7 +1069,7 @@ _Py_HandlePending(PyThreadState *tstate)
     }
 
     /* Pending calls */
-    if (_Py_atomic_load_relaxed_int32(&interp_ceval_state->pending.calls_to_do)) {
+    if (maybe_has_pending_calls(tstate->interp)) {
         if (make_pending_calls(tstate->interp) != 0) {
             return -1;
         }
index 9ae03d4d783117f6a1f030d648bdccf7feb87f17..d45b739c279737b085745e4b35310ed11e1bcde9 100644 (file)
@@ -2152,6 +2152,9 @@ Py_EndInterpreter(PyThreadState *tstate)
     // Wrap up existing "threading"-module-created, non-daemon threads.
     wait_for_thread_shutdown(tstate);
 
+    // Make any remaining pending calls.
+    _Py_FinishPendingCalls(tstate);
+
     _PyAtExit_Call(tstate->interp);
 
     if (tstate != interp->threads.head || tstate->next != NULL) {
index fb95825ae0977651ad2bc48e2efba355f3e2a62f..52d6b29d78605ac79a65e2c3515dee04d432c46c 100644 (file)
@@ -380,7 +380,7 @@ _Py_COMP_DIAG_IGNORE_DEPR_DECLS
 static const _PyRuntimeState initial = _PyRuntimeState_INIT(_PyRuntime);
 _Py_COMP_DIAG_POP
 
-#define NUMLOCKS 8
+#define NUMLOCKS 9
 #define LOCKS_INIT(runtime) \
     { \
         &(runtime)->interpreters.mutex, \
@@ -388,6 +388,7 @@ _Py_COMP_DIAG_POP
         &(runtime)->getargs.mutex, \
         &(runtime)->unicode_state.ids.lock, \
         &(runtime)->imports.extensions.mutex, \
+        &(runtime)->ceval.pending_mainthread.lock, \
         &(runtime)->atexit.mutex, \
         &(runtime)->audit_hooks.mutex, \
         &(runtime)->allocators.mutex, \
index 607976f5afdc689618bc14e6e18814d880fc7aa3..87d9b39c16113be2c51b7ee97f94405aeb562956 100644 (file)
@@ -517,6 +517,7 @@ Modules/_testcapimodule.c   -       g_type_watchers_installed       -
 Modules/_testimportmultiple.c  -       _barmodule      -
 Modules/_testimportmultiple.c  -       _foomodule      -
 Modules/_testimportmultiple.c  -       _testimportmultiple     -
+Modules/_testinternalcapi.c    -       pending_identify_result -
 Modules/_testmultiphase.c      -       Example_Type_slots      -
 Modules/_testmultiphase.c      -       Example_Type_spec       -
 Modules/_testmultiphase.c      -       Example_methods -