]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-110693: Pending Calls Machinery Cleanups (gh-118296)
authorEric Snow <ericsnowcurrently@gmail.com>
Fri, 26 Apr 2024 01:05:51 +0000 (19:05 -0600)
committerGitHub <noreply@github.com>
Fri, 26 Apr 2024 01:05:51 +0000 (01:05 +0000)
This does some cleanup in preparation for later changes.

Include/internal/pycore_ceval.h
Include/internal/pycore_ceval_state.h
Include/internal/pycore_runtime_init.h
Lib/test/test_capi/test_misc.py
Modules/_testcapimodule.c
Modules/_testinternalcapi.c
Python/ceval_gil.c

index 8d88b5c1d15cb845b367cbf086eec960517665c4..cfb88c3f4c8e1565f005f1775417e4ecba73ca34 100644 (file)
@@ -48,8 +48,12 @@ extern void _PyEval_SignalReceived(void);
 #define _Py_PENDING_MAINTHREADONLY 1
 #define _Py_PENDING_RAWFREE 2
 
+typedef int _Py_add_pending_call_result;
+#define _Py_ADD_PENDING_SUCCESS 0
+#define _Py_ADD_PENDING_FULL -1
+
 // Export for '_testinternalcapi' shared extension
-PyAPI_FUNC(int) _PyEval_AddPendingCall(
+PyAPI_FUNC(_Py_add_pending_call_result) _PyEval_AddPendingCall(
     PyInterpreterState *interp,
     _Py_pending_call_func func,
     void *arg,
index 168295534e036cac8395e3b163bd1f37a1f7859f..1831f58899b7458e0030e658f47786b63c5112e7 100644 (file)
@@ -14,28 +14,56 @@ extern "C" {
 
 typedef int (*_Py_pending_call_func)(void *);
 
+struct _pending_call {
+    _Py_pending_call_func func;
+    void *arg;
+    int flags;
+};
+
+#define PENDINGCALLSARRAYSIZE 32
+
+#define MAXPENDINGCALLS PENDINGCALLSARRAYSIZE
+/* For interpreter-level pending calls, we want to avoid spending too
+   much time on pending calls in any one thread, so we apply a limit. */
+#if MAXPENDINGCALLS > 100
+#  define MAXPENDINGCALLSLOOP 100
+#else
+#  define MAXPENDINGCALLSLOOP MAXPENDINGCALLS
+#endif
+
+#define MAXPENDINGCALLS_MAIN PENDINGCALLSARRAYSIZE
+/* For the main thread, we want to make sure all pending calls are
+   run at once, for the sake of prompt signal handling.  This is
+   unlikely to cause any problems since there should be very few
+   pending calls for the main thread. */
+#define MAXPENDINGCALLSLOOP_MAIN 0
+
 struct _pending_calls {
     int busy;
     PyMutex mutex;
     /* Request for running pending calls. */
-    int32_t calls_to_do;
-#define NPENDINGCALLS 32
-    struct _pending_call {
-        _Py_pending_call_func func;
-        void *arg;
-        int flags;
-    } calls[NPENDINGCALLS];
+    int32_t npending;
+    /* The maximum allowed number of pending calls.
+       If the queue fills up to this point then _PyEval_AddPendingCall()
+       will return _Py_ADD_PENDING_FULL. */
+    int32_t max;
+    /* We don't want a flood of pending calls to interrupt any one thread
+       for too long, so we keep a limit on the number handled per pass.
+       A value of 0 means there is no limit (other than the maximum
+       size of the list of pending calls). */
+    int32_t maxloop;
+    struct _pending_call calls[PENDINGCALLSARRAYSIZE];
     int first;
-    int last;
+    int next;
 };
 
+
 typedef enum {
     PERF_STATUS_FAILED = -1,  // Perf trampoline is in an invalid state
     PERF_STATUS_NO_INIT = 0,  // Perf trampoline is not initialized
     PERF_STATUS_OK = 1,       // Perf trampoline is ready to be executed
 } perf_status_t;
 
-
 #ifdef PY_HAVE_PERF_TRAMPOLINE
 struct code_arena_st;
 
@@ -48,6 +76,7 @@ struct trampoline_api_st {
 };
 #endif
 
+
 struct _ceval_runtime_state {
     struct {
 #ifdef PY_HAVE_PERF_TRAMPOLINE
@@ -62,10 +91,15 @@ struct _ceval_runtime_state {
 #endif
     } perf;
     /* Pending calls to be made only on the main thread. */
+    // The signal machinery falls back on this
+    // so it must be especially stable and efficient.
+    // For example, we use a preallocated array
+    // for the list of pending calls.
     struct _pending_calls pending_mainthread;
     PyMutex sys_trace_profile_mutex;
 };
 
+
 #ifdef PY_HAVE_PERF_TRAMPOLINE
 # define _PyEval_RUNTIME_PERF_INIT \
     { \
index 33c7a9dadfd2a1e03b3e668b3d55c3c9f347e405..41331df8320a9c9aaf4823ef8c28816f7f3188e9 100644 (file)
@@ -114,6 +114,10 @@ extern PyTypeObject _PyExc_MemoryError;
         .autoTSSkey = Py_tss_NEEDS_INIT, \
         .parser = _parser_runtime_state_INIT, \
         .ceval = { \
+            .pending_mainthread = { \
+                .max = MAXPENDINGCALLS_MAIN, \
+                .maxloop = MAXPENDINGCALLSLOOP_MAIN, \
+            }, \
             .perf = _PyEval_RUNTIME_PERF_INIT, \
         }, \
         .gilstate = { \
@@ -166,6 +170,10 @@ extern PyTypeObject _PyExc_MemoryError;
         .imports = IMPORTS_INIT, \
         .ceval = { \
             .recursion_limit = Py_DEFAULT_RECURSION_LIMIT, \
+            .pending = { \
+                .max = MAXPENDINGCALLS, \
+                .maxloop = MAXPENDINGCALLSLOOP, \
+            }, \
         }, \
         .gc = { \
             .enabled = 1, \
index 0701eafb7c36e0f37fc6060bff209e19a71ae9ce..49d1056f05046715334e9336da6d36e8282b4f5c 100644 (file)
@@ -1172,6 +1172,12 @@ class CAPITest(unittest.TestCase):
         self.assertEqual(get_type_fullyqualname(MyType), 'my_qualname')
 
 
+    def test_gen_get_code(self):
+        def genf(): yield
+        gen = genf()
+        self.assertEqual(_testcapi.gen_get_code(gen), gen.gi_code)
+
+
 @requires_limited_api
 class TestHeapTypeRelative(unittest.TestCase):
     """Test API for extending opaque types (PEP 697)"""
@@ -1452,7 +1458,7 @@ class TestPendingCalls(unittest.TestCase):
     # about when pending calls get run.  This is especially relevant
     # here for creating deterministic tests.
 
-    def pendingcalls_submit(self, l, n):
+    def main_pendingcalls_submit(self, l, n):
         def callback():
             #this function can be interrupted by thread switching so let's
             #use an atomic operation
@@ -1467,12 +1473,27 @@ class TestPendingCalls(unittest.TestCase):
                 if _testcapi._pending_threadfunc(callback):
                     break
 
-    def pendingcalls_wait(self, l, n, context = None):
+    def pendingcalls_submit(self, l, n, *, main=True, ensure=False):
+        def callback():
+            #this function can be interrupted by thread switching so let's
+            #use an atomic operation
+            l.append(None)
+
+        if main:
+            return _testcapi._pending_threadfunc(callback, n,
+                                                 blocking=False,
+                                                 ensure_added=ensure)
+        else:
+            return _testinternalcapi.pending_threadfunc(callback, n,
+                                                        blocking=False,
+                                                        ensure_added=ensure)
+
+    def pendingcalls_wait(self, l, numadded, context = None):
         #now, stick around until l[0] has grown to 10
         count = 0
-        while len(l) != n:
+        while len(l) != numadded:
             #this busy loop is where we expect to be interrupted to
-            #run our callbacks.  Note that callbacks are only run on the
+            #run our callbacks.  Note that some callbacks are only run on the
             #main thread
             if False and support.verbose:
                 print("(%i)"%(len(l),),)
@@ -1482,12 +1503,12 @@ class TestPendingCalls(unittest.TestCase):
                 continue
             count += 1
             self.assertTrue(count < 10000,
-                "timeout waiting for %i callbacks, got %i"%(n, len(l)))
+                "timeout waiting for %i callbacks, got %i"%(numadded, len(l)))
         if False and support.verbose:
             print("(%i)"%(len(l),))
 
     @threading_helper.requires_working_threading()
-    def test_pendingcalls_threaded(self):
+    def test_main_pendingcalls_threaded(self):
 
         #do every callback on a separate thread
         n = 32 #total callbacks
@@ -1501,15 +1522,15 @@ class TestPendingCalls(unittest.TestCase):
         context.lock = threading.Lock()
         context.event = threading.Event()
 
-        threads = [threading.Thread(target=self.pendingcalls_thread,
+        threads = [threading.Thread(target=self.main_pendingcalls_thread,
                                     args=(context,))
                    for i in range(context.nThreads)]
         with threading_helper.start_threads(threads):
             self.pendingcalls_wait(context.l, n, context)
 
-    def pendingcalls_thread(self, context):
+    def main_pendingcalls_thread(self, context):
         try:
-            self.pendingcalls_submit(context.l, context.n)
+            self.main_pendingcalls_submit(context.l, context.n)
         finally:
             with context.lock:
                 context.nFinished += 1
@@ -1519,20 +1540,54 @@ class TestPendingCalls(unittest.TestCase):
             if nFinished == context.nThreads:
                 context.event.set()
 
-    def test_pendingcalls_non_threaded(self):
+    def test_main_pendingcalls_non_threaded(self):
         #again, just using the main thread, likely they will all be dispatched at
         #once.  It is ok to ask for too many, because we loop until we find a slot.
         #the loop can be interrupted to dispatch.
         #there are only 32 dispatch slots, so we go for twice that!
         l = []
         n = 64
-        self.pendingcalls_submit(l, n)
+        self.main_pendingcalls_submit(l, n)
         self.pendingcalls_wait(l, n)
 
-    def test_gen_get_code(self):
-        def genf(): yield
-        gen = genf()
-        self.assertEqual(_testcapi.gen_get_code(gen), gen.gi_code)
+    def test_max_pending(self):
+        with self.subTest('main-only'):
+            maxpending = 32
+
+            l = []
+            added = self.pendingcalls_submit(l, 1, main=True)
+            self.pendingcalls_wait(l, added)
+            self.assertEqual(added, 1)
+
+            l = []
+            added = self.pendingcalls_submit(l, maxpending, main=True)
+            self.pendingcalls_wait(l, added)
+            self.assertEqual(added, maxpending)
+
+            l = []
+            added = self.pendingcalls_submit(l, maxpending+1, main=True)
+            self.pendingcalls_wait(l, added)
+            self.assertEqual(added, maxpending)
+
+        with self.subTest('not main-only'):
+            # Per-interpreter pending calls has the same low limit
+            # on how many may be pending at a time.
+            maxpending = 32
+
+            l = []
+            added = self.pendingcalls_submit(l, 1, main=False)
+            self.pendingcalls_wait(l, added)
+            self.assertEqual(added, 1)
+
+            l = []
+            added = self.pendingcalls_submit(l, maxpending, main=False)
+            self.pendingcalls_wait(l, added)
+            self.assertEqual(added, maxpending)
+
+            l = []
+            added = self.pendingcalls_submit(l, maxpending+1, main=False)
+            self.pendingcalls_wait(l, added)
+            self.assertEqual(added, maxpending)
 
     class PendingTask(types.SimpleNamespace):
 
index 034a30fa47ed30f43a3728f1b8fdfd753728688c..0bdd252efdabc750f31366725ad5ddf9e9c4eaeb 100644 (file)
@@ -819,25 +819,55 @@ static int _pending_callback(void *arg)
  * run from any python thread.
  */
 static PyObject *
-pending_threadfunc(PyObject *self, PyObject *arg)
+pending_threadfunc(PyObject *self, PyObject *arg, PyObject *kwargs)
 {
+    static char *kwlist[] = {"callback", "num",
+                             "blocking", "ensure_added", NULL};
     PyObject *callable;
-    int r;
-    if (PyArg_ParseTuple(arg, "O", &callable) == 0)
+    unsigned int num = 1;
+    int blocking = 0;
+    int ensure_added = 0;
+    if (!PyArg_ParseTupleAndKeywords(arg, kwargs,
+                                     "O|I$pp:_pending_threadfunc", kwlist,
+                                     &callable, &num, &blocking, &ensure_added))
+    {
         return NULL;
+    }
 
     /* create the reference for the callbackwhile we hold the lock */
-    Py_INCREF(callable);
+    for (unsigned int i = 0; i < num; i++) {
+        Py_INCREF(callable);
+    }
 
-    Py_BEGIN_ALLOW_THREADS
-    r = Py_AddPendingCall(&_pending_callback, callable);
-    Py_END_ALLOW_THREADS
+    PyThreadState *save_tstate = NULL;
+    if (!blocking) {
+        save_tstate = PyEval_SaveThread();
+    }
+
+    unsigned int num_added = 0;
+    for (; num_added < num; num_added++) {
+        if (ensure_added) {
+            int r;
+            do {
+                r = Py_AddPendingCall(&_pending_callback, callable);
+            } while (r < 0);
+        }
+        else {
+            if (Py_AddPendingCall(&_pending_callback, callable) < 0) {
+                break;
+            }
+        }
+    }
+
+    if (!blocking) {
+        PyEval_RestoreThread(save_tstate);
+    }
 
-    if (r<0) {
+    for (unsigned int i = num_added; i < num; i++) {
         Py_DECREF(callable); /* unsuccessful add, destroy the extra reference */
-        Py_RETURN_FALSE;
     }
-    Py_RETURN_TRUE;
+    /* The callable is decref'ed above in each added _pending_callback(). */
+    return PyLong_FromUnsignedLong((unsigned long)num_added);
 }
 
 /* Test PyOS_string_to_double. */
@@ -3232,7 +3262,8 @@ static PyMethodDef TestMethods[] = {
     {"_spawn_pthread_waiter",   spawn_pthread_waiter,            METH_NOARGS},
     {"_end_spawned_pthread",    end_spawned_pthread,             METH_NOARGS},
 #endif
-    {"_pending_threadfunc",     pending_threadfunc,              METH_VARARGS},
+    {"_pending_threadfunc",     _PyCFunction_CAST(pending_threadfunc),
+     METH_VARARGS|METH_KEYWORDS},
 #ifdef HAVE_GETTIMEOFDAY
     {"profile_int",             profile_int,                     METH_NOARGS},
 #endif
index cc9e1403f87ecd0faf126bc058f0d102e39c547d..b0bba3422a50a0abe2654c3ef8d91a436f85be2b 100644 (file)
@@ -1062,37 +1062,56 @@ static PyObject *
 pending_threadfunc(PyObject *self, PyObject *args, PyObject *kwargs)
 {
     PyObject *callable;
+    unsigned int num = 1;
+    int blocking = 0;
     int ensure_added = 0;
-    static char *kwlist[] = {"", "ensure_added", NULL};
+    static char *kwlist[] = {"callback", "num",
+                             "blocking", "ensure_added", NULL};
     if (!PyArg_ParseTupleAndKeywords(args, kwargs,
-                                     "O|$p:pending_threadfunc", kwlist,
-                                     &callable, &ensure_added))
+                                     "O|I$pp:pending_threadfunc", kwlist,
+                                     &callable, &num, &blocking, &ensure_added))
     {
         return NULL;
     }
     PyInterpreterState *interp = _PyInterpreterState_GET();
 
     /* create the reference for the callbackwhile we hold the lock */
-    Py_INCREF(callable);
+    for (unsigned int i = 0; i < num; i++) {
+        Py_INCREF(callable);
+    }
 
-    int r;
-    Py_BEGIN_ALLOW_THREADS
-    r = _PyEval_AddPendingCall(interp, &_pending_callback, callable, 0);
-    Py_END_ALLOW_THREADS
-    if (r < 0) {
-        /* unsuccessful add */
-        if (!ensure_added) {
-            Py_DECREF(callable);
-            Py_RETURN_FALSE;
+    PyThreadState *save_tstate = NULL;
+    if (!blocking) {
+        save_tstate = PyEval_SaveThread();
+    }
+
+    unsigned int num_added = 0;
+    for (; num_added < num; num_added++) {
+        if (ensure_added) {
+            _Py_add_pending_call_result r;
+            do {
+                r = _PyEval_AddPendingCall(interp, &_pending_callback, callable, 0);
+                assert(r == _Py_ADD_PENDING_SUCCESS
+                       || r == _Py_ADD_PENDING_FULL);
+            } while (r == _Py_ADD_PENDING_FULL);
+        }
+        else {
+            if (_PyEval_AddPendingCall(interp, &_pending_callback, callable, 0) < 0) {
+                break;
+            }
         }
-        do {
-            Py_BEGIN_ALLOW_THREADS
-            r = _PyEval_AddPendingCall(interp, &_pending_callback, callable, 0);
-            Py_END_ALLOW_THREADS
-        } while (r < 0);
     }
 
-    Py_RETURN_TRUE;
+    if (!blocking) {
+        PyEval_RestoreThread(save_tstate);
+    }
+
+    for (unsigned int i = num_added; i < num; i++) {
+        Py_DECREF(callable); /* unsuccessful add, destroy the extra reference */
+    }
+
+    /* The callable is decref'ed in _pending_callback() above. */
+    return PyLong_FromUnsignedLong((unsigned long)num_added);
 }
 
 
@@ -1135,14 +1154,16 @@ pending_identify(PyObject *self, PyObject *args)
     PyThread_acquire_lock(mutex, WAIT_LOCK);
     /* It gets released in _pending_identify_callback(). */
 
-    int r;
+    _Py_add_pending_call_result r;
     do {
         Py_BEGIN_ALLOW_THREADS
         r = _PyEval_AddPendingCall(interp,
                                    &_pending_identify_callback, (void *)mutex,
                                    0);
         Py_END_ALLOW_THREADS
-    } while (r < 0);
+        assert(r == _Py_ADD_PENDING_SUCCESS
+               || r == _Py_ADD_PENDING_FULL);
+    } while (r == _Py_ADD_PENDING_FULL);
 
     /* Wait for the pending call to complete. */
     PyThread_acquire_lock(mutex, WAIT_LOCK);
index c0819d8ab1d8d03a3de01c876057e1ea4a62a358..c3c2c54b199c59c594a3eb6f158cef6ad17bd73b 100644 (file)
@@ -84,15 +84,15 @@ update_eval_breaker_for_thread(PyInterpreterState *interp, PyThreadState *tstate
     return;
 #endif
 
-    int32_t calls_to_do = _Py_atomic_load_int32_relaxed(
-        &interp->ceval.pending.calls_to_do);
-    if (calls_to_do) {
+    int32_t npending = _Py_atomic_load_int32_relaxed(
+        &interp->ceval.pending.npending);
+    if (npending) {
         _Py_set_eval_breaker_bit(tstate, _PY_CALLS_TO_DO_BIT);
     }
     else if (_Py_IsMainThread()) {
-        calls_to_do = _Py_atomic_load_int32_relaxed(
-            &_PyRuntime.ceval.pending_mainthread.calls_to_do);
-        if (calls_to_do) {
+        npending = _Py_atomic_load_int32_relaxed(
+            &_PyRuntime.ceval.pending_mainthread.npending);
+        if (npending) {
             _Py_set_eval_breaker_bit(tstate, _PY_CALLS_TO_DO_BIT);
         }
     }
@@ -624,6 +624,34 @@ PyEval_RestoreThread(PyThreadState *tstate)
 }
 
 
+void
+_PyEval_SignalReceived(void)
+{
+    _Py_set_eval_breaker_bit(_PyRuntime.main_tstate, _PY_SIGNALS_PENDING_BIT);
+}
+
+
+#ifndef Py_GIL_DISABLED
+static void
+signal_active_thread(PyInterpreterState *interp, uintptr_t bit)
+{
+    struct _gil_runtime_state *gil = interp->ceval.gil;
+
+    // If a thread from the targeted interpreter is holding the GIL, signal
+    // that thread. Otherwise, the next thread to run from the targeted
+    // interpreter will have its bit set as part of taking the GIL.
+    MUTEX_LOCK(gil->mutex);
+    if (_Py_atomic_load_int_relaxed(&gil->locked)) {
+        PyThreadState *holder = (PyThreadState*)_Py_atomic_load_ptr_relaxed(&gil->last_holder);
+        if (holder->interp == interp) {
+            _Py_set_eval_breaker_bit(holder, bit);
+        }
+    }
+    MUTEX_UNLOCK(gil->mutex);
+}
+#endif
+
+
 /* Mechanism whereby asynchronously executing callbacks (e.g. UNIX
    signal handlers or Mac I/O completion routines) can schedule calls
    to a function to be called synchronously.
@@ -646,29 +674,31 @@ PyEval_RestoreThread(PyThreadState *tstate)
    threadstate.
 */
 
-void
-_PyEval_SignalReceived(void)
-{
-    _Py_set_eval_breaker_bit(_PyRuntime.main_tstate, _PY_SIGNALS_PENDING_BIT);
-}
-
 /* Push one item onto the queue while holding the lock. */
 static int
 _push_pending_call(struct _pending_calls *pending,
                    _Py_pending_call_func func, void *arg, int flags)
 {
-    int i = pending->last;
-    int j = (i + 1) % NPENDINGCALLS;
-    if (j == pending->first) {
-        return -1; /* Queue full */
+    if (pending->npending == pending->max) {
+        return _Py_ADD_PENDING_FULL;
     }
+    assert(pending->npending < pending->max);
+
+    int i = pending->next;
+    assert(pending->calls[i].func == NULL);
+
     pending->calls[i].func = func;
     pending->calls[i].arg = arg;
     pending->calls[i].flags = flags;
-    pending->last = j;
-    assert(pending->calls_to_do < NPENDINGCALLS);
-    _Py_atomic_add_int32(&pending->calls_to_do, 1);
-    return 0;
+
+    assert(pending->npending < PENDINGCALLSARRAYSIZE);
+    _Py_atomic_add_int32(&pending->npending, 1);
+
+    pending->next = (i + 1) % PENDINGCALLSARRAYSIZE;
+    assert(pending->next != pending->first
+            || pending->npending == pending->max);
+
+    return _Py_ADD_PENDING_SUCCESS;
 }
 
 static int
@@ -676,8 +706,9 @@ _next_pending_call(struct _pending_calls *pending,
                    int (**func)(void *), void **arg, int *flags)
 {
     int i = pending->first;
-    if (i == pending->last) {
+    if (pending->npending == 0) {
         /* Queue empty */
+        assert(i == pending->next);
         assert(pending->calls[i].func == NULL);
         return -1;
     }
@@ -695,38 +726,18 @@ _pop_pending_call(struct _pending_calls *pending,
     int i = _next_pending_call(pending, func, arg, flags);
     if (i >= 0) {
         pending->calls[i] = (struct _pending_call){0};
-        pending->first = (i + 1) % NPENDINGCALLS;
-        assert(pending->calls_to_do > 0);
-        _Py_atomic_add_int32(&pending->calls_to_do, -1);
+        pending->first = (i + 1) % PENDINGCALLSARRAYSIZE;
+        assert(pending->npending > 0);
+        _Py_atomic_add_int32(&pending->npending, -1);
     }
 }
 
-#ifndef Py_GIL_DISABLED
-static void
-signal_active_thread(PyInterpreterState *interp, uintptr_t bit)
-{
-    struct _gil_runtime_state *gil = interp->ceval.gil;
-
-    // If a thread from the targeted interpreter is holding the GIL, signal
-    // that thread. Otherwise, the next thread to run from the targeted
-    // interpreter will have its bit set as part of taking the GIL.
-    MUTEX_LOCK(gil->mutex);
-    if (_Py_atomic_load_int_relaxed(&gil->locked)) {
-        PyThreadState *holder = (PyThreadState*)_Py_atomic_load_ptr_relaxed(&gil->last_holder);
-        if (holder->interp == interp) {
-            _Py_set_eval_breaker_bit(holder, bit);
-        }
-    }
-    MUTEX_UNLOCK(gil->mutex);
-}
-#endif
-
 /* This implementation is thread-safe.  It allows
    scheduling to be made from any thread, and even from an executing
    callback.
  */
 
-int
+_Py_add_pending_call_result
 _PyEval_AddPendingCall(PyInterpreterState *interp,
                        _Py_pending_call_func func, void *arg, int flags)
 {
@@ -739,7 +750,8 @@ _PyEval_AddPendingCall(PyInterpreterState *interp,
     }
 
     PyMutex_Lock(&pending->mutex);
-    int result = _push_pending_call(pending, func, arg, flags);
+    _Py_add_pending_call_result result =
+        _push_pending_call(pending, func, arg, flags);
     PyMutex_Unlock(&pending->mutex);
 
     if (main_only) {
@@ -762,7 +774,15 @@ Py_AddPendingCall(_Py_pending_call_func func, void *arg)
     /* Legacy users of this API will continue to target the main thread
        (of the main interpreter). */
     PyInterpreterState *interp = _PyInterpreterState_Main();
-    return _PyEval_AddPendingCall(interp, func, arg, _Py_PENDING_MAINTHREADONLY);
+    _Py_add_pending_call_result r =
+        _PyEval_AddPendingCall(interp, func, arg, _Py_PENDING_MAINTHREADONLY);
+    if (r == _Py_ADD_PENDING_FULL) {
+        return -1;
+    }
+    else {
+        assert(r == _Py_ADD_PENDING_SUCCESS);
+        return 0;
+    }
 }
 
 static int
@@ -782,10 +802,21 @@ handle_signals(PyThreadState *tstate)
 }
 
 static int
-_make_pending_calls(struct _pending_calls *pending)
+_make_pending_calls(struct _pending_calls *pending, int32_t *p_npending)
 {
+    int res = 0;
+    int32_t npending = -1;
+
+    assert(sizeof(pending->max) <= sizeof(size_t)
+            && ((size_t)pending->max) <= Py_ARRAY_LENGTH(pending->calls));
+    int32_t maxloop = pending->maxloop;
+    if (maxloop == 0) {
+        maxloop = pending->max;
+    }
+    assert(maxloop > 0 && maxloop <= pending->max);
+
     /* perform a bounded number of calls, in case of recursion */
-    for (int i=0; i<NPENDINGCALLS; i++) {
+    for (int i=0; i<maxloop; i++) {
         _Py_pending_call_func func = NULL;
         void *arg = NULL;
         int flags = 0;
@@ -793,21 +824,29 @@ _make_pending_calls(struct _pending_calls *pending)
         /* pop one item off the queue while holding the lock */
         PyMutex_Lock(&pending->mutex);
         _pop_pending_call(pending, &func, &arg, &flags);
+        npending = pending->npending;
         PyMutex_Unlock(&pending->mutex);
 
-        /* having released the lock, perform the callback */
+        /* Check if there are any more pending calls. */
         if (func == NULL) {
+            assert(npending == 0);
             break;
         }
-        int res = func(arg);
+
+        /* having released the lock, perform the callback */
+        res = func(arg);
         if ((flags & _Py_PENDING_RAWFREE) && arg != NULL) {
             PyMem_RawFree(arg);
         }
         if (res != 0) {
-            return -1;
+            res = -1;
+            goto finally;
         }
     }
-    return 0;
+
+finally:
+    *p_npending = npending;
+    return res;
 }
 
 static void
@@ -861,26 +900,36 @@ make_pending_calls(PyThreadState *tstate)
        added in-between re-signals */
     unsignal_pending_calls(tstate, interp);
 
-    if (_make_pending_calls(pending) != 0) {
+    int32_t npending;
+    if (_make_pending_calls(pending, &npending) != 0) {
         pending->busy = 0;
         /* There might not be more calls to make, but we play it safe. */
         signal_pending_calls(tstate, interp);
         return -1;
     }
+    if (npending > 0) {
+        /* We hit pending->maxloop. */
+        signal_pending_calls(tstate, interp);
+    }
 
     if (_Py_IsMainThread() && _Py_IsMainInterpreter(interp)) {
-        if (_make_pending_calls(pending_main) != 0) {
+        if (_make_pending_calls(pending_main, &npending) != 0) {
             pending->busy = 0;
             /* There might not be more calls to make, but we play it safe. */
             signal_pending_calls(tstate, interp);
             return -1;
         }
+        if (npending > 0) {
+            /* We hit pending_main->maxloop. */
+            signal_pending_calls(tstate, interp);
+        }
     }
 
     pending->busy = 0;
     return 0;
 }
 
+
 void
 _Py_set_eval_breaker_bit_all(PyInterpreterState *interp, uintptr_t bit)
 {