import traceback
import weakref
-from .base import _ConnDialect
+from .base import _AsyncConnDialect
from .base import _ConnectionFairy
from .base import _ConnectionRecord
from .base import Pool
"""
+ _is_asyncio = False
_queue_class = sqla_queue.Queue
def __init__(
return self._pool.maxsize - self._pool.qsize() + self._overflow
-class _AsyncConnDialect(_ConnDialect):
- is_async = True
-
-
class AsyncAdaptedQueuePool(QueuePool):
_is_asyncio = True
_queue_class = sqla_queue.AsyncAdaptedQueue
"""
+ _is_asyncio = False
+
def __init__(self, creator, pool_size=5, **kw):
Pool.__init__(self, creator, **kw)
self._conn = threading.local()
from sqlalchemy import select
from sqlalchemy import testing
from sqlalchemy.engine import default
-from sqlalchemy.pool.impl import _AsyncConnDialect
+from sqlalchemy.pool.base import _AsyncConnDialect
+from sqlalchemy.pool.base import _ConnDialect
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_context_ok
from sqlalchemy.testing import assert_raises_message
if "use_lifo" in pool_args:
eq_(p1._pool.use_lifo, p2._pool.use_lifo)
+ @testing.combinations(
+ (pool.QueuePool, False),
+ (pool.AsyncAdaptedQueuePool, True),
+ (pool.FallbackAsyncAdaptedQueuePool, True),
+ (pool.NullPool, None),
+ (pool.SingletonThreadPool, False),
+ (pool.StaticPool, None),
+ (pool.AssertionPool, None),
+ )
+ def test_is_asyncio_from_dialect(self, pool_cls, is_async_king):
+ p = pool_cls(creator=object())
+ for is_async in (True, False):
+ if is_async:
+ p._dialect = _AsyncConnDialect()
+ else:
+ p._dialect = _ConnDialect
+ if is_async_king is None:
+ eq_(p._is_asyncio, is_async)
+ else:
+ eq_(p._is_asyncio, is_async_king)
+
+ @testing.combinations(
+ (pool.QueuePool, False),
+ (pool.AsyncAdaptedQueuePool, True),
+ (pool.FallbackAsyncAdaptedQueuePool, True),
+ (pool.NullPool, False),
+ (pool.SingletonThreadPool, False),
+ (pool.StaticPool, False),
+ (pool.AssertionPool, False),
+ )
+ def test_is_asyncio_from_dialect_cls(self, pool_cls, is_async):
+ eq_(pool_cls._is_asyncio, is_async)
+
class PoolDialectTest(PoolTestBase):
def _dialect(self):
is_false(async_engine == None)
- # NOTE: this test currently causes the test suite to hang; it previously
- # was not actually running the worker thread
- # as the testing_engine() fixture
- # was rejecting the "transfer_staticpool" keyword argument
@async_test
- async def temporarily_dont_test_no_attach_to_event_loop(
- self, testing_engine
- ):
+ async def test_no_attach_to_event_loop(self, testing_engine):
"""test #6409"""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
- engine = testing_engine(asyncio=True, transfer_staticpool=True)
-
async def main():
tasks = [task() for _ in range(2)]
await asyncio.gather(*tasks)
+ await engine.dispose()
async def task():
async with engine.begin() as connection:
result.all()
try:
+ engine = testing_engine(
+ asyncio=True, transfer_staticpool=False
+ )
+
asyncio.run(main())
except Exception as err:
errs.append(err)