from .langhelpers import memoized_property
from .. import exc
-if compat.py37:
- try:
- from contextvars import copy_context as _copy_context
-
- # If greenlet.gr_context is present in current version of greenlet,
- # it will be set with a copy of the current context on creation.
- # Refs: https://github.com/python-greenlet/greenlet/pull/198
- getattr(greenlet.greenlet, "gr_context")
- except (ImportError, AttributeError):
- _copy_context = None
-else:
- _copy_context = None
+# If greenlet.gr_context is present in current version of greenlet,
+# it will be set with the current context on creation.
+# Refs: https://github.com/python-greenlet/greenlet/pull/198
+_has_gr_context = hasattr(greenlet.getcurrent(), "gr_context")
def is_exit_exception(e):
def __init__(self, fn, driver):
greenlet.greenlet.__init__(self, fn, driver)
self.driver = driver
- if _copy_context is not None:
- self.gr_context = _copy_context()
+ if _has_gr_context:
+ self.gr_context = driver.gr_context
def await_only(awaitable: Coroutine) -> Any:
"""Awaits an async function in a sync method.
The sync method must be inside a :func:`greenlet_spawn` context.
- :func:`await_` calls cannot be nested.
+ :func:`await_only` calls cannot be nested.
:param awaitable: The coroutine to call.
current = greenlet.getcurrent()
if not isinstance(current, _AsyncIoGreenlet):
raise exc.MissingGreenlet(
- "greenlet_spawn has not been called; can't call await_() here. "
- "Was IO attempted in an unexpected place?"
+ "greenlet_spawn has not been called; can't call await_only() "
+ "here. Was IO attempted in an unexpected place?"
)
# returns the control to the driver greenlet passing it
"""Awaits an async function in a sync method.
The sync method must be inside a :func:`greenlet_spawn` context.
- :func:`await_` calls cannot be nested.
+ :func:`await_fallback` calls cannot be nested.
:param awaitable: The coroutine to call.
if loop.is_running():
raise exc.MissingGreenlet(
"greenlet_spawn has not been called and asyncio event "
- "loop is already running; can't call await_() here. "
+ "loop is already running; can't call await_fallback() here. "
"Was IO attempted in an unexpected place?"
)
return loop.run_until_complete(awaitable)
) -> Any:
"""Runs a sync function ``fn`` in a new greenlet.
- The sync function can then use :func:`await_` to wait for async
+ The sync function can then use :func:`await_only` to wait for async
functions.
:param fn: The sync callable to call.
context = _AsyncIoGreenlet(fn, greenlet.getcurrent())
# runs the function synchronously in gl greenlet. If the execution
- # is interrupted by await_, context is not dead and result is a
+ # is interrupted by await_only, context is not dead and result is a
# coroutine to wait. If the context is dead the function has
# returned, and its result can be returned.
switch_occurred = False
while not context.dead:
switch_occurred = True
try:
- # wait for a coroutine from await_ and then return its
+ # wait for a coroutine from await_only and then return its
# result back to it.
value = await result
except BaseException:
+import asyncio
+import random
import threading
from sqlalchemy import exc
from sqlalchemy.testing import expect_raises_message
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_true
-from sqlalchemy.util import asyncio
from sqlalchemy.util import await_fallback
from sqlalchemy.util import await_only
from sqlalchemy.util import greenlet_spawn
to_await = run1()
with expect_raises_message(
exc.MissingGreenlet,
- r"greenlet_spawn has not been called; can't call await_\(\) here.",
+ "greenlet_spawn has not been called; "
+ r"can't call await_only\(\) here.",
):
await_only(to_await)
with expect_raises_message(
exc.InvalidRequestError,
- r"greenlet_spawn has not been called; can't call await_\(\) here.",
+ "greenlet_spawn has not been called; "
+ r"can't call await_only\(\) here.",
):
await greenlet_spawn(go)
import contextvars
var = contextvars.ContextVar("var")
- concurrency = 5
+ concurrency = 500
+ # NOTE: sleep here is not necessary. It's used to simulate IO
+ # ensuring that task are not run sequentially
async def async_inner(val):
+ await asyncio.sleep(random.uniform(0.005, 0.015))
eq_(val, var.get())
return var.get()
+ async def async_set(val):
+ await asyncio.sleep(random.uniform(0.005, 0.015))
+ var.set(val)
+
def inner(val):
retval = await_only(async_inner(val))
eq_(val, var.get())
eq_(retval, val)
+
+ # set the value in a sync function
+ newval = val + concurrency
+ var.set(newval)
+ syncset = await_only(async_inner(newval))
+ eq_(newval, var.get())
+ eq_(syncset, newval)
+
+ # set the value in an async function
+ retval = val + 2 * concurrency
+ await_only(async_set(retval))
+ eq_(var.get(), retval)
+ eq_(await_only(async_inner(retval)), retval)
+
return retval
async def task(val):
+ await asyncio.sleep(random.uniform(0.005, 0.015))
var.set(val)
+ await asyncio.sleep(random.uniform(0.005, 0.015))
return await greenlet_spawn(inner, val)
values = {
[task(i) for i in range(concurrency)]
)
}
- eq_(values, set(range(concurrency)))
+ eq_(values, set(range(concurrency * 2, concurrency * 3)))
@async_test
async def test_require_await(self):