from sqlalchemy.util import asyncio
from sqlalchemy.util import await_fallback
from sqlalchemy.util import await_only
-from sqlalchemy.util import compat
from sqlalchemy.util import greenlet_spawn
from sqlalchemy.util import queue
is_true(run[0])
- def test_error_other_loop(self):
+ @async_test
+ async def test_error_other_loop(self):
run = [False]
def thread_go(q):
eq_(q.get(block=False), 1)
q.get(timeout=0.1)
- if compat.py310:
- # TODO: I don't really know what this means in 3.10
- with expect_raises(queue.Empty):
- asyncio.run(greenlet_spawn(go))
- else:
- with expect_raises_message(
- RuntimeError, "Task .* attached to a different loop"
- ):
- asyncio.run(greenlet_spawn(go))
+ with expect_raises_message(
+ RuntimeError, ".* to a different .*loop"
+ ):
+ asyncio.run(greenlet_spawn(go))
run[0] = True
q = queue.AsyncAdaptedQueue()
+
+ def prime():
+ with expect_raises(queue.Empty):
+ q.get(timeout=0.1)
+
+ await greenlet_spawn(prime)
q.put_nowait(1)
t = threading.Thread(target=thread_go, args=[q])
t.start()