]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-105716: Support Background Threads in Subinterpreters Consistently (gh-109921)
authorEric Snow <ericsnowcurrently@gmail.com>
Mon, 2 Oct 2023 20:12:12 +0000 (14:12 -0600)
committerGitHub <noreply@github.com>
Mon, 2 Oct 2023 20:12:12 +0000 (20:12 +0000)
The existence of background threads running on a subinterpreter was preventing interpreters from getting properly destroyed, as well as impacting the ability to run the interpreter again. It also affected how we wait for non-daemon threads to finish.

We add PyInterpreterState.threads.main, with some internal C-API functions.

Include/cpython/pystate.h
Include/internal/pycore_interp.h
Include/internal/pycore_pystate.h
Lib/test/test_interpreters.py
Lib/test/test_threading.py
Lib/threading.py
Misc/NEWS.d/next/Core and Builtins/2023-09-26-14-00-25.gh-issue-105716.SUJkW1.rst [new file with mode: 0644]
Modules/_threadmodule.c
Modules/_xxsubinterpretersmodule.c
Modules/main.c
Python/pystate.c

index 5e184d0ca0944ba87f3c68c9384a66caa8262310..7e4c57efc7c00cddc5f6b712434ef2dd9bbdd8fc 100644 (file)
@@ -211,6 +211,7 @@ struct _ts {
  * if it is NULL. */
 PyAPI_FUNC(PyThreadState *) _PyThreadState_UncheckedGet(void);
 
+
 // Disable tracing and profiling.
 PyAPI_FUNC(void) PyThreadState_EnterTracing(PyThreadState *tstate);
 
index 0912bd175fe4f71fd7342c31761b4ebdc98060ba..ebf02281a7a2a6fe76e368cb2e5ab5940d8c1a87 100644 (file)
@@ -73,6 +73,8 @@ struct _is {
         uint64_t next_unique_id;
         /* The linked list of threads, newest first. */
         PyThreadState *head;
+        /* The thread currently executing in the __main__ module, if any. */
+        PyThreadState *main;
         /* Used in Modules/_threadmodule.c. */
         long count;
         /* Support for runtime thread stack size tuning.
index 2e568f8aeeb152cfa443bf4cd05c3c623e6b83ef..6a36dba3708e3844ff91459b86b4dc26387901ae 100644 (file)
@@ -44,6 +44,11 @@ _Py_IsMainInterpreterFinalizing(PyInterpreterState *interp)
             interp == &_PyRuntime._main_interpreter);
 }
 
+// Export for _xxsubinterpreters module.
+PyAPI_FUNC(int) _PyInterpreterState_SetRunningMain(PyInterpreterState *);
+PyAPI_FUNC(void) _PyInterpreterState_SetNotRunningMain(PyInterpreterState *);
+PyAPI_FUNC(int) _PyInterpreterState_IsRunningMain(PyInterpreterState *);
+
 
 static inline const PyConfig *
 _Py_GetMainConfig(void)
index 9cd71e519036c3121329c93344c8bb467f7d31c7..e62859a9c2b08ec97390081912ff708a3a3b25ff 100644 (file)
@@ -261,6 +261,16 @@ class TestInterpreterIsRunning(TestBase):
             self.assertTrue(interp.is_running())
         self.assertFalse(interp.is_running())
 
+    def test_finished(self):
+        r, w = os.pipe()
+        interp = interpreters.create()
+        interp.run(f"""if True:
+            import os
+            os.write({w}, b'x')
+            """)
+        self.assertFalse(interp.is_running())
+        self.assertEqual(os.read(r, 1), b'x')
+
     def test_from_subinterpreter(self):
         interp = interpreters.create()
         out = _run_output(interp, dedent(f"""
@@ -288,6 +298,31 @@ class TestInterpreterIsRunning(TestBase):
         with self.assertRaises(ValueError):
             interp.is_running()
 
+    def test_with_only_background_threads(self):
+        r_interp, w_interp = os.pipe()
+        r_thread, w_thread = os.pipe()
+
+        DONE = b'D'
+        FINISHED = b'F'
+
+        interp = interpreters.create()
+        interp.run(f"""if True:
+            import os
+            import threading
+
+            def task():
+                v = os.read({r_thread}, 1)
+                assert v == {DONE!r}
+                os.write({w_interp}, {FINISHED!r})
+            t = threading.Thread(target=task)
+            t.start()
+            """)
+        self.assertFalse(interp.is_running())
+
+        os.write(w_thread, DONE)
+        interp.run('t.join()')
+        self.assertEqual(os.read(r_interp, 1), FINISHED)
+
 
 class TestInterpreterClose(TestBase):
 
@@ -389,6 +424,37 @@ class TestInterpreterClose(TestBase):
                 interp.close()
             self.assertTrue(interp.is_running())
 
+    def test_subthreads_still_running(self):
+        r_interp, w_interp = os.pipe()
+        r_thread, w_thread = os.pipe()
+
+        FINISHED = b'F'
+
+        interp = interpreters.create()
+        interp.run(f"""if True:
+            import os
+            import threading
+            import time
+
+            done = False
+
+            def notify_fini():
+                global done
+                done = True
+                t.join()
+            threading._register_atexit(notify_fini)
+
+            def task():
+                while not done:
+                    time.sleep(0.1)
+                os.write({w_interp}, {FINISHED!r})
+            t = threading.Thread(target=task)
+            t.start()
+            """)
+        interp.close()
+
+        self.assertEqual(os.read(r_interp, 1), FINISHED)
+
 
 class TestInterpreterRun(TestBase):
 
@@ -465,6 +531,37 @@ class TestInterpreterRun(TestBase):
         with self.assertRaises(TypeError):
             interp.run(b'print("spam")')
 
+    def test_with_background_threads_still_running(self):
+        r_interp, w_interp = os.pipe()
+        r_thread, w_thread = os.pipe()
+
+        RAN = b'R'
+        DONE = b'D'
+        FINISHED = b'F'
+
+        interp = interpreters.create()
+        interp.run(f"""if True:
+            import os
+            import threading
+
+            def task():
+                v = os.read({r_thread}, 1)
+                assert v == {DONE!r}
+                os.write({w_interp}, {FINISHED!r})
+            t = threading.Thread(target=task)
+            t.start()
+            os.write({w_interp}, {RAN!r})
+            """)
+        interp.run(f"""if True:
+            os.write({w_interp}, {RAN!r})
+            """)
+
+        os.write(w_thread, DONE)
+        interp.run('t.join()')
+        self.assertEqual(os.read(r_interp, 1), RAN)
+        self.assertEqual(os.read(r_interp, 1), RAN)
+        self.assertEqual(os.read(r_interp, 1), FINISHED)
+
     # test_xxsubinterpreters covers the remaining Interpreter.run() behavior.
 
 
index 13bfacbac83f13b7f9b6abe21451d3096da3106b..f8b81942cf17323321880ecc70fa2a068f1bf3fc 100644 (file)
@@ -26,6 +26,11 @@ from unittest import mock
 from test import lock_tests
 from test import support
 
+try:
+    from test.support import interpreters
+except ModuleNotFoundError:
+    interpreters = None
+
 threading_helper.requires_working_threading(module=True)
 
 # Between fork() and exec(), only async-safe functions are allowed (issues
@@ -52,6 +57,12 @@ def skip_unless_reliable_fork(test):
     return test
 
 
+def requires_subinterpreters(meth):
+    """Decorator to skip a test if subinterpreters are not supported."""
+    return unittest.skipIf(interpreters is None,
+                           'subinterpreters required')(meth)
+
+
 def restore_default_excepthook(testcase):
     testcase.addCleanup(setattr, threading, 'excepthook', threading.excepthook)
     threading.excepthook = threading.__excepthook__
@@ -1311,6 +1322,44 @@ class SubinterpThreadingTests(BaseTestCase):
         # The thread was joined properly.
         self.assertEqual(os.read(r, 1), b"x")
 
+    @requires_subinterpreters
+    def test_threads_join_with_no_main(self):
+        r_interp, w_interp = self.pipe()
+
+        INTERP = b'I'
+        FINI = b'F'
+        DONE = b'D'
+
+        interp = interpreters.create()
+        interp.run(f"""if True:
+            import os
+            import threading
+            import time
+
+            done = False
+
+            def notify_fini():
+                global done
+                done = True
+                os.write({w_interp}, {FINI!r})
+                t.join()
+            threading._register_atexit(notify_fini)
+
+            def task():
+                while not done:
+                    time.sleep(0.1)
+                os.write({w_interp}, {DONE!r})
+            t = threading.Thread(target=task)
+            t.start()
+
+            os.write({w_interp}, {INTERP!r})
+            """)
+        interp.close()
+
+        self.assertEqual(os.read(r_interp, 1), INTERP)
+        self.assertEqual(os.read(r_interp, 1), FINI)
+        self.assertEqual(os.read(r_interp, 1), DONE)
+
     @cpython_only
     def test_daemon_threads_fatal_error(self):
         subinterp_code = f"""if 1:
index 31cefd2143a8c42094d10f87436b21c859d93304..0edfaf880f711a2db1f78cc3ed032b373fe61a97 100644 (file)
@@ -38,6 +38,7 @@ _daemon_threads_allowed = _thread.daemon_threads_allowed
 _allocate_lock = _thread.allocate_lock
 _set_sentinel = _thread._set_sentinel
 get_ident = _thread.get_ident
+_is_main_interpreter = _thread._is_main_interpreter
 try:
     get_native_id = _thread.get_native_id
     _HAVE_THREAD_NATIVE_ID = True
@@ -1574,7 +1575,7 @@ def _shutdown():
     # the main thread's tstate_lock - that won't happen until the interpreter
     # is nearly dead.  So we release it here.  Note that just calling _stop()
     # isn't enough:  other threads may already be waiting on _tstate_lock.
-    if _main_thread._is_stopped:
+    if _main_thread._is_stopped and _is_main_interpreter():
         # _shutdown() was already called
         return
 
@@ -1627,6 +1628,7 @@ def main_thread():
     In normal conditions, the main thread is the thread from which the
     Python interpreter was started.
     """
+    # XXX Figure this out for subinterpreters.  (See gh-75698.)
     return _main_thread
 
 # get thread-local implementation, either from the thread
diff --git a/Misc/NEWS.d/next/Core and Builtins/2023-09-26-14-00-25.gh-issue-105716.SUJkW1.rst b/Misc/NEWS.d/next/Core and Builtins/2023-09-26-14-00-25.gh-issue-105716.SUJkW1.rst
new file mode 100644 (file)
index 0000000..b35550f
--- /dev/null
@@ -0,0 +1,3 @@
+Subinterpreters now correctly handle the case where they have threads
+running in the background.  Before, such threads would interfere with
+cleaning up and destroying them, as well as prevent running another script.
index e77e30dfe5e8216dbe966b3894e5da68572e2747..ee46b37d92e128c1348f06378d18d87baa22adbe 100644 (file)
@@ -1605,6 +1605,18 @@ PyDoc_STRVAR(excepthook_doc,
 \n\
 Handle uncaught Thread.run() exception.");
 
+static PyObject *
+thread__is_main_interpreter(PyObject *module, PyObject *Py_UNUSED(ignored))
+{
+    PyInterpreterState *interp = _PyInterpreterState_GET();
+    return PyBool_FromLong(_Py_IsMainInterpreter(interp));
+}
+
+PyDoc_STRVAR(thread__is_main_interpreter_doc,
+"_is_main_interpreter()\n\
+\n\
+Return True if the current interpreter is the main Python interpreter.");
+
 static PyMethodDef thread_methods[] = {
     {"start_new_thread",        (PyCFunction)thread_PyThread_start_new_thread,
      METH_VARARGS, start_new_doc},
@@ -1634,8 +1646,10 @@ static PyMethodDef thread_methods[] = {
      METH_VARARGS, stack_size_doc},
     {"_set_sentinel",           thread__set_sentinel,
      METH_NOARGS, _set_sentinel_doc},
-    {"_excepthook",              thread_excepthook,
+    {"_excepthook",             thread_excepthook,
      METH_O, excepthook_doc},
+    {"_is_main_interpreter",    thread__is_main_interpreter,
+     METH_NOARGS, thread__is_main_interpreter_doc},
     {NULL,                      NULL}           /* sentinel */
 };
 
index 1ddf64909bf18ad9eb4508c2f0eccd0e3f8f3039..e1c7d4ab2fd78f94741ea80f70a20edd258e05c9 100644 (file)
@@ -8,6 +8,7 @@
 #include "Python.h"
 #include "pycore_initconfig.h"    // _PyErr_SetFromPyStatus()
 #include "pycore_pyerrors.h"      // _PyErr_ChainExceptions1()
+#include "pycore_pystate.h"       // _PyInterpreterState_SetRunningMain()
 #include "interpreteridobject.h"
 
 
@@ -358,41 +359,14 @@ exceptions_init(PyObject *mod)
 }
 
 static int
