self.assertEqual(out.strip(), b"OK")
self.assertIn(b"can't create new thread at interpreter shutdown", err)
+ def test_start_new_thread_failed(self):
+ # gh-109746: if Python fails to start newly created thread
+ # due to failure of underlying PyThread_start_new_thread() call,
+ # its state should be removed from interpreter' thread states list
+ # to avoid its double cleanup
+ try:
+ from resource import setrlimit, RLIMIT_NPROC
+ except ImportError as err:
+ self.skipTest(err) # RLIMIT_NPROC is specific to Linux and BSD
+ code = """if 1:
+ import resource
+ import _thread
+
+ def f():
+ print("shouldn't be printed")
+
+ limits = resource.getrlimit(resource.RLIMIT_NPROC)
+ [_, hard] = limits
+ resource.setrlimit(resource.RLIMIT_NPROC, (0, hard))
+
+ try:
+ _thread.start_new_thread(f, ())
+ except RuntimeError:
+ print('ok')
+ else:
+ print('skip')
+ """
+ _, out, err = assert_python_ok("-u", "-c", code)
+ out = out.strip()
+ if out == b'skip':
+ self.skipTest('RLIMIT_NPROC had no effect; probably superuser')
+ self.assertEqual(out, b'ok')
+ self.assertEqual(err, b'')
+
+
class ThreadJoinOnShutdown(BaseTestCase):
def _run_and_join(self, script):
if (ident == PYTHREAD_INVALID_THREAD_ID) {
PyErr_SetString(ThreadError, "can't start new thread");
PyThreadState_Clear(boot->tstate);
+ PyThreadState_Delete(boot->tstate);
thread_bootstate_free(boot, 1);
return NULL;
}