]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Add support for two-phase commit in oracledb.
authorFederico Caselli <cfederico87@gmail.com>
Mon, 10 Jun 2024 19:20:56 +0000 (21:20 +0200)
committerMike Bayer <mike_mp@zzzcomputing.com>
Thu, 1 Aug 2024 14:43:11 +0000 (10:43 -0400)
Implemented two-phase transactions for the oracledb dialect. Historically,
this feature never worked with the cx_Oracle dialect, however recent
improvements to the oracledb successor now allow this to be possible.  The
two phase transaction API is available at the Core level via the
:meth:`_engine.Connection.begin_twophase` method.

As part of this change, added new facility for testing that allows
a test to skip if a certain step takes too long, allowing for a
separate cleanup step.  this is needed as oracle tpc wont allow
commit recovery if transaction is older than about 1 second, could not
find any docs on how to increase this timeout.

Fixed an execute call in the PostgreSQL dialect's provisioning that
drops old tpc transactions which was non-working, which indicates
that we've apparently never had any PG tpc transactions needing to
be cleaned up in CI for some years now, so that's good

Fixes: #11480
Change-Id: If3ad19cc29999e70f07f767b88afd330f6e5a4be

doc/build/changelog/unreleased_20/11480.rst [new file with mode: 0644]
lib/sqlalchemy/dialects/oracle/base.py
lib/sqlalchemy/dialects/oracle/cx_oracle.py
lib/sqlalchemy/dialects/oracle/oracledb.py
lib/sqlalchemy/dialects/postgresql/provision.py
lib/sqlalchemy/testing/__init__.py
lib/sqlalchemy/testing/util.py
test/engine/test_transaction.py
test/requirements.py

diff --git a/doc/build/changelog/unreleased_20/11480.rst b/doc/build/changelog/unreleased_20/11480.rst
new file mode 100644 (file)
index 0000000..7a653a6
--- /dev/null
@@ -0,0 +1,9 @@
+.. change::
+    :tags: usecase, oracle
+    :tickets: 11480
+
+    Implemented two-phase transactions for the oracledb dialect. Historically,
+    this feature never worked with the cx_Oracle dialect, however recent
+    improvements to the oracledb successor now allow this to be possible.  The
+    two phase transaction API is available at the Core level via the
+    :meth:`_engine.Connection.begin_twophase` method.
index 8e5989990ef1badbfd97ad9954c04a8fb4f12d10..5873fd070dc2c31cdb591f766deaa848778e767a 100644 (file)
@@ -338,7 +338,6 @@ returned as well.
    on parity with other backends.
 
 
-
 ON UPDATE CASCADE
 -----------------
 
@@ -479,7 +478,7 @@ is reflected and the type is reported as ``DATE``, the time-supporting
 .. _oracle_table_options:
 
 Oracle Table Options
--------------------------
+--------------------
 
 The CREATE TABLE phrase supports the following options with Oracle
 in conjunction with the :class:`_schema.Table` construct:
index 93462246647c36c00395dfee286a13d68e80f501..873d943371df392bde9a2ae7d901a553c111e75b 100644 (file)
@@ -377,14 +377,12 @@ buffered objects with a ``read()`` method, the parameter
 ``auto_convert_lobs=False`` may be passed to :func:`_sa.create_engine`,
 which takes place only engine-wide.
 
-Two Phase Transactions Not Supported
--------------------------------------
+Two Phase Transactions Not Supported (use oracledb)
+---------------------------------------------------
 
-Two phase transactions are **not supported** under cx_Oracle due to poor
-driver support.   As of cx_Oracle 6.0b1, the interface for
-two phase transactions has been changed to be more of a direct pass-through
-to the underlying OCI layer with less automation.  The additional logic
-to support this system is not implemented in SQLAlchemy.
+Two phase transactions are **not supported** under cx_Oracle due to poor driver
+support.   The newer :ref:`oracledb` dialect however **does** support two phase
+transactions and should be preferred.
 
 .. _cx_oracle_numeric:
 
