pass
-Current State of Generators and Coroutines
-------------------------------------------
+Current State of Generators, Coroutines, and Asynchronous Generators
+--------------------------------------------------------------------
When implementing coroutine schedulers and for other advanced uses of
generators, it is useful to determine whether a generator is currently
.. versionadded:: 3.5
+.. function:: getasyncgenstate(agen)
+
+ Get current state of an asynchronous generator object. The function is
+ intended to be used with asynchronous iterator objects created by
+ :keyword:`async def` functions which use the :keyword:`yield` statement,
+ but will accept any asynchronous generator-like object that has
+ ``ag_running`` and ``ag_frame`` attributes.
+
+ Possible states are:
+ * AGEN_CREATED: Waiting to start execution.
+ * AGEN_RUNNING: Currently being executed by the interpreter.
+ * AGEN_SUSPENDED: Currently suspended at a yield expression.
+ * AGEN_CLOSED: Execution has completed.
+
+ .. versionadded:: 3.12
+
The current internal state of the generator can also be queried. This is
mostly useful for testing purposes, to ensure that internal state is being
updated as expected:
.. versionadded:: 3.5
+.. function:: getasyncgenlocals(agen)
+
+ This function is analogous to :func:`~inspect.getgeneratorlocals`, but
+ works for asynchronous generator objects created by :keyword:`async def`
+ functions which use the :keyword:`yield` statement.
+
+ .. versionadded:: 3.12
+
.. _inspect-module-co-flags:
'Yury Selivanov <yselivanov@sprymix.com>')
__all__ = [
+ "AGEN_CLOSED",
+ "AGEN_CREATED",
+ "AGEN_RUNNING",
+ "AGEN_SUSPENDED",
"ArgInfo",
"Arguments",
"Attribute",
"getabsfile",
"getargs",
"getargvalues",
+ "getasyncgenlocals",
+ "getasyncgenstate",
"getattr_static",
"getblock",
"getcallargs",
return {}
+# ----------------------------------- asynchronous generator introspection
+
+AGEN_CREATED = 'AGEN_CREATED'
+AGEN_RUNNING = 'AGEN_RUNNING'
+AGEN_SUSPENDED = 'AGEN_SUSPENDED'
+AGEN_CLOSED = 'AGEN_CLOSED'
+
+
+def getasyncgenstate(agen):
+ """Get current state of an asynchronous generator object.
+
+ Possible states are:
+ AGEN_CREATED: Waiting to start execution.
+ AGEN_RUNNING: Currently being executed by the interpreter.
+ AGEN_SUSPENDED: Currently suspended at a yield expression.
+ AGEN_CLOSED: Execution has completed.
+ """
+ if agen.ag_running:
+ return AGEN_RUNNING
+ if agen.ag_suspended:
+ return AGEN_SUSPENDED
+ if agen.ag_frame is None:
+ return AGEN_CLOSED
+ return AGEN_CREATED
+
+
+def getasyncgenlocals(agen):
+ """
+ Get the mapping of asynchronous generator local variables to their current
+ values.
+
+ A dict is returned, with the keys the local variable names and values the
+ bound values."""
+
+ if not isasyncgen(agen):
+ raise TypeError(f"{agen!r} is not a Python async generator")
+
+ frame = getattr(agen, "ag_frame", None)
+ if frame is not None:
+ return agen.ag_frame.f_locals
+ else:
+ return {}
+
+
###############################################################################
### Function Signature Object (PEP 362)
###############################################################################
+import asyncio
import builtins
import collections
import datetime
git = mod.StupidGit()
+def tearDownModule():
+ asyncio.set_event_loop_policy(None)
+
+
def signatures_with_lexicographic_keyword_only_parameters():
"""
Yields a whole bunch of functions with only keyword-only parameters,
{'a': None, 'gencoro': gencoro, 'b': 'spam'})
+class TestGetAsyncGenState(unittest.IsolatedAsyncioTestCase):
+
+ def setUp(self):
+ async def number_asyncgen():
+ for number in range(5):
+ yield number
+ self.asyncgen = number_asyncgen()
+
+ async def asyncTearDown(self):
+ await self.asyncgen.aclose()
+
+ def _asyncgenstate(self):
+ return inspect.getasyncgenstate(self.asyncgen)
+
+ def test_created(self):
+ self.assertEqual(self._asyncgenstate(), inspect.AGEN_CREATED)
+
+ async def test_suspended(self):
+ value = await anext(self.asyncgen)
+ self.assertEqual(self._asyncgenstate(), inspect.AGEN_SUSPENDED)
+ self.assertEqual(value, 0)
+
+ async def test_closed_after_exhaustion(self):
+ countdown = 7
+ with self.assertRaises(StopAsyncIteration):
+ while countdown := countdown - 1:
+ await anext(self.asyncgen)
+ self.assertEqual(countdown, 1)
+ self.assertEqual(self._asyncgenstate(), inspect.AGEN_CLOSED)
+
+ async def test_closed_after_immediate_exception(self):
+ with self.assertRaises(RuntimeError):
+ await self.asyncgen.athrow(RuntimeError)
+ self.assertEqual(self._asyncgenstate(), inspect.AGEN_CLOSED)
+
+ async def test_running(self):
+ async def running_check_asyncgen():
+ for number in range(5):
+ self.assertEqual(self._asyncgenstate(), inspect.AGEN_RUNNING)
+ yield number
+ self.assertEqual(self._asyncgenstate(), inspect.AGEN_RUNNING)
+ self.asyncgen = running_check_asyncgen()
+ # Running up to the first yield
+ await anext(self.asyncgen)
+ self.assertEqual(self._asyncgenstate(), inspect.AGEN_SUSPENDED)
+ # Running after the first yield
+ await anext(self.asyncgen)
+ self.assertEqual(self._asyncgenstate(), inspect.AGEN_SUSPENDED)
+
+ def test_easy_debugging(self):
+ # repr() and str() of a asyncgen state should contain the state name
+ names = 'AGEN_CREATED AGEN_RUNNING AGEN_SUSPENDED AGEN_CLOSED'.split()
+ for name in names:
+ state = getattr(inspect, name)
+ self.assertIn(name, repr(state))
+ self.assertIn(name, str(state))
+
+ async def test_getasyncgenlocals(self):
+ async def each(lst, a=None):
+ b=(1, 2, 3)
+ for v in lst:
+ if v == 3:
+ c = 12
+ yield v
+
+ numbers = each([1, 2, 3])
+ self.assertEqual(inspect.getasyncgenlocals(numbers),
+ {'a': None, 'lst': [1, 2, 3]})
+ await anext(numbers)
+ self.assertEqual(inspect.getasyncgenlocals(numbers),
+ {'a': None, 'lst': [1, 2, 3], 'v': 1,
+ 'b': (1, 2, 3)})
+ await anext(numbers)
+ self.assertEqual(inspect.getasyncgenlocals(numbers),
+ {'a': None, 'lst': [1, 2, 3], 'v': 2,
+ 'b': (1, 2, 3)})
+ await anext(numbers)
+ self.assertEqual(inspect.getasyncgenlocals(numbers),
+ {'a': None, 'lst': [1, 2, 3], 'v': 3,
+ 'b': (1, 2, 3), 'c': 12})
+ with self.assertRaises(StopAsyncIteration):
+ await anext(numbers)
+ self.assertEqual(inspect.getasyncgenlocals(numbers), {})
+
+ async def test_getasyncgenlocals_empty(self):
+ async def yield_one():
+ yield 1
+ one = yield_one()
+ self.assertEqual(inspect.getasyncgenlocals(one), {})
+ await anext(one)
+ self.assertEqual(inspect.getasyncgenlocals(one), {})
+ with self.assertRaises(StopAsyncIteration):
+ await anext(one)
+ self.assertEqual(inspect.getasyncgenlocals(one), {})
+
+ def test_getasyncgenlocals_error(self):
+ self.assertRaises(TypeError, inspect.getasyncgenlocals, 1)
+ self.assertRaises(TypeError, inspect.getasyncgenlocals, lambda x: True)
+ self.assertRaises(TypeError, inspect.getasyncgenlocals, set)
+ self.assertRaises(TypeError, inspect.getasyncgenlocals, (2,3))
+
+
class MySignature(inspect.Signature):
# Top-level to make it picklable;
# used in test_signature_object_pickle