From: Mike Bayer Date: Wed, 16 Aug 2023 19:38:51 +0000 (-0400) Subject: place asyncpg do_ping within a transaction if one not begun already X-Git-Tag: rel_2_0_21~34^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=e5c881488e1f06f46ee3ddcfb513431efce3cd7b;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git place asyncpg do_ping within a transaction if one not begun already Fixed regression which appeared in 2.0 due to :ticket:`8491` where the revised "ping" used for PostgreSQL dialects when the :paramref:`_sa.create_engine.pool_pre_ping` parameter is in use would interfere with the use of asyncpg with PGBouncer "transaction" mode, as the multiple PostgreSQL commands emitted by asnycpg could be broken out among multiple connections leading to errors, due to the lack of any transaction around this newly revised "ping". The ping is now invoked within a transaction, in the same way that is implicit with all other backends that are based on the pep-249 DBAPI; this guarantees that the series of PG commands sent by asyncpg for this command are invoked on the same backend connection without it jumping to a different connection mid-command. The transaction is not used if the asyncpg dialect is used in "AUTOCOMMIT" mode, which remains incompatible with pgbouncer transaction mode. Fixes: #10226 Change-Id: I93156ad7c353a865d93e5296bd7bc137f0350f3c --- diff --git a/doc/build/changelog/unreleased_20/10226.rst b/doc/build/changelog/unreleased_20/10226.rst new file mode 100644 index 0000000000..420ef350f9 --- /dev/null +++ b/doc/build/changelog/unreleased_20/10226.rst @@ -0,0 +1,18 @@ +.. change:: + :tags: bug, postgresql + :tickets: 10226 + + Fixed regression which appeared in 2.0 due to :ticket:`8491` where the + revised "ping" used for PostgreSQL dialects when the + :paramref:`_sa.create_engine.pool_pre_ping` parameter is in use would + interfere with the use of asyncpg with PGBouncer "transaction" mode, as the + multiple PostgreSQL commands emitted by asnycpg could be broken out among + multiple connections leading to errors, due to the lack of any transaction + around this newly revised "ping". The ping is now invoked within a + transaction, in the same way that is implicit with all other backends that + are based on the pep-249 DBAPI; this guarantees that the series of PG + commands sent by asyncpg for this command are invoked on the same backend + connection without it jumping to a different connection mid-command. The + transaction is not used if the asyncpg dialect is used in "AUTOCOMMIT" + mode, which remains incompatible with pgbouncer transaction mode. + diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py index 1f6c0dcbe4..1554879a5c 100644 --- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py +++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py @@ -818,10 +818,23 @@ class AsyncAdapt_asyncpg_connection(AdaptedConnection): def ping(self): try: - _ = self.await_(self._connection.fetchrow(";")) + _ = self.await_(self._async_ping()) except Exception as error: self._handle_exception(error) + async def _async_ping(self): + if self._transaction is None and self.isolation_level != "autocommit": + # create a tranasction explicitly to support pgbouncer + # transaction mode. See #10226 + tr = self._connection.transaction() + await tr.start() + try: + await self._connection.fetchrow(";") + finally: + await tr.rollback() + else: + await self._connection.fetchrow(";") + def set_isolation_level(self, level): if self._started: self.rollback() diff --git a/test/dialect/postgresql/test_dialect.py b/test/dialect/postgresql/test_dialect.py index d94a2e2f60..b86ac55fdb 100644 --- a/test/dialect/postgresql/test_dialect.py +++ b/test/dialect/postgresql/test_dialect.py @@ -1,3 +1,4 @@ +import asyncio import dataclasses import datetime import logging @@ -1138,6 +1139,50 @@ class MiscBackendTest( dbapi_conn = conn.connection.dbapi_connection eq_(dbapi_conn.autocommit, autocommit) + @testing.only_on("+asyncpg") + @testing.combinations((True,), (False,), argnames="autocommit") + def test_asyncpg_transactional_ping(self, testing_engine, autocommit): + """test #10226""" + + engine = testing_engine( + options={ + "isolation_level": "AUTOCOMMIT" + if autocommit + else "SERIALIZABLE", + "pool_pre_ping": True, + } + ) + conn = engine.connect() + dbapi_conn = conn.connection.dbapi_connection + conn.close() + + future = asyncio.Future() + future.set_result(None) + + rollback = mock.Mock(return_value=future) + transaction = mock.Mock( + return_value=mock.Mock( + start=mock.Mock(return_value=future), + rollback=rollback, + ) + ) + mock_asyncpg_connection = mock.Mock( + fetchrow=mock.Mock(return_value=future), transaction=transaction + ) + + with mock.patch.object( + dbapi_conn, "_connection", mock_asyncpg_connection + ): + conn = engine.connect() + conn.close() + + if autocommit: + eq_(transaction.mock_calls, []) + eq_(rollback.mock_calls, []) + else: + eq_(transaction.mock_calls, [mock.call()]) + eq_(rollback.mock_calls, [mock.call()]) + def test_deferrable_flag_engine(self): engine = engines.testing_engine( options={