class AsyncAdaptedQueue:
- await_ = await_fallback
+ await_ = staticmethod(await_fallback)
def __init__(self, maxsize=0, use_lifo=False):
if use_lifo:
+import asyncio
+
from sqlalchemy import Column
from sqlalchemy import delete
from sqlalchemy import exc
trans.rollback(),
)
+ @async_test
+ async def test_pool_exhausted(self, async_engine):
+ engine = create_async_engine(
+ testing.db.url, pool_size=1, max_overflow=0, pool_timeout=0.1,
+ )
+ async with engine.connect():
+ await assert_raises_message_async(
+ asyncio.TimeoutError, "", engine.connect(),
+ )
+
class AsyncResultTest(EngineFixture):
@testing.combinations(