]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
GH-110829: Ensure Thread.join() joins the OS thread (#110848)
authorAntoine Pitrou <antoine@python.org>
Sat, 4 Nov 2023 13:59:24 +0000 (14:59 +0100)
committerGitHub <noreply@github.com>
Sat, 4 Nov 2023 13:59:24 +0000 (13:59 +0000)
Joining a thread now ensures the underlying OS thread has exited. This is required for safer fork() in multi-threaded processes.

---------

Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com>
14 files changed:
Include/cpython/pthread_stubs.h
Include/internal/pycore_pythread.h
Lib/test/_test_multiprocessing.py
Lib/test/audit-tests.py
Lib/test/test_audit.py
Lib/test/test_concurrent_futures/test_process_pool.py
Lib/test/test_thread.py
Lib/test/test_threading.py
Lib/threading.py
Misc/NEWS.d/next/Core and Builtins/2023-11-04-13-36-51.gh-issue-110829.Pa0CJI.rst [new file with mode: 0644]
Modules/_threadmodule.c
Python/thread_nt.h
Python/thread_pthread.h
Python/thread_pthread_stubs.h

index 5246968ea05476d50099d4972fca12d8eaef1150..e542eaa5bff0cf0322441f18e05da6dba35ab417 100644 (file)
@@ -83,6 +83,7 @@ PyAPI_FUNC(int) pthread_create(pthread_t *restrict thread,
                                void *(*start_routine)(void *),
                                void *restrict arg);
 PyAPI_FUNC(int) pthread_detach(pthread_t thread);
+PyAPI_FUNC(int) pthread_join(pthread_t thread, void** value_ptr);
 PyAPI_FUNC(pthread_t) pthread_self(void);
 PyAPI_FUNC(int) pthread_exit(void *retval) __attribute__ ((__noreturn__));
 PyAPI_FUNC(int) pthread_attr_init(pthread_attr_t *attr);
index d31ffc781305349ff019f686fb8108a8fca9422a..9c9a09f60f34418bbd4257bcb6540295df110ab3 100644 (file)
@@ -106,6 +106,48 @@ PyAPI_FUNC(PyLockStatus) PyThread_acquire_lock_timed_with_retries(
     PyThread_type_lock,
     PY_TIMEOUT_T microseconds);
 
+typedef unsigned long long PyThread_ident_t;
+typedef Py_uintptr_t PyThread_handle_t;
+
+#define PY_FORMAT_THREAD_IDENT_T "llu"
+#define Py_PARSE_THREAD_IDENT_T "K"
+
+PyAPI_FUNC(PyThread_ident_t) PyThread_get_thread_ident_ex(void);
+
+/* Thread joining APIs.
+ *
+ * These APIs have a strict contract:
+ *  - Either PyThread_join_thread or PyThread_detach_thread must be called
+ *    exactly once with the given handle.
+ *  - Calling neither PyThread_join_thread nor PyThread_detach_thread results
+ *    in a resource leak until the end of the process.
+ *  - Any other usage, such as calling both PyThread_join_thread and
+ *    PyThread_detach_thread, or calling them more than once (including
+ *    simultaneously), results in undefined behavior.
+ */
+PyAPI_FUNC(int) PyThread_start_joinable_thread(void (*func)(void *),
+                                               void *arg,
+                                               PyThread_ident_t* ident,
+                                               PyThread_handle_t* handle);
+/*
+ * Join a thread started with `PyThread_start_joinable_thread`.
+ * This function cannot be interrupted. It returns 0 on success,
+ * a non-zero value on failure.
+ */
+PyAPI_FUNC(int) PyThread_join_thread(PyThread_handle_t);
+/*
+ * Detach a thread started with `PyThread_start_joinable_thread`, such
+ * that its resources are relased as soon as it exits.
+ * This function cannot be interrupted. It returns 0 on success,
+ * a non-zero value on failure.
+ */
+PyAPI_FUNC(int) PyThread_detach_thread(PyThread_handle_t);
+
+/*
+ * Obtain the new thread ident and handle in a forked child process.
+ */
+PyAPI_FUNC(void) PyThread_update_thread_after_fork(PyThread_ident_t* ident,
+                                                   PyThread_handle_t* handle);
 
 #ifdef __cplusplus
 }
index bf87a3e8d6ffd8260dd1f9c126159e11041c8b95..ec003d8dc4314de3693529742546b3358845e82b 100644 (file)
@@ -2693,6 +2693,9 @@ class _TestPool(BaseTestCase):
                 p.join()
 
     def test_terminate(self):
+        if self.TYPE == 'threads':
+            self.skipTest("Threads cannot be terminated")
+
         # Simulate slow tasks which take "forever" to complete
         p = self.Pool(3)
         args = [support.LONG_TIMEOUT for i in range(10_000)]
index 89f407de4b0d9c52692fbc355c1816539a35559e..ce4a11b119c9007c64ac0f4c779b3434c05afc54 100644 (file)
@@ -455,6 +455,9 @@ def test_threading():
     i = _thread.start_new_thread(test_func(), ())
     lock.acquire()
 
+    handle = _thread.start_joinable_thread(test_func())
+    handle.join()
+
 
 def test_threading_abort():
     # Ensures that aborting PyThreadState_New raises the correct exception
index 47e5832d311bd1c391837fbf6254125c4dcc4318..cd0a4e2264865d374c7895bd3b48942ec5f31cd8 100644 (file)
@@ -209,6 +209,8 @@ class AuditTest(unittest.TestCase):
         expected = [
             ("_thread.start_new_thread", "(<test_func>, (), None)"),
             ("test.test_func", "()"),
+            ("_thread.start_joinable_thread", "(<test_func>,)"),
+            ("test.test_func", "()"),
         ]
 
         self.assertEqual(actual, expected)
index c73c2da1a010882a40de4a13489e5c1950f28a8b..3e61b0c9387c6fa7ba6ec14825bfad853d7d0fd1 100644 (file)
@@ -194,11 +194,11 @@ class ProcessPoolExecutorTest(ExecutorTest):
 
         context = self.get_context()
 
