import asyncio
import io
import unittest
+from unittest import mock
# To prevent a warning "test altered the execution environment"
self.assertTrue(stack_for_fut[1].startswith('* Future(id='))
+ async def test_capture_call_graph_positive_limit(self):
+ captured = None
+
+ async def c3():
+ nonlocal captured
+ captured = asyncio.capture_call_graph(limit=2)
+
+ async def c2():
+ await c3()
+
+ async def c1():
+ await c2()
+
+ await c1()
+ self.assertEqual(len(captured.call_stack), 2)
+
+ async def test_capture_call_graph_negative_limit(self):
+ captured = None
+
+ async def c3():
+ nonlocal captured
+ captured = asyncio.capture_call_graph(limit=-2)
+
+ async def c2():
+ await c3()
+
+ async def c1():
+ await c2()
+
+ await c1()
+ self.assertEqual(len(captured.call_stack), 2)
+
+ async def test_capture_call_graph_zero_limit(self):
+ captured = None
+
+ async def inner():
+ nonlocal captured
+ captured = asyncio.capture_call_graph(limit=0)
+
+ await inner()
+ self.assertEqual(captured.call_stack, ())
+
+ def test_capture_call_graph_outside_loop(self):
+ with self.assertRaises(RuntimeError):
+ asyncio.capture_call_graph()
+
+ def test_capture_call_graph_non_future(self):
+ with self.assertRaises(TypeError):
+ asyncio.capture_call_graph("not a future")
+
+ async def test_capture_call_graph_no_current_task(self):
+ results = []
+
+ def cb():
+ results.append(asyncio.capture_call_graph())
+ results.append(asyncio.format_call_graph())
+
+ loop = asyncio.get_running_loop()
+ loop.call_soon(cb)
+ await asyncio.sleep(0)
+
+ self.assertEqual(results, [None, ""])
+
+ async def test_capture_call_graph_current_task_not_future(self):
+ sentinel = object()
+ with mock.patch('asyncio.tasks.current_task', return_value=sentinel):
+ with self.assertRaises(TypeError):
+ asyncio.capture_call_graph(sentinel)
+
+ async def test_build_graph_for_future_positive_limit(self):
+ fut = asyncio.Future()
+ captured = None
+
+ async def deep():
+ await fut
+
+ async def mid():
+ await deep()
+
+ async def runner():
+ await mid()
+
+ async def main():
+ nonlocal captured
+ async with asyncio.TaskGroup() as g:
+ t = g.create_task(runner(), name='runner')
+ for _ in range(5):
+ await asyncio.sleep(0)
+ captured = asyncio.capture_call_graph(t, limit=2)
+ fut.set_result(None)
+
+ await main()
+ self.assertEqual(len(captured.call_stack), 2)
+
+ async def test_build_graph_for_future_negative_limit(self):
+ fut = asyncio.Future()
+ captured = None
+
+ async def deep():
+ await fut
+
+ async def mid():
+ await deep()
+
+ async def runner():
+ await mid()
+
+ async def main():
+ nonlocal captured
+ async with asyncio.TaskGroup() as g:
+ t = g.create_task(runner(), name='runner')
+ for _ in range(5):
+ await asyncio.sleep(0)
+ captured = asyncio.capture_call_graph(t, limit=-2)
+ fut.set_result(None)
+
+ await main()
+ self.assertEqual(len(captured.call_stack), 2)
+
+ async def test_format_call_graph_regular_generator(self):
+ output = []
+
+ def gen():
+ output.append(asyncio.format_call_graph())
+ yield
+
+ async def main():
+ for _ in gen():
+ pass
+
+ await main()
+ self.assertRegex(output[0], r'in generator [\w.<>]+\.gen\(\)')
+
@unittest.skipIf(
not hasattr(asyncio.futures, "_c_future_add_to_awaited_by"),