rv = ns['f']()
self.assertEqual(rv, tuple(expected))
+ def test_compile_top_level_await_no_coro(self):
+ """Make sure top level non-await codes get the correct coroutine flags.
+ """
+ modes = ('single', 'exec')
+ code_samples = [
+ '''def f():pass\n''',
+ '''[x for x in l]'''
+ ]
+ for mode, code_sample in product(modes, code_samples):
+ source = dedent(code_sample)
+ co = compile(source,
+ '?',
+ mode,
+ flags=ast.PyCF_ALLOW_TOP_LEVEL_AWAIT)
+
+ self.assertNotEqual(co.co_flags & CO_COROUTINE, CO_COROUTINE,
+ msg=f"source={source} mode={mode}")
+
+
def test_compile_top_level_await(self):
"""Test whether code some top level await can be compiled.
comprehension_ty outermost;
PyObject *qualname = NULL;
int is_async_generator = 0;
+ int top_level_await = IS_TOP_LEVEL_AWAIT(c);
+
- if (IS_TOP_LEVEL_AWAIT(c)) {
- c->u->u_ste->ste_coroutine = 1;
- }
int is_async_function = c->u->u_ste->ste_coroutine;
outermost = (comprehension_ty) asdl_seq_GET(generators, 0);
is_async_generator = c->u->u_ste->ste_coroutine;
- if (is_async_generator && !is_async_function && type != COMP_GENEXP) {
+ if (is_async_generator && !is_async_function && type != COMP_GENEXP && !top_level_await) {
compiler_error(c, "asynchronous comprehension outside of "
"an asynchronous function");
goto error_in_scope;
qualname = c->u->u_qualname;
Py_INCREF(qualname);
compiler_exit_scope(c);
+ if (top_level_await && is_async_generator){
+ c->u->u_ste->ste_coroutine = 1;
+ }
if (co == NULL)
goto error;