import asyncio
+import threading
import unittest
from threading import Thread
from unittest import TestCase
with threading_helper.start_threads(threads):
pass
+ def test_all_tasks_different_thread(self) -> None:
+ loop = None
+ started = threading.Event()
+
+ async def coro():
+ await asyncio.sleep(0.01)
+
+ lock = threading.Lock()
+ tasks = set()
+
+ async def main():
+ nonlocal tasks, loop
+ loop = asyncio.get_running_loop()
+ started.set()
+ for i in range(1000):
+ with lock:
+ asyncio.create_task(coro())
+ tasks = self.all_tasks(loop)
+
+ runner = threading.Thread(target=lambda: asyncio.run(main()))
+
+ def check():
+ started.wait()
+ with lock:
+ self.assertSetEqual(tasks & self.all_tasks(loop), tasks)
+
+ threads = [threading.Thread(target=check) for _ in range(10)]
+ threads.append(runner)
+
+ with threading_helper.start_threads(threads):
+ pass
+
def test_run_coroutine_threadsafe(self) -> None:
results = []
#include "pycore_llist.h" // struct llist_node
#include "pycore_modsupport.h" // _PyArg_CheckPositional()
#include "pycore_moduleobject.h" // _PyModule_GetState()
+#include "pycore_object.h" // _PyObject_SetMaybeWeakref
#include "pycore_pyerrors.h" // _PyErr_ClearExcState()
#include "pycore_pylifecycle.h" // _Py_IsInterpreterFinalizing()
#include "pycore_pystate.h" // _PyThreadState_GET()
if (task_call_step_soon(state, self, NULL)) {
return -1;
}
+#ifdef Py_GIL_DISABLED
+ // This is required so that _Py_TryIncref(self)
+ // works correctly in non-owning threads.
+ _PyObject_SetMaybeWeakref((PyObject *)self);
+#endif
register_task(state, self);
return 0;
}