def _task_get_stack(task, limit):
frames = []
- try:
- # 'async def' coroutines
+ if hasattr(task._coro, 'cr_frame'):
+ # case 1: 'async def' coroutines
f = task._coro.cr_frame
- except AttributeError:
+ elif hasattr(task._coro, 'gi_frame'):
+ # case 2: legacy coroutines
f = task._coro.gi_frame
+ elif hasattr(task._coro, 'ag_frame'):
+ # case 3: async generators
+ f = task._coro.ag_frame
+ else:
+ # case 4: unknown objects
+ f = None
if f is not None:
while f is not None:
if limit is not None:
self.loop.run_until_complete(run())
+ def test_async_gen_aclose_compatible_with_get_stack(self):
+ async def async_generator():
+ yield object()
+
+ async def run():
+ ag = async_generator()
+ asyncio.create_task(ag.aclose())
+ tasks = asyncio.all_tasks()
+ for task in tasks:
+ # No AttributeError raised
+ task.get_stack()
+
+ self.loop.run_until_complete(run())
+
+
if __name__ == "__main__":
unittest.main()