import inspect as stdlib_inspect
from unittest.mock import patch
+import asyncpg
from sqlalchemy import Column
from sqlalchemy import create_engine
from sqlalchemy import delete
async def run_test(subject, trans_on_subject, execute_on_subject):
async with subject.begin() as trans:
-
if begin_nested:
if not config.requirements.savepoints.enabled:
config.skip_test("savepoints not enabled")
@async_test
async def test_connection_info(self, async_engine):
-
async with async_engine.connect() as conn:
conn.info["foo"] = "bar"
@async_test
async def test_connection_eq_ne(self, async_engine):
-
async with async_engine.connect() as conn:
c2 = _async_engine.AsyncConnection(
async_engine, conn.sync_connection
@async_test
async def test_transaction_eq_ne(self, async_engine):
-
async with async_engine.connect() as conn:
t1 = await conn.begin()
"do_rollback",
mock.Mock(side_effect=Exception("can't run rollback")),
), mock.patch("sqlalchemy.util.warn") as m:
-
_finalize_fairy(
None, rec, pool, ref, echo, transaction_was_reset=False
)
@async_test
async def test_get_raw_connection(self, async_connection):
-
pooled = await async_connection.get_raw_connection()
is_(pooled, async_connection.sync_connection.connection)
@async_test
async def test_connection_not_started(self, async_engine):
-
conn = async_engine.connect()
testing.assert_raises_message(
asyncio_exc.AsyncContextNotStarted,
users = self.tables.users
async with async_engine.begin() as conn:
-
savepoint = await conn.begin_nested()
await conn.execute(delete(users))
await savepoint.rollback()
users = self.tables.users
async with async_engine.begin() as conn:
-
savepoint = await conn.begin_nested()
await conn.execute(delete(users))
await savepoint.commit()
@async_test
async def test_conn_transaction_not_started(self, async_engine):
-
async with async_engine.connect() as conn:
trans = conn.begin()
with expect_raises_message(
assert engine.echo is True
assert engine.dialect.is_async is True
+ @testing.only_on("postgresql+asyncpg")
+ @async_test
+ async def test_create_async_engine_async_creator_asyncpg(
+ self, async_engine
+ ):
+ async def async_creator():
+ conn = await asyncpg.connect(
+ "postgresql+asyncpg://scott:tiger@127.0.0.1:5432/test"
+ )
+ return conn
+
+ engine = create_async_engine(
+ testing.db.url,
+ async_creator=async_creator,
+ )
+ async with engine.connect() as conn:
+ result = await conn.execute(select(1))
+ eq_(result.scalar(), 1)
+
class AsyncCreatePoolTest(fixtures.TestBase):
@config.fixture
async def test_get_transaction(self, async_engine):
async with async_engine.connect() as conn:
async with conn.begin() as trans:
-
is_(trans.connection, conn)
is_(conn.get_transaction(), trans)
)
def test_regenerate_connection(self, connection):
-
async_connection = AsyncConnection._retrieve_proxy_for_target(
connection
)
eq_(len(ReversibleProxy._proxy_objects), 0)
def test_regen_conn_but_not_engine(self, async_engine):
-
with async_engine.sync_engine.connect() as sync_conn:
-
async_conn = AsyncConnection._retrieve_proxy_for_target(sync_conn)
async_conn2 = AsyncConnection._retrieve_proxy_for_target(sync_conn)