PyObject *asyncio_mod;
PyObject *context_kwname;
- /* Dictionary containing tasks that are currently active in
- all running event loops. {EventLoop: Task} */
- PyObject *current_tasks;
-
/* WeakSet containing scheduled 3rd party tasks which don't
inherit from native asyncio.Task */
PyObject *non_asyncio_tasks;
static PyObject * task_wakeup(TaskObj *, PyObject *);
static PyObject * task_step(asyncio_state *, TaskObj *, PyObject *);
static int task_eager_start(asyncio_state *state, TaskObj *task);
-static inline void clear_ts_asyncio_running_task(PyObject *loop);
-static inline void set_ts_asyncio_running_task(PyObject *loop, PyObject *task);
/* ----- Task._step wrapper */
}
static int
-enter_task(asyncio_state *state, PyObject *loop, PyObject *task)
+enter_task(PyObject *loop, PyObject *task)
{
- PyObject *item;
- int res = PyDict_SetDefaultRef(state->current_tasks, loop, task, &item);
- if (res < 0) {
+ _PyThreadStateImpl *ts = (_PyThreadStateImpl *)_PyThreadState_GET();
+
+ if (ts->asyncio_running_loop != loop) {
+ PyErr_Format(PyExc_RuntimeError, "loop %R is not the running loop", loop);
return -1;
}
- else if (res == 1) {
+
+ if (ts->asyncio_running_task != NULL) {
PyErr_Format(
PyExc_RuntimeError,
"Cannot enter into task %R while another " \
"task %R is being executed.",
- task, item, NULL);
- Py_DECREF(item);
+ task, ts->asyncio_running_task, NULL);
return -1;
}
- assert(task == item);
- Py_CLEAR(item);
- set_ts_asyncio_running_task(loop, task);
+ ts->asyncio_running_task = Py_NewRef(task);
return 0;
}
static int
-err_leave_task(PyObject *item, PyObject *task)
+leave_task(PyObject *loop, PyObject *task)
{
- PyErr_Format(
- PyExc_RuntimeError,
- "Leaving task %R does not match the current task %R.",
- task, item);
- return -1;
-}
-
-static int
-leave_task_predicate(PyObject *item, void *task)
-{
- if (item != task) {
- return err_leave_task(item, (PyObject *)task);
- }
- return 1;
-}
+ _PyThreadStateImpl *ts = (_PyThreadStateImpl *)_PyThreadState_GET();
-static int
-leave_task(asyncio_state *state, PyObject *loop, PyObject *task)
-{
- int res = _PyDict_DelItemIf(state->current_tasks, loop,
- leave_task_predicate, task);
- if (res == 0) {
- // task was not found
- return err_leave_task(Py_None, task);
+ if (ts->asyncio_running_loop != loop) {
+ PyErr_Format(PyExc_RuntimeError, "loop %R is not the running loop", loop);
+ return -1;
}
- clear_ts_asyncio_running_task(loop);
- return res;
-}
-static PyObject *
-swap_current_task_lock_held(PyDictObject *current_tasks, PyObject *loop,
- Py_hash_t hash, PyObject *task)
-{
- PyObject *prev_task;
- if (_PyDict_GetItemRef_KnownHash_LockHeld(current_tasks, loop, hash, &prev_task) < 0) {
- return NULL;
- }
- if (_PyDict_SetItem_KnownHash_LockHeld(current_tasks, loop, task, hash) < 0) {
- Py_XDECREF(prev_task);
- return NULL;
- }
- if (prev_task == NULL) {
- Py_RETURN_NONE;
+ if (ts->asyncio_running_task != task) {
+ PyErr_Format(
+ PyExc_RuntimeError,
+ "Invalid attempt to leave task %R while " \
+ "task %R is entered.",
+ task, ts->asyncio_running_task ? ts->asyncio_running_task : Py_None, NULL);
+ return -1;
}
- return prev_task;
+ Py_CLEAR(ts->asyncio_running_task);
+ return 0;
}
static PyObject *
-swap_current_task(asyncio_state *state, PyObject *loop, PyObject *task)
+swap_current_task(PyObject *loop, PyObject *task)
{
- PyObject *prev_task;
-
- clear_ts_asyncio_running_task(loop);
- if (task == Py_None) {
- if (PyDict_Pop(state->current_tasks, loop, &prev_task) < 0) {
- return NULL;
- }
- if (prev_task == NULL) {
- Py_RETURN_NONE;
- }
- return prev_task;
- }
+ _PyThreadStateImpl *ts = (_PyThreadStateImpl *)_PyThreadState_GET();
- Py_hash_t hash = PyObject_Hash(loop);
- if (hash == -1) {
+ if (ts->asyncio_running_loop != loop) {
+ PyErr_Format(PyExc_RuntimeError, "loop %R is not the running loop", loop);
return NULL;
}
- PyDictObject *current_tasks = (PyDictObject *)state->current_tasks;
- Py_BEGIN_CRITICAL_SECTION(current_tasks);
- prev_task = swap_current_task_lock_held(current_tasks, loop, hash, task);
- Py_END_CRITICAL_SECTION();
- set_ts_asyncio_running_task(loop, task);
- return prev_task;
-}
-
-static inline void
-set_ts_asyncio_running_task(PyObject *loop, PyObject *task)
-{
- // We want to enable debuggers and profilers to be able to quickly
- // introspect the asyncio running state from another process.
- // When we do that, we need to essentially traverse the address space
- // of a Python process and understand what every Python thread in it is
- // currently doing, mainly:
- //
- // * current frame
- // * current asyncio task
- //
- // A naive solution would be to require profilers and debuggers to
- // find the current task in the "_asynciomodule" module state, but
- // unfortunately that would require a lot of complicated remote
- // memory reads and logic, as Python's dict is a notoriously complex
- // and ever-changing data structure.
- //
- // So the easier solution is to put a strong reference to the currently
- // running `asyncio.Task` on the current thread state (the current loop
- // is also stored there.)
- _PyThreadStateImpl *ts = (_PyThreadStateImpl *)_PyThreadState_GET();
- if (ts->asyncio_running_loop == loop) {
- // Protect from a situation when someone calls this method
- // from another thread. This shouldn't ever happen though,
- // as `enter_task` and `leave_task` can either be called by:
- //
- // - `asyncio.Task` itself, in `Task.__step()`. That method
- // can only be called by the event loop itself.
- //
- // - third-party Task "from scratch" implementations, that
- // our `capture_call_graph` API doesn't support anyway.
- //
- // That said, we still want to make sure we don't end up in
- // a broken state, so we check that we're in the correct thread
- // by comparing the *loop* argument to the event loop running
- // in the current thread. If they match we know we're in the
- // right thread, as asyncio event loops don't change threads.
- assert(ts->asyncio_running_task == NULL);
+ /* transfer ownership to avoid redundant ref counting */
+ PyObject *prev_task = ts->asyncio_running_task;
+ if (task != Py_None) {
ts->asyncio_running_task = Py_NewRef(task);
+ } else {
+ ts->asyncio_running_task = NULL;
}
-}
-
-static inline void
-clear_ts_asyncio_running_task(PyObject *loop)
-{
- // See comment in set_ts_asyncio_running_task() for details.
- _PyThreadStateImpl *ts = (_PyThreadStateImpl *)_PyThreadState_GET();
- if (ts->asyncio_running_loop == NULL || ts->asyncio_running_loop == loop) {
- Py_CLEAR(ts->asyncio_running_task);
+ if (prev_task == NULL) {
+ Py_RETURN_NONE;
}
+ return prev_task;
}
/* ----- Task */
{
PyObject *res;
- if (enter_task(state, task->task_loop, (PyObject*)task) < 0) {
+ if (enter_task(task->task_loop, (PyObject*)task) < 0) {
return NULL;
}
if (res == NULL) {
PyObject *exc = PyErr_GetRaisedException();
- leave_task(state, task->task_loop, (PyObject*)task);
+ leave_task(task->task_loop, (PyObject*)task);
_PyErr_ChainExceptions1(exc);
return NULL;
}
else {
- if (leave_task(state, task->task_loop, (PyObject*)task) < 0) {
+ if (leave_task(task->task_loop, (PyObject*)task) < 0) {
Py_DECREF(res);
return NULL;
}
task_eager_start(asyncio_state *state, TaskObj *task)
{
assert(task != NULL);
- PyObject *prevtask = swap_current_task(state, task->task_loop, (PyObject *)task);
+ PyObject *prevtask = swap_current_task(task->task_loop, (PyObject *)task);
if (prevtask == NULL) {
return -1;
}
Py_DECREF(stepres);
}
- PyObject *curtask = swap_current_task(state, task->task_loop, prevtask);
+ PyObject *curtask = swap_current_task(task->task_loop, prevtask);
Py_DECREF(prevtask);
if (curtask == NULL) {
retval = -1;
_asyncio__enter_task_impl(PyObject *module, PyObject *loop, PyObject *task)
/*[clinic end generated code: output=a22611c858035b73 input=de1b06dca70d8737]*/
{
- asyncio_state *state = get_asyncio_state(module);
- if (enter_task(state, loop, task) < 0) {
+ if (enter_task(loop, task) < 0) {
return NULL;
}
Py_RETURN_NONE;
_asyncio__leave_task_impl(PyObject *module, PyObject *loop, PyObject *task)
/*[clinic end generated code: output=0ebf6db4b858fb41 input=51296a46313d1ad8]*/
{
- asyncio_state *state = get_asyncio_state(module);
- if (leave_task(state, loop, task) < 0) {
+ if (leave_task(loop, task) < 0) {
return NULL;
}
Py_RETURN_NONE;
PyObject *task)
/*[clinic end generated code: output=9f88de958df74c7e input=c9c72208d3d38b6c]*/
{
- return swap_current_task(get_asyncio_state(module), loop, task);
+ return swap_current_task(loop, task);
}
_asyncio_current_task_impl(PyObject *module, PyObject *loop)
/*[clinic end generated code: output=fe15ac331a7f981a input=58910f61a5627112]*/
{
- PyObject *ret;
- asyncio_state *state = get_asyncio_state(module);
-
if (loop == Py_None) {
loop = _asyncio_get_running_loop_impl(module);
if (loop == NULL) {
Py_INCREF(loop);
}
- int rc = PyDict_GetItemRef(state->current_tasks, loop, &ret);
- Py_DECREF(loop);
- if (rc == 0) {
+ _PyThreadStateImpl *ts = (_PyThreadStateImpl *)_PyThreadState_GET();
+ // Fast path for the current running loop of current thread
+ // no locking or stop the world pause is required
+ if (ts->asyncio_running_loop == loop) {
+ if (ts->asyncio_running_task != NULL) {
+ Py_DECREF(loop);
+ return Py_NewRef(ts->asyncio_running_task);
+ }
+ Py_DECREF(loop);
Py_RETURN_NONE;
}
+
+ PyObject *ret = Py_None;
+ // Stop the world and traverse the per-thread current tasks
+ // and return the task if the loop matches
+ PyInterpreterState *interp = ts->base.interp;
+ _PyEval_StopTheWorld(interp);
+ _Py_FOR_EACH_TSTATE_BEGIN(interp, p) {
+ ts = (_PyThreadStateImpl *)p;
+ if (ts->asyncio_running_loop == loop) {
+ if (ts->asyncio_running_task != NULL) {
+ ret = Py_NewRef(ts->asyncio_running_task);
+ }
+ goto exit;
+ }
+ }
+exit:
+ _Py_FOR_EACH_TSTATE_END(interp);
+ _PyEval_StartTheWorld(interp);
+ Py_DECREF(loop);
return ret;
}
Py_VISIT(state->non_asyncio_tasks);
Py_VISIT(state->eager_tasks);
- Py_VISIT(state->current_tasks);
Py_VISIT(state->iscoroutine_typecache);
Py_VISIT(state->context_kwname);
Py_CLEAR(state->non_asyncio_tasks);
Py_CLEAR(state->eager_tasks);
- Py_CLEAR(state->current_tasks);
Py_CLEAR(state->iscoroutine_typecache);
Py_CLEAR(state->context_kwname);
goto fail;
}
- state->current_tasks = PyDict_New();
- if (state->current_tasks == NULL) {
- goto fail;
- }
-
state->iscoroutine_typecache = PySet_New(NULL);
if (state->iscoroutine_typecache == NULL) {
goto fail;
return -1;
}
- if (PyModule_AddObjectRef(mod, "_current_tasks", state->current_tasks) < 0) {
- return -1;
- }
-
-
return 0;
}