def test_async_gen_exception_12(self):
async def gen():
- await anext(me)
+ with self.assertWarnsRegex(RuntimeWarning,
+ f"coroutine method 'asend' of '{gen.__qualname__}' "
+ f"was never awaited"):
+ await anext(me)
yield 123
me = gen()
yield 123
with self.assertWarns(DeprecationWarning):
- gen().athrow(GeneratorExit, GeneratorExit(), None)
+ x = gen().athrow(GeneratorExit, GeneratorExit(), None)
+ with self.assertWarnsRegex(RuntimeWarning,
+ f"coroutine method 'athrow' of '{gen.__qualname__}' "
+ f"was never awaited"):
+ del x
+ gc_collect()
def test_async_gen_api_01(self):
async def gen():
self.assertIsInstance(message['exception'], ZeroDivisionError)
self.assertIn('unhandled exception during asyncio.run() shutdown',
message['message'])
+ with self.assertWarnsRegex(RuntimeWarning,
+ f"coroutine method 'aclose' of '{async_iterate.__qualname__}' "
+ f"was never awaited"):
+ del message, messages
+ gc_collect()
def test_async_gen_expression_01(self):
async def arange(n):
asyncio.run(main())
self.assertEqual([], messages)
+ with self.assertWarnsRegex(RuntimeWarning,
+ f"coroutine method 'aclose' of '{async_iterate.__qualname__}' "
+ f"was never awaited"):
+ gc_collect()
def test_async_gen_await_same_anext_coro_twice(self):
async def async_iterate():