-        # gh-109047: Mock the threading.start_new_thread() function to inject
+        # gh-109047: Mock the threading.start_joinable_thread() function to inject
         # RuntimeError: simulate the error raised during Python finalization.
         # Block the second creation: create _ExecutorManagerThread, but block
         # QueueFeederThread.
-        orig_start_new_thread = threading._start_new_thread
+        orig_start_new_thread = threading._start_joinable_thread
         nthread = 0
         def mock_start_new_thread(func, *args):
             nonlocal nthread
@@ -208,7 +208,7 @@ class ProcessPoolExecutorTest(ExecutorTest):
             nthread += 1
             return orig_start_new_thread(func, *args)
 
-        with support.swap_attr(threading, '_start_new_thread',
+        with support.swap_attr(threading, '_start_joinable_thread',
                                mock_start_new_thread):
             executor = self.executor_type(max_workers=2, mp_context=context)
             with executor:
index 831aaf5b6a794f75f88307687e720963f00a57fe..931cb4b797e0b21e80b2cfe5669b1d0872162680 100644 (file)
@@ -160,6 +160,132 @@ class ThreadRunningTests(BasicThreadTest):
                              f"Exception ignored in thread started by {task!r}")
             self.assertIsNotNone(cm.unraisable.exc_traceback)
 
+    def test_join_thread(self):
+        finished = []
+
+        def task():
+            time.sleep(0.05)
+            finished.append(thread.get_ident())
+
+        with threading_helper.wait_threads_exit():
+            handle = thread.start_joinable_thread(task)
+            handle.join()
+            self.assertEqual(len(finished), 1)
+            self.assertEqual(handle.ident, finished[0])
+
+    def test_join_thread_already_exited(self):
+        def task():
+            pass
+
+        with threading_helper.wait_threads_exit():
+            handle = thread.start_joinable_thread(task)
+            time.sleep(0.05)
+            handle.join()
+
+    def test_join_several_times(self):
+        def task():
+            pass
+
+        with threading_helper.wait_threads_exit():
+            handle = thread.start_joinable_thread(task)
+            handle.join()
+            with self.assertRaisesRegex(ValueError, "not joinable"):
+                handle.join()
+
+    def test_joinable_not_joined(self):
+        handle_destroyed = thread.allocate_lock()
+        handle_destroyed.acquire()
+
+        def task():
+            handle_destroyed.acquire()
+
+        with threading_helper.wait_threads_exit():
+            handle = thread.start_joinable_thread(task)
+            del handle
+            handle_destroyed.release()
+
+    def test_join_from_self(self):
+        errors = []
+        handles = []
+        start_joinable_thread_returned = thread.allocate_lock()
+        start_joinable_thread_returned.acquire()
+        task_tried_to_join = thread.allocate_lock()
+        task_tried_to_join.acquire()
+
+        def task():
+            start_joinable_thread_returned.acquire()
+            try:
+                handles[0].join()
+            except Exception as e:
+                errors.append(e)
+            finally:
+                task_tried_to_join.release()
+
+        with threading_helper.wait_threads_exit():
+            handle = thread.start_joinable_thread(task)
+            handles.append(handle)
+            start_joinable_thread_returned.release()
+            # Can still join after joining failed in other thread
+            task_tried_to_join.acquire()
+            handle.join()
+
+        assert len(errors) == 1
+        with self.assertRaisesRegex(RuntimeError, "Cannot join current thread"):
+            raise errors[0]
+
+    def test_detach_from_self(self):
+        errors = []
+        handles = []
+        start_joinable_thread_returned = thread.allocate_lock()
+        start_joinable_thread_returned.acquire()
+        thread_detached = thread.allocate_lock()
+        thread_detached.acquire()
+
+        def task():
+            start_joinable_thread_returned.acquire()
+            try:
+                handles[0].detach()
+            except Exception as e:
+                errors.append(e)
+            finally:
+                thread_detached.release()
+
+        with threading_helper.wait_threads_exit():
+            handle = thread.start_joinable_thread(task)
+            handles.append(handle)
+            start_joinable_thread_returned.release()
+            thread_detached.acquire()
+            with self.assertRaisesRegex(ValueError, "not joinable"):
+                handle.join()
+
+        assert len(errors) == 0
+
+    def test_detach_then_join(self):
+        lock = thread.allocate_lock()
+        lock.acquire()
+
+        def task():
+            lock.acquire()
+
+        with threading_helper.wait_threads_exit():
+            handle = thread.start_joinable_thread(task)
+            # detach() returns even though the thread is blocked on lock
+            handle.detach()
+            # join() then cannot be called anymore
+            with self.assertRaisesRegex(ValueError, "not joinable"):
+                handle.join()
+            lock.release()
+
+    def test_join_then_detach(self):
+        def task():
+            pass
+
+        with threading_helper.wait_threads_exit():
+            handle = thread.start_joinable_thread(task)
+            handle.join()
+            with self.assertRaisesRegex(ValueError, "not joinable"):
+                handle.detach()
+
 
 class Barrier:
     def __init__(self, num_threads):
index 00a64372b394dc4d4964acecb6e8b30b6f6fa985..146e2dbc0fc3963f26eda55723d71cfb6723cc85 100644 (file)
@@ -376,8 +376,8 @@ class ThreadTests(BaseTestCase):
         # Issue 7481: Failure to start thread should cleanup the limbo map.
         def fail_new_thread(*args):
             raise threading.ThreadError()
-        _start_new_thread = threading._start_new_thread
-        threading._start_new_thread = fail_new_thread
+        _start_joinable_thread = threading._start_joinable_thread
+        threading._start_joinable_thread = fail_new_thread
         try:
             t = threading.Thread(target=lambda: None)
             self.assertRaises(threading.ThreadError, t.start)
@@ -385,7 +385,7 @@ class ThreadTests(BaseTestCase):
                 t in threading._limbo,
                 "Failed to cleanup _limbo map on failure of Thread.start().")
         finally:
-            threading._start_new_thread = _start_new_thread
+            threading._start_joinable_thread = _start_joinable_thread
 
     def test_finalize_running_thread(self):
         # Issue 1402: the PyGILState_Ensure / _Release functions may be called
