]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-99357: Close the event loop when it is no longer used in test_uncancel_structured_...
authorXiao Chen <chenxiao_7@163.com>
Sat, 12 Nov 2022 20:16:44 +0000 (04:16 +0800)
committerGitHub <noreply@github.com>
Sat, 12 Nov 2022 20:16:44 +0000 (12:16 -0800)
Lib/test/test_asyncio/test_tasks.py

index 0e38e6a92e52b74804807ddb77867feaf8dc5766..d8ba2f4e2a742a3b3e7dfe412889d53e45b503a8 100644 (file)
@@ -635,27 +635,30 @@ class BaseTaskTests:
             await asyncio.sleep(0)
             return timed_out, structured_block_finished, outer_code_reached
 
-        # Test which timed out.
-        t1 = self.new_task(loop, make_request_with_timeout(sleep=10.0, timeout=0.1))
-        timed_out, structured_block_finished, outer_code_reached = (
-            loop.run_until_complete(t1)
-        )
-        self.assertTrue(timed_out)
-        self.assertFalse(structured_block_finished)  # it was cancelled
-        self.assertTrue(outer_code_reached)  # task got uncancelled after leaving
-                                             # the structured block and continued until
-                                             # completion
-        self.assertEqual(t1.cancelling(), 0) # no pending cancellation of the outer task
-
-        # Test which did not time out.
-        t2 = self.new_task(loop, make_request_with_timeout(sleep=0, timeout=10.0))
-        timed_out, structured_block_finished, outer_code_reached = (
-            loop.run_until_complete(t2)
-        )
-        self.assertFalse(timed_out)
-        self.assertTrue(structured_block_finished)
-        self.assertTrue(outer_code_reached)
-        self.assertEqual(t2.cancelling(), 0)
+        try:
+            # Test which timed out.
+            t1 = self.new_task(loop, make_request_with_timeout(sleep=10.0, timeout=0.1))
+            timed_out, structured_block_finished, outer_code_reached = (
+                loop.run_until_complete(t1)
+            )
+            self.assertTrue(timed_out)
+            self.assertFalse(structured_block_finished)  # it was cancelled
+            self.assertTrue(outer_code_reached)  # task got uncancelled after leaving
+                                                 # the structured block and continued until
+                                                 # completion
+            self.assertEqual(t1.cancelling(), 0) # no pending cancellation of the outer task
+
+            # Test which did not time out.
+            t2 = self.new_task(loop, make_request_with_timeout(sleep=0, timeout=10.0))
+            timed_out, structured_block_finished, outer_code_reached = (
+                loop.run_until_complete(t2)
+            )
+            self.assertFalse(timed_out)
+            self.assertTrue(structured_block_finished)
+            self.assertTrue(outer_code_reached)
+            self.assertEqual(t2.cancelling(), 0)
+        finally:
+            loop.close()
 
     def test_cancel(self):