@@ -1423,13 +1421,6 @@ class OracleDialect_cx_oracle(OracleDialect):
         return False
 
     def create_xid(self):
-        """create a two-phase transaction ID.
-
-        this id will be passed to do_begin_twophase(), do_rollback_twophase(),
-        do_commit_twophase().  its format is unspecified.
-
-        """
-
         id_ = random.randint(0, 2**128)
         return (0x1234, "%032x" % id_, "%032x" % 9)
 
index de5be44d90477a6cce36091e462f5cb79d1a4063..e48dcdc6bbe0a54d33f54841bd99f9e74842b061 100644 (file)
@@ -13,6 +13,9 @@ r"""
     :connectstring: oracle+oracledb://user:pass@hostname:port[/dbname][?service_name=<service>[&key=value&key=value...]]
     :url: https://oracle.github.io/python-oracledb/
 
+Description
+-----------
+
 python-oracledb is released by Oracle to supersede the cx_Oracle driver.
 It is fully compatible with cx_Oracle and features both a "thin" client
 mode that requires no dependencies, as well as a "thick" mode that uses
@@ -21,7 +24,7 @@ the Oracle Client Interface in the same way as cx_Oracle.
 .. seealso::
 
     :ref:`cx_oracle` - all of cx_Oracle's notes apply to the oracledb driver
-    as well.
+    as well, with the exception that oracledb supports two phase transactions.
 
 The SQLAlchemy ``oracledb`` dialect provides both a sync and an async
 implementation under the same dialect name. The proper version is
@@ -70,6 +73,16 @@ like the ``lib_dir`` path, a dict may be passed to this parameter, as in::
 
     https://python-oracledb.readthedocs.io/en/latest/api_manual/module.html#oracledb.init_oracle_client
 
+Two Phase Transactions Supported
+--------------------------------
+
+Two phase transactions are fully supported under oracledb. Starting with
+oracledb 2.3 two phase transactions are supported also in thin mode.    APIs
+for two phase transactions are provided at the Core level via
+:meth:`_engine.Connection.begin_twophase` and :paramref:`_orm.Session.twophase`
+for transparent ORM use.
+
+.. versionchanged:: 2.0.32 added support for two phase transactions
 
 .. versionadded:: 2.0.0 added support for oracledb driver.
 
@@ -151,6 +164,49 @@ class OracleDialect_oracledb(_OracleDialect_cx_oracle):
                 f"oracledb version {self._min_version} and above are supported"
             )
 
+    def do_begin_twophase(self, connection, xid):
+        conn_xis = connection.connection.xid(*xid)
+        connection.connection.tpc_begin(conn_xis)
+        connection.connection.info["oracledb_xid"] = conn_xis
+
+    def do_prepare_twophase(self, connection, xid):
+        should_commit = connection.connection.tpc_prepare()
+        connection.info["oracledb_should_commit"] = should_commit
+
+    def do_rollback_twophase(
+        self, connection, xid, is_prepared=True, recover=False
+    ):
+        if recover:
+            conn_xid = connection.connection.xid(*xid)
+        else:
+            conn_xid = None
+        connection.connection.tpc_rollback(conn_xid)
+
+    def do_commit_twophase(
+        self, connection, xid, is_prepared=True, recover=False
+    ):
+        conn_xid = None
+        if not is_prepared:
+            should_commit = connection.connection.tpc_prepare()
+        elif recover:
+            conn_xid = connection.connection.xid(*xid)
+            should_commit = True
+        else:
+            should_commit = connection.info["oracledb_should_commit"]
+        if should_commit:
+            connection.connection.tpc_commit(conn_xid)
+
+    def do_recover_twophase(self, connection):
+        return [
+            # oracledb seems to return bytes
+            (
+                fi,
+                gti.decode() if isinstance(gti, bytes) else gti,
+                bq.decode() if isinstance(bq, bytes) else bq,
+            )
+            for fi, gti, bq in connection.connection.tpc_recover()
+        ]
+
 
 class AsyncAdapt_oracledb_cursor(AsyncAdapt_dbapi_cursor):
     _cursor: AsyncCursor
@@ -241,6 +297,24 @@ class AsyncAdapt_oracledb_connection(AsyncAdapt_dbapi_connection):
     def cursor(self):
         return AsyncAdapt_oracledb_cursor(self)
 
+    def xid(self, *args: Any, **kwargs: Any) -> Any:
+        return self._connection.xid(*args, **kwargs)
+
+    def tpc_begin(self, *args: Any, **kwargs: Any) -> Any:
+        return await_(self._connection.tpc_begin(*args, **kwargs))
+
+    def tpc_commit(self, *args: Any, **kwargs: Any) -> Any:
+        return await_(self._connection.tpc_commit(*args, **kwargs))
+
+    def tpc_prepare(self, *args: Any, **kwargs: Any) -> Any:
+        return await_(self._connection.tpc_prepare(*args, **kwargs))
+
+    def tpc_recover(self, *args: Any, **kwargs: Any) -> Any:
+        return await_(self._connection.tpc_recover(*args, **kwargs))
+
+    def tpc_rollback(self, *args: Any, **kwargs: Any) -> Any:
+        return await_(self._connection.tpc_rollback(*args, **kwargs))
+
 
 class OracledbAdaptDBAPI:
     def __init__(self, oracledb) -> None:
index a87bb932066500ac08b51cdaf94004b9799d21ff..38573c77ad60f0ffe1328b9d90aefcd2bab8db62 100644 (file)
@@ -97,7 +97,7 @@ def drop_all_schema_objects_pre_tables(cfg, eng):
         for xid in conn.exec_driver_sql(
             "select gid from pg_prepared_xacts"
         ).scalars():
-            conn.execute("ROLLBACK PREPARED '%s'" % xid)
+            conn.exec_driver_sql("ROLLBACK PREPARED '%s'" % xid)
 
 
 @drop_all_schema_objects_post_tables.for_db("postgresql")
index d3a6f32c71620c8d33350bed91717124649427ac..7fa361c9b92aa226afab46488488f0005756184f 100644 (file)
@@ -83,6 +83,7 @@ from .util import provide_metadata
 from .util import resolve_lambda
 from .util import rowset
 from .util import run_as_contextmanager
+from .util import skip_if_timeout
 from .util import teardown_events
 from .warnings import assert_warnings
 from .warnings import warn_test_suite
index a6ce6ca3cc2eb154d4a65f9317d8ac11d9549792..f6fad11d0e2a7869e22e0eb100639e32555f485d 100644 (file)
 from __future__ import annotations
 
 from collections import deque
+import contextlib
 import decimal
 import gc
 from itertools import chain
 import random
 import sys
 from sys import getsizeof
+import time
 import types
+from typing import Any
 
 from . import config
 from . import mock
@@ -517,3 +520,18 @@ def count_cache_key_tuples(tup):
             if elem:
                 stack = list(elem) + [sentinel] + stack
     return num_elements
+
+
+@contextlib.contextmanager
+def skip_if_timeout(seconds: float, cleanup: Any = None):
+
+    now = time.time()
+    yield
+    sec = time.time() - now
+    if sec > seconds:
+        try:
+            cleanup()
+        finally:
+            config.skip_test(
+                f"test took too long ({sec:.4f} seconds > {seconds})"
+            )
index 68650d6d2bc7767027f43f0e8e10f080a5c4630f..9fe040c3a050d7de35d261f97e493a3384153782 100644 (file)
@@ -473,7 +473,8 @@ class TransactionTest(fixtures.TablesTest):
 
     @testing.requires.two_phase_transactions
     @testing.requires.two_phase_recovery
-    def test_two_phase_recover(self):
+    @testing.variation("commit", [True, False])
+    def test_two_phase_recover(self, commit):
         users = self.tables.users
 
         # 2020, still can't get this to work w/ modern MySQL or MariaDB.
@@ -501,17 +502,29 @@ class TransactionTest(fixtures.TablesTest):
                 [],
             )
         # recover_twophase needs to be run in a new transaction
-        with testing.db.connect() as connection2:
-            recoverables = connection2.recover_twophase()
-            assert transaction.xid in recoverables
-            connection2.commit_prepared(transaction.xid, recover=True)
-
-            eq_(
-                connection2.execute(
-                    select(users.c.user_id).order_by(users.c.user_id)
-                ).fetchall(),
-                [(1,)],
-            )
+        with testing.db.connect() as connection3:
+            # oracle transactions can't be recovered for commit after...
+            # about 1 second?  OK
+            with testing.skip_if_timeout(
+                0.75,
+                cleanup=(
+                    lambda: connection3.rollback_prepared(
+                        transaction.xid, recover=True
+                    )
+                ),
+            ):
+                recoverables = connection3.recover_twophase()
+                assert transaction.xid in recoverables
+
+            if commit:
+                connection3.commit_prepared(transaction.xid, recover=True)
+                res = [(1,)]
+            else:
+                connection3.rollback_prepared(transaction.xid, recover=True)
+                res = []
+
+            stmt = select(users.c.user_id).order_by(users.c.user_id)
+            eq_(connection3.execute(stmt).fetchall(), res)
 
     @testing.requires.two_phase_transactions
     def test_multiple_two_phase(self, local_connection):
index 0f6fb3f0e3829f8d59f5a9e3cf3ee8dbd2b875ce..b0218634561ffc8e756a5071f296ccecebb57ca7 100644 (file)
@@ -858,32 +858,27 @@ class DefaultRequirements(SuiteRequirements):
                 else:
                     return num > 0
 
-        return (
-            skip_if(
-                [
-                    no_support(
-                        "mssql", "two-phase xact not supported by drivers"
-                    ),
-                    no_support(
-                        "sqlite", "two-phase xact not supported by database"
-                    ),
-                    # in Ia3cbbf56d4882fcc7980f90519412f1711fae74d
-                    # we are evaluating which modern MySQL / MariaDB versions
-                    # can handle two-phase testing without too many problems
-                    # no_support(
-                    #     "mysql",
-                    #    "recent MySQL community editions have too many "
-                    #    "issues (late 2016), disabling for now",
-                    # ),
-                    NotPredicate(
-                        LambdaPredicate(
-                            pg_prepared_transaction,
-                            "max_prepared_transactions not available or zero",
-                        )
-                    ),
-                ]
-            )
-            + self.skip_on_oracledb_thin
+        return skip_if(
+            [
+                no_support("mssql", "two-phase xact not supported by drivers"),
+                no_support(
+                    "sqlite", "two-phase xact not supported by database"
+                ),
+                # in Ia3cbbf56d4882fcc7980f90519412f1711fae74d
+                # we are evaluating which modern MySQL / MariaDB versions
+                # can handle two-phase testing without too many problems
+                # no_support(
+                #     "mysql",
+                #    "recent MySQL community editions have too many "
+                #    "issues (late 2016), disabling for now",
+                # ),
+                NotPredicate(
+                    LambdaPredicate(
+                        pg_prepared_transaction,
+                        "max_prepared_transactions not available or zero",
+                    )
+                ),
+            ]
         )
 
     @property
@@ -893,7 +888,7 @@ class DefaultRequirements(SuiteRequirements):
                 ["mysql", "mariadb"],
                 "still can't get recover to work w/ MariaDB / MySQL",
             )
-            + skip_if("oracle", "recovery not functional")
+            + skip_if("oracle+cx_oracle", "recovery not functional")
         )
 
     @property
@@ -1870,16 +1865,6 @@ class DefaultRequirements(SuiteRequirements):
 
         return only_if(go)
 
-    @property
-    def skip_on_oracledb_thin(self):
-        def go(config):
-            if against(config, "oracle+oracledb"):
-                with config.db.connect() as conn:
-                    return config.db.dialect.is_thin_mode(conn)
-            return False
-
-        return skip_if(go)
-
     @property
     def computed_columns(self):
         return skip_if(["postgresql < 12", "sqlite < 3.31", "mysql < 5.7"])