assert engine.echo is True
assert engine.dialect.is_async is True
- @testing.only_on("postgresql+asyncpg")
- @async_test
- async def test_create_async_engine_async_creator_asyncpg(
- self, async_engine
- ):
- async def async_creator():
- conn = await asyncpg.connect(
- "postgresql+asyncpg://scott:tiger@127.0.0.1:5432/test"
- )
- return conn
-
- engine = create_async_engine(
- testing.db.url,
- async_creator=async_creator,
- )
- async with engine.connect() as conn:
- result = await conn.execute(select(1))
- eq_(result.scalar(), 1)
-
class AsyncCreatePoolTest(fixtures.TestBase):
@config.fixture