self.loop.run_until_complete(run())
+ def test_async_gen_asyncio_anext_tuple_no_exceptions(self):
+ # StopAsyncIteration exceptions should be cleared.
+ # See: https://github.com/python/cpython/issues/128078.
+
+ async def foo():
+ if False:
+ yield (1, 2)
+
+ async def run():
+ it = foo().__aiter__()
+ with self.assertRaises(StopAsyncIteration):
+ await it.__anext__()
+ res = await anext(it, ('a', 'b'))
+ self.assertEqual(res, ('a', 'b'))
+
+ self.loop.run_until_complete(run())
+
def test_async_gen_asyncio_anext_stopiteration(self):
async def foo():
try:
return result;
}
if (PyErr_ExceptionMatches(PyExc_StopAsyncIteration)) {
+ PyErr_Clear();
_PyGen_SetStopIterationValue(obj->default_value);
}
return NULL;
* exception we replace it with a `StopIteration(default)`, as if
* it was the return value of `__anext__()` coroutine.
*/
+ PyErr_Clear();
_PyGen_SetStopIterationValue(obj->default_value);
}
return NULL;