From: Federico Caselli Date: Fri, 5 Jun 2026 19:51:13 +0000 (+0200) Subject: repair xid in psycopg X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=75cb5aec658196a0bae5d07216d790126ebc0739;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git repair xid in psycopg Repaired bug introduced in :ticket:`13229` where a two-phase transaction recovery would not return the correct transaction identifier when generating the identifiers using the ``xid()`` method of the psycopg connection. Fixes: #13355 Change-Id: Iffe68c1701afaa678fa7b598559dd396d3f8db41 --- diff --git a/doc/build/changelog/unreleased_20/13355.rst b/doc/build/changelog/unreleased_20/13355.rst new file mode 100644 index 0000000000..cad4508ef0 --- /dev/null +++ b/doc/build/changelog/unreleased_20/13355.rst @@ -0,0 +1,8 @@ +.. change:: + :tags: bug, postgresql + :tickets: 13355 + + Repaired bug introduced in :ticket:`13229` where a two-phase + transaction recovery would not return the correct transaction + identifier when generating the identifiers using the ``xid()`` + method of the psycopg connection. diff --git a/lib/sqlalchemy/dialects/postgresql/_psycopg_common.py b/lib/sqlalchemy/dialects/postgresql/_psycopg_common.py index 97b3d30bbc..3a3f823cca 100644 --- a/lib/sqlalchemy/dialects/postgresql/_psycopg_common.py +++ b/lib/sqlalchemy/dialects/postgresql/_psycopg_common.py @@ -226,4 +226,4 @@ class _PGDialect_common_psycopg(PGDialect): ) def do_recover_twophase(self, connection): - return [row[1] for row in connection.connection.tpc_recover()] + return [str(row) for row in connection.connection.tpc_recover()] diff --git a/lib/sqlalchemy/dialects/postgresql/base.py b/lib/sqlalchemy/dialects/postgresql/base.py index 97a553bec1..0d5ca08221 100644 --- a/lib/sqlalchemy/dialects/postgresql/base.py +++ b/lib/sqlalchemy/dialects/postgresql/base.py @@ -3711,7 +3711,7 @@ class PGDialect(default.DefaultDialect): def do_prepare_twophase(self, connection, xid): connection.execute( - sql.text("PREPARE TRANSACTION :xid'").bindparams( + sql.text("PREPARE TRANSACTION :xid").bindparams( sql.bindparam("xid", xid, literal_execute=True) ) ) diff --git a/lib/sqlalchemy/testing/fixtures/base.py b/lib/sqlalchemy/testing/fixtures/base.py index 4a31e5a9c2..d45ff46151 100644 --- a/lib/sqlalchemy/testing/fixtures/base.py +++ b/lib/sqlalchemy/testing/fixtures/base.py @@ -9,6 +9,9 @@ from __future__ import annotations +from collections.abc import Callable +from collections.abc import Sequence + import sqlalchemy as sa from .. import assertions from .. import config @@ -28,23 +31,25 @@ from ...orm import registry @config.mark_base_test_class() class TestBase: # A sequence of requirement names matching testing.requires decorators - __requires__ = () + __requires__: tuple[str, ...] = () # A sequence of dialect names to exclude from the test class. - __unsupported_on__ = () + __unsupported_on__: tuple[str, ...] = () # If present, test class is only runnable for the *single* specified # dialect. If you need multiple, use __unsupported_on__ and invert. - __only_on__ = None + __only_on__: tuple[str, ...] | str | None = None # A sequence of no-arg callables. If any are True, the entire testcase is # skipped. - __skip_if__ = None + __skip_if__: Sequence[Callable[[], bool]] | None = None # if True, the testing reaper will not attempt to touch connection # state after a test is completed and before the outer teardown # starts - __leave_connections_for_teardown__ = False + __leave_connections_for_teardown__: bool = False + + __backend__: bool def assert_(self, val, msg=None): assert val, msg diff --git a/test/dialect/postgresql/test_dialect.py b/test/dialect/postgresql/test_dialect.py index 59d41ee895..2efac75c38 100644 --- a/test/dialect/postgresql/test_dialect.py +++ b/test/dialect/postgresql/test_dialect.py @@ -56,7 +56,9 @@ from sqlalchemy.testing.assertions import AssertsExecutionResults from sqlalchemy.testing.assertions import eq_ from sqlalchemy.testing.assertions import eq_regex from sqlalchemy.testing.assertions import expect_raises +from sqlalchemy.testing.assertions import in_ from sqlalchemy.testing.assertions import ne_ +from sqlalchemy.testing.assertions import not_in class DialectTest(fixtures.TestBase): @@ -1662,3 +1664,70 @@ class Psycopg3Test(fixtures.TestBase): is_true(isinstance(cursor, AsyncClientCursor)) await engine.dispose() + + +class TwoPhaseCommitTest(fixtures.TestBase): + __only_on__ = ("+psycopg2", "+psycopg") + __backend__ = True + + @testing.fixture(autouse=True) + def reap_xid(self): + with config.db.connect() as connection: + before = connection.recover_twophase() + yield + with config.db.connect() as connection: + for xid in connection.recover_twophase(): + if xid not in before: + connection.rollback_prepared(xid, recover=True) + + @testing.variation("mode", ["noid", "withid", "driverid"]) + @testing.variation("commit", [True, False]) + def test_provided_id_round_trip(self, mode: testing.Variation, commit): + c1 = config.db.connect() + dc = c1.connection.driver_connection + c2 = config.db.connect() + if mode.noid: + transaction = c1.begin_twophase() + xid = transaction.xid + elif mode.withid: + xid = "myid" + transaction = c1.begin_twophase(xid) + eq_(transaction.xid, "myid") + elif mode.driverid: + xid_obj = dc.xid(42, "abc", "def") + xid = str(xid_obj) + transaction = c1.begin_twophase(xid_obj) + eq_(transaction.xid, xid_obj) + else: + mode.fail() + transaction.prepare() + in_(xid, c2.recover_twophase()) + if commit: + c2.commit_prepared(xid, recover=True) + else: + c2.rollback_prepared(xid, recover=True) + not_in(xid, c2.recover_twophase()) + c2.close() + c1.detach() + dc.close() + + @testing.variation("commit", [True, False]) + def test_default_pg_dialect(self, commit): + dialect = postgresql.PGDialect + c1 = config.db.connect() + dc = c1.connection.driver_connection + c2 = config.db.connect() + c2.execution_options(isolation_level="AUTOCOMMIT") + xid = "myid" + dialect.do_begin_twophase(c1.dialect, c1, xid) + dialect.do_prepare_twophase(c1.dialect, c1, xid) + + in_(xid, dialect.do_recover_twophase(c2.dialect, c2)) + if commit: + dialect.do_commit_twophase(c2.dialect, c2, xid, recover=True) + else: + dialect.do_rollback_twophase(c2.dialect, c2, xid, recover=True) + not_in(xid, dialect.do_recover_twophase(c2.dialect, c2)) + c2.close() + c1.detach() + dc.close()