]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-128002: use per threads tasks linked list in asyncio (#128869)
authorKumar Aditya <kumaraditya@python.org>
Thu, 6 Feb 2025 18:51:07 +0000 (00:21 +0530)
committerGitHub <noreply@github.com>
Thu, 6 Feb 2025 18:51:07 +0000 (19:51 +0100)
Co-authored-by: Ɓukasz Langa <lukasz@langa.pl>
Include/internal/pycore_interp.h
Include/internal/pycore_lock.h
Include/internal/pycore_pystate.h
Include/internal/pycore_tstate.h
Lib/test/test_asyncio/test_free_threading.py
Modules/_asynciomodule.c
Python/pystate.c

index 6f00eca8de05afba63fb87fd9fdda97e6897dd63..7fdfc7903477de39303cc7c2a86d00961c67ddf1 100644 (file)
@@ -227,6 +227,13 @@ struct _is {
     PyMutex weakref_locks[NUM_WEAKREF_LIST_LOCKS];
     _PyIndexPool tlbc_indices;
 #endif
+    // Per-interpreter list of tasks, any lingering tasks from thread
+    // states gets added here and removed from the corresponding
+    // thread state's list.
+    struct llist_node asyncio_tasks_head;
+    // `asyncio_tasks_lock` is used when tasks are moved
+    // from thread's list to interpreter's list.
+    PyMutex asyncio_tasks_lock;
 
     // Per-interpreter state for the obmalloc allocator.  For the main
     // interpreter and for all interpreters that don't have their
index 8bcb23a6ce9f9d04650c1241ffc2b198dcd635fc..7484b05d7f2446a00a7d6a1270cba0aeb60eb8d5 100644 (file)
@@ -52,7 +52,7 @@ typedef enum _PyLockFlags {
 
 // Lock a mutex with an optional timeout and additional options. See
 // _PyLockFlags for details.
-extern PyLockStatus
+extern PyAPI_FUNC(PyLockStatus)
 _PyMutex_LockTimed(PyMutex *m, PyTime_t timeout_ns, _PyLockFlags flags);
 
 // Lock a mutex with additional options. See _PyLockFlags for details.
index ff3b222b1578104346ab47def90b78ecb0043e27..9ec59e60f609ab585fb193c28ac7682d43effec8 100644 (file)
@@ -182,8 +182,8 @@ extern void _PyEval_StartTheWorldAll(_PyRuntimeState *runtime);
 // Perform a stop-the-world pause for threads in the specified interpreter.
 //
 // NOTE: This is a no-op outside of Py_GIL_DISABLED builds.
-extern void _PyEval_StopTheWorld(PyInterpreterState *interp);
-extern void _PyEval_StartTheWorld(PyInterpreterState *interp);
+extern PyAPI_FUNC(void) _PyEval_StopTheWorld(PyInterpreterState *interp);
+extern PyAPI_FUNC(void) _PyEval_StartTheWorld(PyInterpreterState *interp);
 
 
 static inline void
index 74e1452763e56c4143d5122532090ddc494e72cc..932623f54c426071507cbae4f63793770ab8fc43 100644 (file)
@@ -24,9 +24,14 @@ typedef struct _PyThreadStateImpl {
     PyObject *asyncio_running_loop; // Strong reference
     PyObject *asyncio_running_task; // Strong reference
 
+    /* Head of circular linked-list of all tasks which are instances of `asyncio.Task`
+       or subclasses of it used in `asyncio.all_tasks`.
+    */
+    struct llist_node asyncio_tasks_head;
     struct _qsbr_thread_state *qsbr;  // only used by free-threaded build
     struct llist_node mem_free_queue; // delayed free queue
 
+
 #ifdef Py_GIL_DISABLED
     struct _gc_thread_state gc;
     struct _mimalloc_thread_state mimalloc;
index 6da398e77e77979cc166561427c0f32e57df6790..d0221d87062c5b4687afcab11a60a511dcb95281 100644 (file)
@@ -3,7 +3,8 @@ import threading
 import unittest
 from threading import Thread
 from unittest import TestCase
-
+import weakref
+from test import support
 from test.support import threading_helper
 
 threading_helper.requires_working_threading(module=True)
@@ -95,6 +96,22 @@ class TestFreeThreading:
         done.set()
         runner.join()
 
+    def test_task_different_thread_finalized(self) -> None:
+        task = None
+        async def func():
+            nonlocal task
+            task = asyncio.current_task()
+
+        thread = Thread(target=lambda: asyncio.run(func()))
+        thread.start()
+        thread.join()
+        wr = weakref.ref(task)
+        del thread
+        del task
+        # task finalization in different thread shouldn't crash
+        support.gc_collect()
+        self.assertIsNone(wr())
+
     def test_run_coroutine_threadsafe(self) -> None:
         results = []
 
index 09ab8f13fe1f5d510e767d905688be2d67363bb2..656c03a98d73b20d45a19f588ceb9715f336b704 100644 (file)
@@ -67,6 +67,10 @@ typedef struct TaskObj {
     PyObject *task_name;
     PyObject *task_context;
     struct llist_node task_node;
+#ifdef Py_GIL_DISABLED
+    // thread id of the thread where this task was created
+    uintptr_t task_tid;
+#endif
 } TaskObj;
 
 typedef struct {
@@ -94,14 +98,6 @@ typedef struct {
      || PyObject_TypeCheck(obj, state->FutureType)      \
      || PyObject_TypeCheck(obj, state->TaskType))
 
-#ifdef Py_GIL_DISABLED
-#   define ASYNCIO_STATE_LOCK(state) Py_BEGIN_CRITICAL_SECTION_MUT(&state->mutex)
-#   define ASYNCIO_STATE_UNLOCK(state) Py_END_CRITICAL_SECTION()
-#else
-#   define ASYNCIO_STATE_LOCK(state) ((void)state)
-#   define ASYNCIO_STATE_UNLOCK(state) ((void)state)
-#endif
-
 typedef struct _Py_AsyncioModuleDebugOffsets {
     struct _asyncio_task_object {
         uint64_t size;
@@ -135,9 +131,6 @@ GENERATE_DEBUG_SECTION(AsyncioDebug, Py_AsyncioModuleDebugOffsets AsyncioDebug)
 
 /* State of the _asyncio module */
 typedef struct {
-#ifdef Py_GIL_DISABLED
-    PyMutex mutex;
-#endif
     PyTypeObject *FutureIterType;
     PyTypeObject *TaskStepMethWrapper_Type;
     PyTypeObject *FutureType;
@@ -184,11 +177,6 @@ typedef struct {
     /* Counter for autogenerated Task names */
     uint64_t task_name_counter;
 
-    /* Head of circular linked-list of all tasks which are instances of `asyncio.Task`
-       or subclasses of it. Third party tasks implementations which don't inherit from
-       `asyncio.Task` are tracked separately using the `non_asyncio_tasks` WeakSet.
-    */
-    struct llist_node asyncio_tasks_head;
 } asyncio_state;
 
 static inline asyncio_state *
@@ -2179,16 +2167,15 @@ static  PyMethodDef TaskWakeupDef = {
 static void
 register_task(asyncio_state *state, TaskObj *task)
 {
-    ASYNCIO_STATE_LOCK(state);
     assert(Task_Check(state, task));
     if (task->task_node.next != NULL) {
         // already registered
         assert(task->task_node.prev != NULL);
-        goto exit;
+        return;
     }
-    llist_insert_tail(&state->asyncio_tasks_head, &task->task_node);
-exit:
-    ASYNCIO_STATE_UNLOCK(state);
+    _PyThreadStateImpl *tstate = (_PyThreadStateImpl *) _PyThreadState_GET();
+    struct llist_node *head = &tstate->asyncio_tasks_head;
+    llist_insert_tail(head, &task->task_node);
 }
 
 static int
@@ -2197,19 +2184,38 @@ register_eager_task(asyncio_state *state, PyObject *task)
     return PySet_Add(state->eager_tasks, task);
 }
 
-static void
-unregister_task(asyncio_state *state, TaskObj *task)
+static inline void
+unregister_task_safe(TaskObj *task)
 {
-    ASYNCIO_STATE_LOCK(state);
-    assert(Task_Check(state, task));
     if (task->task_node.next == NULL) {
         // not registered
         assert(task->task_node.prev == NULL);
-        goto exit;
+        return;
     }
     llist_remove(&task->task_node);
-exit:
-    ASYNCIO_STATE_UNLOCK(state);
+}
+
+static void
+unregister_task(asyncio_state *state, TaskObj *task)
+{
+    assert(Task_Check(state, task));
+#ifdef Py_GIL_DISABLED
+    // check if we are in the same thread
+    // if so, we can avoid locking
+    if (task->task_tid == _Py_ThreadId()) {
+        unregister_task_safe(task);
+    }
+    else {
+        // we are in a different thread
+        // stop the world then check and remove the task
+        PyThreadState *tstate = _PyThreadState_GET();
+        _PyEval_StopTheWorld(tstate->interp);
+        unregister_task_safe(task);
+        _PyEval_StartTheWorld(tstate->interp);
+    }
+#else
+    unregister_task_safe(task);
+#endif
 }
 
 static int
@@ -2423,6 +2429,9 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop,
     }
 
     Py_CLEAR(self->task_fut_waiter);
+#ifdef Py_GIL_DISABLED
+    self->task_tid = _Py_ThreadId();
+#endif
     self->task_must_cancel = 0;
     self->task_log_destroy_pending = 1;
     self->task_num_cancels_requested = 0;
@@ -3981,6 +3990,7 @@ _asyncio_current_task_impl(PyObject *module, PyObject *loop)
 static inline int
 add_one_task(asyncio_state *state, PyObject *tasks, PyObject *task, PyObject *loop)
 {
+    assert(PySet_CheckExact(tasks));
     PyObject *done = PyObject_CallMethodNoArgs(task, &_Py_ID(done));
     if (done == NULL) {
         return -1;
@@ -4003,6 +4013,57 @@ add_one_task(asyncio_state *state, PyObject *tasks, PyObject *task, PyObject *lo
     return 0;
 }
 
+static inline int
+add_tasks_llist(struct llist_node *head, PyListObject *tasks)
+{
+    struct llist_node *node;
+    llist_for_each_safe(node, head) {
+        TaskObj *task = llist_data(node, TaskObj, task_node);
+        // The linked list holds borrowed references to task
+        // as such it is possible that the task is concurrently
+        // deallocated while added to this list.
+        // To protect against concurrent deallocations,
+        // we first try to incref the task which would fail
+        // if it is concurrently getting deallocated in another thread,
+        // otherwise it gets added to the list.
+        if (_Py_TryIncref((PyObject *)task)) {
+            if (_PyList_AppendTakeRef(tasks, (PyObject *)task) < 0) {
+                // do not call any escaping calls here while the world is stopped.
+                return -1;
+            }
+        }
+    }
+    return 0;
+}
+
+static inline int
+add_tasks_interp(PyInterpreterState *interp, PyListObject *tasks)
+{
+#ifdef Py_GIL_DISABLED
+    assert(interp->stoptheworld.world_stopped);
+#endif
+    // Start traversing from interpreter's linked list
+    struct llist_node *head = &interp->asyncio_tasks_head;
+
+    if (add_tasks_llist(head, tasks) < 0) {
+        return -1;
+    }
+
+    int ret = 0;
+    // traverse the task lists of thread states
+    _Py_FOR_EACH_TSTATE_BEGIN(interp, p) {
+        _PyThreadStateImpl *ts = (_PyThreadStateImpl *)p;
+        head = &ts->asyncio_tasks_head;
+        if (add_tasks_llist(head, tasks) < 0) {
+            ret = -1;
+            goto exit;
+        }
+    }
+exit:
+    _Py_FOR_EACH_TSTATE_END(interp);
+    return ret;
+}
+
 /*********************** Module **************************/
 
 /*[clinic input]
@@ -4041,30 +4102,29 @@ _asyncio_all_tasks_impl(PyObject *module, PyObject *loop)
         Py_DECREF(loop);
         return NULL;
     }
-    int err = 0;
-    ASYNCIO_STATE_LOCK(state);
-    struct llist_node *node;
-
-    llist_for_each_safe(node, &state->asyncio_tasks_head) {
-        TaskObj *task = llist_data(node, TaskObj, task_node);
-        // The linked list holds borrowed references to task
-        // as such it is possible that the task is concurrently
-        // deallocated while added to this list.
-        // To protect against concurrent deallocations,
-        // we first try to incref the task which would fail
-        // if it is concurrently getting deallocated in another thread,
-        // otherwise it gets added to the list.
-        if (_Py_TryIncref((PyObject *)task)) {
-            if (_PyList_AppendTakeRef((PyListObject *)tasks, (PyObject *)task) < 0) {
-                Py_DECREF(tasks);
-                Py_DECREF(loop);
-                err = 1;
-                break;
-            }
-        }
-    }
-    ASYNCIO_STATE_UNLOCK(state);
-    if (err) {
+    PyInterpreterState *interp = PyInterpreterState_Get();
+    // Stop the world and traverse the per-thread linked list
+    // of asyncio tasks for every thread, as well as the
+    // interpreter's linked list, and add them to `tasks`.
+    // The interpreter linked list is used for any lingering tasks
+    // whose thread state has been deallocated while the task was
+    // still alive. This can happen if a task is referenced by
+    // a different thread, in which case the task is moved to
+    // the interpreter's linked list from the thread's linked
+    // list before deallocation. See PyThreadState_Clear.
+    //
+    // The stop-the-world pause is required so that no thread
+    // modifies its linked list while being iterated here
+    // in parallel. This design allows for lock-free
+    // register_task/unregister_task for loops running in parallel
+    // in different threads (the general case).
+    _PyEval_StopTheWorld(interp);
+    int ret = add_tasks_interp(interp, (PyListObject *)tasks);
+    _PyEval_StartTheWorld(interp);
+    if (ret < 0) {
+        // call any escaping calls after starting the world to avoid any deadlocks.
+        Py_DECREF(tasks);
+        Py_DECREF(loop);
         return NULL;
     }
     PyObject *scheduled_iter = PyObject_GetIter(state->non_asyncio_tasks);
@@ -4348,7 +4408,6 @@ module_exec(PyObject *mod)
 {
     asyncio_state *state = get_asyncio_state(mod);
 
-    llist_init(&state->asyncio_tasks_head);
 
 #define CREATE_TYPE(m, tp, spec, base)                                  \
     do {                                                                \
index e6770ef40df7404f6e59c2f12c9eec3c6ff19913..89a652850e936379036289537514c85e26bf093d 100644 (file)
@@ -643,6 +643,8 @@ init_interpreter(PyInterpreterState *interp,
     _Py_brc_init_state(interp);
 #endif
     llist_init(&interp->mem_free_queue.head);
+    llist_init(&interp->asyncio_tasks_head);
+    interp->asyncio_tasks_lock = (PyMutex){0};
     for (int i = 0; i < _PY_MONITORING_UNGROUPED_EVENTS; i++) {
         interp->monitors.tools[i] = 0;
     }
@@ -1512,7 +1514,7 @@ init_threadstate(_PyThreadStateImpl *_tstate,
     tstate->delete_later = NULL;
 
     llist_init(&_tstate->mem_free_queue);
-
+    llist_init(&_tstate->asyncio_tasks_head);
     if (interp->stoptheworld.requested || _PyRuntime.stoptheworld.requested) {
         // Start in the suspended state if there is an ongoing stop-the-world.
         tstate->state = _Py_THREAD_SUSPENDED;
@@ -1692,6 +1694,14 @@ PyThreadState_Clear(PyThreadState *tstate)
     Py_CLEAR(((_PyThreadStateImpl *)tstate)->asyncio_running_loop);
     Py_CLEAR(((_PyThreadStateImpl *)tstate)->asyncio_running_task);
 
+
+    PyMutex_Lock(&tstate->interp->asyncio_tasks_lock);
+    // merge any lingering tasks from thread state to interpreter's
+    // tasks list
+    llist_concat(&tstate->interp->asyncio_tasks_head,
+                 &((_PyThreadStateImpl *)tstate)->asyncio_tasks_head);
+    PyMutex_Unlock(&tstate->interp->asyncio_tasks_lock);
+
     Py_CLEAR(tstate->dict);
     Py_CLEAR(tstate->async_exc);