from abc import ABCMeta, abstractmethod
import sys
-__all__ = ["Awaitable", "Coroutine", "AsyncIterable", "AsyncIterator",
+__all__ = ["Awaitable", "Coroutine",
+ "AsyncIterable", "AsyncIterator", "AsyncGenerator",
"Hashable", "Iterable", "Iterator", "Generator", "Reversible",
"Sized", "Container", "Callable", "Collection",
"Set", "MutableSet",
coroutine = type(_coro)
_coro.close() # Prevent ResourceWarning
del _coro
+## asynchronous generator ##
+async def _ag(): yield
+_ag = _ag()
+async_generator = type(_ag)
+del _ag
### ONE-TRICK PONIES ###
return NotImplemented
+class AsyncGenerator(AsyncIterator):
+
+ __slots__ = ()
+
+ async def __anext__(self):
+ """Return the next item from the asynchronous generator.
+ When exhausted, raise StopAsyncIteration.
+ """
+ return await self.asend(None)
+
+ @abstractmethod
+ async def asend(self, value):
+ """Send a value into the asynchronous generator.
+ Return next yielded value or raise StopAsyncIteration.
+ """
+ raise StopAsyncIteration
+
+ @abstractmethod
+ async def athrow(self, typ, val=None, tb=None):
+ """Raise an exception in the asynchronous generator.
+ Return next yielded value or raise StopAsyncIteration.
+ """
+ if val is None:
+ if tb is None:
+ raise typ
+ val = typ()
+ if tb is not None:
+ val = val.with_traceback(tb)
+ raise val
+
+ async def aclose(self):
+ """Raise GeneratorExit inside coroutine.
+ """
+ try:
+ await self.athrow(GeneratorExit)
+ except (GeneratorExit, StopAsyncIteration):
+ pass
+ else:
+ raise RuntimeError("asynchronous generator ignored GeneratorExit")
+
+ @classmethod
+ def __subclasshook__(cls, C):
+ if cls is AsyncGenerator:
+ return _check_methods(C, '__aiter__', '__anext__',
+ 'asend', 'athrow', 'aclose')
+ return NotImplemented
+
+
+AsyncGenerator.register(async_generator)
+
+
class Iterable(metaclass=ABCMeta):
__slots__ = ()
from collections import UserDict, UserString, UserList
from collections import ChainMap
from collections import deque
-from collections.abc import Awaitable, Coroutine, AsyncIterator, AsyncIterable
+from collections.abc import Awaitable, Coroutine
+from collections.abc import AsyncIterator, AsyncIterable, AsyncGenerator
from collections.abc import Hashable, Iterable, Iterator, Generator, Reversible
from collections.abc import Sized, Container, Callable, Collection
from collections.abc import Set, MutableSet
self.assertRaises(RuntimeError, IgnoreGeneratorExit().close)
+ def test_AsyncGenerator(self):
+ class NonAGen1:
+ def __aiter__(self): return self
+ def __anext__(self): return None
+ def aclose(self): pass
+ def athrow(self, typ, val=None, tb=None): pass
+
+ class NonAGen2:
+ def __aiter__(self): return self
+ def __anext__(self): return None
+ def aclose(self): pass
+ def asend(self, value): return value
+
+ class NonAGen3:
+ def aclose(self): pass
+ def asend(self, value): return value
+ def athrow(self, typ, val=None, tb=None): pass
+
+ non_samples = [
+ None, 42, 3.14, 1j, b"", "", (), [], {}, set(),
+ iter(()), iter([]), NonAGen1(), NonAGen2(), NonAGen3()]
+ for x in non_samples:
+ self.assertNotIsInstance(x, AsyncGenerator)
+ self.assertFalse(issubclass(type(x), AsyncGenerator), repr(type(x)))
+
+ class Gen:
+ def __aiter__(self): return self
+ async def __anext__(self): return None
+ async def aclose(self): pass
+ async def asend(self, value): return value
+ async def athrow(self, typ, val=None, tb=None): pass
+
+ class MinimalAGen(AsyncGenerator):
+ async def asend(self, value):
+ return value
+ async def athrow(self, typ, val=None, tb=None):
+ await super().athrow(typ, val, tb)
+
+ async def gen():
+ yield 1
+
+ samples = [gen(), Gen(), MinimalAGen()]
+ for x in samples:
+ self.assertIsInstance(x, AsyncIterator)
+ self.assertIsInstance(x, AsyncGenerator)
+ self.assertTrue(issubclass(type(x), AsyncGenerator), repr(type(x)))
+ self.validate_abstract_methods(AsyncGenerator, 'asend', 'athrow')
+
+ def run_async(coro):
+ result = None
+ while True:
+ try:
+ coro.send(None)
+ except StopIteration as ex:
+ result = ex.args[0] if ex.args else None
+ break
+ return result
+
+ # mixin tests
+ mgen = MinimalAGen()
+ self.assertIs(mgen, mgen.__aiter__())
+ self.assertIs(run_async(mgen.asend(None)), run_async(mgen.__anext__()))
+ self.assertEqual(2, run_async(mgen.asend(2)))
+ self.assertIsNone(run_async(mgen.aclose()))
+ with self.assertRaises(ValueError):
+ run_async(mgen.athrow(ValueError))
+
+ class FailOnClose(AsyncGenerator):
+ async def asend(self, value): return value
+ async def athrow(self, *args): raise ValueError
+
+ with self.assertRaises(ValueError):
+ run_async(FailOnClose().aclose())
+
+ class IgnoreGeneratorExit(AsyncGenerator):
+ async def asend(self, value): return value
+ async def athrow(self, *args): pass
+
+ with self.assertRaises(RuntimeError):
+ run_async(IgnoreGeneratorExit().aclose())
+
def test_Sized(self):
non_samples = [None, 42, 3.14, 1j,
_test_gen(),