PyObject *task_name;
PyObject *task_context;
struct llist_node task_node;
+#ifdef Py_GIL_DISABLED
+ // thread id of the thread where this task was created
+ uintptr_t task_tid;
+#endif
} TaskObj;
typedef struct {
|| PyObject_TypeCheck(obj, state->FutureType) \
|| PyObject_TypeCheck(obj, state->TaskType))
-#ifdef Py_GIL_DISABLED
-# define ASYNCIO_STATE_LOCK(state) Py_BEGIN_CRITICAL_SECTION_MUT(&state->mutex)
-# define ASYNCIO_STATE_UNLOCK(state) Py_END_CRITICAL_SECTION()
-#else
-# define ASYNCIO_STATE_LOCK(state) ((void)state)
-# define ASYNCIO_STATE_UNLOCK(state) ((void)state)
-#endif
-
typedef struct _Py_AsyncioModuleDebugOffsets {
struct _asyncio_task_object {
uint64_t size;
/* State of the _asyncio module */
typedef struct {
-#ifdef Py_GIL_DISABLED
- PyMutex mutex;
-#endif
PyTypeObject *FutureIterType;
PyTypeObject *TaskStepMethWrapper_Type;
PyTypeObject *FutureType;
/* Counter for autogenerated Task names */
uint64_t task_name_counter;
- /* Head of circular linked-list of all tasks which are instances of `asyncio.Task`
- or subclasses of it. Third party tasks implementations which don't inherit from
- `asyncio.Task` are tracked separately using the `non_asyncio_tasks` WeakSet.
- */
- struct llist_node asyncio_tasks_head;
} asyncio_state;
static inline asyncio_state *
static void
register_task(asyncio_state *state, TaskObj *task)
{
- ASYNCIO_STATE_LOCK(state);
assert(Task_Check(state, task));
if (task->task_node.next != NULL) {
// already registered
assert(task->task_node.prev != NULL);
- goto exit;
+ return;
}
- llist_insert_tail(&state->asyncio_tasks_head, &task->task_node);
-exit:
- ASYNCIO_STATE_UNLOCK(state);
+ _PyThreadStateImpl *tstate = (_PyThreadStateImpl *) _PyThreadState_GET();
+ struct llist_node *head = &tstate->asyncio_tasks_head;
+ llist_insert_tail(head, &task->task_node);
}
static int
return PySet_Add(state->eager_tasks, task);
}
-static void
-unregister_task(asyncio_state *state, TaskObj *task)
+static inline void
+unregister_task_safe(TaskObj *task)
{
- ASYNCIO_STATE_LOCK(state);
- assert(Task_Check(state, task));
if (task->task_node.next == NULL) {
// not registered
assert(task->task_node.prev == NULL);
- goto exit;
+ return;
}
llist_remove(&task->task_node);
-exit:
- ASYNCIO_STATE_UNLOCK(state);
+}
+
+static void
+unregister_task(asyncio_state *state, TaskObj *task)
+{
+ assert(Task_Check(state, task));
+#ifdef Py_GIL_DISABLED
+ // check if we are in the same thread
+ // if so, we can avoid locking
+ if (task->task_tid == _Py_ThreadId()) {
+ unregister_task_safe(task);
+ }
+ else {
+ // we are in a different thread
+ // stop the world then check and remove the task
+ PyThreadState *tstate = _PyThreadState_GET();
+ _PyEval_StopTheWorld(tstate->interp);
+ unregister_task_safe(task);
+ _PyEval_StartTheWorld(tstate->interp);
+ }
+#else
+ unregister_task_safe(task);
+#endif
}
static int
}
Py_CLEAR(self->task_fut_waiter);
+#ifdef Py_GIL_DISABLED
+ self->task_tid = _Py_ThreadId();
+#endif
self->task_must_cancel = 0;
self->task_log_destroy_pending = 1;
self->task_num_cancels_requested = 0;
static inline int
add_one_task(asyncio_state *state, PyObject *tasks, PyObject *task, PyObject *loop)
{
+ assert(PySet_CheckExact(tasks));
PyObject *done = PyObject_CallMethodNoArgs(task, &_Py_ID(done));
if (done == NULL) {
return -1;
return 0;
}
+static inline int
+add_tasks_llist(struct llist_node *head, PyListObject *tasks)
+{
+ struct llist_node *node;
+ llist_for_each_safe(node, head) {
+ TaskObj *task = llist_data(node, TaskObj, task_node);
+ // The linked list holds borrowed references to task
+ // as such it is possible that the task is concurrently
+ // deallocated while added to this list.
+ // To protect against concurrent deallocations,
+ // we first try to incref the task which would fail
+ // if it is concurrently getting deallocated in another thread,
+ // otherwise it gets added to the list.
+ if (_Py_TryIncref((PyObject *)task)) {
+ if (_PyList_AppendTakeRef(tasks, (PyObject *)task) < 0) {
+ // do not call any escaping calls here while the world is stopped.
+ return -1;
+ }
+ }
+ }
+ return 0;
+}
+
+static inline int
+add_tasks_interp(PyInterpreterState *interp, PyListObject *tasks)
+{
+#ifdef Py_GIL_DISABLED
+ assert(interp->stoptheworld.world_stopped);
+#endif
+ // Start traversing from interpreter's linked list
+ struct llist_node *head = &interp->asyncio_tasks_head;
+
+ if (add_tasks_llist(head, tasks) < 0) {
+ return -1;
+ }
+
+ int ret = 0;
+ // traverse the task lists of thread states
+ _Py_FOR_EACH_TSTATE_BEGIN(interp, p) {
+ _PyThreadStateImpl *ts = (_PyThreadStateImpl *)p;
+ head = &ts->asyncio_tasks_head;
+ if (add_tasks_llist(head, tasks) < 0) {
+ ret = -1;
+ goto exit;
+ }
+ }
+exit:
+ _Py_FOR_EACH_TSTATE_END(interp);
+ return ret;
+}
+
/*********************** Module **************************/
/*[clinic input]
Py_DECREF(loop);
return NULL;
}
- int err = 0;
- ASYNCIO_STATE_LOCK(state);
- struct llist_node *node;
-
- llist_for_each_safe(node, &state->asyncio_tasks_head) {
- TaskObj *task = llist_data(node, TaskObj, task_node);
- // The linked list holds borrowed references to task
- // as such it is possible that the task is concurrently
- // deallocated while added to this list.
- // To protect against concurrent deallocations,
- // we first try to incref the task which would fail
- // if it is concurrently getting deallocated in another thread,
- // otherwise it gets added to the list.
- if (_Py_TryIncref((PyObject *)task)) {
- if (_PyList_AppendTakeRef((PyListObject *)tasks, (PyObject *)task) < 0) {
- Py_DECREF(tasks);
- Py_DECREF(loop);
- err = 1;
- break;
- }
- }
- }
- ASYNCIO_STATE_UNLOCK(state);
- if (err) {
+ PyInterpreterState *interp = PyInterpreterState_Get();
+ // Stop the world and traverse the per-thread linked list
+ // of asyncio tasks for every thread, as well as the
+ // interpreter's linked list, and add them to `tasks`.
+ // The interpreter linked list is used for any lingering tasks
+ // whose thread state has been deallocated while the task was
+ // still alive. This can happen if a task is referenced by
+ // a different thread, in which case the task is moved to
+ // the interpreter's linked list from the thread's linked
+ // list before deallocation. See PyThreadState_Clear.
+ //
+ // The stop-the-world pause is required so that no thread
+ // modifies its linked list while being iterated here
+ // in parallel. This design allows for lock-free
+ // register_task/unregister_task for loops running in parallel
+ // in different threads (the general case).
+ _PyEval_StopTheWorld(interp);
+ int ret = add_tasks_interp(interp, (PyListObject *)tasks);
+ _PyEval_StartTheWorld(interp);
+ if (ret < 0) {
+ // call any escaping calls after starting the world to avoid any deadlocks.
+ Py_DECREF(tasks);
+ Py_DECREF(loop);
return NULL;
}
PyObject *scheduled_iter = PyObject_GetIter(state->non_asyncio_tasks);
{
asyncio_state *state = get_asyncio_state(mod);
- llist_init(&state->asyncio_tasks_head);
#define CREATE_TYPE(m, tp, spec, base) \
do { \