]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-129874: improve `test_tasks` in asyncio to use correct internal functions (#129890)
authorKumar Aditya <kumaraditya@python.org>
Sun, 9 Feb 2025 13:02:11 +0000 (18:32 +0530)
committerGitHub <noreply@github.com>
Sun, 9 Feb 2025 13:02:11 +0000 (13:02 +0000)
Lib/test/test_asyncio/test_tasks.py

index 881a7aeeb2fbe59faab015bba65c36c675b24fb8..5a28d4c185bf962994a43c5483b2910f76931269 100644 (file)
@@ -546,7 +546,7 @@ class BaseTaskTests:
             try:
                 await asyncio.sleep(10)
             except asyncio.CancelledError:
-                asyncio.current_task().uncancel()
+                self.current_task().uncancel()
                 await asyncio.sleep(10)
 
         try:
@@ -598,7 +598,7 @@ class BaseTaskTests:
         loop = asyncio.new_event_loop()
 
         async def make_request_with_timeout(*, sleep: float, timeout: float):
-            task = asyncio.current_task()
+            task = self.current_task()
             loop = task.get_loop()
 
             timed_out = False
@@ -1987,41 +1987,41 @@ class BaseTaskTests:
         self.assertIsNone(t2.result())
 
     def test_current_task(self):
-        self.assertIsNone(asyncio.current_task(loop=self.loop))
+        self.assertIsNone(self.current_task(loop=self.loop))
 
         async def coro(loop):
-            self.assertIs(asyncio.current_task(), task)
+            self.assertIs(self.current_task(), task)
 
-            self.assertIs(asyncio.current_task(None), task)
-            self.assertIs(asyncio.current_task(), task)
+            self.assertIs(self.current_task(None), task)
+            self.assertIs(self.current_task(), task)
 
         task = self.new_task(self.loop, coro(self.loop))
         self.loop.run_until_complete(task)
-        self.assertIsNone(asyncio.current_task(loop=self.loop))
+        self.assertIsNone(self.current_task(loop=self.loop))
 
     def test_current_task_with_interleaving_tasks(self):
-        self.assertIsNone(asyncio.current_task(loop=self.loop))
+        self.assertIsNone(self.current_task(loop=self.loop))
 
         fut1 = self.new_future(self.loop)
         fut2 = self.new_future(self.loop)
 
         async def coro1(loop):
-            self.assertTrue(asyncio.current_task() is task1)
+            self.assertTrue(self.current_task() is task1)
             await fut1
-            self.assertTrue(asyncio.current_task() is task1)
+            self.assertTrue(self.current_task() is task1)
             fut2.set_result(True)
 
         async def coro2(loop):
-            self.assertTrue(asyncio.current_task() is task2)
+            self.assertTrue(self.current_task() is task2)
             fut1.set_result(True)
             await fut2
-            self.assertTrue(asyncio.current_task() is task2)
+            self.assertTrue(self.current_task() is task2)
 
         task1 = self.new_task(self.loop, coro1(self.loop))
         task2 = self.new_task(self.loop, coro2(self.loop))
 
         self.loop.run_until_complete(asyncio.wait((task1, task2)))
-        self.assertIsNone(asyncio.current_task(loop=self.loop))
+        self.assertIsNone(self.current_task(loop=self.loop))
 
     # Some thorough tests for cancellation propagation through
     # coroutines, tasks and wait().
@@ -2833,6 +2833,7 @@ class CTask_CFuture_Tests(BaseTaskTests, SetMethodsTest,
     Task = getattr(tasks, '_CTask', None)
     Future = getattr(futures, '_CFuture', None)
     all_tasks = getattr(tasks, '_c_all_tasks', None)
+    current_task = staticmethod(getattr(tasks, '_c_current_task', None))
 
     @support.refcount_test
     def test_refleaks_in_task___init__(self):
@@ -2865,6 +2866,7 @@ class CTask_CFuture_SubclassTests(BaseTaskTests, test_utils.TestCase):
     Task = getattr(tasks, '_CTask', None)
     Future = getattr(futures, '_CFuture', None)
     all_tasks = getattr(tasks, '_c_all_tasks', None)
+    current_task = staticmethod(getattr(tasks, '_c_current_task', None))
 
 
 @unittest.skipUnless(hasattr(tasks, '_CTask'),
@@ -2875,6 +2877,7 @@ class CTaskSubclass_PyFuture_Tests(BaseTaskTests, test_utils.TestCase):
     Task = getattr(tasks, '_CTask', None)
     Future = futures._PyFuture
     all_tasks = getattr(tasks, '_c_all_tasks', None)
+    current_task = staticmethod(getattr(tasks, '_c_current_task', None))
 
 
 @unittest.skipUnless(hasattr(futures, '_CFuture'),
@@ -2885,6 +2888,7 @@ class PyTask_CFutureSubclass_Tests(BaseTaskTests, test_utils.TestCase):
     Future = getattr(futures, '_CFuture', None)
     Task = tasks._PyTask
     all_tasks = staticmethod(tasks._py_all_tasks)
+    current_task = staticmethod(tasks._py_current_task)
 
 
 @unittest.skipUnless(hasattr(tasks, '_CTask'),
@@ -2894,6 +2898,7 @@ class CTask_PyFuture_Tests(BaseTaskTests, test_utils.TestCase):
     Task = getattr(tasks, '_CTask', None)
     Future = futures._PyFuture
     all_tasks = getattr(tasks, '_c_all_tasks', None)
+    current_task = staticmethod(getattr(tasks, '_c_current_task', None))
 
 
 @unittest.skipUnless(hasattr(futures, '_CFuture'),
@@ -2903,6 +2908,7 @@ class PyTask_CFuture_Tests(BaseTaskTests, test_utils.TestCase):
     Task = tasks._PyTask
     Future = getattr(futures, '_CFuture', None)
     all_tasks = staticmethod(tasks._py_all_tasks)
+    current_task = staticmethod(tasks._py_current_task)
 
 
 class PyTask_PyFuture_Tests(BaseTaskTests, SetMethodsTest,
@@ -2911,6 +2917,7 @@ class PyTask_PyFuture_Tests(BaseTaskTests, SetMethodsTest,
     Task = tasks._PyTask
     Future = futures._PyFuture
     all_tasks = staticmethod(tasks._py_all_tasks)
+    current_task = staticmethod(tasks._py_current_task)
 
 
 @add_subclass_tests
@@ -2918,6 +2925,7 @@ class PyTask_PyFuture_SubclassTests(BaseTaskTests, test_utils.TestCase):
     Task = tasks._PyTask
     Future = futures._PyFuture
     all_tasks = staticmethod(tasks._py_all_tasks)
+    current_task = staticmethod(tasks._py_current_task)
 
 @unittest.skipUnless(hasattr(tasks, '_CTask'),
                      'requires the C _asyncio module')
@@ -3004,44 +3012,60 @@ class BaseTaskIntrospectionTests:
     def test__enter_task(self):
         task = mock.Mock()
         loop = mock.Mock()
-        self.assertIsNone(asyncio.current_task(loop))
+        # _enter_task is called by Task.__step while the loop
+        # is running, so set the loop as the running loop
+        # for a more realistic test.
+        asyncio._set_running_loop(loop)
+        self.assertIsNone(self.current_task(loop))
         self._enter_task(loop, task)
-        self.assertIs(asyncio.current_task(loop), task)
+        self.assertIs(self.current_task(loop), task)
         self._leave_task(loop, task)
+        asyncio._set_running_loop(None)
 
     def test__enter_task_failure(self):
         task1 = mock.Mock()
         task2 = mock.Mock()
         loop = mock.Mock()
+        asyncio._set_running_loop(loop)
         self._enter_task(loop, task1)
         with self.assertRaises(RuntimeError):
             self._enter_task(loop, task2)
-        self.assertIs(asyncio.current_task(loop), task1)
+        self.assertIs(self.current_task(loop), task1)
         self._leave_task(loop, task1)
+        asyncio._set_running_loop(None)
 
     def test__leave_task(self):
         task = mock.Mock()
         loop = mock.Mock()
+        asyncio._set_running_loop(loop)
         self._enter_task(loop, task)
         self._leave_task(loop, task)
-        self.assertIsNone(asyncio.current_task(loop))
+        self.assertIsNone(self.current_task(loop))
+        asyncio._set_running_loop(None)
 
     def test__leave_task_failure1(self):
         task1 = mock.Mock()
         task2 = mock.Mock()
         loop = mock.Mock()
+        # _leave_task is called by Task.__step while the loop
+        # is running, so set the loop as the running loop
+        # for a more realistic test.
+        asyncio._set_running_loop(loop)
         self._enter_task(loop, task1)
         with self.assertRaises(RuntimeError):
             self._leave_task(loop, task2)
-        self.assertIs(asyncio.current_task(loop), task1)
+        self.assertIs(self.current_task(loop), task1)
         self._leave_task(loop, task1)
+        asyncio._set_running_loop(None)
 
     def test__leave_task_failure2(self):
         task = mock.Mock()
         loop = mock.Mock()
+        asyncio._set_running_loop(loop)
         with self.assertRaises(RuntimeError):
             self._leave_task(loop, task)
-        self.assertIsNone(asyncio.current_task(loop))
+        self.assertIsNone(self.current_task(loop))
+        asyncio._set_running_loop(None)
 
     def test__unregister_task(self):
         task = mock.Mock()
@@ -3064,6 +3088,7 @@ class PyIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests):
     _enter_task = staticmethod(tasks._py_enter_task)
     _leave_task = staticmethod(tasks._py_leave_task)
     all_tasks = staticmethod(tasks._py_all_tasks)
+    current_task = staticmethod(tasks._py_current_task)
 
 
 @unittest.skipUnless(hasattr(tasks, '_c_register_task'),
@@ -3075,6 +3100,7 @@ class CIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests):
         _enter_task = staticmethod(tasks._c_enter_task)
         _leave_task = staticmethod(tasks._c_leave_task)
         all_tasks = staticmethod(tasks._c_all_tasks)
+        current_task = staticmethod(tasks._c_current_task)
     else:
         _register_task = _unregister_task = _enter_task = _leave_task = None