Task = None
Future = None
+ all_tasks = None
def new_task(self, loop, coro, name='TestTask', context=None):
return self.__class__.Task(coro, loop=loop, name=name, context=context)
coro = kill_me(self.loop)
task = asyncio.ensure_future(coro, loop=self.loop)
- self.assertEqual(asyncio.all_tasks(loop=self.loop), {task})
+ self.assertEqual(self.all_tasks(loop=self.loop), {task})
asyncio.set_event_loop(None)
# no more reference to kill_me() task: the task is destroyed by the GC
support.gc_collect()
- self.assertEqual(asyncio.all_tasks(loop=self.loop), set())
+ self.assertEqual(self.all_tasks(loop=self.loop), set())
mock_handler.assert_called_with(self.loop, {
'message': 'Task was destroyed but it is pending!',
message = m_log.error.call_args[0][0]
self.assertIn('Task was destroyed but it is pending', message)
- self.assertEqual(asyncio.all_tasks(self.loop), set())
+ self.assertEqual(self.all_tasks(self.loop), set())
def test_create_task_with_noncoroutine(self):
with self.assertRaisesRegex(TypeError,
# Add patched Task & Future back to the test case
cls.Task = Task
cls.Future = Future
+ cls.all_tasks = tasks.all_tasks
# Add an extra unit-test
cls.test_subclasses_ctask_cfuture = test_subclasses_ctask_cfuture
Task = getattr(tasks, '_CTask', None)
Future = getattr(futures, '_CFuture', None)
+ all_tasks = getattr(tasks, '_c_all_tasks', None)
@support.refcount_test
def test_refleaks_in_task___init__(self):
Task = getattr(tasks, '_CTask', None)
Future = getattr(futures, '_CFuture', None)
+ all_tasks = getattr(tasks, '_c_all_tasks', None)
@unittest.skipUnless(hasattr(tasks, '_CTask'),
Task = getattr(tasks, '_CTask', None)
Future = futures._PyFuture
+ all_tasks = getattr(tasks, '_c_all_tasks', None)
@unittest.skipUnless(hasattr(futures, '_CFuture'),
Future = getattr(futures, '_CFuture', None)
Task = tasks._PyTask
+ all_tasks = tasks._py_all_tasks
@unittest.skipUnless(hasattr(tasks, '_CTask'),
Task = getattr(tasks, '_CTask', None)
Future = futures._PyFuture
+ all_tasks = getattr(tasks, '_c_all_tasks', None)
@unittest.skipUnless(hasattr(futures, '_CFuture'),
Task = tasks._PyTask
Future = getattr(futures, '_CFuture', None)
+ all_tasks = staticmethod(tasks._py_all_tasks)
class PyTask_PyFuture_Tests(BaseTaskTests, SetMethodsTest,
Task = tasks._PyTask
Future = futures._PyFuture
+ all_tasks = staticmethod(tasks._py_all_tasks)
@add_subclass_tests
_unregister_task = None
_enter_task = None
_leave_task = None
+ all_tasks = None
def test__register_task_1(self):
class TaskLike:
task = TaskLike()
loop = mock.Mock()
- self.assertEqual(asyncio.all_tasks(loop), set())
+ self.assertEqual(self.all_tasks(loop), set())
self._register_task(task)
- self.assertEqual(asyncio.all_tasks(loop), {task})
+ self.assertEqual(self.all_tasks(loop), {task})
self._unregister_task(task)
def test__register_task_2(self):
task = TaskLike()
loop = mock.Mock()
- self.assertEqual(asyncio.all_tasks(loop), set())
+ self.assertEqual(self.all_tasks(loop), set())
self._register_task(task)
- self.assertEqual(asyncio.all_tasks(loop), {task})
+ self.assertEqual(self.all_tasks(loop), {task})
self._unregister_task(task)
def test__register_task_3(self):
task = TaskLike()
loop = mock.Mock()
- self.assertEqual(asyncio.all_tasks(loop), set())
+ self.assertEqual(self.all_tasks(loop), set())
self._register_task(task)
- self.assertEqual(asyncio.all_tasks(loop), set())
+ self.assertEqual(self.all_tasks(loop), set())
self._unregister_task(task)
def test__enter_task(self):
task.get_loop = lambda: loop
self._register_task(task)
self._unregister_task(task)
- self.assertEqual(asyncio.all_tasks(loop), set())
+ self.assertEqual(self.all_tasks(loop), set())
def test__unregister_task_not_registered(self):
task = mock.Mock()
loop = mock.Mock()
self._unregister_task(task)
- self.assertEqual(asyncio.all_tasks(loop), set())
+ self.assertEqual(self.all_tasks(loop), set())
class PyIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests):
_unregister_task = staticmethod(tasks._py_unregister_task)
_enter_task = staticmethod(tasks._py_enter_task)
_leave_task = staticmethod(tasks._py_leave_task)
+ all_tasks = staticmethod(tasks._py_all_tasks)
@unittest.skipUnless(hasattr(tasks, '_c_register_task'),
_unregister_task = staticmethod(tasks._c_unregister_task)
_enter_task = staticmethod(tasks._c_enter_task)
_leave_task = staticmethod(tasks._c_leave_task)
+ all_tasks = staticmethod(tasks._c_all_tasks)
else:
_register_task = _unregister_task = _enter_task = _leave_task = None
[clinic start generated code]*/
/*[clinic end generated code: output=da39a3ee5e6b4b0d input=8fd17862aa989c69]*/
+typedef enum {
+ STATE_PENDING,
+ STATE_CANCELLED,
+ STATE_FINISHED
+} fut_state;
+
+#define FutureObj_HEAD(prefix) \
+ PyObject_HEAD \
+ PyObject *prefix##_loop; \
+ PyObject *prefix##_callback0; \
+ PyObject *prefix##_context0; \
+ PyObject *prefix##_callbacks; \
+ PyObject *prefix##_exception; \
+ PyObject *prefix##_exception_tb; \
+ PyObject *prefix##_result; \
+ PyObject *prefix##_source_tb; \
+ PyObject *prefix##_cancel_msg; \
+ PyObject *prefix##_cancelled_exc; \
+ fut_state prefix##_state; \
+ /* These bitfields need to be at the end of the struct
+ so that these and bitfields from TaskObj are contiguous.
+ */ \
+ unsigned prefix##_log_tb: 1; \
+ unsigned prefix##_blocking: 1;
+
+typedef struct {
+ FutureObj_HEAD(fut)
+} FutureObj;
+
+typedef struct TaskObj {
+ FutureObj_HEAD(task)
+ unsigned task_must_cancel: 1;
+ unsigned task_log_destroy_pending: 1;
+ int task_num_cancels_requested;
+ PyObject *task_fut_waiter;
+ PyObject *task_coro;
+ PyObject *task_name;
+ PyObject *task_context;
+ struct TaskObj *next;
+ struct TaskObj *prev;
+} TaskObj;
+
+typedef struct {
+ PyObject_HEAD
+ TaskObj *sw_task;
+ PyObject *sw_arg;
+} TaskStepMethWrapper;
+
+
+#define Future_CheckExact(state, obj) Py_IS_TYPE(obj, state->FutureType)
+#define Task_CheckExact(state, obj) Py_IS_TYPE(obj, state->TaskType)
+
+#define Future_Check(state, obj) PyObject_TypeCheck(obj, state->FutureType)
+#define Task_Check(state, obj) PyObject_TypeCheck(obj, state->TaskType)
#define FI_FREELIST_MAXLEN 255
all running event loops. {EventLoop: Task} */
PyObject *current_tasks;
- /* WeakSet containing all tasks scheduled to run on event loops. */
- PyObject *scheduled_tasks;
+ /* WeakSet containing scheduled 3rd party tasks which don't
+ inherit from native asyncio.Task */
+ PyObject *non_asyncio_tasks;
/* Set containing all eagerly executing tasks. */
PyObject *eager_tasks;
futureiterobject *fi_freelist;
Py_ssize_t fi_freelist_len;
+
+ /* Linked-list of all tasks which are instances of asyncio.Task or subclasses
+ of it. Third party tasks implementations which don't inherit from
+ asyncio.Task are tracked separately using the 'non_asyncio_tasks' WeakSet.
+ `tail` is used as a sentinel to mark the end of the linked-list. It avoids one
+ branch in checking for empty list when adding a new task, the list is
+ initialized with `head` pointing to `tail` to mark an empty list.
+
+ Invariants:
+ * When the list is empty:
+ - asyncio_tasks.head == &asyncio_tasks.tail
+ - asyncio_tasks.head->prev == NULL
+ - asyncio_tasks.head->next == NULL
+
+ * After adding the first task 'task1':
+ - asyncio_tasks.head == task1
+ - task1->next == &asyncio_tasks.tail
+ - task1->prev == NULL
+ - asyncio_tasks.tail.prev == task1
+
+ * After adding a second task 'task2':
+ - asyncio_tasks.head == task2
+ - task2->next == task1
+ - task2->prev == NULL
+ - task1->prev == task2
+ - asyncio_tasks.tail.prev == task1
+
+ * After removing task 'task1':
+ - asyncio_tasks.head == task2
+ - task2->next == &asyncio_tasks.tail
+ - task2->prev == NULL
+ - asyncio_tasks.tail.prev == task2
+
+ * After removing task 'task2', the list is empty:
+ - asyncio_tasks.head == &asyncio_tasks.tail
+ - asyncio_tasks.head->prev == NULL
+ - asyncio_tasks.tail.prev == NULL
+ - asyncio_tasks.tail.next == NULL
+ */
+
+ struct {
+ TaskObj tail;
+ TaskObj *head;
+ } asyncio_tasks;
+
} asyncio_state;
static inline asyncio_state *
return get_asyncio_state(mod);
}
-typedef enum {
- STATE_PENDING,
- STATE_CANCELLED,
- STATE_FINISHED
-} fut_state;
-
-#define FutureObj_HEAD(prefix) \
- PyObject_HEAD \
- PyObject *prefix##_loop; \
- PyObject *prefix##_callback0; \
- PyObject *prefix##_context0; \
- PyObject *prefix##_callbacks; \
- PyObject *prefix##_exception; \
- PyObject *prefix##_exception_tb; \
- PyObject *prefix##_result; \
- PyObject *prefix##_source_tb; \
- PyObject *prefix##_cancel_msg; \
- PyObject *prefix##_cancelled_exc; \
- fut_state prefix##_state; \
- /* These bitfields need to be at the end of the struct
- so that these and bitfields from TaskObj are contiguous.
- */ \
- unsigned prefix##_log_tb: 1; \
- unsigned prefix##_blocking: 1;
-
-typedef struct {
- FutureObj_HEAD(fut)
-} FutureObj;
-
-typedef struct {
- FutureObj_HEAD(task)
- unsigned task_must_cancel: 1;
- unsigned task_log_destroy_pending: 1;
- int task_num_cancels_requested;
- PyObject *task_fut_waiter;
- PyObject *task_coro;
- PyObject *task_name;
- PyObject *task_context;
-} TaskObj;
-
-typedef struct {
- PyObject_HEAD
- TaskObj *sw_task;
- PyObject *sw_arg;
-} TaskStepMethWrapper;
-
-
-#define Future_CheckExact(state, obj) Py_IS_TYPE(obj, state->FutureType)
-#define Task_CheckExact(state, obj) Py_IS_TYPE(obj, state->TaskType)
-
-#define Future_Check(state, obj) PyObject_TypeCheck(obj, state->FutureType)
-#define Task_Check(state, obj) PyObject_TypeCheck(obj, state->TaskType)
-
#include "clinic/_asynciomodule.c.h"
/* ----- Task introspection helpers */
-static int
-register_task(asyncio_state *state, PyObject *task)
+static void
+register_task(asyncio_state *state, TaskObj *task)
{
- PyObject *res = PyObject_CallMethodOneArg(state->scheduled_tasks,
- &_Py_ID(add), task);
- if (res == NULL) {
- return -1;
+ assert(Task_Check(state, task));
+ assert(task != &state->asyncio_tasks.tail);
+ if (task->next != NULL) {
+ // already registered
+ return;
}
- Py_DECREF(res);
- return 0;
+ assert(task->prev == NULL);
+ assert(state->asyncio_tasks.head != NULL);
+
+ task->next = state->asyncio_tasks.head;
+ state->asyncio_tasks.head->prev = task;
+ state->asyncio_tasks.head = task;
}
static int
return PySet_Add(state->eager_tasks, task);
}
-static int
-unregister_task(asyncio_state *state, PyObject *task)
-{
- PyObject *res = PyObject_CallMethodOneArg(state->scheduled_tasks,
- &_Py_ID(discard), task);
- if (res == NULL) {
- return -1;
+static void
+unregister_task(asyncio_state *state, TaskObj *task)
+{
+ assert(Task_Check(state, task));
+ assert(task != &state->asyncio_tasks.tail);
+ if (task->next == NULL) {
+ // not registered
+ assert(task->prev == NULL);
+ assert(state->asyncio_tasks.head != task);
+ return;
}
- Py_DECREF(res);
- return 0;
+ task->next->prev = task->prev;
+ if (task->prev == NULL) {
+ assert(state->asyncio_tasks.head == task);
+ state->asyncio_tasks.head = task->next;
+ } else {
+ task->prev->next = task->next;
+ }
+ task->next = NULL;
+ task->prev = NULL;
+ assert(state->asyncio_tasks.head != task);
}
static int
if (task_call_step_soon(state, self, NULL)) {
return -1;
}
- return register_task(state, (PyObject*)self);
+ register_task(state, self);
+ return 0;
}
static int
static void
TaskObj_finalize(TaskObj *task)
{
+ asyncio_state *state = get_asyncio_state_by_def((PyObject *)task);
+ // Unregister the task from the linked list of tasks.
+ // Since task is a native task, we directly call the
+ // unregister_task function. Third party event loops
+ // should use the asyncio._unregister_task function.
+ // See https://docs.python.org/3/library/asyncio-extending.html#task-lifetime-support
+
+ unregister_task(state, task);
+
PyObject *context;
PyObject *message = NULL;
PyObject *func;
}
if (task->task_state == STATE_PENDING) {
- if (register_task(state, (PyObject *)task) == -1) {
- retval = -1;
- }
+ register_task(state, task);
} else {
// This seems to really help performance on pyperformance benchmarks
Py_CLEAR(task->task_coro);
/*[clinic end generated code: output=8672dadd69a7d4e2 input=21075aaea14dfbad]*/
{
asyncio_state *state = get_asyncio_state(module);
- if (register_task(state, task) < 0) {
+ if (Task_Check(state, task)) {
+ // task is an asyncio.Task instance or subclass, use efficient
+ // linked-list implementation.
+ register_task(state, (TaskObj *)task);
+ Py_RETURN_NONE;
+ }
+ // As task does not inherit from asyncio.Task, fallback to less efficient
+ // weakset implementation.
+ PyObject *res = PyObject_CallMethodOneArg(state->non_asyncio_tasks,
+ &_Py_ID(add), task);
+ if (res == NULL) {
return NULL;
}
+ Py_DECREF(res);
Py_RETURN_NONE;
}
/*[clinic end generated code: output=6e5585706d568a46 input=28fb98c3975f7bdc]*/
{
asyncio_state *state = get_asyncio_state(module);
- if (unregister_task(state, task) < 0) {
+ if (Task_Check(state, task)) {
+ unregister_task(state, (TaskObj *)task);
+ Py_RETURN_NONE;
+ }
+ PyObject *res = PyObject_CallMethodOneArg(state->non_asyncio_tasks,
+ &_Py_ID(discard), task);
+ if (res == NULL) {
return NULL;
}
+ Py_DECREF(res);
Py_RETURN_NONE;
}
}
+static inline int
+add_one_task(asyncio_state *state, PyObject *tasks, PyObject *task, PyObject *loop)
+{
+ PyObject *done = PyObject_CallMethodNoArgs(task, &_Py_ID(done));
+ if (done == NULL) {
+ return -1;
+ }
+ if (Py_IsTrue(done)) {
+ return 0;
+ }
+ Py_DECREF(done);
+ PyObject *task_loop = get_future_loop(state, task);
+ if (task_loop == NULL) {
+ return -1;
+ }
+ if (task_loop == loop) {
+ if (PySet_Add(tasks, task) < 0) {
+ Py_DECREF(task_loop);
+ return -1;
+ }
+ }
+ Py_DECREF(task_loop);
+ return 0;
+}
+
/*********************** Module **************************/
+/*[clinic input]
+_asyncio.all_tasks
+
+ loop: object = None
+
+Return a set of all tasks for the loop.
+
+[clinic start generated code]*/
+
+static PyObject *
+_asyncio_all_tasks_impl(PyObject *module, PyObject *loop)
+/*[clinic end generated code: output=0e107cbb7f72aa7b input=43a1b423c2d95bfa]*/
+{
+
+ asyncio_state *state = get_asyncio_state(module);
+ PyObject *tasks = PySet_New(NULL);
+ if (tasks == NULL) {
+ return NULL;
+ }
+ if (loop == Py_None) {
+ loop = _asyncio_get_running_loop_impl(module);
+ if (loop == NULL) {
+ Py_DECREF(tasks);
+ return NULL;
+ }
+ } else {
+ Py_INCREF(loop);
+ }
+ // First add eager tasks to the set so that we don't miss
+ // any tasks which graduates from eager to non-eager
+ PyObject *eager_iter = PyObject_GetIter(state->eager_tasks);
+ if (eager_iter == NULL) {
+ Py_DECREF(tasks);
+ Py_DECREF(loop);
+ return NULL;
+ }
+ PyObject *item;
+ while ((item = PyIter_Next(eager_iter)) != NULL) {
+ if (add_one_task(state, tasks, item, loop) < 0) {
+ Py_DECREF(tasks);
+ Py_DECREF(loop);
+ Py_DECREF(item);
+ Py_DECREF(eager_iter);
+ return NULL;
+ }
+ Py_DECREF(item);
+ }
+ Py_DECREF(eager_iter);
+ TaskObj *head = state->asyncio_tasks.head;
+ assert(head != NULL);
+ assert(head->prev == NULL);
+ TaskObj *tail = &state->asyncio_tasks.tail;
+ while (head != tail)
+ {
+ if (add_one_task(state, tasks, (PyObject *)head, loop) < 0) {
+ Py_DECREF(tasks);
+ Py_DECREF(loop);
+ return NULL;
+ }
+ head = head->next;
+ assert(head != NULL);
+ }
+ PyObject *scheduled_iter = PyObject_GetIter(state->non_asyncio_tasks);
+ if (scheduled_iter == NULL) {
+ Py_DECREF(tasks);
+ Py_DECREF(loop);
+ return NULL;
+ }
+ while ((item = PyIter_Next(scheduled_iter)) != NULL) {
+ if (add_one_task(state, tasks, item, loop) < 0) {
+ Py_DECREF(tasks);
+ Py_DECREF(loop);
+ Py_DECREF(item);
+ Py_DECREF(scheduled_iter);
+ return NULL;
+ }
+ Py_DECREF(item);
+ }
+ Py_DECREF(scheduled_iter);
+ Py_DECREF(loop);
+ return tasks;
+}
static void
module_free_freelists(asyncio_state *state)
Py_VISIT(state->asyncio_InvalidStateError);
Py_VISIT(state->asyncio_CancelledError);
- Py_VISIT(state->scheduled_tasks);
+ Py_VISIT(state->non_asyncio_tasks);
Py_VISIT(state->eager_tasks);
Py_VISIT(state->current_tasks);
Py_VISIT(state->iscoroutine_typecache);
Py_CLEAR(state->asyncio_InvalidStateError);
Py_CLEAR(state->asyncio_CancelledError);
- Py_CLEAR(state->scheduled_tasks);
+ Py_CLEAR(state->non_asyncio_tasks);
Py_CLEAR(state->eager_tasks);
Py_CLEAR(state->current_tasks);
Py_CLEAR(state->iscoroutine_typecache);
PyObject *weak_set;
WITH_MOD("weakref")
GET_MOD_ATTR(weak_set, "WeakSet");
- state->scheduled_tasks = PyObject_CallNoArgs(weak_set);
+ state->non_asyncio_tasks = PyObject_CallNoArgs(weak_set);
Py_CLEAR(weak_set);
- if (state->scheduled_tasks == NULL) {
+ if (state->non_asyncio_tasks == NULL) {
goto fail;
}
_ASYNCIO__ENTER_TASK_METHODDEF
_ASYNCIO__LEAVE_TASK_METHODDEF
_ASYNCIO__SWAP_CURRENT_TASK_METHODDEF
+ _ASYNCIO_ALL_TASKS_METHODDEF
{NULL, NULL}
};
module_exec(PyObject *mod)
{
asyncio_state *state = get_asyncio_state(mod);
+ state->asyncio_tasks.head = &state->asyncio_tasks.tail;
#define CREATE_TYPE(m, tp, spec, base) \
do { \
return -1;
}
- if (PyModule_AddObjectRef(mod, "_scheduled_tasks", state->scheduled_tasks) < 0) {
+ if (PyModule_AddObjectRef(mod, "_scheduled_tasks", state->non_asyncio_tasks) < 0) {
return -1;
}