From: Federico Caselli Date: Mon, 13 Apr 2026 21:53:00 +0000 (+0200) Subject: Improve pg two-phase transactions X-Git-Url: http://git.ipfire.org/gitweb/?a=commitdiff_plain;h=57cc9e7921141dc2212974944d67d025ededd706;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git Improve pg two-phase transactions Improve handling of two phase transaction identifiers for PostgreSQL when the identifier is provided by the user. As part of this change the psycopg dialect was updated to use the DBAPI two phase transaction API instead of executing the SQL directly. Fixes: #13229 Change-Id: If8301a7253b4a0c88e5323c9a052c3a9fa258780 (cherry picked from commit 08cef20f4a2bfbeda61abfe6caee975190f0794c) --- diff --git a/doc/build/changelog/unreleased_20/13229.rst b/doc/build/changelog/unreleased_20/13229.rst new file mode 100644 index 0000000000..e02886bed0 --- /dev/null +++ b/doc/build/changelog/unreleased_20/13229.rst @@ -0,0 +1,8 @@ +.. change:: + :tags: postgresql, bug + :tickets: 13229 + + Improve handling of two phase transaction identifiers for PostgreSQL + when the identifier is provided by the user. + As part of this change the psycopg dialect was updated to use the DBAPI + two phase transaction API instead of executing the SQL directly. diff --git a/lib/sqlalchemy/dialects/postgresql/_psycopg_common.py b/lib/sqlalchemy/dialects/postgresql/_psycopg_common.py index 03b0f76ec3..dcd93b12cc 100644 --- a/lib/sqlalchemy/dialects/postgresql/_psycopg_common.py +++ b/lib/sqlalchemy/dialects/postgresql/_psycopg_common.py @@ -187,3 +187,39 @@ class _PGDialect_common_psycopg(PGDialect): dbapi_connection.autocommit = before_autocommit return True + + def do_begin_twophase(self, connection, xid): + connection.connection.tpc_begin(xid) + + def do_prepare_twophase(self, connection, xid): + connection.connection.tpc_prepare() + + def _do_twophase(self, dbapi_conn, operation, xid, recover=False): + if recover: + if not self._twophase_idle_check(dbapi_conn): + dbapi_conn.rollback() + operation(xid) + else: + operation() + + def _twophase_idle_check(self, dbapi_conn): + raise NotImplementedError + + def do_rollback_twophase( + self, connection, xid, is_prepared=True, recover=False + ): + dbapi_conn = connection.connection.dbapi_connection + self._do_twophase( + dbapi_conn, dbapi_conn.tpc_rollback, xid, recover=recover + ) + + def do_commit_twophase( + self, connection, xid, is_prepared=True, recover=False + ): + dbapi_conn = connection.connection.dbapi_connection + self._do_twophase( + dbapi_conn, dbapi_conn.tpc_commit, xid, recover=recover + ) + + def do_recover_twophase(self, connection): + return [row[1] for row in connection.connection.tpc_recover()] diff --git a/lib/sqlalchemy/dialects/postgresql/base.py b/lib/sqlalchemy/dialects/postgresql/base.py index e2b6f257e9..8637f35d4f 100644 --- a/lib/sqlalchemy/dialects/postgresql/base.py +++ b/lib/sqlalchemy/dialects/postgresql/base.py @@ -3478,7 +3478,11 @@ class PGDialect(default.DefaultDialect): self.do_begin(connection.connection) def do_prepare_twophase(self, connection, xid): - connection.exec_driver_sql("PREPARE TRANSACTION '%s'" % xid) + connection.execute( + sql.text("PREPARE TRANSACTION :xid'").bindparams( + sql.bindparam("xid", xid, literal_execute=True) + ) + ) def do_rollback_twophase( self, connection, xid, is_prepared=True, recover=False @@ -3490,7 +3494,11 @@ class PGDialect(default.DefaultDialect): # Must find out a way how to make the dbapi not # open a transaction. connection.exec_driver_sql("ROLLBACK") - connection.exec_driver_sql("ROLLBACK PREPARED '%s'" % xid) + connection.execute( + sql.text("ROLLBACK PREPARED :xid").bindparams( + sql.bindparam("xid", xid, literal_execute=True) + ) + ) connection.exec_driver_sql("BEGIN") self.do_rollback(connection.connection) else: @@ -3502,7 +3510,11 @@ class PGDialect(default.DefaultDialect): if is_prepared: if recover: connection.exec_driver_sql("ROLLBACK") - connection.exec_driver_sql("COMMIT PREPARED '%s'" % xid) + connection.execute( + sql.text("COMMIT PREPARED :xid").bindparams( + sql.bindparam("xid", xid, literal_execute=True) + ) + ) connection.exec_driver_sql("BEGIN") self.do_rollback(connection.connection) else: diff --git a/lib/sqlalchemy/dialects/postgresql/provision.py b/lib/sqlalchemy/dialects/postgresql/provision.py index dfe0f627d2..dfe67dce9c 100644 --- a/lib/sqlalchemy/dialects/postgresql/provision.py +++ b/lib/sqlalchemy/dialects/postgresql/provision.py @@ -95,9 +95,10 @@ def _postgresql_set_default_schema_on_connection( def drop_all_schema_objects_pre_tables(cfg, eng): with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn: for xid in conn.exec_driver_sql( - "select gid from pg_prepared_xacts" + "SELECT gid FROM pg_prepared_xacts " + "WHERE database = current_database()" ).scalars(): - conn.exec_driver_sql("ROLLBACK PREPARED '%s'" % xid) + eng.dialect.do_rollback_twophase(conn, xid, recover=True) @drop_all_schema_objects_post_tables.for_db("postgresql") diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg.py b/lib/sqlalchemy/dialects/postgresql/psycopg.py index 67dc5ca86c..4af214dabb 100644 --- a/lib/sqlalchemy/dialects/postgresql/psycopg.py +++ b/lib/sqlalchemy/dialects/postgresql/psycopg.py @@ -596,44 +596,12 @@ class PGDialect_psycopg(_PGDialect_common_psycopg): return True return False - def _do_prepared_twophase(self, connection, command, recover=False): - dbapi_conn = connection.connection.dbapi_connection - if ( - recover - # don't rely on psycopg providing enum symbols, compare with - # eq/ne - or dbapi_conn.info.transaction_status - != self._psycopg_TransactionStatus.IDLE - ): - dbapi_conn.rollback() - before_autocommit = dbapi_conn.autocommit - try: - if not before_autocommit: - self._do_autocommit(dbapi_conn, True) - dbapi_conn.execute(command) - finally: - if not before_autocommit: - self._do_autocommit(dbapi_conn, before_autocommit) - - def do_rollback_twophase( - self, connection, xid, is_prepared=True, recover=False - ): - if is_prepared: - self._do_prepared_twophase( - connection, f"ROLLBACK PREPARED '{xid}'", recover=recover - ) - else: - self.do_rollback(connection.connection) - - def do_commit_twophase( - self, connection, xid, is_prepared=True, recover=False - ): - if is_prepared: - self._do_prepared_twophase( - connection, f"COMMIT PREPARED '{xid}'", recover=recover - ) - else: - self.do_commit(connection.connection) + def _twophase_idle_check(self, dbapi_conn): + # don't rely on psycopg providing enum symbols, compare with eq/ne + return ( + dbapi_conn.info.transaction_status + == self._psycopg_TransactionStatus.IDLE + ) @util.memoized_property def _dialect_specific_select_one(self): @@ -784,6 +752,21 @@ class AsyncAdapt_psycopg_connection(AdaptedConnection): def set_deferrable(self, value): self.await_(self._connection.set_deferrable(value)) + def tpc_begin(self, xid): + return self.await_(self._connection.tpc_begin(xid)) + + def tpc_prepare(self): + return self.await_(self._connection.tpc_prepare()) + + def tpc_commit(self, xid=None): + return self.await_(self._connection.tpc_commit(xid)) + + def tpc_rollback(self, xid=None): + return self.await_(self._connection.tpc_rollback(xid)) + + def tpc_recover(self): + return self.await_(self._connection.tpc_recover()) + class AsyncAdaptFallback_psycopg_connection(AsyncAdapt_psycopg_connection): __slots__ = () diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg2.py b/lib/sqlalchemy/dialects/postgresql/psycopg2.py index 189e6566cf..48ed9bf1e1 100644 --- a/lib/sqlalchemy/dialects/postgresql/psycopg2.py +++ b/lib/sqlalchemy/dialects/postgresql/psycopg2.py @@ -799,35 +799,8 @@ class PGDialect_psycopg2(_PGDialect_common_psycopg): else: cursor.executemany(statement, parameters) - def do_begin_twophase(self, connection, xid): - connection.connection.tpc_begin(xid) - - def do_prepare_twophase(self, connection, xid): - connection.connection.tpc_prepare() - - def _do_twophase(self, dbapi_conn, operation, xid, recover=False): - if recover: - if dbapi_conn.status != self._psycopg2_extensions.STATUS_READY: - dbapi_conn.rollback() - operation(xid) - else: - operation() - - def do_rollback_twophase( - self, connection, xid, is_prepared=True, recover=False - ): - dbapi_conn = connection.connection.dbapi_connection - self._do_twophase( - dbapi_conn, dbapi_conn.tpc_rollback, xid, recover=recover - ) - - def do_commit_twophase( - self, connection, xid, is_prepared=True, recover=False - ): - dbapi_conn = connection.connection.dbapi_connection - self._do_twophase( - dbapi_conn, dbapi_conn.tpc_commit, xid, recover=recover - ) + def _twophase_idle_check(self, dbapi_conn): + return dbapi_conn.status == self._psycopg2_extensions.STATUS_READY @util.memoized_instancemethod def _hstore_oids(self, dbapi_connection): diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index 26f162b88c..e3b71fe5b5 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -951,7 +951,8 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): :meth:`~.TwoPhaseTransaction.prepare` method. :param xid: the two phase transaction id. If not supplied, a - random id will be generated. + random id will be generated. The accepted type and value depends on + the driver in use. .. seealso:: diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index 4c6bf8a28f..e96cfd06dc 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -1502,8 +1502,9 @@ class ExecutionOptionsTest(fixtures.TestBase): "hoho", ) eng.update_execution_options(foo="hoho") - conn = eng.connect() - eq_(conn._execution_options["foo"], "hoho") + conn2 = eng.connect() + eq_(conn2._execution_options["foo"], "hoho") + conn2.close() def test_generative_engine_execution_options(self): eng = engines.testing_engine( @@ -2811,6 +2812,7 @@ class EngineEventsTest(fixtures.TestBase): "commit_twophase", ], ) + conn.close() class HandleErrorTest(fixtures.TestBase):