--- /dev/null
+.. change::
+ :tags: bug, postgresql
+ :tickets: 10226
+
+ Fixed regression which appeared in 2.0 due to :ticket:`8491` where the
+ revised "ping" used for PostgreSQL dialects when the
+ :paramref:`_sa.create_engine.pool_pre_ping` parameter is in use would
+ interfere with the use of asyncpg with PGBouncer "transaction" mode, as the
+ multiple PostgreSQL commands emitted by asnycpg could be broken out among
+ multiple connections leading to errors, due to the lack of any transaction
+ around this newly revised "ping". The ping is now invoked within a
+ transaction, in the same way that is implicit with all other backends that
+ are based on the pep-249 DBAPI; this guarantees that the series of PG
+ commands sent by asyncpg for this command are invoked on the same backend
+ connection without it jumping to a different connection mid-command. The
+ transaction is not used if the asyncpg dialect is used in "AUTOCOMMIT"
+ mode, which remains incompatible with pgbouncer transaction mode.
+
def ping(self):
try:
- _ = self.await_(self._connection.fetchrow(";"))
+ _ = self.await_(self._async_ping())
except Exception as error:
self._handle_exception(error)
+ async def _async_ping(self):
+ if self._transaction is None and self.isolation_level != "autocommit":
+ # create a tranasction explicitly to support pgbouncer
+ # transaction mode. See #10226
+ tr = self._connection.transaction()
+ await tr.start()
+ try:
+ await self._connection.fetchrow(";")
+ finally:
+ await tr.rollback()
+ else:
+ await self._connection.fetchrow(";")
+
def set_isolation_level(self, level):
if self._started:
self.rollback()
+import asyncio
import dataclasses
import datetime
import logging
dbapi_conn = conn.connection.dbapi_connection
eq_(dbapi_conn.autocommit, autocommit)
+ @testing.only_on("+asyncpg")
+ @testing.combinations((True,), (False,), argnames="autocommit")
+ def test_asyncpg_transactional_ping(self, testing_engine, autocommit):
+ """test #10226"""
+
+ engine = testing_engine(
+ options={
+ "isolation_level": "AUTOCOMMIT"
+ if autocommit
+ else "SERIALIZABLE",
+ "pool_pre_ping": True,
+ }
+ )
+ conn = engine.connect()
+ dbapi_conn = conn.connection.dbapi_connection
+ conn.close()
+
+ future = asyncio.Future()
+ future.set_result(None)
+
+ rollback = mock.Mock(return_value=future)
+ transaction = mock.Mock(
+ return_value=mock.Mock(
+ start=mock.Mock(return_value=future),
+ rollback=rollback,
+ )
+ )
+ mock_asyncpg_connection = mock.Mock(
+ fetchrow=mock.Mock(return_value=future), transaction=transaction
+ )
+
+ with mock.patch.object(
+ dbapi_conn, "_connection", mock_asyncpg_connection
+ ):
+ conn = engine.connect()
+ conn.close()
+
+ if autocommit:
+ eq_(transaction.mock_calls, [])
+ eq_(rollback.mock_calls, [])
+ else:
+ eq_(transaction.mock_calls, [mock.call()])
+ eq_(rollback.mock_calls, [mock.call()])
+
def test_deferrable_flag_engine(self):
engine = engines.testing_engine(
options={