Task = tasks._PyTask
def setUp(self):
+ self._all_tasks = asyncio.all_tasks
self._current_task = asyncio.current_task
asyncio.current_task = asyncio.tasks.current_task = asyncio.tasks._py_current_task
+ asyncio.all_tasks = asyncio.tasks.all_tasks = asyncio.tasks._py_all_tasks
return super().setUp()
def tearDown(self):
asyncio.current_task = asyncio.tasks.current_task = self._current_task
+ asyncio.all_tasks = asyncio.tasks.all_tasks = self._all_tasks
return super().tearDown()
def setUp(self):
self._current_task = asyncio.current_task
+ self._all_tasks = asyncio.all_tasks
asyncio.current_task = asyncio.tasks.current_task = asyncio.tasks._c_current_task
+ asyncio.all_tasks = asyncio.tasks.all_tasks = asyncio.tasks._c_all_tasks
return super().setUp()
def tearDown(self):
asyncio.current_task = asyncio.tasks.current_task = self._current_task
+ asyncio.all_tasks = asyncio.tasks.all_tasks = self._all_tasks
return super().tearDown()
for _ in range(100):
tasks.add(tg.create_task(coro()))
- all_tasks = self.all_tasks(loop)
+ all_tasks = asyncio.all_tasks(loop)
self.assertEqual(len(all_tasks), 101)
for task in all_tasks:
for i in range(1000):
with lock:
asyncio.create_task(coro())
- tasks = self.all_tasks(loop)
+ tasks = asyncio.all_tasks(loop)
done.wait()
runner = threading.Thread(target=lambda: asyncio.run(main()))
def check():
started.wait()
with lock:
- self.assertSetEqual(tasks & self.all_tasks(loop), tasks)
+ self.assertSetEqual(tasks & asyncio.all_tasks(loop), tasks)
threads = [threading.Thread(target=check) for _ in range(10)]
runner.start()
class TestPyFreeThreading(TestFreeThreading, TestCase):
- all_tasks = staticmethod(asyncio.tasks._py_all_tasks)
def setUp(self):
self._old_current_task = asyncio.current_task
asyncio.current_task = asyncio.tasks.current_task = asyncio.tasks._py_current_task
+ self._old_all_tasks = asyncio.all_tasks
+ asyncio.all_tasks = asyncio.tasks.all_tasks = asyncio.tasks._py_all_tasks
+ self._old_Task = asyncio.Task
+ asyncio.Task = asyncio.tasks.Task = asyncio.tasks._PyTask
+ self._old_Future = asyncio.Future
+ asyncio.Future = asyncio.futures.Future = asyncio.futures._PyFuture
return super().setUp()
def tearDown(self):
asyncio.current_task = asyncio.tasks.current_task = self._old_current_task
+ asyncio.all_tasks = asyncio.tasks.all_tasks = self._old_all_tasks
+ asyncio.Task = asyncio.tasks.Task = self._old_Task
+ asyncio.Future = asyncio.tasks.Future = self._old_Future
return super().tearDown()
def factory(self, loop, coro, **kwargs):
@unittest.skipUnless(hasattr(asyncio.tasks, "_c_all_tasks"), "requires _asyncio")
class TestCFreeThreading(TestFreeThreading, TestCase):
- all_tasks = staticmethod(getattr(asyncio.tasks, "_c_all_tasks", None))
def setUp(self):
self._old_current_task = asyncio.current_task
asyncio.current_task = asyncio.tasks.current_task = asyncio.tasks._c_current_task
+ self._old_all_tasks = asyncio.all_tasks
+ asyncio.all_tasks = asyncio.tasks.all_tasks = asyncio.tasks._c_all_tasks
+ self._old_Task = asyncio.Task
+ asyncio.Task = asyncio.tasks.Task = asyncio.tasks._CTask
+ self._old_Future = asyncio.Future
+ asyncio.Future = asyncio.futures.Future = asyncio.futures._CFuture
return super().setUp()
def tearDown(self):
asyncio.current_task = asyncio.tasks.current_task = self._old_current_task
+ asyncio.all_tasks = asyncio.tasks.all_tasks = self._old_all_tasks
+ asyncio.Task = asyncio.tasks.Task = self._old_Task
+ asyncio.Future = asyncio.futures.Future = self._old_Future
return super().tearDown()