The existence of background threads running on a subinterpreter was preventing interpreters from getting properly destroyed, as well as impacting the ability to run the interpreter again. It also affected how we wait for non-daemon threads to finish.
We add PyInterpreterState.threads.main, with some internal C-API functions.
* if it is NULL. */
PyAPI_FUNC(PyThreadState *) _PyThreadState_UncheckedGet(void);
+
// Disable tracing and profiling.
PyAPI_FUNC(void) PyThreadState_EnterTracing(PyThreadState *tstate);
uint64_t next_unique_id;
/* The linked list of threads, newest first. */
PyThreadState *head;
+ /* The thread currently executing in the __main__ module, if any. */
+ PyThreadState *main;
/* Used in Modules/_threadmodule.c. */
long count;
/* Support for runtime thread stack size tuning.
interp == &_PyRuntime._main_interpreter);
}
+// Export for _xxsubinterpreters module.
+PyAPI_FUNC(int) _PyInterpreterState_SetRunningMain(PyInterpreterState *);
+PyAPI_FUNC(void) _PyInterpreterState_SetNotRunningMain(PyInterpreterState *);
+PyAPI_FUNC(int) _PyInterpreterState_IsRunningMain(PyInterpreterState *);
+
static inline const PyConfig *
_Py_GetMainConfig(void)
self.assertTrue(interp.is_running())
self.assertFalse(interp.is_running())
+ def test_finished(self):
+ r, w = os.pipe()
+ interp = interpreters.create()
+ interp.run(f"""if True:
+ import os
+ os.write({w}, b'x')
+ """)
+ self.assertFalse(interp.is_running())
+ self.assertEqual(os.read(r, 1), b'x')
+
def test_from_subinterpreter(self):
interp = interpreters.create()
out = _run_output(interp, dedent(f"""
with self.assertRaises(ValueError):
interp.is_running()
+ def test_with_only_background_threads(self):
+ r_interp, w_interp = os.pipe()
+ r_thread, w_thread = os.pipe()
+
+ DONE = b'D'
+ FINISHED = b'F'
+
+ interp = interpreters.create()
+ interp.run(f"""if True:
+ import os
+ import threading
+
+ def task():
+ v = os.read({r_thread}, 1)
+ assert v == {DONE!r}
+ os.write({w_interp}, {FINISHED!r})
+ t = threading.Thread(target=task)
+ t.start()
+ """)
+ self.assertFalse(interp.is_running())
+
+ os.write(w_thread, DONE)
+ interp.run('t.join()')
+ self.assertEqual(os.read(r_interp, 1), FINISHED)
+
class TestInterpreterClose(TestBase):
interp.close()
self.assertTrue(interp.is_running())
+ def test_subthreads_still_running(self):
+ r_interp, w_interp = os.pipe()
+ r_thread, w_thread = os.pipe()
+
+ FINISHED = b'F'
+
+ interp = interpreters.create()
+ interp.run(f"""if True:
+ import os
+ import threading
+ import time
+
+ done = False
+
+ def notify_fini():
+ global done
+ done = True
+ t.join()
+ threading._register_atexit(notify_fini)
+
+ def task():
+ while not done:
+ time.sleep(0.1)
+ os.write({w_interp}, {FINISHED!r})
+ t = threading.Thread(target=task)
+ t.start()
+ """)
+ interp.close()
+
+ self.assertEqual(os.read(r_interp, 1), FINISHED)
+
class TestInterpreterRun(TestBase):
with self.assertRaises(TypeError):
interp.run(b'print("spam")')
+ def test_with_background_threads_still_running(self):
+ r_interp, w_interp = os.pipe()
+ r_thread, w_thread = os.pipe()
+
+ RAN = b'R'
+ DONE = b'D'
+ FINISHED = b'F'
+
+ interp = interpreters.create()
+ interp.run(f"""if True:
+ import os
+ import threading
+
+ def task():
+ v = os.read({r_thread}, 1)
+ assert v == {DONE!r}
+ os.write({w_interp}, {FINISHED!r})
+ t = threading.Thread(target=task)
+ t.start()
+ os.write({w_interp}, {RAN!r})
+ """)
+ interp.run(f"""if True:
+ os.write({w_interp}, {RAN!r})
+ """)
+
+ os.write(w_thread, DONE)
+ interp.run('t.join()')
+ self.assertEqual(os.read(r_interp, 1), RAN)
+ self.assertEqual(os.read(r_interp, 1), RAN)
+ self.assertEqual(os.read(r_interp, 1), FINISHED)
+
# test_xxsubinterpreters covers the remaining Interpreter.run() behavior.
from test import lock_tests
from test import support
+try:
+ from test.support import interpreters
+except ModuleNotFoundError:
+ interpreters = None
+
threading_helper.requires_working_threading(module=True)
# Between fork() and exec(), only async-safe functions are allowed (issues
return test
+def requires_subinterpreters(meth):
+ """Decorator to skip a test if subinterpreters are not supported."""
+ return unittest.skipIf(interpreters is None,
+ 'subinterpreters required')(meth)
+
+
def restore_default_excepthook(testcase):
testcase.addCleanup(setattr, threading, 'excepthook', threading.excepthook)
threading.excepthook = threading.__excepthook__
# The thread was joined properly.
self.assertEqual(os.read(r, 1), b"x")
+ @requires_subinterpreters
+ def test_threads_join_with_no_main(self):
+ r_interp, w_interp = self.pipe()
+
+ INTERP = b'I'
+ FINI = b'F'
+ DONE = b'D'
+
+ interp = interpreters.create()
+ interp.run(f"""if True:
+ import os
+ import threading
+ import time
+
+ done = False
+
+ def notify_fini():
+ global done
+ done = True
+ os.write({w_interp}, {FINI!r})
+ t.join()
+ threading._register_atexit(notify_fini)
+
+ def task():
+ while not done:
+ time.sleep(0.1)
+ os.write({w_interp}, {DONE!r})
+ t = threading.Thread(target=task)
+ t.start()
+
+ os.write({w_interp}, {INTERP!r})
+ """)
+ interp.close()
+
+ self.assertEqual(os.read(r_interp, 1), INTERP)
+ self.assertEqual(os.read(r_interp, 1), FINI)
+ self.assertEqual(os.read(r_interp, 1), DONE)
+
@cpython_only
def test_daemon_threads_fatal_error(self):
subinterp_code = f"""if 1:
_allocate_lock = _thread.allocate_lock
_set_sentinel = _thread._set_sentinel
get_ident = _thread.get_ident
+_is_main_interpreter = _thread._is_main_interpreter
try:
get_native_id = _thread.get_native_id
_HAVE_THREAD_NATIVE_ID = True
# the main thread's tstate_lock - that won't happen until the interpreter
# is nearly dead. So we release it here. Note that just calling _stop()
# isn't enough: other threads may already be waiting on _tstate_lock.
- if _main_thread._is_stopped:
+ if _main_thread._is_stopped and _is_main_interpreter():
# _shutdown() was already called
return
In normal conditions, the main thread is the thread from which the
Python interpreter was started.
"""
+ # XXX Figure this out for subinterpreters. (See gh-75698.)
return _main_thread
# get thread-local implementation, either from the thread
--- /dev/null
+Subinterpreters now correctly handle the case where they have threads
+running in the background. Before, such threads would interfere with
+cleaning up and destroying them, as well as prevent running another script.
\n\
Handle uncaught Thread.run() exception.");
+static PyObject *
+thread__is_main_interpreter(PyObject *module, PyObject *Py_UNUSED(ignored))
+{
+ PyInterpreterState *interp = _PyInterpreterState_GET();
+ return PyBool_FromLong(_Py_IsMainInterpreter(interp));
+}
+
+PyDoc_STRVAR(thread__is_main_interpreter_doc,
+"_is_main_interpreter()\n\
+\n\
+Return True if the current interpreter is the main Python interpreter.");
+
static PyMethodDef thread_methods[] = {
{"start_new_thread", (PyCFunction)thread_PyThread_start_new_thread,
METH_VARARGS, start_new_doc},
METH_VARARGS, stack_size_doc},
{"_set_sentinel", thread__set_sentinel,
METH_NOARGS, _set_sentinel_doc},
- {"_excepthook", thread_excepthook,
+ {"_excepthook", thread_excepthook,
METH_O, excepthook_doc},
+ {"_is_main_interpreter", thread__is_main_interpreter,
+ METH_NOARGS, thread__is_main_interpreter_doc},
{NULL, NULL} /* sentinel */
};
#include "Python.h"
#include "pycore_initconfig.h" // _PyErr_SetFromPyStatus()
#include "pycore_pyerrors.h" // _PyErr_ChainExceptions1()
+#include "pycore_pystate.h" // _PyInterpreterState_SetRunningMain()
#include "interpreteridobject.h"
}
static int
-_is_running(PyInterpreterState *interp)
-{
- PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
- if (PyThreadState_Next(tstate) != NULL) {
- PyErr_SetString(PyExc_RuntimeError,
- "interpreter has more than one thread");
- return -1;
- }
-
- assert(!PyErr_Occurred());
- struct _PyInterpreterFrame *frame = tstate->current_frame;
- if (frame == NULL) {
- return 0;
- }
- return 1;
-}
-
-static int
-_ensure_not_running(PyInterpreterState *interp)
+_run_script(PyInterpreterState *interp, const char *codestr,
+ _sharedns *shared, _sharedexception *sharedexc)
{
- int is_running = _is_running(interp);
- if (is_running < 0) {
+ if (_PyInterpreterState_SetRunningMain(interp) < 0) {
+ // We skip going through the shared exception.
return -1;
}
- if (is_running) {
- PyErr_Format(PyExc_RuntimeError, "interpreter already running");
- return -1;
- }
- return 0;
-}
-static int
-_run_script(PyInterpreterState *interp, const char *codestr,
- _sharedns *shared, _sharedexception *sharedexc)
-{
PyObject *excval = NULL;
PyObject *main_mod = PyUnstable_InterpreterState_GetMainModule(interp);
if (main_mod == NULL) {
else {
Py_DECREF(result); // We throw away the result.
}
+ _PyInterpreterState_SetNotRunningMain(interp);
*sharedexc = no_exception;
return 0;
}
Py_XDECREF(excval);
assert(!PyErr_Occurred());
+ _PyInterpreterState_SetNotRunningMain(interp);
return -1;
}
_run_script_in_interpreter(PyObject *mod, PyInterpreterState *interp,
const char *codestr, PyObject *shareables)
{
- if (_ensure_not_running(interp) < 0) {
- return -1;
- }
module_state *state = get_module_state(mod);
_sharedns *shared = _get_shared_ns(shareables);
// Switch to interpreter.
PyThreadState *save_tstate = NULL;
if (interp != PyInterpreterState_Get()) {
- // XXX Using the "head" thread isn't strictly correct.
+ // XXX gh-109860: Using the "head" thread isn't strictly correct.
PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
+ assert(tstate != NULL);
+ // Hack (until gh-109860): The interpreter's initial thread state
+ // is least likely to break.
+ while(tstate->next != NULL) {
+ tstate = tstate->next;
+ }
+ // We must do this check before switching interpreters, so any
+ // exception gets raised in the right one.
+ // XXX gh-109860: Drop this redundant check once we stop
+ // re-using tstates that might already be in use.
+ if (_PyInterpreterState_IsRunningMain(interp)) {
+ PyErr_SetString(PyExc_RuntimeError,
+ "interpreter already running");
+ if (shared != NULL) {
+ _sharedns_free(shared);
+ }
+ return -1;
+ }
// XXX Possible GILState issues?
save_tstate = PyThreadState_Swap(tstate);
}
_sharedexception_apply(&exc, state->RunFailedError);
}
else if (result != 0) {
- // We were unable to allocate a shared exception.
- PyErr_NoMemory();
+ if (!PyErr_Occurred()) {
+ // We were unable to allocate a shared exception.
+ PyErr_NoMemory();
+ }
}
if (shared != NULL) {
// Ensure the interpreter isn't running.
/* XXX We *could* support destroying a running interpreter but
aren't going to worry about it for now. */
- if (_ensure_not_running(interp) < 0) {
+ if (_PyInterpreterState_IsRunningMain(interp)) {
+ PyErr_Format(PyExc_RuntimeError, "interpreter running");
return NULL;
}
// Destroy the interpreter.
+ // XXX gh-109860: Using the "head" thread isn't strictly correct.
PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
+ assert(tstate != NULL);
+ // Hack (until gh-109860): The interpreter's initial thread state
+ // is least likely to break.
+ while(tstate->next != NULL) {
+ tstate = tstate->next;
+ }
// XXX Possible GILState issues?
PyThreadState *save_tstate = PyThreadState_Swap(tstate);
Py_EndInterpreter(tstate);
if (interp == NULL) {
return NULL;
}
- int is_running = _is_running(interp);
- if (is_running < 0) {
- return NULL;
- }
- if (is_running) {
+ if (_PyInterpreterState_IsRunningMain(interp)) {
Py_RETURN_TRUE;
}
Py_RETURN_FALSE;
\n\
Return whether or not the identified interpreter is running.");
+
static PyMethodDef module_functions[] = {
{"create", _PyCFunction_CAST(interp_create),
METH_VARARGS | METH_KEYWORDS, create_doc},
pymain_header(config);
+ _PyInterpreterState_SetRunningMain(interp);
+ assert(!PyErr_Occurred());
+
if (config->run_command) {
*exitcode = pymain_run_command(config->run_command);
}
*exitcode = pymain_exit_err_print();
done:
+ _PyInterpreterState_SetNotRunningMain(interp);
Py_XDECREF(main_importer_path);
}
#endif
+int
+_PyInterpreterState_SetRunningMain(PyInterpreterState *interp)
+{
+ if (interp->threads.main != NULL) {
+ PyErr_SetString(PyExc_RuntimeError,
+ "interpreter already running");
+ return -1;
+ }
+ PyThreadState *tstate = current_fast_get(&_PyRuntime);
+ _Py_EnsureTstateNotNULL(tstate);
+ if (tstate->interp != interp) {
+ PyErr_SetString(PyExc_RuntimeError,
+ "current tstate has wrong interpreter");
+ return -1;
+ }
+ interp->threads.main = tstate;
+ return 0;
+}
+
+void
+_PyInterpreterState_SetNotRunningMain(PyInterpreterState *interp)
+{
+ assert(interp->threads.main == current_fast_get(&_PyRuntime));
+ interp->threads.main = NULL;
+}
+
+int
+_PyInterpreterState_IsRunningMain(PyInterpreterState *interp)
+{
+ return (interp->threads.main != NULL);
+}
+
+
//----------
// accessors
//----------
}
+/*************/
+/* Other API */
+/*************/
+
_PyFrameEvalFunction
_PyInterpreterState_GetEvalFrameFunc(PyInterpreterState *interp)
{