]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
GH-107803: double linked list implementation for asyncio tasks (GH-107804)
authorKumar Aditya <kumaraditya@python.org>
Sat, 22 Jun 2024 17:58:35 +0000 (23:28 +0530)
committerGitHub <noreply@github.com>
Sat, 22 Jun 2024 17:58:35 +0000 (10:58 -0700)
* linked list

* add tail optmiization to linked list

* wip

* wip

* wip

* more fixes

* finally it works

* add tests

* remove weakreflist

* add some comments

* reduce code duplication in _asynciomodule.c

* address some review comments

* add invariants about the state of the linked list

* add better explanation

* clinic regen

* reorder branches for better branch prediction

* Update Modules/_asynciomodule.c

* Apply suggestions from code review

Co-authored-by: Itamar Oren <itamarost@gmail.com>
* fix capturing of eager tasks

* add comment to task finalization

* fix tests and couple c implmentation to c task

improved linked-list logic and more comments

* fix test

---------

Co-authored-by: Itamar Oren <itamarost@gmail.com>
Include/internal/pycore_global_objects_fini_generated.h
Include/internal/pycore_global_strings.h
Include/internal/pycore_runtime_init_generated.h
Include/internal/pycore_unicodeobject_generated.h
Lib/asyncio/tasks.py
Lib/test/test_asyncio/test_tasks.py
Modules/_asynciomodule.c
Modules/clinic/_asynciomodule.c.h

index 62e10e2325b8fbec3c1a583e6b40c38634c550da..c0840f9eb7eca21d91aaf3785c0f4b0efdfe4526 100644 (file)
@@ -899,6 +899,7 @@ _PyStaticObjects_CheckRefcnt(PyInterpreterState *interp) {
     _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(displayhook));
     _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(dklen));
     _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(doc));
+    _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(done));
     _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(dont_inherit));
     _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(dst));
     _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(dst_dir_fd));