@@ -482,6 +482,47 @@ class ThreadTests(BaseTestCase):
         finally:
             sys.setswitchinterval(old_interval)
 
+    def test_join_from_multiple_threads(self):
+        # Thread.join() should be thread-safe
+        errors = []
+
+        def worker():
+            time.sleep(0.005)
+
+        def joiner(thread):
+            try:
+                thread.join()
+            except Exception as e:
+                errors.append(e)
+
+        for N in range(2, 20):
+            threads = [threading.Thread(target=worker)]
+            for i in range(N):
+                threads.append(threading.Thread(target=joiner,
+                                                args=(threads[0],)))
+            for t in threads:
+                t.start()
+            time.sleep(0.01)
+            for t in threads:
+                t.join()
+            if errors:
+                raise errors[0]
+
+    def test_join_with_timeout(self):
+        lock = _thread.allocate_lock()
+        lock.acquire()
+
+        def worker():
+            lock.acquire()
+
+        thread = threading.Thread(target=worker)
+        thread.start()
+        thread.join(timeout=0.01)
+        assert thread.is_alive()
+        lock.release()
+        thread.join()
+        assert not thread.is_alive()
+
     def test_no_refcycle_through_target(self):
         class RunSelfFunction(object):
             def __init__(self, should_raise):
index 41c3a9ff93856fd486494cbf3fb78e0bcd5852b6..85aff58968082d875ea8130241f8b1561dbcf9dd 100644 (file)
@@ -5,6 +5,7 @@ import sys as _sys
 import _thread
 import functools
 import warnings
+import _weakref
 
 from time import monotonic as _time
 from _weakrefset import WeakSet
@@ -33,7 +34,7 @@ __all__ = ['get_ident', 'active_count', 'Condition', 'current_thread',
            'setprofile_all_threads','settrace_all_threads']
 
 # Rename some stuff so "from threading import *" is safe
-_start_new_thread = _thread.start_new_thread
+_start_joinable_thread = _thread.start_joinable_thread
 _daemon_threads_allowed = _thread.daemon_threads_allowed
 _allocate_lock = _thread.allocate_lock
 _set_sentinel = _thread._set_sentinel
@@ -589,7 +590,7 @@ class Event:
         return f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}: {status}>"
 
     def _at_fork_reinit(self):
-        # Private method called by Thread._reset_internal_locks()
+        # Private method called by Thread._after_fork()
         self._cond._at_fork_reinit()
 
     def is_set(self):
@@ -924,6 +925,8 @@ class Thread:
         if _HAVE_THREAD_NATIVE_ID:
             self._native_id = None
         self._tstate_lock = None
+        self._join_lock = None
+        self._handle = None
         self._started = Event()
         self._is_stopped = False
         self._initialized = True
@@ -933,22 +936,32 @@ class Thread:
         # For debugging and _after_fork()
         _dangling.add(self)
 
-    def _reset_internal_locks(self, is_alive):
-        # private!  Called by _after_fork() to reset our internal locks as
-        # they may be in an invalid state leading to a deadlock or crash.
+    def _after_fork(self, new_ident=None):
+        # Private!  Called by threading._after_fork().
         self._started._at_fork_reinit()
-        if is_alive:
+        if new_ident is not None:
+            # This thread is alive.
+            self._ident = new_ident
+            if self._handle is not None:
+                self._handle.after_fork_alive()
+                assert self._handle.ident == new_ident
             # bpo-42350: If the fork happens when the thread is already stopped
             # (ex: after threading._shutdown() has been called), _tstate_lock
             # is None. Do nothing in this case.
             if self._tstate_lock is not None:
                 self._tstate_lock._at_fork_reinit()
                 self._tstate_lock.acquire()
+            if self._join_lock is not None:
+                self._join_lock._at_fork_reinit()
         else:
-            # The thread isn't alive after fork: it doesn't have a tstate
+            # This thread isn't alive after fork: it doesn't have a tstate
             # anymore.
             self._is_stopped = True
             self._tstate_lock = None
+            self._join_lock = None
+            if self._handle is not None:
+                self._handle.after_fork_dead()
+                self._handle = None
 
     def __repr__(self):
         assert self._initialized, "Thread.__init__() was not called"
@@ -980,15 +993,18 @@ class Thread:
         if self._started.is_set():
             raise RuntimeError("threads can only be started once")
 
+        self._join_lock = _allocate_lock()
+
         with _active_limbo_lock:
             _limbo[self] = self
         try:
-            _start_new_thread(self._bootstrap, ())
+            # Start joinable thread
+            self._handle = _start_joinable_thread(self._bootstrap)
         except Exception:
             with _active_limbo_lock:
                 del _limbo[self]
             raise
