]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
repair xid in psycopg
authorFederico Caselli <cfederico87@gmail.com>
Fri, 5 Jun 2026 19:51:13 +0000 (21:51 +0200)
committerFederico Caselli <cfederico87@gmail.com>
Mon, 8 Jun 2026 20:17:57 +0000 (22:17 +0200)
Repaired bug introduced in :ticket:`13229` where a two-phase
transaction recovery would not return the correct transaction
identifier when generating the identifiers using the ``xid()``
method of the psycopg connection.

Fixes: #13355
Change-Id: Iffe68c1701afaa678fa7b598559dd396d3f8db41

doc/build/changelog/unreleased_20/13355.rst [new file with mode: 0644]
lib/sqlalchemy/dialects/postgresql/_psycopg_common.py
lib/sqlalchemy/dialects/postgresql/base.py
lib/sqlalchemy/testing/fixtures/base.py
test/dialect/postgresql/test_dialect.py

diff --git a/doc/build/changelog/unreleased_20/13355.rst b/doc/build/changelog/unreleased_20/13355.rst
new file mode 100644 (file)
index 0000000..cad4508
--- /dev/null
@@ -0,0 +1,8 @@
+.. change::
+    :tags: bug, postgresql
+    :tickets: 13355
+
+    Repaired bug introduced in :ticket:`13229` where a two-phase
+    transaction recovery would not return the correct transaction
+    identifier when generating the identifiers using the ``xid()``
+    method of the psycopg connection.
index 97b3d30bbcba35146238dee5b6f0c13a531d5865..3a3f823cca11c67406c35671a74bfad8712739d1 100644 (file)
@@ -226,4 +226,4 @@ class _PGDialect_common_psycopg(PGDialect):
         )
 
     def do_recover_twophase(self, connection):
-        return [row[1] for row in connection.connection.tpc_recover()]
+        return [str(row) for row in connection.connection.tpc_recover()]
index 97a553bec162b6c91f41efc8e9282129b495e377..0d5ca08221ce1dc9fefb74d1dd08144b8ca031c8 100644 (file)
@@ -3711,7 +3711,7 @@ class PGDialect(default.DefaultDialect):
 
     def do_prepare_twophase(self, connection, xid):
         connection.execute(
-            sql.text("PREPARE TRANSACTION :xid'").bindparams(
+            sql.text("PREPARE TRANSACTION :xid").bindparams(
                 sql.bindparam("xid", xid, literal_execute=True)
             )
         )
index 4a31e5a9c2f2909f8840a0a75a3f216513642b96..d45ff46151b69ea92d56a7a3b2dcde8599407181 100644 (file)
@@ -9,6 +9,9 @@
 
 from __future__ import annotations
 
+from collections.abc import Callable
+from collections.abc import Sequence
+
 import sqlalchemy as sa
 from .. import assertions
 from .. import config
@@ -28,23 +31,25 @@ from ...orm import registry
 @config.mark_base_test_class()
 class TestBase:
     # A sequence of requirement names matching testing.requires decorators
-    __requires__ = ()
+    __requires__: tuple[str, ...] = ()
 
     # A sequence of dialect names to exclude from the test class.
-    __unsupported_on__ = ()
+    __unsupported_on__: tuple[str, ...] = ()
 
     # If present, test class is only runnable for the *single* specified
     # dialect.  If you need multiple, use __unsupported_on__ and invert.
-    __only_on__ = None
+    __only_on__: tuple[str, ...] | str | None = None
 
     # A sequence of no-arg callables. If any are True, the entire testcase is
     # skipped.
-    __skip_if__ = None
+    __skip_if__: Sequence[Callable[[], bool]] | None = None
 
     # if True, the testing reaper will not attempt to touch connection
     # state after a test is completed and before the outer teardown
     # starts
-    __leave_connections_for_teardown__ = False
+    __leave_connections_for_teardown__: bool = False
+
+    __backend__: bool
 
     def assert_(self, val, msg=None):
         assert val, msg
index 59d41ee8959c130e5ebfc3e633aa6a27c479e90f..2efac75c381a320125348c3dc0a3f43a0d014699 100644 (file)
@@ -56,7 +56,9 @@ from sqlalchemy.testing.assertions import AssertsExecutionResults
 from sqlalchemy.testing.assertions import eq_
 from sqlalchemy.testing.assertions import eq_regex
 from sqlalchemy.testing.assertions import expect_raises
+from sqlalchemy.testing.assertions import in_
 from sqlalchemy.testing.assertions import ne_
+from sqlalchemy.testing.assertions import not_in
 
 
 class DialectTest(fixtures.TestBase):
@@ -1662,3 +1664,70 @@ class Psycopg3Test(fixtures.TestBase):
                 is_true(isinstance(cursor, AsyncClientCursor))
 
         await engine.dispose()
+
+
+class TwoPhaseCommitTest(fixtures.TestBase):
+    __only_on__ = ("+psycopg2", "+psycopg")
+    __backend__ = True
+
+    @testing.fixture(autouse=True)
+    def reap_xid(self):
+        with config.db.connect() as connection:
+            before = connection.recover_twophase()
+        yield
+        with config.db.connect() as connection:
+            for xid in connection.recover_twophase():
+                if xid not in before:
+                    connection.rollback_prepared(xid, recover=True)
+
+    @testing.variation("mode", ["noid", "withid", "driverid"])
+    @testing.variation("commit", [True, False])
+    def test_provided_id_round_trip(self, mode: testing.Variation, commit):
+        c1 = config.db.connect()
+        dc = c1.connection.driver_connection
+        c2 = config.db.connect()
+        if mode.noid:
+            transaction = c1.begin_twophase()
+            xid = transaction.xid
+        elif mode.withid:
+            xid = "myid"
+            transaction = c1.begin_twophase(xid)
+            eq_(transaction.xid, "myid")
+        elif mode.driverid:
+            xid_obj = dc.xid(42, "abc", "def")
+            xid = str(xid_obj)
+            transaction = c1.begin_twophase(xid_obj)
+            eq_(transaction.xid, xid_obj)
+        else:
+            mode.fail()
+        transaction.prepare()
+        in_(xid, c2.recover_twophase())
+        if commit:
+            c2.commit_prepared(xid, recover=True)
+        else:
+            c2.rollback_prepared(xid, recover=True)
+        not_in(xid, c2.recover_twophase())
+        c2.close()
+        c1.detach()
+        dc.close()
+
+    @testing.variation("commit", [True, False])
+    def test_default_pg_dialect(self, commit):
+        dialect = postgresql.PGDialect
+        c1 = config.db.connect()
+        dc = c1.connection.driver_connection
+        c2 = config.db.connect()
+        c2.execution_options(isolation_level="AUTOCOMMIT")
+        xid = "myid"
+        dialect.do_begin_twophase(c1.dialect, c1, xid)
+        dialect.do_prepare_twophase(c1.dialect, c1, xid)
+
+        in_(xid, dialect.do_recover_twophase(c2.dialect, c2))
+        if commit:
+            dialect.do_commit_twophase(c2.dialect, c2, xid, recover=True)
+        else:
+            dialect.do_rollback_twophase(c2.dialect, c2, xid, recover=True)
+        not_in(xid, dialect.do_recover_twophase(c2.dialect, c2))
+        c2.close()
+        c1.detach()
+        dc.close()