r"will now invalidate all prepared caches in response "
r"to this exception\)",
):
-
result = await conn.execute(
t1.select()
.where(t1.c.name.like("some name%"))
@async_test
async def test_failed_commit_recover(self, metadata, async_testing_engine):
-
Table("t1", metadata, Column("id", Integer, primary_key=True))
t2 = Table(
async def test_rollback_twice_no_problem(
self, metadata, async_testing_engine
):
-
engine = async_testing_engine()
async with engine.connect() as conn:
-
trans = await conn.begin()
await trans.rollback()
@async_test
async def test_closed_during_execute(self, metadata, async_testing_engine):
-
engine = async_testing_engine()
async with engine.connect() as conn:
async def test_failed_rollback_recover(
self, metadata, async_testing_engine
):
-
engine = async_testing_engine()
async with engine.connect() as conn:
async with engine.begin() as conn:
await conn.execute(select(1))
assert len(cache) > 0
+
+ @async_test
+ async def test_async_creator(self, metadata, async_testing_engine):
+ print(testing.db.url)
+
+ async def async_creator():
+ import asyncpg
+
+ conn = await asyncpg.connect(
+ "postgresql://scott:tiger@127.0.0.1:5432/test"
+ )
+ return conn
+
+ engine = async_testing_engine(
+ options={"async_creator": async_creator},
+ )
+ async with engine.connect() as conn:
+ result = await conn.execute(select(1))
+ eq_(result.scalar(), 1)