-        self._started.wait()
+        self._started.wait()  # Will set ident and native_id
 
     def run(self):
         """Method representing the thread's activity.
@@ -1144,6 +1160,22 @@ class Thread:
             # historically .join(timeout=x) for x<0 has acted as if timeout=0
             self._wait_for_tstate_lock(timeout=max(timeout, 0))
 
+        if self._is_stopped:
+            self._join_os_thread()
+
+    def _join_os_thread(self):
+        join_lock = self._join_lock
+        if join_lock is None:
+            return
+        with join_lock:
+            # Calling join() multiple times would raise an exception
+            # in one of the callers.
+            if self._handle is not None:
+                self._handle.join()
+                self._handle = None
+                # No need to keep this around
+                self._join_lock = None
+
     def _wait_for_tstate_lock(self, block=True, timeout=-1):
         # Issue #18808: wait for the thread state to be gone.
         # At the end of the thread's life, after all knowledge of the thread
@@ -1223,7 +1255,10 @@ class Thread:
         if self._is_stopped or not self._started.is_set():
             return False
         self._wait_for_tstate_lock(False)
-        return not self._is_stopped
+        if not self._is_stopped:
+            return True
+        self._join_os_thread()
+        return False
 
     @property
     def daemon(self):
@@ -1679,15 +1714,13 @@ def _after_fork():
             # Any lock/condition variable may be currently locked or in an
             # invalid state, so we reinitialize them.
             if thread is current:
-                # There is only one active thread. We reset the ident to
-                # its new value since it can have changed.
-                thread._reset_internal_locks(True)
+                # This is the one and only active thread.
                 ident = get_ident()
-                thread._ident = ident
+                thread._after_fork(new_ident=ident)
                 new_active[ident] = thread
             else:
                 # All the others are already stopped.
-                thread._reset_internal_locks(False)
+                thread._after_fork()
                 thread._stop()
 
         _limbo.clear()
diff --git a/Misc/NEWS.d/next/Core and Builtins/2023-11-04-13-36-51.gh-issue-110829.Pa0CJI.rst b/Misc/NEWS.d/next/Core and Builtins/2023-11-04-13-36-51.gh-issue-110829.Pa0CJI.rst
new file mode 100644 (file)
index 0000000..f4fa61d
--- /dev/null
@@ -0,0 +1 @@
+Joining a thread now ensures the underlying OS thread has exited. This is required for safer fork() in multi-threaded processes.
index 9eecebddb723a42447d9e72e88b84ae7d0f358a4..88ca9032b5e6798c6c7bf5d5dc817757818f83f7 100644 (file)
 // Forward declarations
 static struct PyModuleDef thread_module;
 
-
+// Module state
 typedef struct {
     PyTypeObject *excepthook_type;
     PyTypeObject *lock_type;
     PyTypeObject *local_type;
     PyTypeObject *local_dummy_type;
+    PyTypeObject *thread_handle_type;
 } thread_module_state;
 
 static inline thread_module_state*
@@ -38,6 +39,145 @@ get_thread_state(PyObject *module)
     return (thread_module_state *)state;
 }
 
+// _ThreadHandle type
+
+typedef struct {
+    PyObject_HEAD
+    PyThread_ident_t ident;
+    PyThread_handle_t handle;
+    char joinable;
+} ThreadHandleObject;
+
+static ThreadHandleObject*
+new_thread_handle(thread_module_state* state)
+{
+    ThreadHandleObject* self = PyObject_New(ThreadHandleObject, state->thread_handle_type);
+    if (self == NULL) {
+        return NULL;
+    }
+    self->ident = 0;
+    self->handle = 0;
+    self->joinable = 0;
+    return self;
+}
+
+static void
+ThreadHandle_dealloc(ThreadHandleObject *self)
+{
+    PyObject *tp = (PyObject *) Py_TYPE(self);
+    if (self->joinable) {
+        int ret = PyThread_detach_thread(self->handle);
+        if (ret) {
+            PyErr_SetString(ThreadError, "Failed detaching thread");
+            PyErr_WriteUnraisable(tp);
+        }
+    }
+    PyObject_Free(self);
+    Py_DECREF(tp);
+}
+
+static PyObject *
+ThreadHandle_repr(ThreadHandleObject *self)
+{
+    return PyUnicode_FromFormat("<%s object: ident=%" PY_FORMAT_THREAD_IDENT_T ">",
+                                Py_TYPE(self)->tp_name, self->ident);
+}
+
+static PyObject *
+ThreadHandle_get_ident(ThreadHandleObject *self, void *ignored)
+{
+    return PyLong_FromUnsignedLongLong(self->ident);
+}
+
+
+static PyObject *
+ThreadHandle_after_fork_alive(ThreadHandleObject *self, void* ignored)
+{
+    PyThread_update_thread_after_fork(&self->ident, &self->handle);
+    Py_RETURN_NONE;
+}
+
+static PyObject *
+ThreadHandle_after_fork_dead(ThreadHandleObject *self, void* ignored)
+{
+    // Disallow calls to detach() and join() as they could crash.
+    self->joinable = 0;
+    Py_RETURN_NONE;
+}
+
+static PyObject *
+ThreadHandle_detach(ThreadHandleObject *self, void* ignored)
+{
+    if (!self->joinable) {
+        PyErr_SetString(PyExc_ValueError,
+                        "the thread is not joinable and thus cannot be detached");
+        return NULL;
+    }
+    self->joinable = 0;
+    // This is typically short so no need to release the GIL
+    int ret = PyThread_detach_thread(self->handle);
+    if (ret) {
+        PyErr_SetString(ThreadError, "Failed detaching thread");
+        return NULL;
+    }
+    Py_RETURN_NONE;
+}
+
+static PyObject *
+ThreadHandle_join(ThreadHandleObject *self, void* ignored)
+{
+    if (!self->joinable) {
+        PyErr_SetString(PyExc_ValueError, "the thread is not joinable");
+        return NULL;
+    }
+    if (self->ident == PyThread_get_thread_ident_ex()) {
+        // PyThread_join_thread() would deadlock or error out.
+        PyErr_SetString(ThreadError, "Cannot join current thread");
+        return NULL;
+    }
+    // Before actually joining, we must first mark the thread as non-joinable,
+    // as joining several times simultaneously or sequentially is undefined behavior.
+    self->joinable = 0;
+    int ret;
+    Py_BEGIN_ALLOW_THREADS
+    ret = PyThread_join_thread(self->handle);
+    Py_END_ALLOW_THREADS
+    if (ret) {
+        PyErr_SetString(ThreadError, "Failed joining thread");
+        return NULL;
+    }
+    Py_RETURN_NONE;
+}
+
+static PyGetSetDef ThreadHandle_getsetlist[] = {
+    {"ident", (getter)ThreadHandle_get_ident, NULL, NULL},
+    {0},
+};
+
+static PyMethodDef ThreadHandle_methods[] =
+{
+    {"after_fork_alive", (PyCFunction)ThreadHandle_after_fork_alive, METH_NOARGS},
+    {"after_fork_dead", (PyCFunction)ThreadHandle_after_fork_dead, METH_NOARGS},
+    {"detach", (PyCFunction)ThreadHandle_detach, METH_NOARGS},
+    {"join", (PyCFunction)ThreadHandle_join, METH_NOARGS},
+    {0, 0}
+};
+
+static PyType_Slot ThreadHandle_Type_slots[] = {
+    {Py_tp_dealloc, (destructor)ThreadHandle_dealloc},
+    {Py_tp_repr, (reprfunc)ThreadHandle_repr},
+    {Py_tp_getset, ThreadHandle_getsetlist},
+    {Py_tp_methods, ThreadHandle_methods},
+    {0, 0}
+};
+
+static PyType_Spec ThreadHandle_Type_spec = {
+    "_thread._ThreadHandle",
+    sizeof(ThreadHandleObject),
+    0,
+    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_DISALLOW_INSTANTIATION,
+    ThreadHandle_Type_slots,
+};
 
 /* Lock objects */
 
@@ -274,7 +414,7 @@ static PyType_Spec lock_type_spec = {
 typedef struct {
     PyObject_HEAD
     PyThread_type_lock rlock_lock;
-    unsigned long rlock_owner;
+    PyThread_ident_t rlock_owner;
     unsigned long rlock_count;
     PyObject *in_weakreflist;
 } rlockobject;
@@ -311,13 +451,13 @@ static PyObject *
 rlock_acquire(rlockobject *self, PyObject *args, PyObject *kwds)
 {
     _PyTime_t timeout;
-    unsigned long tid;
+    PyThread_ident_t tid;
     PyLockStatus r = PY_LOCK_ACQUIRED;
 
     if (lock_acquire_parse_args(args, kwds, &timeout) < 0)
         return NULL;
 
-    tid = PyThread_get_thread_ident();
+    tid = PyThread_get_thread_ident_ex();
     if (self->rlock_count > 0 && tid == self->rlock_owner) {
         unsigned long count = self->rlock_count + 1;
         if (count <= self->rlock_count) {
@@ -360,7 +500,7 @@ the lock is taken and its internal counter initialized to 1.");
 static PyObject *
 rlock_release(rlockobject *self, PyObject *Py_UNUSED(ignored))
 {
-    unsigned long tid = PyThread_get_thread_ident();
+    PyThread_ident_t tid = PyThread_get_thread_ident_ex();
 
     if (self->rlock_count == 0 || self->rlock_owner != tid) {
         PyErr_SetString(PyExc_RuntimeError,
@@ -389,11 +529,12 @@ to be available for other threads.");
 static PyObject *
 rlock_acquire_restore(rlockobject *self, PyObject *args)
 {
-    unsigned long owner;
+    PyThread_ident_t owner;
     unsigned long count;
     int r = 1;
 
-    if (!PyArg_ParseTuple(args, "(kk):_acquire_restore", &count, &owner))
+    if (!PyArg_ParseTuple(args, "(k" Py_PARSE_THREAD_IDENT_T "):_acquire_restore",
+            &count, &owner))
         return NULL;
 
     if (!PyThread_acquire_lock(self->rlock_lock, 0)) {
@@ -419,7 +560,7 @@ For internal use by `threading.Condition`.");
 static PyObject *
 rlock_release_save(rlockobject *self, PyObject *Py_UNUSED(ignored))
 {
-    unsigned long owner;
+    PyThread_ident_t owner;
     unsigned long count;
 
     if (self->rlock_count == 0) {
@@ -433,7 +574,7 @@ rlock_release_save(rlockobject *self, PyObject *Py_UNUSED(ignored))
     self->rlock_count = 0;
     self->rlock_owner = 0;
     PyThread_release_lock(self->rlock_lock);
-    return Py_BuildValue("kk", count, owner);
+    return Py_BuildValue("k" Py_PARSE_THREAD_IDENT_T, count, owner);
 }
 
 PyDoc_STRVAR(rlock_release_save_doc,
@@ -444,7 +585,7 @@ For internal use by `threading.Condition`.");
 static PyObject *
 rlock_recursion_count(rlockobject *self, PyObject *Py_UNUSED(ignored))
 {
-    unsigned long tid = PyThread_get_thread_ident();
+    PyThread_ident_t tid = PyThread_get_thread_ident_ex();
     return PyLong_FromUnsignedLong(
         self->rlock_owner == tid ? self->rlock_count : 0UL);
 }
@@ -457,7 +598,7 @@ For internal use by reentrancy checks.");
 static PyObject *
 rlock_is_owned(rlockobject *self, PyObject *Py_UNUSED(ignored))
 {
-    unsigned long tid = PyThread_get_thread_ident();
+    PyThread_ident_t tid = PyThread_get_thread_ident_ex();
 
     if (self->rlock_count > 0 && self->rlock_owner == tid) {
         Py_RETURN_TRUE;
@@ -493,7 +634,8 @@ rlock_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
 static PyObject *
 rlock_repr(rlockobject *self)
 {
-    return PyUnicode_FromFormat("<%s %s object owner=%ld count=%lu at %p>",
+    return PyUnicode_FromFormat(
+        "<%s %s object owner=%" PY_FORMAT_THREAD_IDENT_T " count=%lu at %p>",
         self->rlock_count ? "locked" : "unlocked",
         Py_TYPE(self)->tp_name, self->rlock_owner,
         self->rlock_count, self);
@@ -1109,10 +1251,66 @@ PyDoc_STRVAR(daemon_threads_allowed_doc,
 Return True if daemon threads are allowed in the current interpreter,\n\
 and False otherwise.\n");
 
+static int
+do_start_new_thread(thread_module_state* state,
+                    PyObject *func, PyObject* args, PyObject* kwargs,
+                    int joinable,
+                    PyThread_ident_t* ident, PyThread_handle_t* handle)
+{
+    PyInterpreterState *interp = _PyInterpreterState_GET();
+    if (!_PyInterpreterState_HasFeature(interp, Py_RTFLAGS_THREADS)) {
+        PyErr_SetString(PyExc_RuntimeError,
+                        "thread is not supported for isolated subinterpreters");
+        return -1;
+    }
+    if (interp->finalizing) {
+        PyErr_SetString(PyExc_RuntimeError,
+                        "can't create new thread at interpreter shutdown");
+        return -1;
+    }
+
+    // gh-109795: Use PyMem_RawMalloc() instead of PyMem_Malloc(),
+    // because it should be possible to call thread_bootstate_free()
+    // without holding the GIL.
+    struct bootstate *boot = PyMem_RawMalloc(sizeof(struct bootstate));
+    if (boot == NULL) {
+        PyErr_NoMemory();
+        return -1;
+    }
+    boot->tstate = _PyThreadState_New(interp, _PyThreadState_WHENCE_THREADING);
+    if (boot->tstate == NULL) {
+        PyMem_RawFree(boot);
+        if (!PyErr_Occurred()) {
+            PyErr_NoMemory();
+        }
+        return -1;
+    }
+    boot->func = Py_NewRef(func);
+    boot->args = Py_NewRef(args);
+    boot->kwargs = Py_XNewRef(kwargs);
+
+    int err;
+    if (joinable) {
+        err = PyThread_start_joinable_thread(thread_run, (void*) boot, ident, handle);
+    } else {
+        *handle = 0;
+        *ident = PyThread_start_new_thread(thread_run, (void*) boot);
+        err = (*ident == PYTHREAD_INVALID_THREAD_ID);
+    }
+    if (err) {
+        PyErr_SetString(ThreadError, "can't start new thread");
+        PyThreadState_Clear(boot->tstate);
+        thread_bootstate_free(boot, 1);
+        return -1;
+    }
+    return 0;
+}
+
 static PyObject *
-thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs)
+thread_PyThread_start_new_thread(PyObject *module, PyObject *fargs)
 {
     PyObject *func, *args, *kwargs = NULL;
+    thread_module_state *state = get_thread_state(module);
 
     if (!PyArg_UnpackTuple(fargs, "start_new_thread", 2, 3,
                            &func, &args, &kwargs))
@@ -1138,57 +1336,73 @@ thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs)
         return NULL;
     }
 
-    PyInterpreterState *interp = _PyInterpreterState_GET();
-    if (!_PyInterpreterState_HasFeature(interp, Py_RTFLAGS_THREADS)) {
-        PyErr_SetString(PyExc_RuntimeError,
-                        "thread is not supported for isolated subinterpreters");
+    PyThread_ident_t ident = 0;
+    PyThread_handle_t handle;
+    if (do_start_new_thread(state, func, args, kwargs, /*joinable=*/ 0,
+                            &ident, &handle)) {
         return NULL;
     }
-    if (interp->finalizing) {
-        PyErr_SetString(PyExc_RuntimeError,
-                        "can't create new thread at interpreter shutdown");
+    return PyLong_FromUnsignedLongLong(ident);
+}
+
+PyDoc_STRVAR(start_new_doc,
+"start_new_thread(function, args[, kwargs])\n\
+(start_new() is an obsolete synonym)\n\
+\n\
+Start a new thread and return its identifier.\n\
+\n\
+The thread will call the function with positional arguments from the\n\
+tuple args and keyword arguments taken from the optional dictionary\n\
+kwargs.  The thread exits when the function returns; the return value\n\
+is ignored.  The thread will also exit when the function raises an\n\
+unhandled exception; a stack trace will be printed unless the exception\n\
+is SystemExit.\n");
+
+static PyObject *
+thread_PyThread_start_joinable_thread(PyObject *module, PyObject *func)
+{
+    thread_module_state *state = get_thread_state(module);
+
+    if (!PyCallable_Check(func)) {
+        PyErr_SetString(PyExc_TypeError,
+                        "thread function must be callable");
         return NULL;
     }
 
-    // gh-109795: Use PyMem_RawMalloc() instead of PyMem_Malloc(),
-    // because it should be possible to call thread_bootstate_free()
-    // without holding the GIL.
-    struct bootstate *boot = PyMem_RawMalloc(sizeof(struct bootstate));
-    if (boot == NULL) {
-        return PyErr_NoMemory();
-    }
-    boot->tstate = _PyThreadState_New(interp, _PyThreadState_WHENCE_THREADING);
-    if (boot->tstate == NULL) {
-        PyMem_RawFree(boot);
-        if (!PyErr_Occurred()) {
-            return PyErr_NoMemory();
-        }
+    if (PySys_Audit("_thread.start_joinable_thread", "O", func) < 0) {
         return NULL;
     }
-    boot->func = Py_NewRef(func);
-    boot->args = Py_NewRef(args);
-    boot->kwargs = Py_XNewRef(kwargs);
 
-    unsigned long ident = PyThread_start_new_thread(thread_run, (void*) boot);
-    if (ident == PYTHREAD_INVALID_THREAD_ID) {
-        PyErr_SetString(ThreadError, "can't start new thread");
-        PyThreadState_Clear(boot->tstate);
-        thread_bootstate_free(boot, 1);
+    PyObject* args = PyTuple_New(0);
+    if (args == NULL) {
+        return NULL;
+    }
+    ThreadHandleObject* hobj = new_thread_handle(state);
+    if (hobj == NULL) {
+        Py_DECREF(args);
+        return NULL;
+    }
+    if (do_start_new_thread(state, func, args, /*kwargs=*/ NULL, /*joinable=*/ 1,
+                            &hobj->ident, &hobj->handle)) {
+        Py_DECREF(args);
+        Py_DECREF(hobj);
         return NULL;
     }
-    return PyLong_FromUnsignedLong(ident);
+    Py_DECREF(args);
+    hobj->joinable = 1;
+    return (PyObject*) hobj;
 }
 
-PyDoc_STRVAR(start_new_doc,
-"start_new_thread(function, args[, kwargs])\n\
-(start_new() is an obsolete synonym)\n\
+PyDoc_STRVAR(start_joinable_doc,
+"start_joinable_thread(function)\n\
+\n\
+*For internal use only*: start a new thread.\n\
 \n\
-Start a new thread and return its identifier.  The thread will call the\n\
-function with positional arguments from the tuple args and keyword arguments\n\
-taken from the optional dictionary kwargs.  The thread exits when the\n\
-function returns; the return value is ignored.  The thread will also exit\n\
-when the function raises an unhandled exception; a stack trace will be\n\
-printed unless the exception is SystemExit.\n");
+Like start_new_thread(), this starts a new thread calling the given function.\n\
+Unlike start_new_thread(), this returns a handle object with methods to join\n\
+or detach the given thread.\n\
+This function is not for third-party code, please use the\n\
+`threading` module instead.\n");
 
 static PyObject *
 thread_PyThread_exit_thread(PyObject *self, PyObject *Py_UNUSED(ignored))
@@ -1248,12 +1462,12 @@ information about locks.");
 static PyObject *
 thread_get_ident(PyObject *self, PyObject *Py_UNUSED(ignored))
 {
-    unsigned long ident = PyThread_get_thread_ident();
+    PyThread_ident_t ident = PyThread_get_thread_ident_ex();
     if (ident == PYTHREAD_INVALID_THREAD_ID) {
         PyErr_SetString(ThreadError, "no current thread ident");
         return NULL;
     }
-    return PyLong_FromUnsignedLong(ident);
+    return PyLong_FromUnsignedLongLong(ident);
 }
 
 PyDoc_STRVAR(get_ident_doc,
@@ -1440,8 +1654,8 @@ thread_excepthook_file(PyObject *file, PyObject *exc_type, PyObject *exc_value,
         Py_DECREF(name);
     }
     else {
-        unsigned long ident = PyThread_get_thread_ident();
-        PyObject *str = PyUnicode_FromFormat("%lu", ident);
+        PyThread_ident_t ident = PyThread_get_thread_ident_ex();
+        PyObject *str = PyUnicode_FromFormat("%" PY_FORMAT_THREAD_IDENT_T, ident);
         if (str != NULL) {
             if (PyFile_WriteObject(str, file, Py_PRINT_RAW) < 0) {
                 Py_DECREF(str);
@@ -1574,6 +1788,8 @@ static PyMethodDef thread_methods[] = {
      METH_VARARGS, start_new_doc},
     {"start_new",               (PyCFunction)thread_PyThread_start_new_thread,
      METH_VARARGS, start_new_doc},
+    {"start_joinable_thread",   (PyCFunction)thread_PyThread_start_joinable_thread,
+     METH_O, start_joinable_doc},
     {"daemon_threads_allowed",  (PyCFunction)thread_daemon_threads_allowed,
      METH_NOARGS, daemon_threads_allowed_doc},
     {"allocate_lock",           thread_PyThread_allocate_lock,
@@ -1617,6 +1833,15 @@ thread_module_exec(PyObject *module)
     // Initialize the C thread library
     PyThread_init_thread();
 
+    // _ThreadHandle
+    state->thread_handle_type = (PyTypeObject *)PyType_FromSpec(&ThreadHandle_Type_spec);
+    if (state->thread_handle_type == NULL) {
+        return -1;
+    }
+    if (PyDict_SetItemString(d, "_ThreadHandle", (PyObject *)state->thread_handle_type) < 0) {
+        return -1;
+    }
+
     // Lock
     state->lock_type = (PyTypeObject *)PyType_FromSpec(&lock_type_spec);
     if (state->lock_type == NULL) {
@@ -1690,6 +1915,7 @@ thread_module_traverse(PyObject *module, visitproc visit, void *arg)
     Py_VISIT(state->lock_type);
     Py_VISIT(state->local_type);
     Py_VISIT(state->local_dummy_type);
+    Py_VISIT(state->thread_handle_type);
     return 0;
 }
 
@@ -1701,6 +1927,7 @@ thread_module_clear(PyObject *module)
     Py_CLEAR(state->lock_type);
     Py_CLEAR(state->local_type);
     Py_CLEAR(state->local_dummy_type);
+    Py_CLEAR(state->thread_handle_type);
     return 0;
 }
 
index 26f441bd6d3c569da03069d0267a4c966fc6cc97..14b9cddc24c0ec344cfb5890e0a53c45cf1a4668 100644 (file)
@@ -182,9 +182,9 @@ bootstrap(void *call)
     return 0;
 }
 
-unsigned long
-PyThread_start_new_thread(void (*func)(void *), void *arg)
-{
+int
+PyThread_start_joinable_thread(void (*func)(void *), void *arg,
+                               PyThread_ident_t* ident, PyThread_handle_t* handle) {
     HANDLE hThread;
     unsigned threadID;
     callobj *obj;
@@ -194,7 +194,7 @@ PyThread_start_new_thread(void (*func)(void *), void *arg)
 
     obj = (callobj*)HeapAlloc(GetProcessHeap(), 0, sizeof(*obj));
     if (!obj)
-        return PYTHREAD_INVALID_THREAD_ID;
+        return -1;
     obj->func = func;
     obj->arg = arg;
     PyThreadState *tstate = _PyThreadState_GET();
@@ -207,22 +207,51 @@ PyThread_start_new_thread(void (*func)(void *), void *arg)
         /* I've seen errno == EAGAIN here, which means "there are
          * too many threads".
          */
-        int e = errno;
-        threadID = (unsigned)-1;
         HeapFree(GetProcessHeap(), 0, obj);
+        return -1;
     }
-    else {
-        CloseHandle(hThread);
+    *ident = threadID;
+    // The cast is safe since HANDLE is pointer-sized
+    *handle = (PyThread_handle_t) hThread;
+    return 0;
+}
+
+unsigned long
+PyThread_start_new_thread(void (*func)(void *), void *arg) {
+    PyThread_handle_t handle;
+    PyThread_ident_t ident;
+    if (PyThread_start_joinable_thread(func, arg, &ident, &handle)) {
+        return PYTHREAD_INVALID_THREAD_ID;
     }
-    return threadID;
+    CloseHandle((HANDLE) handle);
+    // The cast is safe since the ident is really an unsigned int
+    return (unsigned long) ident;
+}
+
+int
+PyThread_join_thread(PyThread_handle_t handle) {
+    HANDLE hThread = (HANDLE) handle;
+    int errored = (WaitForSingleObject(hThread, INFINITE) != WAIT_OBJECT_0);
+    CloseHandle(hThread);
+    return errored;
+}
+
+int
+PyThread_detach_thread(PyThread_handle_t handle) {
+    HANDLE hThread = (HANDLE) handle;
+    return (CloseHandle(hThread) == 0);
+}
+
+void
+PyThread_update_thread_after_fork(PyThread_ident_t* ident, PyThread_handle_t* handle) {
 }
 
 /*
  * Return the thread Id instead of a handle. The Id is said to uniquely identify the
  * thread in the system
  */
-unsigned long
-PyThread_get_thread_ident(void)
+PyThread_ident_t
+PyThread_get_thread_ident_ex(void)
 {
     if (!initialized)
         PyThread_init_thread();
@@ -230,6 +259,13 @@ PyThread_get_thread_ident(void)
     return GetCurrentThreadId();
 }
 
+unsigned long
+PyThread_get_thread_ident(void)
+{
+    return (unsigned long) PyThread_get_thread_ident_ex();
+}
+
+
 #ifdef PY_HAVE_THREAD_NATIVE_ID
 /*
  * Return the native Thread ID (TID) of the calling thread.
index 76a1f7763f23b9fb61402613794f5f71a926b7e5..a8df5449714a8141b5a7f9e939acbec694366aa5 100644 (file)
@@ -235,8 +235,8 @@ pythread_wrapper(void *arg)
     return NULL;
 }
 
-unsigned long
-PyThread_start_new_thread(void (*func)(void *), void *arg)
+static int
+do_start_joinable_thread(void (*func)(void *), void *arg, pthread_t* out_id)
 {
     pthread_t th;
     int status;
@@ -252,7 +252,7 @@ PyThread_start_new_thread(void (*func)(void *), void *arg)
 
 #if defined(THREAD_STACK_SIZE) || defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
     if (pthread_attr_init(&attrs) != 0)
-        return PYTHREAD_INVALID_THREAD_ID;
+        return -1;
 #endif
 #if defined(THREAD_STACK_SIZE)
     PyThreadState *tstate = _PyThreadState_GET();
@@ -261,7 +261,7 @@ PyThread_start_new_thread(void (*func)(void *), void *arg)
     if (tss != 0) {
         if (pthread_attr_setstacksize(&attrs, tss) != 0) {
             pthread_attr_destroy(&attrs);
-            return PYTHREAD_INVALID_THREAD_ID;
+            return -1;
         }
     }
 #endif
@@ -272,7 +272,7 @@ PyThread_start_new_thread(void (*func)(void *), void *arg)
     pythread_callback *callback = PyMem_RawMalloc(sizeof(pythread_callback));
 
     if (callback == NULL) {
-      return PYTHREAD_INVALID_THREAD_ID;
+      return -1;
     }
 
     callback->func = func;
@@ -292,11 +292,34 @@ PyThread_start_new_thread(void (*func)(void *), void *arg)
 
     if (status != 0) {
         PyMem_RawFree(callback);
-        return PYTHREAD_INVALID_THREAD_ID;
+        return -1;
     }
+    *out_id = th;
+    return 0;
+}
 
-    pthread_detach(th);
+int
+PyThread_start_joinable_thread(void (*func)(void *), void *arg,
+                               PyThread_ident_t* ident, PyThread_handle_t* handle) {
+    pthread_t th = (pthread_t) 0;
+    if (do_start_joinable_thread(func, arg, &th)) {
+        return -1;
+    }
+    *ident = (PyThread_ident_t) th;
+    *handle = (PyThread_handle_t) th;
+    assert(th == (pthread_t) *ident);
+    assert(th == (pthread_t) *handle);
+    return 0;
+}
 
+unsigned long
+PyThread_start_new_thread(void (*func)(void *), void *arg)
+{
+    pthread_t th = (pthread_t) 0;
+    if (do_start_joinable_thread(func, arg, &th)) {
+        return PYTHREAD_INVALID_THREAD_ID;
+    }
+    pthread_detach(th);
 #if SIZEOF_PTHREAD_T <= SIZEOF_LONG
     return (unsigned long) th;
 #else
@@ -304,20 +327,46 @@ PyThread_start_new_thread(void (*func)(void *), void *arg)
 #endif
 }
 
+int
+PyThread_join_thread(PyThread_handle_t th) {
+    return pthread_join((pthread_t) th, NULL);
+}
+
+int
+PyThread_detach_thread(PyThread_handle_t th) {
+    return pthread_detach((pthread_t) th);
+}
+
+void
+PyThread_update_thread_after_fork(PyThread_ident_t* ident, PyThread_handle_t* handle) {
+    // The thread id might have been updated in the forked child
+    pthread_t th = pthread_self();
+    *ident = (PyThread_ident_t) th;
+    *handle = (PyThread_handle_t) th;
+    assert(th == (pthread_t) *ident);
+    assert(th == (pthread_t) *handle);
+}
+
 /* XXX This implementation is considered (to quote Tim Peters) "inherently
    hosed" because:
      - It does not guarantee the promise that a non-zero integer is returned.
      - The cast to unsigned long is inherently unsafe.
      - It is not clear that the 'volatile' (for AIX?) are any longer necessary.
 */
-unsigned long
-PyThread_get_thread_ident(void)
-{
+PyThread_ident_t
+PyThread_get_thread_ident_ex(void) {
     volatile pthread_t threadid;
     if (!initialized)
         PyThread_init_thread();
     threadid = pthread_self();
-    return (unsigned long) threadid;
+    assert(threadid == (pthread_t) (PyThread_ident_t) threadid);
+    return (PyThread_ident_t) threadid;
+}
+
+unsigned long
+PyThread_get_thread_ident(void)
+{
+    return (unsigned long) PyThread_get_thread_ident_ex();
 }
 
 #ifdef PY_HAVE_THREAD_NATIVE_ID
index 48bad36ec449ab4cafa58a7ed4f05a48a590dac9..4741e594e52e656c4280a56646da04c6fd342328 100644 (file)
@@ -94,6 +94,15 @@ pthread_detach(pthread_t thread)
     return 0;
 }
 
+int
+pthread_join(pthread_t thread, void** value_ptr)
+{
+    if (value_ptr) {
+        *value_ptr = NULL;
+    }
+    return 0;
+}
+
 PyAPI_FUNC(pthread_t) pthread_self(void)
 {
     return 0;