import asyncio
import code
import concurrent.futures
+import contextvars
import inspect
import sys
import threading
super().__init__(locals)
self.compile.compiler.flags |= ast.PyCF_ALLOW_TOP_LEVEL_AWAIT
self.loop = loop
+ self.context = contextvars.copy_context()
def runcode(self, code):
future = concurrent.futures.Future()
return
try:
- repl_future = self.loop.create_task(coro)
+ repl_future = self.loop.create_task(coro, context=self.context)
futures._chain_future(repl_future, future)
except BaseException as exc:
future.set_exception(exc)
- loop.call_soon_threadsafe(callback)
+ loop.call_soon_threadsafe(callback, context=self.context)
try:
return future.result()
self.assertEqual(traceback_lines, expected_lines)
+class TestAsyncioREPLContextVars(unittest.TestCase):
+ def test_toplevel_contextvars_sync(self):
+ user_input = dedent("""\
+ from contextvars import ContextVar
+ var = ContextVar("var", default="failed")
+ var.set("ok")
+ """)
+ p = spawn_repl("-m", "asyncio")
+ p.stdin.write(user_input)
+ user_input2 = dedent("""
+ print(f"toplevel contextvar test: {var.get()}")
+ """)
+ p.stdin.write(user_input2)
+ output = kill_python(p)
+ self.assertEqual(p.returncode, 0)
+ expected = "toplevel contextvar test: ok"
+ self.assertIn(expected, output, expected)
+
+ def test_toplevel_contextvars_async(self):
+ user_input = dedent("""\
+ from contextvars import ContextVar
+ var = ContextVar('var', default='failed')
+ """)
+ p = spawn_repl("-m", "asyncio")
+ p.stdin.write(user_input+"\n")
+ user_input2 = "async def set_var(): var.set('ok')\n"
+ p.stdin.write(user_input2+"\n")
+ user_input3 = "await set_var()\n"
+ p.stdin.write(user_input3+"\n")
+ user_input4 = "print(f'toplevel contextvar test: {var.get()}')\n"
+ p.stdin.write(user_input4+"\n")
+ output = kill_python(p)
+ self.assertEqual(p.returncode, 0)
+ expected = "toplevel contextvar test: ok"
+ self.assertIn(expected, output, expected)
+
+
if __name__ == "__main__":
unittest.main()