from sqlalchemy import Column
from sqlalchemy import exc
+from sqlalchemy import ForeignKey
from sqlalchemy import Integer
from sqlalchemy import MetaData
+from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import testing
from sqlalchemy.dialects.postgresql import ENUM
-from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.testing import async_test
-from sqlalchemy.testing import engines
+from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
__requires__ = ("async_dialect",)
__only_on__ = "postgresql+asyncpg"
- @testing.fixture
- def async_engine(self):
- return create_async_engine(testing.db.url)
-
- @testing.fixture()
- def metadata(self):
- # TODO: remove when Iae6ab95938a7e92b6d42086aec534af27b5577d3
- # merges
-
- from sqlalchemy.testing import util as testing_util
- from sqlalchemy.sql import schema
-
- metadata = schema.MetaData()
-
- try:
- yield metadata
- finally:
- testing_util.drop_all_tables_from_metadata(metadata, testing.db)
-
@async_test
async def test_detect_stale_ddl_cache_raise_recover(
- self, metadata, async_engine
+ self, metadata, async_testing_engine
):
async def async_setup(engine, strlen):
metadata.clear()
Column("name", String),
)
- await async_setup(async_engine, 30)
+ first_engine = async_testing_engine()
+ second_engine = async_testing_engine()
- second_engine = engines.testing_engine(asyncio=True)
+ await async_setup(first_engine, 30)
async with second_engine.connect() as conn:
result = await conn.execute(
rows = result.fetchall()
assert len(rows) >= 29
- await async_setup(async_engine, 20)
+ await async_setup(first_engine, 20)
async with second_engine.connect() as conn:
with testing.expect_raises_message(
@async_test
async def test_detect_stale_type_cache_raise_recover(
- self, metadata, async_engine
+ self, metadata, async_testing_engine
):
async def async_setup(engine, enums):
metadata = MetaData()
),
)
- await async_setup(async_engine, ("beans", "means", "keens"))
-
- second_engine = engines.testing_engine(
- asyncio=True,
- options={"connect_args": {"prepared_statement_cache_size": 0}},
+ first_engine = async_testing_engine()
+ second_engine = async_testing_engine(
+ options={"connect_args": {"prepared_statement_cache_size": 0}}
)
+ await async_setup(first_engine, ("beans", "means", "keens"))
+
async with second_engine.connect() as conn:
await conn.execute(
t1.insert(),
],
)
- await async_setup(async_engine, ("faux", "beau", "flow"))
+ await async_setup(first_engine, ("faux", "beau", "flow"))
async with second_engine.connect() as conn:
with testing.expect_raises_message(
for i in range(10)
],
)
+
+ @async_test
+ async def test_failed_commit_recover(self, metadata, async_testing_engine):
+
+ Table("t1", metadata, Column("id", Integer, primary_key=True))
+
+ t2 = Table(
+ "t2",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column(
+ "t1_id",
+ Integer,
+ ForeignKey("t1.id", deferrable=True, initially="deferred"),
+ ),
+ )
+
+ engine = async_testing_engine()
+
+ async with engine.connect() as conn:
+ await conn.run_sync(metadata.create_all)
+
+ await conn.execute(t2.insert().values(id=1, t1_id=2))
+
+ with testing.expect_raises_message(
+ exc.IntegrityError, 'insert or update on table "t2"'
+ ):
+ await conn.commit()
+
+ await conn.rollback()
+
+ eq_((await conn.execute(select(1))).scalar(), 1)
+
+ @async_test
+ async def test_rollback_twice_no_problem(
+ self, metadata, async_testing_engine
+ ):
+
+ engine = async_testing_engine()
+
+ async with engine.connect() as conn:
+
+ trans = await conn.begin()
+
+ await trans.rollback()
+
+ await conn.rollback()
+
+ @async_test
+ async def test_closed_during_execute(self, metadata, async_testing_engine):
+
+ engine = async_testing_engine()
+
+ async with engine.connect() as conn:
+ await conn.begin()
+
+ with testing.expect_raises_message(
+ exc.DBAPIError, "connection was closed"
+ ):
+ await conn.exec_driver_sql(
+ "select pg_terminate_backend(pg_backend_pid())"
+ )
+
+ @async_test
+ async def test_failed_rollback_recover(
+ self, metadata, async_testing_engine
+ ):
+
+ engine = async_testing_engine()
+
+ async with engine.connect() as conn:
+ await conn.begin()
+
+ (await conn.execute(select(1))).scalar()
+
+ raw_connection = await conn.get_raw_connection()
+ # close the asyncpg transaction directly
+ await raw_connection._transaction.rollback()
+
+ with testing.expect_raises_message(
+ exc.InterfaceError, "already rolled back"
+ ):
+ await conn.rollback()
+
+ # recovers no problem
+
+ await conn.begin()
+ await conn.rollback()