from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import testing
+from sqlalchemy.dialects.postgresql import asyncpg as asyncpg_dialect
from sqlalchemy.dialects.postgresql import ENUM
from sqlalchemy.testing import async_test
from sqlalchemy.testing import eq_
from sqlalchemy.testing import expect_raises
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import mock
+from sqlalchemy.util import greenlet_spawn
class AsyncPgTest(fixtures.TestBase):
],
)
+ @testing.combinations(
+ None,
+ "read committed",
+ "repeatable read",
+ "serializable",
+ argnames="isolation_level",
+ )
+ @async_test
+ async def test_honor_server_level_iso_setting(
+ self, async_testing_engine, isolation_level
+ ):
+ """test for #12159"""
+
+ engine = async_testing_engine()
+
+ arg, kw = engine.dialect.create_connect_args(engine.url)
+
+ # 1. create an asyncpg.connection directly, set a session level
+ # isolation level on it (this is similar to server default isolation
+ # level)
+ raw_asyncpg_conn = await engine.dialect.dbapi.asyncpg.connect(
+ *arg, **kw
+ )
+
+ if isolation_level:
+ await raw_asyncpg_conn.execute(
+ f"set SESSION CHARACTERISTICS AS TRANSACTION "
+ f"isolation level {isolation_level}"
+ )
+
+ # 2. fetch it, confirm the setting took and matches
+ raw_iso_level = (
+ await raw_asyncpg_conn.fetchrow("show transaction isolation level")
+ )[0]
+ if isolation_level:
+ eq_(raw_iso_level, isolation_level.lower())
+
+ # 3.build our pep-249 wrapper around asyncpg.connection
+ dbapi_conn = asyncpg_dialect.AsyncAdapt_asyncpg_connection(
+ engine.dialect.dbapi,
+ raw_asyncpg_conn,
+ )
+
+ # 4. show the isolation level inside of a query. this will
+ # call asyncpg.connection.transaction() in order to run the
+ # statement.
+ cursor = await greenlet_spawn(dbapi_conn.cursor)
+ await greenlet_spawn(
+ cursor.execute, "show transaction isolation level"
+ )
+ row = cursor.fetchone()
+
+ # 5. see that the raw iso level is maintained
+ eq_(row[0], raw_iso_level)
+
+ await greenlet_spawn(dbapi_conn.close)
+
@testing.variation("trans", ["commit", "rollback"])
@async_test
async def test_dont_reset_open_transaction(