From: Federico Caselli Date: Mon, 13 Apr 2026 21:53:00 +0000 (+0200) Subject: Improve pg two-phase transactions X-Git-Tag: rel_2_1_0b2~5^2 X-Git-Url: http://git.ipfire.org/index.cgi?a=commitdiff_plain;h=08cef20f4a2bfbeda61abfe6caee975190f0794c;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git Improve pg two-phase transactions Improve handling of two phase transaction identifiers for PostgreSQL when the identifier is provided by the user. As part of this change the psycopg dialect was updated to use the DBAPI two phase transaction API instead of executing the SQL directly. Fixes: #13229 Change-Id: If8301a7253b4a0c88e5323c9a052c3a9fa258780 --- diff --git a/doc/build/changelog/unreleased_20/13229.rst b/doc/build/changelog/unreleased_20/13229.rst new file mode 100644 index 0000000000..e02886bed0 --- /dev/null +++ b/doc/build/changelog/unreleased_20/13229.rst @@ -0,0 +1,8 @@ +.. change:: + :tags: postgresql, bug + :tickets: 13229 + + Improve handling of two phase transaction identifiers for PostgreSQL + when the identifier is provided by the user. + As part of this change the psycopg dialect was updated to use the DBAPI + two phase transaction API instead of executing the SQL directly. diff --git a/lib/sqlalchemy/dialects/postgresql/_psycopg_common.py b/lib/sqlalchemy/dialects/postgresql/_psycopg_common.py index bc4994a976..97b3d30bbc 100644 --- a/lib/sqlalchemy/dialects/postgresql/_psycopg_common.py +++ b/lib/sqlalchemy/dialects/postgresql/_psycopg_common.py @@ -191,3 +191,39 @@ class _PGDialect_common_psycopg(PGDialect): dbapi_connection.autocommit = before_autocommit return True + + def do_begin_twophase(self, connection, xid): + connection.connection.tpc_begin(xid) + + def do_prepare_twophase(self, connection, xid): + connection.connection.tpc_prepare() + + def _do_twophase(self, dbapi_conn, operation, xid, recover=False): + if recover: + if not self._twophase_idle_check(dbapi_conn): + dbapi_conn.rollback() + operation(xid) + else: + operation() + + def _twophase_idle_check(self, dbapi_conn): + raise NotImplementedError + + def do_rollback_twophase( + self, connection, xid, is_prepared=True, recover=False + ): + dbapi_conn = connection.connection.dbapi_connection + self._do_twophase( + dbapi_conn, dbapi_conn.tpc_rollback, xid, recover=recover + ) + + def do_commit_twophase( + self, connection, xid, is_prepared=True, recover=False + ): + dbapi_conn = connection.connection.dbapi_connection + self._do_twophase( + dbapi_conn, dbapi_conn.tpc_commit, xid, recover=recover + ) + + def do_recover_twophase(self, connection): + return [row[1] for row in connection.connection.tpc_recover()] diff --git a/lib/sqlalchemy/dialects/postgresql/base.py b/lib/sqlalchemy/dialects/postgresql/base.py index 25d6b6fbea..1702bc70c9 100644 --- a/lib/sqlalchemy/dialects/postgresql/base.py +++ b/lib/sqlalchemy/dialects/postgresql/base.py @@ -3687,7 +3687,11 @@ class PGDialect(default.DefaultDialect): self.do_begin(connection.connection) def do_prepare_twophase(self, connection, xid): - connection.exec_driver_sql("PREPARE TRANSACTION '%s'" % xid) + connection.execute( + sql.text("PREPARE TRANSACTION :xid'").bindparams( + sql.bindparam("xid", xid, literal_execute=True) + ) + ) def do_rollback_twophase( self, connection, xid, is_prepared=True, recover=False @@ -3699,7 +3703,11 @@ class PGDialect(default.DefaultDialect): # Must find out a way how to make the dbapi not # open a transaction. connection.exec_driver_sql("ROLLBACK") - connection.exec_driver_sql("ROLLBACK PREPARED '%s'" % xid) + connection.execute( + sql.text("ROLLBACK PREPARED :xid").bindparams( + sql.bindparam("xid", xid, literal_execute=True) + ) + ) connection.exec_driver_sql("BEGIN") self.do_rollback(connection.connection) else: @@ -3711,7 +3719,11 @@ class PGDialect(default.DefaultDialect): if is_prepared: if recover: connection.exec_driver_sql("ROLLBACK") - connection.exec_driver_sql("COMMIT PREPARED '%s'" % xid) + connection.execute( + sql.text("COMMIT PREPARED :xid").bindparams( + sql.bindparam("xid", xid, literal_execute=True) + ) + ) connection.exec_driver_sql("BEGIN") self.do_rollback(connection.connection) else: diff --git a/lib/sqlalchemy/dialects/postgresql/provision.py b/lib/sqlalchemy/dialects/postgresql/provision.py index dfe0f627d2..dfe67dce9c 100644 --- a/lib/sqlalchemy/dialects/postgresql/provision.py +++ b/lib/sqlalchemy/dialects/postgresql/provision.py @@ -95,9 +95,10 @@ def _postgresql_set_default_schema_on_connection( def drop_all_schema_objects_pre_tables(cfg, eng): with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn: for xid in conn.exec_driver_sql( - "select gid from pg_prepared_xacts" + "SELECT gid FROM pg_prepared_xacts " + "WHERE database = current_database()" ).scalars(): - conn.exec_driver_sql("ROLLBACK PREPARED '%s'" % xid) + eng.dialect.do_rollback_twophase(conn, xid, recover=True) @drop_all_schema_objects_post_tables.for_db("postgresql") diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg.py b/lib/sqlalchemy/dialects/postgresql/psycopg.py index b23ac6319f..5422849e82 100644 --- a/lib/sqlalchemy/dialects/postgresql/psycopg.py +++ b/lib/sqlalchemy/dialects/postgresql/psycopg.py @@ -598,45 +598,12 @@ class PGDialect_psycopg(_PGDialect_common_psycopg): return True return False - def _do_prepared_twophase(self, connection, command, recover=False): - dbapi_conn = connection.connection.dbapi_connection - if ( - recover - # don't rely on psycopg providing enum symbols, compare with - # eq/ne - or dbapi_conn.info.transaction_status - != self._psycopg_TransactionStatus.IDLE - ): - dbapi_conn.rollback() - before_autocommit = dbapi_conn.autocommit - try: - if not before_autocommit: - self._do_autocommit(dbapi_conn, True) - with dbapi_conn.cursor() as cursor: - cursor.execute(command) - finally: - if not before_autocommit: - self._do_autocommit(dbapi_conn, before_autocommit) - - def do_rollback_twophase( - self, connection, xid, is_prepared=True, recover=False - ): - if is_prepared: - self._do_prepared_twophase( - connection, f"ROLLBACK PREPARED '{xid}'", recover=recover - ) - else: - self.do_rollback(connection.connection) - - def do_commit_twophase( - self, connection, xid, is_prepared=True, recover=False - ): - if is_prepared: - self._do_prepared_twophase( - connection, f"COMMIT PREPARED '{xid}'", recover=recover - ) - else: - self.do_commit(connection.connection) + def _twophase_idle_check(self, dbapi_conn): + # don't rely on psycopg providing enum symbols, compare with eq/ne + return ( + dbapi_conn.info.transaction_status + == self._psycopg_TransactionStatus.IDLE + ) @util.memoized_property def _dialect_specific_select_one(self): @@ -759,6 +726,21 @@ class AsyncAdapt_psycopg_connection(AsyncAdapt_dbapi_connection): else: return AsyncAdapt_psycopg_cursor(self) + def tpc_begin(self, xid): + return await_(self._connection.tpc_begin(xid)) + + def tpc_prepare(self): + return await_(self._connection.tpc_prepare()) + + def tpc_commit(self, xid=None): + return await_(self._connection.tpc_commit(xid)) + + def tpc_rollback(self, xid=None): + return await_(self._connection.tpc_rollback(xid)) + + def tpc_recover(self): + return await_(self._connection.tpc_recover()) + class PsycopgAdaptDBAPI(AsyncAdapt_dbapi_module): def __init__(self, psycopg, ExecStatus) -> None: diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg2.py b/lib/sqlalchemy/dialects/postgresql/psycopg2.py index 2f886c9df6..bde9e1c93e 100644 --- a/lib/sqlalchemy/dialects/postgresql/psycopg2.py +++ b/lib/sqlalchemy/dialects/postgresql/psycopg2.py @@ -794,35 +794,8 @@ class PGDialect_psycopg2(_PGDialect_common_psycopg): else: cursor.executemany(statement, parameters) - def do_begin_twophase(self, connection, xid): - connection.connection.tpc_begin(xid) - - def do_prepare_twophase(self, connection, xid): - connection.connection.tpc_prepare() - - def _do_twophase(self, dbapi_conn, operation, xid, recover=False): - if recover: - if dbapi_conn.status != self._psycopg2_extensions.STATUS_READY: - dbapi_conn.rollback() - operation(xid) - else: - operation() - - def do_rollback_twophase( - self, connection, xid, is_prepared=True, recover=False - ): - dbapi_conn = connection.connection.dbapi_connection - self._do_twophase( - dbapi_conn, dbapi_conn.tpc_rollback, xid, recover=recover - ) - - def do_commit_twophase( - self, connection, xid, is_prepared=True, recover=False - ): - dbapi_conn = connection.connection.dbapi_connection - self._do_twophase( - dbapi_conn, dbapi_conn.tpc_commit, xid, recover=recover - ) + def _twophase_idle_check(self, dbapi_conn): + return dbapi_conn.status == self._psycopg2_extensions.STATUS_READY @util.memoized_instancemethod def _hstore_oids(self, dbapi_connection): diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index 9aaf02f4a2..7f8af56a8c 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -967,7 +967,8 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): :meth:`~.TwoPhaseTransaction.prepare` method. :param xid: the two phase transaction id. If not supplied, a - random id will be generated. + random id will be generated. The accepted type and value depends on + the driver in use. .. seealso:: diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index 66ef5ce189..aba37ae14a 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -1565,8 +1565,9 @@ class ExecutionOptionsTest(fixtures.TestBase): "hoho", ) eng.update_execution_options(foo="hoho") - conn = eng.connect() - eq_(conn._execution_options["foo"], "hoho") + conn2 = eng.connect() + eq_(conn2._execution_options["foo"], "hoho") + conn2.close() def test_generative_engine_execution_options(self): eng = engines.testing_engine( @@ -2873,6 +2874,7 @@ class EngineEventsTest(fixtures.TestBase): "commit_twophase", ], ) + conn.close() class HandleErrorTest(fixtures.TestBase):