]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
[3.11] gh-99357: Close the event loop when it is no longer used in test_uncancel_stru...
authorMiss Islington (bot) <31488909+miss-islington@users.noreply.github.com>
Mon, 14 Nov 2022 18:52:10 +0000 (10:52 -0800)
committerGitHub <noreply@github.com>
Mon, 14 Nov 2022 18:52:10 +0000 (10:52 -0800)
(cherry picked from commit 99972dc7450f1266e39202012827f4f3c995b0ca)

Co-authored-by: Xiao Chen <chenxiao_7@163.com>
Lib/test/test_asyncio/test_tasks.py

index 7585768073e6ab0937d0d35e233505dd5c0db4b7..6b875cfb4dcb2cb9481b4a595960a05be737bf65 100644 (file)
@@ -639,27 +639,30 @@ class BaseTaskTests:
             await asyncio.sleep(0)
             return timed_out, structured_block_finished, outer_code_reached
 
-        # Test which timed out.
-        t1 = self.new_task(loop, make_request_with_timeout(sleep=10.0, timeout=0.1))
-        timed_out, structured_block_finished, outer_code_reached = (
-            loop.run_until_complete(t1)
-        )
-        self.assertTrue(timed_out)
-        self.assertFalse(structured_block_finished)  # it was cancelled
-        self.assertTrue(outer_code_reached)  # task got uncancelled after leaving
-                                             # the structured block and continued until
-                                             # completion
-        self.assertEqual(t1.cancelling(), 0) # no pending cancellation of the outer task
-
-        # Test which did not time out.
-        t2 = self.new_task(loop, make_request_with_timeout(sleep=0, timeout=10.0))
-        timed_out, structured_block_finished, outer_code_reached = (
-            loop.run_until_complete(t2)
-        )
-        self.assertFalse(timed_out)
-        self.assertTrue(structured_block_finished)
-        self.assertTrue(outer_code_reached)
-        self.assertEqual(t2.cancelling(), 0)
+        try:
+            # Test which timed out.
+            t1 = self.new_task(loop, make_request_with_timeout(sleep=10.0, timeout=0.1))
+            timed_out, structured_block_finished, outer_code_reached = (
+                loop.run_until_complete(t1)
+            )
+            self.assertTrue(timed_out)
+            self.assertFalse(structured_block_finished)  # it was cancelled
+            self.assertTrue(outer_code_reached)  # task got uncancelled after leaving
+                                                 # the structured block and continued until
+                                                 # completion
+            self.assertEqual(t1.cancelling(), 0) # no pending cancellation of the outer task
+
+            # Test which did not time out.
+            t2 = self.new_task(loop, make_request_with_timeout(sleep=0, timeout=10.0))
+            timed_out, structured_block_finished, outer_code_reached = (
+                loop.run_until_complete(t2)
+            )
+            self.assertFalse(timed_out)
+            self.assertTrue(structured_block_finished)
+            self.assertTrue(outer_code_reached)
+            self.assertEqual(t2.cancelling(), 0)
+        finally:
+            loop.close()
 
     def test_cancel(self):