From: Federico Caselli Date: Mon, 10 Jun 2024 19:20:56 +0000 (+0200) Subject: Add support for two-phase commit in oracledb. X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=a9c0487c024410d446b8be3f528e051318dd150e;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git Add support for two-phase commit in oracledb. Implemented two-phase transactions for the oracledb dialect. Historically, this feature never worked with the cx_Oracle dialect, however recent improvements to the oracledb successor now allow this to be possible. The two phase transaction API is available at the Core level via the :meth:`_engine.Connection.begin_twophase` method. As part of this change, added new facility for testing that allows a test to skip if a certain step takes too long, allowing for a separate cleanup step. this is needed as oracle tpc wont allow commit recovery if transaction is older than about 1 second, could not find any docs on how to increase this timeout. Fixed an execute call in the PostgreSQL dialect's provisioning that drops old tpc transactions which was non-working, which indicates that we've apparently never had any PG tpc transactions needing to be cleaned up in CI for some years now, so that's good Fixes: #11480 Change-Id: If3ad19cc29999e70f07f767b88afd330f6e5a4be --- diff --git a/doc/build/changelog/unreleased_20/11480.rst b/doc/build/changelog/unreleased_20/11480.rst new file mode 100644 index 0000000000..7a653a6b69 --- /dev/null +++ b/doc/build/changelog/unreleased_20/11480.rst @@ -0,0 +1,9 @@ +.. change:: + :tags: usecase, oracle + :tickets: 11480 + + Implemented two-phase transactions for the oracledb dialect. Historically, + this feature never worked with the cx_Oracle dialect, however recent + improvements to the oracledb successor now allow this to be possible. The + two phase transaction API is available at the Core level via the + :meth:`_engine.Connection.begin_twophase` method. diff --git a/lib/sqlalchemy/dialects/oracle/base.py b/lib/sqlalchemy/dialects/oracle/base.py index 8e5989990e..5873fd070d 100644 --- a/lib/sqlalchemy/dialects/oracle/base.py +++ b/lib/sqlalchemy/dialects/oracle/base.py @@ -338,7 +338,6 @@ returned as well. on parity with other backends. - ON UPDATE CASCADE ----------------- @@ -479,7 +478,7 @@ is reflected and the type is reported as ``DATE``, the time-supporting .. _oracle_table_options: Oracle Table Options -------------------------- +-------------------- The CREATE TABLE phrase supports the following options with Oracle in conjunction with the :class:`_schema.Table` construct: diff --git a/lib/sqlalchemy/dialects/oracle/cx_oracle.py b/lib/sqlalchemy/dialects/oracle/cx_oracle.py index 9346224664..873d943371 100644 --- a/lib/sqlalchemy/dialects/oracle/cx_oracle.py +++ b/lib/sqlalchemy/dialects/oracle/cx_oracle.py @@ -377,14 +377,12 @@ buffered objects with a ``read()`` method, the parameter ``auto_convert_lobs=False`` may be passed to :func:`_sa.create_engine`, which takes place only engine-wide. -Two Phase Transactions Not Supported -------------------------------------- +Two Phase Transactions Not Supported (use oracledb) +--------------------------------------------------- -Two phase transactions are **not supported** under cx_Oracle due to poor -driver support. As of cx_Oracle 6.0b1, the interface for -two phase transactions has been changed to be more of a direct pass-through -to the underlying OCI layer with less automation. The additional logic -to support this system is not implemented in SQLAlchemy. +Two phase transactions are **not supported** under cx_Oracle due to poor driver +support. The newer :ref:`oracledb` dialect however **does** support two phase +transactions and should be preferred. .. _cx_oracle_numeric: @@ -1423,13 +1421,6 @@ class OracleDialect_cx_oracle(OracleDialect): return False def create_xid(self): - """create a two-phase transaction ID. - - this id will be passed to do_begin_twophase(), do_rollback_twophase(), - do_commit_twophase(). its format is unspecified. - - """ - id_ = random.randint(0, 2**128) return (0x1234, "%032x" % id_, "%032x" % 9) diff --git a/lib/sqlalchemy/dialects/oracle/oracledb.py b/lib/sqlalchemy/dialects/oracle/oracledb.py index de5be44d90..e48dcdc6bb 100644 --- a/lib/sqlalchemy/dialects/oracle/oracledb.py +++ b/lib/sqlalchemy/dialects/oracle/oracledb.py @@ -13,6 +13,9 @@ r""" :connectstring: oracle+oracledb://user:pass@hostname:port[/dbname][?service_name=[&key=value&key=value...]] :url: https://oracle.github.io/python-oracledb/ +Description +----------- + python-oracledb is released by Oracle to supersede the cx_Oracle driver. It is fully compatible with cx_Oracle and features both a "thin" client mode that requires no dependencies, as well as a "thick" mode that uses @@ -21,7 +24,7 @@ the Oracle Client Interface in the same way as cx_Oracle. .. seealso:: :ref:`cx_oracle` - all of cx_Oracle's notes apply to the oracledb driver - as well. + as well, with the exception that oracledb supports two phase transactions. The SQLAlchemy ``oracledb`` dialect provides both a sync and an async implementation under the same dialect name. The proper version is @@ -70,6 +73,16 @@ like the ``lib_dir`` path, a dict may be passed to this parameter, as in:: https://python-oracledb.readthedocs.io/en/latest/api_manual/module.html#oracledb.init_oracle_client +Two Phase Transactions Supported +-------------------------------- + +Two phase transactions are fully supported under oracledb. Starting with +oracledb 2.3 two phase transactions are supported also in thin mode. APIs +for two phase transactions are provided at the Core level via +:meth:`_engine.Connection.begin_twophase` and :paramref:`_orm.Session.twophase` +for transparent ORM use. + +.. versionchanged:: 2.0.32 added support for two phase transactions .. versionadded:: 2.0.0 added support for oracledb driver. @@ -151,6 +164,49 @@ class OracleDialect_oracledb(_OracleDialect_cx_oracle): f"oracledb version {self._min_version} and above are supported" ) + def do_begin_twophase(self, connection, xid): + conn_xis = connection.connection.xid(*xid) + connection.connection.tpc_begin(conn_xis) + connection.connection.info["oracledb_xid"] = conn_xis + + def do_prepare_twophase(self, connection, xid): + should_commit = connection.connection.tpc_prepare() + connection.info["oracledb_should_commit"] = should_commit + + def do_rollback_twophase( + self, connection, xid, is_prepared=True, recover=False + ): + if recover: + conn_xid = connection.connection.xid(*xid) + else: + conn_xid = None + connection.connection.tpc_rollback(conn_xid) + + def do_commit_twophase( + self, connection, xid, is_prepared=True, recover=False + ): + conn_xid = None + if not is_prepared: + should_commit = connection.connection.tpc_prepare() + elif recover: + conn_xid = connection.connection.xid(*xid) + should_commit = True + else: + should_commit = connection.info["oracledb_should_commit"] + if should_commit: + connection.connection.tpc_commit(conn_xid) + + def do_recover_twophase(self, connection): + return [ + # oracledb seems to return bytes + ( + fi, + gti.decode() if isinstance(gti, bytes) else gti, + bq.decode() if isinstance(bq, bytes) else bq, + ) + for fi, gti, bq in connection.connection.tpc_recover() + ] + class AsyncAdapt_oracledb_cursor(AsyncAdapt_dbapi_cursor): _cursor: AsyncCursor @@ -241,6 +297,24 @@ class AsyncAdapt_oracledb_connection(AsyncAdapt_dbapi_connection): def cursor(self): return AsyncAdapt_oracledb_cursor(self) + def xid(self, *args: Any, **kwargs: Any) -> Any: + return self._connection.xid(*args, **kwargs) + + def tpc_begin(self, *args: Any, **kwargs: Any) -> Any: + return await_(self._connection.tpc_begin(*args, **kwargs)) + + def tpc_commit(self, *args: Any, **kwargs: Any) -> Any: + return await_(self._connection.tpc_commit(*args, **kwargs)) + + def tpc_prepare(self, *args: Any, **kwargs: Any) -> Any: + return await_(self._connection.tpc_prepare(*args, **kwargs)) + + def tpc_recover(self, *args: Any, **kwargs: Any) -> Any: + return await_(self._connection.tpc_recover(*args, **kwargs)) + + def tpc_rollback(self, *args: Any, **kwargs: Any) -> Any: + return await_(self._connection.tpc_rollback(*args, **kwargs)) + class OracledbAdaptDBAPI: def __init__(self, oracledb) -> None: diff --git a/lib/sqlalchemy/dialects/postgresql/provision.py b/lib/sqlalchemy/dialects/postgresql/provision.py index a87bb93206..38573c77ad 100644 --- a/lib/sqlalchemy/dialects/postgresql/provision.py +++ b/lib/sqlalchemy/dialects/postgresql/provision.py @@ -97,7 +97,7 @@ def drop_all_schema_objects_pre_tables(cfg, eng): for xid in conn.exec_driver_sql( "select gid from pg_prepared_xacts" ).scalars(): - conn.execute("ROLLBACK PREPARED '%s'" % xid) + conn.exec_driver_sql("ROLLBACK PREPARED '%s'" % xid) @drop_all_schema_objects_post_tables.for_db("postgresql") diff --git a/lib/sqlalchemy/testing/__init__.py b/lib/sqlalchemy/testing/__init__.py index d3a6f32c71..7fa361c9b9 100644 --- a/lib/sqlalchemy/testing/__init__.py +++ b/lib/sqlalchemy/testing/__init__.py @@ -83,6 +83,7 @@ from .util import provide_metadata from .util import resolve_lambda from .util import rowset from .util import run_as_contextmanager +from .util import skip_if_timeout from .util import teardown_events from .warnings import assert_warnings from .warnings import warn_test_suite diff --git a/lib/sqlalchemy/testing/util.py b/lib/sqlalchemy/testing/util.py index a6ce6ca3cc..f6fad11d0e 100644 --- a/lib/sqlalchemy/testing/util.py +++ b/lib/sqlalchemy/testing/util.py @@ -10,13 +10,16 @@ from __future__ import annotations from collections import deque +import contextlib import decimal import gc from itertools import chain import random import sys from sys import getsizeof +import time import types +from typing import Any from . import config from . import mock @@ -517,3 +520,18 @@ def count_cache_key_tuples(tup): if elem: stack = list(elem) + [sentinel] + stack return num_elements + + +@contextlib.contextmanager +def skip_if_timeout(seconds: float, cleanup: Any = None): + + now = time.time() + yield + sec = time.time() - now + if sec > seconds: + try: + cleanup() + finally: + config.skip_test( + f"test took too long ({sec:.4f} seconds > {seconds})" + ) diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py index 68650d6d2b..9fe040c3a0 100644 --- a/test/engine/test_transaction.py +++ b/test/engine/test_transaction.py @@ -473,7 +473,8 @@ class TransactionTest(fixtures.TablesTest): @testing.requires.two_phase_transactions @testing.requires.two_phase_recovery - def test_two_phase_recover(self): + @testing.variation("commit", [True, False]) + def test_two_phase_recover(self, commit): users = self.tables.users # 2020, still can't get this to work w/ modern MySQL or MariaDB. @@ -501,17 +502,29 @@ class TransactionTest(fixtures.TablesTest): [], ) # recover_twophase needs to be run in a new transaction - with testing.db.connect() as connection2: - recoverables = connection2.recover_twophase() - assert transaction.xid in recoverables - connection2.commit_prepared(transaction.xid, recover=True) - - eq_( - connection2.execute( - select(users.c.user_id).order_by(users.c.user_id) - ).fetchall(), - [(1,)], - ) + with testing.db.connect() as connection3: + # oracle transactions can't be recovered for commit after... + # about 1 second? OK + with testing.skip_if_timeout( + 0.75, + cleanup=( + lambda: connection3.rollback_prepared( + transaction.xid, recover=True + ) + ), + ): + recoverables = connection3.recover_twophase() + assert transaction.xid in recoverables + + if commit: + connection3.commit_prepared(transaction.xid, recover=True) + res = [(1,)] + else: + connection3.rollback_prepared(transaction.xid, recover=True) + res = [] + + stmt = select(users.c.user_id).order_by(users.c.user_id) + eq_(connection3.execute(stmt).fetchall(), res) @testing.requires.two_phase_transactions def test_multiple_two_phase(self, local_connection): diff --git a/test/requirements.py b/test/requirements.py index 0f6fb3f0e3..b021863456 100644 --- a/test/requirements.py +++ b/test/requirements.py @@ -858,32 +858,27 @@ class DefaultRequirements(SuiteRequirements): else: return num > 0 - return ( - skip_if( - [ - no_support( - "mssql", "two-phase xact not supported by drivers" - ), - no_support( - "sqlite", "two-phase xact not supported by database" - ), - # in Ia3cbbf56d4882fcc7980f90519412f1711fae74d - # we are evaluating which modern MySQL / MariaDB versions - # can handle two-phase testing without too many problems - # no_support( - # "mysql", - # "recent MySQL community editions have too many " - # "issues (late 2016), disabling for now", - # ), - NotPredicate( - LambdaPredicate( - pg_prepared_transaction, - "max_prepared_transactions not available or zero", - ) - ), - ] - ) - + self.skip_on_oracledb_thin + return skip_if( + [ + no_support("mssql", "two-phase xact not supported by drivers"), + no_support( + "sqlite", "two-phase xact not supported by database" + ), + # in Ia3cbbf56d4882fcc7980f90519412f1711fae74d + # we are evaluating which modern MySQL / MariaDB versions + # can handle two-phase testing without too many problems + # no_support( + # "mysql", + # "recent MySQL community editions have too many " + # "issues (late 2016), disabling for now", + # ), + NotPredicate( + LambdaPredicate( + pg_prepared_transaction, + "max_prepared_transactions not available or zero", + ) + ), + ] ) @property @@ -893,7 +888,7 @@ class DefaultRequirements(SuiteRequirements): ["mysql", "mariadb"], "still can't get recover to work w/ MariaDB / MySQL", ) - + skip_if("oracle", "recovery not functional") + + skip_if("oracle+cx_oracle", "recovery not functional") ) @property @@ -1870,16 +1865,6 @@ class DefaultRequirements(SuiteRequirements): return only_if(go) - @property - def skip_on_oracledb_thin(self): - def go(config): - if against(config, "oracle+oracledb"): - with config.db.connect() as conn: - return config.db.dialect.is_thin_mode(conn) - return False - - return skip_if(go) - @property def computed_columns(self): return skip_if(["postgresql < 12", "sqlite < 3.31", "mysql < 5.7"])