-_is_running(PyInterpreterState *interp)
-{
-    PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
-    if (PyThreadState_Next(tstate) != NULL) {
-        PyErr_SetString(PyExc_RuntimeError,
-                        "interpreter has more than one thread");
-        return -1;
-    }
-
-    assert(!PyErr_Occurred());
-    struct _PyInterpreterFrame *frame = tstate->current_frame;
-    if (frame == NULL) {
-        return 0;
-    }
-    return 1;
-}
-
-static int
-_ensure_not_running(PyInterpreterState *interp)
+_run_script(PyInterpreterState *interp, const char *codestr,
+            _sharedns *shared, _sharedexception *sharedexc)
 {
-    int is_running = _is_running(interp);
-    if (is_running < 0) {
+    if (_PyInterpreterState_SetRunningMain(interp) < 0) {
+        // We skip going through the shared exception.
         return -1;
     }
-    if (is_running) {
-        PyErr_Format(PyExc_RuntimeError, "interpreter already running");
-        return -1;
-    }
-    return 0;
-}
 
-static int
-_run_script(PyInterpreterState *interp, const char *codestr,
-            _sharedns *shared, _sharedexception *sharedexc)
-{
     PyObject *excval = NULL;
     PyObject *main_mod = PyUnstable_InterpreterState_GetMainModule(interp);
     if (main_mod == NULL) {
@@ -422,6 +396,7 @@ _run_script(PyInterpreterState *interp, const char *codestr,
     else {
         Py_DECREF(result);  // We throw away the result.
     }
+    _PyInterpreterState_SetNotRunningMain(interp);
 
     *sharedexc = no_exception;
     return 0;
@@ -437,6 +412,7 @@ error:
     }
     Py_XDECREF(excval);
     assert(!PyErr_Occurred());
+    _PyInterpreterState_SetNotRunningMain(interp);
     return -1;
 }
 
@@ -444,9 +420,6 @@ static int
 _run_script_in_interpreter(PyObject *mod, PyInterpreterState *interp,
                            const char *codestr, PyObject *shareables)
 {
-    if (_ensure_not_running(interp) < 0) {
-        return -1;
-    }
     module_state *state = get_module_state(mod);
 
     _sharedns *shared = _get_shared_ns(shareables);
@@ -457,8 +430,26 @@ _run_script_in_interpreter(PyObject *mod, PyInterpreterState *interp,
     // Switch to interpreter.
     PyThreadState *save_tstate = NULL;
     if (interp != PyInterpreterState_Get()) {
-        // XXX Using the "head" thread isn't strictly correct.
+        // XXX gh-109860: Using the "head" thread isn't strictly correct.
         PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
+        assert(tstate != NULL);
+        // Hack (until gh-109860): The interpreter's initial thread state
+        // is least likely to break.
+        while(tstate->next != NULL) {
+            tstate = tstate->next;
+        }
+        // We must do this check before switching interpreters, so any
+        // exception gets raised in the right one.
+        // XXX gh-109860: Drop this redundant check once we stop
+        // re-using tstates that might already be in use.
+        if (_PyInterpreterState_IsRunningMain(interp)) {
+            PyErr_SetString(PyExc_RuntimeError,
+                            "interpreter already running");
+            if (shared != NULL) {
+                _sharedns_free(shared);
+            }
+            return -1;
+        }
         // XXX Possible GILState issues?
         save_tstate = PyThreadState_Swap(tstate);
     }
@@ -478,8 +469,10 @@ _run_script_in_interpreter(PyObject *mod, PyInterpreterState *interp,
         _sharedexception_apply(&exc, state->RunFailedError);
     }
     else if (result != 0) {
-        // We were unable to allocate a shared exception.
-        PyErr_NoMemory();
+        if (!PyErr_Occurred()) {
+            // We were unable to allocate a shared exception.
+            PyErr_NoMemory();
+        }
     }
 
     if (shared != NULL) {
@@ -574,12 +567,20 @@ interp_destroy(PyObject *self, PyObject *args, PyObject *kwds)
     // Ensure the interpreter isn't running.
     /* XXX We *could* support destroying a running interpreter but
        aren't going to worry about it for now. */
-    if (_ensure_not_running(interp) < 0) {
+    if (_PyInterpreterState_IsRunningMain(interp)) {
+        PyErr_Format(PyExc_RuntimeError, "interpreter running");
         return NULL;
     }
 
     // Destroy the interpreter.
+    // XXX gh-109860: Using the "head" thread isn't strictly correct.
     PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
+    assert(tstate != NULL);
+    // Hack (until gh-109860): The interpreter's initial thread state
+    // is least likely to break.
+    while(tstate->next != NULL) {
+        tstate = tstate->next;
+    }
     // XXX Possible GILState issues?
     PyThreadState *save_tstate = PyThreadState_Swap(tstate);
     Py_EndInterpreter(tstate);
@@ -748,11 +749,7 @@ interp_is_running(PyObject *self, PyObject *args, PyObject *kwds)
     if (interp == NULL) {
         return NULL;
     }
-    int is_running = _is_running(interp);
-    if (is_running < 0) {
-        return NULL;
-    }
-    if (is_running) {
+    if (_PyInterpreterState_IsRunningMain(interp)) {
         Py_RETURN_TRUE;
     }
     Py_RETURN_FALSE;
@@ -763,6 +760,7 @@ PyDoc_STRVAR(is_running_doc,
 \n\
 Return whether or not the identified interpreter is running.");
 
+
 static PyMethodDef module_functions[] = {
     {"create",                    _PyCFunction_CAST(interp_create),
      METH_VARARGS | METH_KEYWORDS, create_doc},
index 8184bedca027a3c2e0592e0de1542eb0f6dc4666..b5ee34d0141daf89400a28c648a616671e0bc44d 100644 (file)
@@ -612,6 +612,9 @@ pymain_run_python(int *exitcode)
 
     pymain_header(config);
 
+    _PyInterpreterState_SetRunningMain(interp);
+    assert(!PyErr_Occurred());
+
     if (config->run_command) {
         *exitcode = pymain_run_command(config->run_command);
     }
@@ -635,6 +638,7 @@ error:
     *exitcode = pymain_exit_err_print();
 
 done:
+    _PyInterpreterState_SetNotRunningMain(interp);
     Py_XDECREF(main_importer_path);
 }
 
index 01aa2552e56f0de9f84a2ebe7e8561c19902143d..fe056bf4687026723ae12f59a2c5ebe5fa645cbf 100644 (file)
@@ -1091,6 +1091,39 @@ _PyInterpreterState_DeleteExceptMain(_PyRuntimeState *runtime)
 #endif
 
 
+int
+_PyInterpreterState_SetRunningMain(PyInterpreterState *interp)
+{
+    if (interp->threads.main != NULL) {
+        PyErr_SetString(PyExc_RuntimeError,
+                        "interpreter already running");
+        return -1;
+    }
+    PyThreadState *tstate = current_fast_get(&_PyRuntime);
+    _Py_EnsureTstateNotNULL(tstate);
+    if (tstate->interp != interp) {
+        PyErr_SetString(PyExc_RuntimeError,
+                        "current tstate has wrong interpreter");
+        return -1;
+    }
+    interp->threads.main = tstate;
+    return 0;
+}
+
+void
+_PyInterpreterState_SetNotRunningMain(PyInterpreterState *interp)
+{
+    assert(interp->threads.main == current_fast_get(&_PyRuntime));
+    interp->threads.main = NULL;
+}
+
+int
+_PyInterpreterState_IsRunningMain(PyInterpreterState *interp)
+{
+    return (interp->threads.main != NULL);
+}
+
+
 //----------
 // accessors
 //----------
@@ -2801,6 +2834,10 @@ _register_builtins_for_crossinterpreter_data(struct _xidregistry *xidregistry)
 }
 
 
+/*************/
+/* Other API */
+/*************/
+
 _PyFrameEvalFunction
 _PyInterpreterState_GetEvalFrameFunc(PyInterpreterState *interp)
 {