index 0e3d1a5a9a9c764c359ffc86dbaba88849bca44c..51735a8a726e114cee7c9e6c479e2130cd90c961 100644 (file)
@@ -388,6 +388,7 @@ struct _Py_global_strings {
         STRUCT_FOR_ID(displayhook)
         STRUCT_FOR_ID(dklen)
         STRUCT_FOR_ID(doc)
+        STRUCT_FOR_ID(done)
         STRUCT_FOR_ID(dont_inherit)
         STRUCT_FOR_ID(dst)
         STRUCT_FOR_ID(dst_dir_fd)
index 5b8f29146287a3a874afca0d3114e804773feb62..c5be67c6d80b9d10651562eae32cf96d10bd465a 100644 (file)
@@ -897,6 +897,7 @@ extern "C" {
     INIT_ID(displayhook), \
     INIT_ID(dklen), \
     INIT_ID(doc), \
+    INIT_ID(done), \
     INIT_ID(dont_inherit), \
     INIT_ID(dst), \
     INIT_ID(dst_dir_fd), \
index 7fa7bb79e21dc69655e7f46c131515cd24481d19..0e0ad6518771e98ab937d2c535abcdb44fd42cf4 100644 (file)
@@ -1352,6 +1352,10 @@ _PyUnicode_InitStaticStrings(PyInterpreterState *interp) {
     _PyUnicode_InternStatic(interp, &string);
     assert(_PyUnicode_CheckConsistency(string, 1));
     assert(PyUnicode_GET_LENGTH(string) != 1);
+    string = &_Py_ID(done);
+    _PyUnicode_InternStatic(interp, &string);
+    assert(_PyUnicode_CheckConsistency(string, 1));
+    assert(PyUnicode_GET_LENGTH(string) != 1);
     string = &_Py_ID(dont_inherit);
     _PyUnicode_InternStatic(interp, &string);
     assert(_PyUnicode_CheckConsistency(string, 1));
index dadcb5b5f36bd787af05b347d078e6118bbdc804..cd869931e01409d2a9fb84214550a56eebc99043 100644 (file)
@@ -1097,14 +1097,14 @@ _py_unregister_eager_task = _unregister_eager_task
 _py_enter_task = _enter_task
 _py_leave_task = _leave_task
 _py_swap_current_task = _swap_current_task
-
+_py_all_tasks = all_tasks
 
 try:
     from _asyncio import (_register_task, _register_eager_task,
                           _unregister_task, _unregister_eager_task,
                           _enter_task, _leave_task, _swap_current_task,
                           _scheduled_tasks, _eager_tasks, _current_tasks,
-                          current_task)
+                          current_task, all_tasks)
 except ImportError:
     pass
 else:
@@ -1116,3 +1116,4 @@ else:
     _c_enter_task = _enter_task
     _c_leave_task = _leave_task
     _c_swap_current_task = _swap_current_task
+    _c_all_tasks = all_tasks
index cc0d7f52a1bf4cddced1ddc626a45794cc2a2a41..9b22fb942c6339fe33dae2ad3c573619233e8b68 100644 (file)
@@ -86,6 +86,7 @@ class BaseTaskTests:
 
     Task = None
     Future = None
+    all_tasks = None
 
     def new_task(self, loop, coro, name='TestTask', context=None):
         return self.__class__.Task(coro, loop=loop, name=name, context=context)
@@ -2267,7 +2268,7 @@ class BaseTaskTests:
         coro = kill_me(self.loop)
         task = asyncio.ensure_future(coro, loop=self.loop)
 
-        self.assertEqual(asyncio.all_tasks(loop=self.loop), {task})
+        self.assertEqual(self.all_tasks(loop=self.loop), {task})
 
         asyncio.set_event_loop(None)
 
@@ -2282,7 +2283,7 @@ class BaseTaskTests:
         # no more reference to kill_me() task: the task is destroyed by the GC
         support.gc_collect()
 
-        self.assertEqual(asyncio.all_tasks(loop=self.loop), set())
+        self.assertEqual(self.all_tasks(loop=self.loop), set())
 
         mock_handler.assert_called_with(self.loop, {
             'message': 'Task was destroyed but it is pending!',
@@ -2431,7 +2432,7 @@ class BaseTaskTests:
         message = m_log.error.call_args[0][0]
         self.assertIn('Task was destroyed but it is pending', message)
 
-        self.assertEqual(asyncio.all_tasks(self.loop), set())
+        self.assertEqual(self.all_tasks(self.loop), set())
 
     def test_create_task_with_noncoroutine(self):
         with self.assertRaisesRegex(TypeError,
@@ -2731,6 +2732,7 @@ def add_subclass_tests(cls):
     # Add patched Task & Future back to the test case
     cls.Task = Task
     cls.Future = Future
+    cls.all_tasks = tasks.all_tasks
 
     # Add an extra unit-test
     cls.test_subclasses_ctask_cfuture = test_subclasses_ctask_cfuture
@@ -2804,6 +2806,7 @@ class CTask_CFuture_Tests(BaseTaskTests, SetMethodsTest,
 
     Task = getattr(tasks, '_CTask', None)
     Future = getattr(futures, '_CFuture', None)
+    all_tasks = getattr(tasks, '_c_all_tasks', None)
 
     @support.refcount_test
     def test_refleaks_in_task___init__(self):
@@ -2835,6 +2838,7 @@ class CTask_CFuture_SubclassTests(BaseTaskTests, test_utils.TestCase):
 
     Task = getattr(tasks, '_CTask', None)
     Future = getattr(futures, '_CFuture', None)
+    all_tasks = getattr(tasks, '_c_all_tasks', None)
 
 
 @unittest.skipUnless(hasattr(tasks, '_CTask'),
@@ -2844,6 +2848,7 @@ class CTaskSubclass_PyFuture_Tests(BaseTaskTests, test_utils.TestCase):
 
     Task = getattr(tasks, '_CTask', None)
     Future = futures._PyFuture
+    all_tasks = getattr(tasks, '_c_all_tasks', None)
 
 
 @unittest.skipUnless(hasattr(futures, '_CFuture'),
@@ -2853,6 +2858,7 @@ class PyTask_CFutureSubclass_Tests(BaseTaskTests, test_utils.TestCase):
 
     Future = getattr(futures, '_CFuture', None)
     Task = tasks._PyTask
+    all_tasks = tasks._py_all_tasks
 
 
 @unittest.skipUnless(hasattr(tasks, '_CTask'),
@@ -2861,6 +2867,7 @@ class CTask_PyFuture_Tests(BaseTaskTests, test_utils.TestCase):
 
     Task = getattr(tasks, '_CTask', None)
     Future = futures._PyFuture
+    all_tasks = getattr(tasks, '_c_all_tasks', None)
 
 
 @unittest.skipUnless(hasattr(futures, '_CFuture'),
@@ -2869,6 +2876,7 @@ class PyTask_CFuture_Tests(BaseTaskTests, test_utils.TestCase):
 
     Task = tasks._PyTask
     Future = getattr(futures, '_CFuture', None)
+    all_tasks = staticmethod(tasks._py_all_tasks)
 
 
 class PyTask_PyFuture_Tests(BaseTaskTests, SetMethodsTest,
@@ -2876,6 +2884,7 @@ class PyTask_PyFuture_Tests(BaseTaskTests, SetMethodsTest,
 
     Task = tasks._PyTask
     Future = futures._PyFuture
+    all_tasks = staticmethod(tasks._py_all_tasks)
 
 
 @add_subclass_tests
@@ -2915,6 +2924,7 @@ class BaseTaskIntrospectionTests:
     _unregister_task = None
     _enter_task = None
     _leave_task = None
+    all_tasks = None
 
     def test__register_task_1(self):
         class TaskLike:
@@ -2928,9 +2938,9 @@ class BaseTaskIntrospectionTests:
         task = TaskLike()
         loop = mock.Mock()
 
-        self.assertEqual(asyncio.all_tasks(loop), set())
+        self.assertEqual(self.all_tasks(loop), set())
         self._register_task(task)
-        self.assertEqual(asyncio.all_tasks(loop), {task})
+        self.assertEqual(self.all_tasks(loop), {task})
         self._unregister_task(task)
 
     def test__register_task_2(self):
@@ -2944,9 +2954,9 @@ class BaseTaskIntrospectionTests:
         task = TaskLike()
         loop = mock.Mock()
 
-        self.assertEqual(asyncio.all_tasks(loop), set())
+        self.assertEqual(self.all_tasks(loop), set())
         self._register_task(task)
-        self.assertEqual(asyncio.all_tasks(loop), {task})
+        self.assertEqual(self.all_tasks(loop), {task})
         self._unregister_task(task)
 
     def test__register_task_3(self):
@@ -2960,9 +2970,9 @@ class BaseTaskIntrospectionTests:
         task = TaskLike()
         loop = mock.Mock()
 
-        self.assertEqual(asyncio.all_tasks(loop), set())
+        self.assertEqual(self.all_tasks(loop), set())
         self._register_task(task)
-        self.assertEqual(asyncio.all_tasks(loop), set())
+        self.assertEqual(self.all_tasks(loop), set())
         self._unregister_task(task)
 
     def test__enter_task(self):
@@ -3013,13 +3023,13 @@ class BaseTaskIntrospectionTests:
         task.get_loop = lambda: loop
         self._register_task(task)
         self._unregister_task(task)
-        self.assertEqual(asyncio.all_tasks(loop), set())
+        self.assertEqual(self.all_tasks(loop), set())
 
     def test__unregister_task_not_registered(self):
         task = mock.Mock()
         loop = mock.Mock()
         self._unregister_task(task)
-        self.assertEqual(asyncio.all_tasks(loop), set())
+        self.assertEqual(self.all_tasks(loop), set())
 
 
 class PyIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests):
@@ -3027,6 +3037,7 @@ class PyIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests):
     _unregister_task = staticmethod(tasks._py_unregister_task)
     _enter_task = staticmethod(tasks._py_enter_task)
     _leave_task = staticmethod(tasks._py_leave_task)
+    all_tasks = staticmethod(tasks._py_all_tasks)
 
 
 @unittest.skipUnless(hasattr(tasks, '_c_register_task'),
@@ -3037,6 +3048,7 @@ class CIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests):
         _unregister_task = staticmethod(tasks._c_unregister_task)
         _enter_task = staticmethod(tasks._c_enter_task)
         _leave_task = staticmethod(tasks._c_leave_task)
+        all_tasks = staticmethod(tasks._c_all_tasks)
     else:
         _register_task = _unregister_task = _enter_task = _leave_task = None
 
index a26714f9755df5f0cd750e00a3124ab0b42f2ae8..4672fc7f60efd62349045d2a17667e75a583b252 100644 (file)
@@ -19,6 +19,60 @@ module _asyncio
 [clinic start generated code]*/
 /*[clinic end generated code: output=da39a3ee5e6b4b0d input=8fd17862aa989c69]*/
 
+typedef enum {
+    STATE_PENDING,
+    STATE_CANCELLED,
+    STATE_FINISHED
+} fut_state;
+
+#define FutureObj_HEAD(prefix)                                              \
+    PyObject_HEAD                                                           \
+    PyObject *prefix##_loop;                                                \
+    PyObject *prefix##_callback0;                                           \
+    PyObject *prefix##_context0;                                            \
+    PyObject *prefix##_callbacks;                                           \
+    PyObject *prefix##_exception;                                           \
+    PyObject *prefix##_exception_tb;                                        \
+    PyObject *prefix##_result;                                              \
+    PyObject *prefix##_source_tb;                                           \
+    PyObject *prefix##_cancel_msg;                                          \
+    PyObject *prefix##_cancelled_exc;                                       \
+    fut_state prefix##_state;                                               \
+    /* These bitfields need to be at the end of the struct
+       so that these and bitfields from TaskObj are contiguous.
+    */                                                                      \
+    unsigned prefix##_log_tb: 1;                                            \
+    unsigned prefix##_blocking: 1;
+
+typedef struct {
+    FutureObj_HEAD(fut)
+} FutureObj;
+
+typedef struct TaskObj {
+    FutureObj_HEAD(task)
+    unsigned task_must_cancel: 1;
+    unsigned task_log_destroy_pending: 1;
+    int task_num_cancels_requested;
+    PyObject *task_fut_waiter;
+    PyObject *task_coro;
+    PyObject *task_name;
+    PyObject *task_context;
+    struct TaskObj *next;
+    struct TaskObj *prev;
+} TaskObj;
+
+typedef struct {
+    PyObject_HEAD
+    TaskObj *sw_task;
+    PyObject *sw_arg;
+} TaskStepMethWrapper;
+
+
+#define Future_CheckExact(state, obj) Py_IS_TYPE(obj, state->FutureType)
+#define Task_CheckExact(state, obj) Py_IS_TYPE(obj, state->TaskType)
+
+#define Future_Check(state, obj) PyObject_TypeCheck(obj, state->FutureType)
+#define Task_Check(state, obj) PyObject_TypeCheck(obj, state->TaskType)
 
 #define FI_FREELIST_MAXLEN 255
 
@@ -38,8 +92,9 @@ typedef struct {
        all running event loops.  {EventLoop: Task} */
     PyObject *current_tasks;
 
-    /* WeakSet containing all tasks scheduled to run on event loops. */
-    PyObject *scheduled_tasks;
+    /* WeakSet containing scheduled 3rd party tasks which don't
+       inherit from native asyncio.Task */
+    PyObject *non_asyncio_tasks;
 
     /* Set containing all eagerly executing tasks. */
     PyObject *eager_tasks;
@@ -76,6 +131,51 @@ typedef struct {
 
     futureiterobject *fi_freelist;
     Py_ssize_t fi_freelist_len;
+
+    /* Linked-list of all tasks which are instances of asyncio.Task or subclasses
+       of it. Third party tasks implementations which don't inherit from
+       asyncio.Task are tracked separately using the 'non_asyncio_tasks' WeakSet.
+       `tail` is used as a sentinel to mark the end of the linked-list. It avoids one
+       branch in checking for empty list when adding a new task, the list is
+       initialized with `head` pointing to `tail` to mark an empty list.
+
+       Invariants:
+        * When the list is empty:
+        - asyncio_tasks.head == &asyncio_tasks.tail
+        - asyncio_tasks.head->prev == NULL
+        - asyncio_tasks.head->next == NULL
+
+        * After adding the first task 'task1':
+        - asyncio_tasks.head == task1
+        - task1->next == &asyncio_tasks.tail
+        - task1->prev == NULL
+        - asyncio_tasks.tail.prev == task1
+
+        * After adding a second task 'task2':
+        - asyncio_tasks.head == task2
+        - task2->next == task1
+        - task2->prev == NULL
+        - task1->prev == task2
+        - asyncio_tasks.tail.prev == task1
+
+        * After removing task 'task1':
+        - asyncio_tasks.head == task2
+        - task2->next == &asyncio_tasks.tail
+        - task2->prev == NULL
+        - asyncio_tasks.tail.prev == task2
+
+        * After removing task 'task2', the list is empty:
+        - asyncio_tasks.head == &asyncio_tasks.tail
+        - asyncio_tasks.head->prev == NULL
+        - asyncio_tasks.tail.prev == NULL
+        - asyncio_tasks.tail.next == NULL
+    */
+
+    struct {
+        TaskObj tail;
+        TaskObj *head;
+    } asyncio_tasks;
+
 } asyncio_state;
 
 static inline asyncio_state *
@@ -105,59 +205,6 @@ get_asyncio_state_by_def(PyObject *self)
     return get_asyncio_state(mod);
 }
 
-typedef enum {
-    STATE_PENDING,
-    STATE_CANCELLED,
-    STATE_FINISHED
-} fut_state;
-
-#define FutureObj_HEAD(prefix)                                              \
-    PyObject_HEAD                                                           \
-    PyObject *prefix##_loop;                                                \
-    PyObject *prefix##_callback0;                                           \
-    PyObject *prefix##_context0;                                            \
-    PyObject *prefix##_callbacks;                                           \
-    PyObject *prefix##_exception;                                           \
-    PyObject *prefix##_exception_tb;                                        \
-    PyObject *prefix##_result;                                              \
-    PyObject *prefix##_source_tb;                                           \
-    PyObject *prefix##_cancel_msg;                                          \
-    PyObject *prefix##_cancelled_exc;                                       \
-    fut_state prefix##_state;                                               \
-    /* These bitfields need to be at the end of the struct
-       so that these and bitfields from TaskObj are contiguous.
-    */                                                                      \
-    unsigned prefix##_log_tb: 1;                                            \
-    unsigned prefix##_blocking: 1;
-
-typedef struct {
-    FutureObj_HEAD(fut)
-} FutureObj;
-
-typedef struct {
-    FutureObj_HEAD(task)
-    unsigned task_must_cancel: 1;
-    unsigned task_log_destroy_pending: 1;
-    int task_num_cancels_requested;
-    PyObject *task_fut_waiter;
-    PyObject *task_coro;
-    PyObject *task_name;
-    PyObject *task_context;
-} TaskObj;
-
-typedef struct {
-    PyObject_HEAD
-    TaskObj *sw_task;
-    PyObject *sw_arg;
-} TaskStepMethWrapper;
-
-
-#define Future_CheckExact(state, obj) Py_IS_TYPE(obj, state->FutureType)
-#define Task_CheckExact(state, obj) Py_IS_TYPE(obj, state->TaskType)
-
-#define Future_Check(state, obj) PyObject_TypeCheck(obj, state->FutureType)
-#define Task_Check(state, obj) PyObject_TypeCheck(obj, state->TaskType)
-
 #include "clinic/_asynciomodule.c.h"
 
 
@@ -1967,16 +2014,21 @@ static  PyMethodDef TaskWakeupDef = {
 
 /* ----- Task introspection helpers */
 
-static int
-register_task(asyncio_state *state, PyObject *task)
+static void
+register_task(asyncio_state *state, TaskObj *task)
 {
-    PyObject *res = PyObject_CallMethodOneArg(state->scheduled_tasks,
-                                                 &_Py_ID(add), task);
-    if (res == NULL) {
-        return -1;
+    assert(Task_Check(state, task));
+    assert(task != &state->asyncio_tasks.tail);
+    if (task->next != NULL) {
+        // already registered
+        return;
     }
-    Py_DECREF(res);
-    return 0;
+    assert(task->prev == NULL);
+    assert(state->asyncio_tasks.head != NULL);
+
+    task->next = state->asyncio_tasks.head;
+    state->asyncio_tasks.head->prev = task;
+    state->asyncio_tasks.head = task;
 }
 
 static int
@@ -1985,16 +2037,27 @@ register_eager_task(asyncio_state *state, PyObject *task)
     return PySet_Add(state->eager_tasks, task);
 }
 
-static int
-unregister_task(asyncio_state *state, PyObject *task)
-{
-    PyObject *res = PyObject_CallMethodOneArg(state->scheduled_tasks,
-                                     &_Py_ID(discard), task);
-    if (res == NULL) {
-        return -1;
+static void
+unregister_task(asyncio_state *state, TaskObj *task)
+{
+    assert(Task_Check(state, task));
+    assert(task != &state->asyncio_tasks.tail);
+    if (task->next == NULL) {
+        // not registered
+        assert(task->prev == NULL);
+        assert(state->asyncio_tasks.head != task);
+        return;
     }
-    Py_DECREF(res);
-    return 0;
+    task->next->prev = task->prev;
+    if (task->prev == NULL) {
+        assert(state->asyncio_tasks.head == task);
+        state->asyncio_tasks.head = task->next;
+    } else {
+        task->prev->next = task->next;
+    }
+    task->next = NULL;
+    task->prev = NULL;
+    assert(state->asyncio_tasks.head != task);
 }
 
 static int
@@ -2178,7 +2241,8 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop,
     if (task_call_step_soon(state, self, NULL)) {
         return -1;
     }
-    return register_task(state, (PyObject*)self);
+    register_task(state, self);
+    return 0;
 }
 
 static int
@@ -2586,6 +2650,15 @@ _asyncio_Task_set_name(TaskObj *self, PyObject *value)
 static void
 TaskObj_finalize(TaskObj *task)
 {
+    asyncio_state *state = get_asyncio_state_by_def((PyObject *)task);
+    // Unregister the task from the linked list of tasks.
+    // Since task is a native task, we directly call the
+    // unregister_task function. Third party event loops
+    // should use the asyncio._unregister_task function.
+    // See https://docs.python.org/3/library/asyncio-extending.html#task-lifetime-support
+
+    unregister_task(state, task);
+
     PyObject *context;
     PyObject *message = NULL;
     PyObject *func;
@@ -3197,9 +3270,7 @@ task_eager_start(asyncio_state *state, TaskObj *task)
     }
 
     if (task->task_state == STATE_PENDING) {
-        if (register_task(state, (PyObject *)task) == -1) {
-            retval = -1;
-        }
+        register_task(state, task);
     } else {
         // This seems to really help performance on pyperformance benchmarks
         Py_CLEAR(task->task_coro);
@@ -3365,9 +3436,20 @@ _asyncio__register_task_impl(PyObject *module, PyObject *task)
 /*[clinic end generated code: output=8672dadd69a7d4e2 input=21075aaea14dfbad]*/
 {
     asyncio_state *state = get_asyncio_state(module);
-    if (register_task(state, task) < 0) {
+    if (Task_Check(state, task)) {
+        // task is an asyncio.Task instance or subclass, use efficient
+        // linked-list implementation.
+        register_task(state, (TaskObj *)task);
+        Py_RETURN_NONE;
+    }
+    // As task does not inherit from asyncio.Task, fallback to less efficient
+    // weakset implementation.
+    PyObject *res = PyObject_CallMethodOneArg(state->non_asyncio_tasks,
+                                              &_Py_ID(add), task);
+    if (res == NULL) {
         return NULL;
     }
+    Py_DECREF(res);
     Py_RETURN_NONE;
 }
 
@@ -3408,9 +3490,16 @@ _asyncio__unregister_task_impl(PyObject *module, PyObject *task)
 /*[clinic end generated code: output=6e5585706d568a46 input=28fb98c3975f7bdc]*/
 {
     asyncio_state *state = get_asyncio_state(module);
-    if (unregister_task(state, task) < 0) {
+    if (Task_Check(state, task)) {
+        unregister_task(state, (TaskObj *)task);
+        Py_RETURN_NONE;
+    }
+    PyObject *res = PyObject_CallMethodOneArg(state->non_asyncio_tasks,
+                                              &_Py_ID(discard), task);
+    if (res == NULL) {
         return NULL;
     }
+    Py_DECREF(res);
     Py_RETURN_NONE;
 }
 
@@ -3541,8 +3630,115 @@ _asyncio_current_task_impl(PyObject *module, PyObject *loop)
 }
 
 
+static inline int
+add_one_task(asyncio_state *state, PyObject *tasks, PyObject *task, PyObject *loop)
+{
+    PyObject *done = PyObject_CallMethodNoArgs(task, &_Py_ID(done));
+    if (done == NULL) {
+        return -1;
+    }
+    if (Py_IsTrue(done)) {
+        return 0;
+    }
+    Py_DECREF(done);
+    PyObject *task_loop = get_future_loop(state, task);
+    if (task_loop == NULL) {
+        return -1;
+    }
+    if (task_loop == loop) {
+        if (PySet_Add(tasks, task) < 0) {
+            Py_DECREF(task_loop);
+            return -1;
+        }
+    }
+    Py_DECREF(task_loop);
+    return 0;
+}
+
 /*********************** Module **************************/
 
+/*[clinic input]
+_asyncio.all_tasks
+
+    loop: object = None
+
+Return a set of all tasks for the loop.
+
+[clinic start generated code]*/
+
+static PyObject *
+_asyncio_all_tasks_impl(PyObject *module, PyObject *loop)
+/*[clinic end generated code: output=0e107cbb7f72aa7b input=43a1b423c2d95bfa]*/
+{
+
+    asyncio_state *state = get_asyncio_state(module);
+    PyObject *tasks = PySet_New(NULL);
+    if (tasks == NULL) {
+        return NULL;
+    }
+    if (loop == Py_None) {
+        loop = _asyncio_get_running_loop_impl(module);
+        if (loop == NULL) {
+            Py_DECREF(tasks);
+            return NULL;
+        }
+    } else {
+        Py_INCREF(loop);
+    }
+    // First add eager tasks to the set so that we don't miss
+    // any tasks which graduates from eager to non-eager
+    PyObject *eager_iter = PyObject_GetIter(state->eager_tasks);
+    if (eager_iter == NULL) {
+        Py_DECREF(tasks);
+        Py_DECREF(loop);
+        return NULL;
+    }
+    PyObject *item;
+    while ((item = PyIter_Next(eager_iter)) != NULL) {
+        if (add_one_task(state, tasks, item, loop) < 0) {
+            Py_DECREF(tasks);
+            Py_DECREF(loop);
+            Py_DECREF(item);
+            Py_DECREF(eager_iter);
+            return NULL;
+        }
+        Py_DECREF(item);
+    }
+    Py_DECREF(eager_iter);
+    TaskObj *head = state->asyncio_tasks.head;
+    assert(head != NULL);
+    assert(head->prev == NULL);
+    TaskObj *tail = &state->asyncio_tasks.tail;
+    while (head != tail)
+    {
+        if (add_one_task(state, tasks, (PyObject *)head, loop) < 0) {
+            Py_DECREF(tasks);
+            Py_DECREF(loop);
+            return NULL;
+        }
+        head = head->next;
+        assert(head != NULL);
+    }
+    PyObject *scheduled_iter = PyObject_GetIter(state->non_asyncio_tasks);
+    if (scheduled_iter == NULL) {
+        Py_DECREF(tasks);
+        Py_DECREF(loop);
+        return NULL;
+    }
+    while ((item = PyIter_Next(scheduled_iter)) != NULL) {
+        if (add_one_task(state, tasks, item, loop) < 0) {
+            Py_DECREF(tasks);
+            Py_DECREF(loop);
+            Py_DECREF(item);
+            Py_DECREF(scheduled_iter);
+            return NULL;
+        }
+        Py_DECREF(item);
+    }
+    Py_DECREF(scheduled_iter);
+    Py_DECREF(loop);
+    return tasks;
+}
 
 static void
 module_free_freelists(asyncio_state *state)
@@ -3584,7 +3780,7 @@ module_traverse(PyObject *mod, visitproc visit, void *arg)
     Py_VISIT(state->asyncio_InvalidStateError);
     Py_VISIT(state->asyncio_CancelledError);
 
-    Py_VISIT(state->scheduled_tasks);
+    Py_VISIT(state->non_asyncio_tasks);
     Py_VISIT(state->eager_tasks);
     Py_VISIT(state->current_tasks);
     Py_VISIT(state->iscoroutine_typecache);
@@ -3622,7 +3818,7 @@ module_clear(PyObject *mod)
     Py_CLEAR(state->asyncio_InvalidStateError);
     Py_CLEAR(state->asyncio_CancelledError);
 
-    Py_CLEAR(state->scheduled_tasks);
+    Py_CLEAR(state->non_asyncio_tasks);
     Py_CLEAR(state->eager_tasks);
     Py_CLEAR(state->current_tasks);
     Py_CLEAR(state->iscoroutine_typecache);
@@ -3703,9 +3899,9 @@ module_init(asyncio_state *state)
     PyObject *weak_set;
     WITH_MOD("weakref")
     GET_MOD_ATTR(weak_set, "WeakSet");
-    state->scheduled_tasks = PyObject_CallNoArgs(weak_set);
+    state->non_asyncio_tasks = PyObject_CallNoArgs(weak_set);
     Py_CLEAR(weak_set);
-    if (state->scheduled_tasks == NULL) {
+    if (state->non_asyncio_tasks == NULL) {
         goto fail;
     }
 
@@ -3740,6 +3936,7 @@ static PyMethodDef asyncio_methods[] = {
     _ASYNCIO__ENTER_TASK_METHODDEF
     _ASYNCIO__LEAVE_TASK_METHODDEF
     _ASYNCIO__SWAP_CURRENT_TASK_METHODDEF
+    _ASYNCIO_ALL_TASKS_METHODDEF
     {NULL, NULL}
 };
 
@@ -3747,6 +3944,7 @@ static int
 module_exec(PyObject *mod)
 {
     asyncio_state *state = get_asyncio_state(mod);
+    state->asyncio_tasks.head = &state->asyncio_tasks.tail;
 
 #define CREATE_TYPE(m, tp, spec, base)                                  \
     do {                                                                \
@@ -3776,7 +3974,7 @@ module_exec(PyObject *mod)
         return -1;
     }
 
-    if (PyModule_AddObjectRef(mod, "_scheduled_tasks", state->scheduled_tasks) < 0) {
+    if (PyModule_AddObjectRef(mod, "_scheduled_tasks", state->non_asyncio_tasks) < 0) {
         return -1;
     }
 
index 6a9c8ff6d8fdd9ee00c58cbb135763cc411b523a..d619a124ccead51350b3de9bd0e182a0769dae6d 100644 (file)
@@ -1487,4 +1487,64 @@ skip_optional_pos:
 exit:
     return return_value;
 }
-/*[clinic end generated code: output=b26155080c82c472 input=a9049054013a1b77]*/
+
+PyDoc_STRVAR(_asyncio_all_tasks__doc__,
+"all_tasks($module, /, loop=None)\n"
+"--\n"
+"\n"
+"Return a set of all tasks for the loop.");
+
+#define _ASYNCIO_ALL_TASKS_METHODDEF    \
+    {"all_tasks", _PyCFunction_CAST(_asyncio_all_tasks), METH_FASTCALL|METH_KEYWORDS, _asyncio_all_tasks__doc__},
+
+static PyObject *
+_asyncio_all_tasks_impl(PyObject *module, PyObject *loop);
+
+static PyObject *
+_asyncio_all_tasks(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
+{
+    PyObject *return_value = NULL;
+    #if defined(Py_BUILD_CORE) && !defined(Py_BUILD_CORE_MODULE)
+
+    #define NUM_KEYWORDS 1
+    static struct {
+        PyGC_Head _this_is_not_used;
+        PyObject_VAR_HEAD
+        PyObject *ob_item[NUM_KEYWORDS];
+    } _kwtuple = {
+        .ob_base = PyVarObject_HEAD_INIT(&PyTuple_Type, NUM_KEYWORDS)
+        .ob_item = { &_Py_ID(loop), },
+    };
+    #undef NUM_KEYWORDS
+    #define KWTUPLE (&_kwtuple.ob_base.ob_base)
+
+    #else  // !Py_BUILD_CORE
+    #  define KWTUPLE NULL
+    #endif  // !Py_BUILD_CORE
+
+    static const char * const _keywords[] = {"loop", NULL};
+    static _PyArg_Parser _parser = {
+        .keywords = _keywords,
+        .fname = "all_tasks",
+        .kwtuple = KWTUPLE,
+    };
+    #undef KWTUPLE
+    PyObject *argsbuf[1];
+    Py_ssize_t noptargs = nargs + (kwnames ? PyTuple_GET_SIZE(kwnames) : 0) - 0;
+    PyObject *loop = Py_None;
+
+    args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 0, 1, 0, argsbuf);
+    if (!args) {
+        goto exit;
+    }
+    if (!noptargs) {
+        goto skip_optional_pos;
+    }
+    loop = args[0];
+skip_optional_pos:
+    return_value = _asyncio_all_tasks_impl(module, loop);
+
+exit:
+    return return_value;
+}
+/*[clinic end generated code: output=ffe9b71bc65888b3 input=a9049054013a1b77]*/