--- /dev/null
+.. change::
+ :tags: bug, postgresql
+ :tickets: 11819
+ :versions: 2.0.33, 1.4.54
+
+ Fixed critical issue in the asyncpg driver where a rollback or commit that
+ fails specifically for the ``MissingGreenlet`` condition or any other error
+ that is not raised by asyncpg itself would discard the asyncpg transaction
+ in any case, even though the transaction were still idle, leaving to a
+ server side condition with an idle transaction that then goes back into the
+ connection pool. The flags for "transaction closed" are now not reset for
+ errors that are raised outside of asyncpg itself. When asyncpg itself
+ raises an error for ``.commit()`` or ``.rollback()``, asyncpg does then
+ discard of this transaction.
else:
self._started = True
+ async def _rollback_and_discard(self):
+ try:
+ await self._transaction.rollback()
+ finally:
+ # if asyncpg .rollback() was actually called, then whether or
+ # not it raised or succeeded, the transation is done, discard it
+ self._transaction = None
+ self._started = False
+
+ async def _commit_and_discard(self):
+ try:
+ await self._transaction.commit()
+ finally:
+ # if asyncpg .commit() was actually called, then whether or
+ # not it raised or succeeded, the transation is done, discard it
+ self._transaction = None
+ self._started = False
+
def rollback(self):
if self._started:
assert self._transaction is not None
try:
- await_(self._transaction.rollback())
- except Exception as error:
- self._handle_exception(error)
- finally:
+ await_(self._rollback_and_discard())
self._transaction = None
self._started = False
+ except Exception as error:
+ # don't dereference asyncpg transaction if we didn't
+ # actually try to call rollback() on it
+ self._handle_exception(error)
def commit(self):
if self._started:
assert self._transaction is not None
try:
- await_(self._transaction.commit())
- except Exception as error:
- self._handle_exception(error)
- finally:
+ await_(self._commit_and_discard())
self._transaction = None
self._started = False
+ except Exception as error:
+ # don't dereference asyncpg transaction if we didn't
+ # actually try to call commit() on it
+ self._handle_exception(error)
def close(self):
self.rollback()
from sqlalchemy.dialects.postgresql import ENUM
from sqlalchemy.testing import async_test
from sqlalchemy.testing import eq_
+from sqlalchemy.testing import expect_raises
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import mock
],
)
+ @testing.variation("trans", ["commit", "rollback"])
+ @async_test
+ async def test_dont_reset_open_transaction(
+ self, trans, async_testing_engine
+ ):
+ """test for #11819"""
+
+ engine = async_testing_engine()
+
+ control_conn = await engine.connect()
+ await control_conn.execution_options(isolation_level="AUTOCOMMIT")
+
+ conn = await engine.connect()
+ txid_current = (
+ await conn.exec_driver_sql("select txid_current()")
+ ).scalar()
+
+ with expect_raises(exc.MissingGreenlet):
+ if trans.commit:
+ conn.sync_connection.connection.dbapi_connection.commit()
+ elif trans.rollback:
+ conn.sync_connection.connection.dbapi_connection.rollback()
+ else:
+ trans.fail()
+
+ trans_exists = (
+ await control_conn.exec_driver_sql(
+ f"SELECT count(*) FROM pg_stat_activity "
+ f"where backend_xid={txid_current}"
+ )
+ ).scalar()
+ eq_(trans_exists, 1)
+
+ if trans.commit:
+ await conn.commit()
+ elif trans.rollback:
+ await conn.rollback()
+ else:
+ trans.fail()
+
+ trans_exists = (
+ await control_conn.exec_driver_sql(
+ f"SELECT count(*) FROM pg_stat_activity "
+ f"where backend_xid={txid_current}"
+ )
+ ).scalar()
+ eq_(trans_exists, 0)
+
@async_test
async def test_failed_commit_recover(self, metadata, async_testing_engine):
Table("t1", metadata, Column("id", Integer, primary_key=True))