Fixed critical issue in the asyncpg driver where a rollback or commit that
fails specifically for the ``MissingGreenlet`` condition or any other error
that is not raised by asyncpg itself would discard the asyncpg transaction
in any case, even though the transaction were still idle, leaving to a
server side condition with an idle transaction that then goes back into the
connection pool. The flags for "transaction closed" are now not reset for
errors that are raised outside of asyncpg itself. When asyncpg itself
raises an error for ``.commit()`` or ``.rollback()``, asyncpg does then
discard of this transaction.
Fixes: #11819
Change-Id: I12f0532788b03ea63fb47a7af21e07c37effb070
(cherry picked from commit
a1f220cb4d1a04412a53200f454fbfc706e136b3)
(cherry picked from commit
ca69db7e1ff6dabbbd57b1bca3387d0321da19a5)
--- /dev/null
+.. change::
+ :tags: bug, postgresql
+ :tickets: 11819
+ :versions: 2.0.33, 1.4.54
+
+ Fixed critical issue in the asyncpg driver where a rollback or commit that
+ fails specifically for the ``MissingGreenlet`` condition or any other error
+ that is not raised by asyncpg itself would discard the asyncpg transaction
+ in any case, even though the transaction were still idle, leaving to a
+ server side condition with an idle transaction that then goes back into the
+ connection pool. The flags for "transaction closed" are now not reset for
+ errors that are raised outside of asyncpg itself. When asyncpg itself
+ raises an error for ``.commit()`` or ``.rollback()``, asyncpg does then
+ discard of this transaction.
else:
return AsyncAdapt_asyncpg_cursor(self)
+ async def _rollback_and_discard(self):
+ try:
+ await self._transaction.rollback()
+ finally:
+ # if asyncpg .rollback() was actually called, then whether or
+ # not it raised or succeeded, the transation is done, discard it
+ self._transaction = None
+ self._started = False
+
+ async def _commit_and_discard(self):
+ try:
+ await self._transaction.commit()
+ finally:
+ # if asyncpg .commit() was actually called, then whether or
+ # not it raised or succeeded, the transation is done, discard it
+ self._transaction = None
+ self._started = False
+
def rollback(self):
if self._started:
try:
- self.await_(self._transaction.rollback())
- except Exception as error:
- self._handle_exception(error)
- finally:
+ self.await_(self._rollback_and_discard())
self._transaction = None
self._started = False
+ except Exception as error:
+ # don't dereference asyncpg transaction if we didn't
+ # actually try to call rollback() on it
+ self._handle_exception(error)
def commit(self):
if self._started:
try:
- self.await_(self._transaction.commit())
- except Exception as error:
- self._handle_exception(error)
- finally:
+ self.await_(self._commit_and_discard())
self._transaction = None
self._started = False
+ except Exception as error:
+ # don't dereference asyncpg transaction if we didn't
+ # actually try to call commit() on it
+ self._handle_exception(error)
def close(self):
self.rollback()
from sqlalchemy.dialects.postgresql import ENUM
from sqlalchemy.testing import async_test
from sqlalchemy.testing import eq_
+from sqlalchemy.testing import expect_raises
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import mock
],
)
+ @testing.variation("trans", ["commit", "rollback"])
+ @async_test
+ async def test_dont_reset_open_transaction(
+ self, trans, async_testing_engine
+ ):
+ """test for #11819"""
+
+ engine = async_testing_engine()
+
+ control_conn = await engine.connect()
+ await control_conn.execution_options(isolation_level="AUTOCOMMIT")
+
+ conn = await engine.connect()
+ txid_current = (
+ await conn.exec_driver_sql("select txid_current()")
+ ).scalar()
+
+ with expect_raises(exc.MissingGreenlet):
+ if trans.commit:
+ conn.sync_connection.connection.dbapi_connection.commit()
+ elif trans.rollback:
+ conn.sync_connection.connection.dbapi_connection.rollback()
+ else:
+ trans.fail()
+
+ trans_exists = (
+ await control_conn.exec_driver_sql(
+ f"SELECT count(*) FROM pg_stat_activity "
+ f"where backend_xid={txid_current}"
+ )
+ ).scalar()
+ eq_(trans_exists, 1)
+
+ if trans.commit:
+ await conn.commit()
+ elif trans.rollback:
+ await conn.rollback()
+ else:
+ trans.fail()
+
+ trans_exists = (
+ await control_conn.exec_driver_sql(
+ f"SELECT count(*) FROM pg_stat_activity "
+ f"where backend_xid={txid_current}"
+ )
+ ).scalar()
+ eq_(trans_exists, 0)
+ await engine.dispose()
+
@async_test
async def test_failed_commit_recover(self, metadata, async_testing_engine):