import contextlib
import os
import pickle
+import signal
import sys
from textwrap import dedent
import threading
from test.support import os_helper
from test.support import script_helper
from test.support import import_helper
-from test.support.script_helper import assert_python_ok
+from test.support.script_helper import assert_python_ok, spawn_python
# Raise SkipTest if subinterpreters not supported.
_interpreters = import_helper.import_module('_interpreters')
from concurrent import interpreters
self.assertIn(b"remaining subinterpreters", stdout)
self.assertNotIn(b"Traceback", stdout)
+ @support.requires_subprocess()
+ @unittest.skipIf(os.name == 'nt', "signals don't work well on windows")
+ def test_keyboard_interrupt_in_thread_running_interp(self):
+ import subprocess
+ source = f"""if True:
+ from concurrent import interpreters
+ from threading import Thread
+
+ def test():
+ import time
+ print('a', flush=True, end='')
+ time.sleep(10)
+
+ interp = interpreters.create()
+ interp.call_in_thread(test)
+ """
+
+ with spawn_python("-c", source, stderr=subprocess.PIPE) as proc:
+ self.assertEqual(proc.stdout.read(1), b'a')
+ proc.send_signal(signal.SIGINT)
+ proc.stderr.flush()
+ error = proc.stderr.read()
+ self.assertIn(b"KeyboardInterrupt", error)
+ retcode = proc.wait()
+ self.assertEqual(retcode, 0)
class TestInterpreterIsRunning(TestBase):
{
assert(tstate->_status.initialized && !tstate->_status.cleared);
assert(current_fast_get()->interp == tstate->interp);
- assert(!_PyThreadState_IsRunningMain(tstate));
+ // GH-126016: In the _interpreters module, KeyboardInterrupt exceptions
+ // during PyEval_EvalCode() are sent to finalization, which doesn't let us
+ // mark threads as "not running main". So, for now this assertion is
+ // disabled.
+ // XXX assert(!_PyThreadState_IsRunningMain(tstate));
// XXX assert(!tstate->_status.bound || tstate->_status.unbound);
tstate->_status.finalizing = 1; // just in case