# want them to affect the rest of the tests.
script_helper.assert_python_ok("-c", textwrap.dedent(source))
+ @threading_helper.requires_working_threading()
+ def test_thread_created_in_atexit(self):
+ source = """if True:
+ import atexit
+ import threading
+ import time
+
+
+ def run():
+ print(24)
+ time.sleep(1)
+ print(42)
+
+ @atexit.register
+ def start_thread():
+ threading.Thread(target=run).start()
+ """
+ return_code, stdout, stderr = script_helper.assert_python_ok("-c", source)
+ self.assertEqual(return_code, 0)
+ self.assertEqual(stdout, f"24{os.linesep}42{os.linesep}".encode("utf-8"))
+ self.assertEqual(stderr, b"")
+
+ @threading_helper.requires_working_threading()
+ @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
+ def test_thread_created_in_atexit_subinterpreter(self):
+ try:
+ from concurrent import interpreters
+ except ImportError:
+ self.skipTest("subinterpreters are not available")
+
+ read, write = os.pipe()
+ source = f"""if True:
+ import atexit
+ import threading
+ import time
+ import os
+
+ def run():
+ os.write({write}, b'spanish')
+ time.sleep(1)
+ os.write({write}, b'inquisition')
+
+ @atexit.register
+ def start_thread():
+ threading.Thread(target=run).start()
+ """
+ interp = interpreters.create()
+ try:
+ interp.exec(source)
+
+ # Close the interpreter to invoke atexit callbacks
+ interp.close()
+ self.assertEqual(os.read(read, 100), b"spanishinquisition")
+ finally:
+ os.close(read)
+ os.close(write)
@support.cpython_only
class SubinterpreterTest(unittest.TestCase):
from test import support
from test.support import MISSING_C_DOCSTRINGS
from test.support import import_helper
+from test.support import script_helper
from test.support import threading_helper
from test.support import warnings_helper
from test.support import requires_limited_api
self.assertEqual(actual, int(interpid))
+ @threading_helper.requires_working_threading()
+ def test_pending_call_creates_thread(self):
+ source = """
+ import _testinternalcapi
+ import threading
+ import time
+
+
+ def output():
+ print(24)
+ time.sleep(1)
+ print(42)
+
+
+ def callback():
+ threading.Thread(target=output).start()
+
+
+ def create_pending_call():
+ time.sleep(1)
+ _testinternalcapi.simple_pending_call(callback)
+
+
+ threading.Thread(target=create_pending_call).start()
+ """
+ return_code, stdout, stderr = script_helper.assert_python_ok('-c', textwrap.dedent(source))
+ self.assertEqual(return_code, 0)
+ self.assertEqual(stdout, f"24{os.linesep}42{os.linesep}".encode("utf-8"))
+ self.assertEqual(stderr, b"")
+
class SubinterpreterTest(unittest.TestCase):
subinterp_attr_id = os.read(r, 100)
self.assertEqual(main_attr_id, subinterp_attr_id)
+ @threading_helper.requires_working_threading()
+ @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
+ @requires_subinterpreters
+ def test_pending_call_creates_thread_subinterpreter(self):
+ interpreters = import_helper.import_module("concurrent.interpreters")
+ r, w = os.pipe()
+ source = f"""if True:
+ import _testinternalcapi
+ import threading
+ import time
+ import os
+
+
+ def output():
+ time.sleep(1)
+ os.write({w}, b"x")
+
+
+ def callback():
+ threading.Thread(target=output).start()
+
+
+ def create_pending_call():
+ time.sleep(1)
+ _testinternalcapi.simple_pending_call(callback)
+
+
+ threading.Thread(target=create_pending_call).start()
+ """
+ interp = interpreters.create()
+ interp.exec(source)
+ interp.close()
+ data = os.read(r, 1)
+ self.assertEqual(data, b"x")
+
@requires_subinterpreters
class InterpreterConfigTests(unittest.TestCase):
return main_tstate;
}
+#ifdef Py_GIL_DISABLED
+#define ASSERT_WORLD_STOPPED(interp) assert(interp->runtime->stoptheworld.world_stopped)
+#else
+#define ASSERT_WORLD_STOPPED(interp)
+#endif
+
+static int
+interp_has_threads(PyInterpreterState *interp)
+{
+ /* This needs to check for non-daemon threads only, otherwise we get stuck
+ * in an infinite loop. */
+ assert(interp != NULL);
+ ASSERT_WORLD_STOPPED(interp);
+ assert(interp->threads.head != NULL);
+ if (interp->threads.head->next == NULL) {
+ // No other threads active, easy way out.
+ return 0;
+ }
+
+ // We don't have to worry about locking this because the
+ // world is stopped.
+ _Py_FOR_EACH_TSTATE_UNLOCKED(interp, tstate) {
+ if (tstate->_whence == _PyThreadState_WHENCE_THREADING) {
+ return 1;
+ }
+ }
+
+ return 0;
+}
+
+static int
+interp_has_pending_calls(PyInterpreterState *interp)
+{
+ assert(interp != NULL);
+ ASSERT_WORLD_STOPPED(interp);
+ return interp->ceval.pending.npending != 0;
+}
+
+static int
+interp_has_atexit_callbacks(PyInterpreterState *interp)
+{
+ assert(interp != NULL);
+ assert(interp->atexit.callbacks != NULL);
+ ASSERT_WORLD_STOPPED(interp);
+ assert(PyList_CheckExact(interp->atexit.callbacks));
+ return PyList_GET_SIZE(interp->atexit.callbacks) != 0;
+}
+
+static int
+runtime_has_subinterpreters(_PyRuntimeState *runtime)
+{
+ assert(runtime != NULL);
+ HEAD_LOCK(runtime);
+ PyInterpreterState *interp = runtime->interpreters.head;
+ HEAD_UNLOCK(runtime);
+ return interp->next != NULL;
+}
+
+static void
+make_pre_finalization_calls(PyThreadState *tstate, int subinterpreters)
+{
+ assert(tstate != NULL);
+ PyInterpreterState *interp = tstate->interp;
+ /* Each of these functions can start one another, e.g. a pending call
+ * could start a thread or vice versa. To ensure that we properly clean
+ * call everything, we run these in a loop until none of them run anything. */
+ for (;;) {
+ assert(!interp->runtime->stoptheworld.world_stopped);
+
+ // Wrap up existing "threading"-module-created, non-daemon threads.
+ wait_for_thread_shutdown(tstate);
+
+ // Make any remaining pending calls.
+ _Py_FinishPendingCalls(tstate);
+
+ /* The interpreter is still entirely intact at this point, and the
+ * exit funcs may be relying on that. In particular, if some thread
+ * or exit func is still waiting to do an import, the import machinery
+ * expects Py_IsInitialized() to return true. So don't say the
+ * runtime is uninitialized until after the exit funcs have run.
+ * Note that Threading.py uses an exit func to do a join on all the
+ * threads created thru it, so this also protects pending imports in
+ * the threads created via Threading.
+ */
+
+ _PyAtExit_Call(tstate->interp);
+
+ if (subinterpreters) {
+ /* Clean up any lingering subinterpreters.
+
+ Two preconditions need to be met here:
+
+ - This has to happen before _PyRuntimeState_SetFinalizing is
+ called, or else threads might get prematurely blocked.
+ - The world must not be stopped, as finalizers can run.
+ */
+ finalize_subinterpreters();
+ }
+
+
+ /* Stop the world to prevent other threads from creating threads or
+ * atexit callbacks. On the default build, this is simply locked by
+ * the GIL. For pending calls, we acquire the dedicated mutex, because
+ * Py_AddPendingCall() can be called without an attached thread state.
+ */
+
+ PyMutex_Lock(&interp->ceval.pending.mutex);
+ // XXX Why does _PyThreadState_DeleteList() rely on all interpreters
+ // being stopped?
+ _PyEval_StopTheWorldAll(interp->runtime);
+ int has_subinterpreters = subinterpreters
+ ? runtime_has_subinterpreters(interp->runtime)
+ : 0;
+ int should_continue = (interp_has_threads(interp)
+ || interp_has_atexit_callbacks(interp)
+ || interp_has_pending_calls(interp)
+ || has_subinterpreters);
+ if (!should_continue) {
+ break;
+ }
+ _PyEval_StartTheWorldAll(interp->runtime);
+ PyMutex_Unlock(&interp->ceval.pending.mutex);
+ }
+ assert(PyMutex_IsLocked(&interp->ceval.pending.mutex));
+ ASSERT_WORLD_STOPPED(interp);
+}
+
static int
_Py_Finalize(_PyRuntimeState *runtime)
{
// Block some operations.
tstate->interp->finalizing = 1;
- // Wrap up existing "threading"-module-created, non-daemon threads.
- wait_for_thread_shutdown(tstate);
-
- // Make any remaining pending calls.
- _Py_FinishPendingCalls(tstate);
-
- /* The interpreter is still entirely intact at this point, and the
- * exit funcs may be relying on that. In particular, if some thread
- * or exit func is still waiting to do an import, the import machinery
- * expects Py_IsInitialized() to return true. So don't say the
- * runtime is uninitialized until after the exit funcs have run.
- * Note that Threading.py uses an exit func to do a join on all the
- * threads created thru it, so this also protects pending imports in
- * the threads created via Threading.
- */
-
- _PyAtExit_Call(tstate->interp);
-
- /* Clean up any lingering subinterpreters.
-
- Two preconditions need to be met here:
-
- - This has to happen before _PyRuntimeState_SetFinalizing is
- called, or else threads might get prematurely blocked.
- - The world must not be stopped, as finalizers can run.
- */
- finalize_subinterpreters();
+ // This call stops the world and takes the pending calls lock.
+ make_pre_finalization_calls(tstate, /*subinterpreters=*/1);
assert(_PyThreadState_GET() == tstate);
#endif
/* Ensure that remaining threads are detached */
- _PyEval_StopTheWorldAll(runtime);
+ ASSERT_WORLD_STOPPED(tstate->interp);
/* Remaining daemon threads will be trapped in PyThread_hang_thread
when they attempt to take the GIL (ex: PyEval_RestoreThread()). */
_PyThreadState_SetShuttingDown(p);
}
_PyEval_StartTheWorldAll(runtime);
+ PyMutex_Unlock(&tstate->interp->ceval.pending.mutex);
/* Clear frames of other threads to call objects destructors. Destructors
will be called in the current Python thread. Since
}
interp->finalizing = 1;
- // Wrap up existing "threading"-module-created, non-daemon threads.
- wait_for_thread_shutdown(tstate);
-
- // Make any remaining pending calls.
- _Py_FinishPendingCalls(tstate);
+ // This call stops the world and takes the pending calls lock.
+ make_pre_finalization_calls(tstate, /*subinterpreters=*/0);
- _PyAtExit_Call(tstate->interp);
- _PyRuntimeState *runtime = interp->runtime;
- _PyEval_StopTheWorldAll(runtime);
+ ASSERT_WORLD_STOPPED(interp);
/* Remaining daemon threads will automatically exit
when they attempt to take the GIL (ex: PyEval_RestoreThread()). */
_PyInterpreterState_SetFinalizing(interp, tstate);
_PyThreadState_SetShuttingDown(p);
}
- _PyEval_StartTheWorldAll(runtime);
+ _PyEval_StartTheWorldAll(interp->runtime);
+ PyMutex_Unlock(&interp->ceval.pending.mutex);
_PyThreadState_DeleteList(list, /*is_after_fork=*/0);
// XXX Call something like _PyImport_Disable() here?