--- /dev/null
+.. change::
+ :tags: usecase, oracle
+ :tickets: 11480
+
+ Implemented two-phase transactions for the oracledb dialect. Historically,
+ this feature never worked with the cx_Oracle dialect, however recent
+ improvements to the oracledb successor now allow this to be possible. The
+ two phase transaction API is available at the Core level via the
+ :meth:`_engine.Connection.begin_twophase` method.
on parity with other backends.
-
ON UPDATE CASCADE
-----------------
.. _oracle_table_options:
Oracle Table Options
--------------------------
+--------------------
The CREATE TABLE phrase supports the following options with Oracle
in conjunction with the :class:`_schema.Table` construct:
``auto_convert_lobs=False`` may be passed to :func:`_sa.create_engine`,
which takes place only engine-wide.
-Two Phase Transactions Not Supported
--------------------------------------
+Two Phase Transactions Not Supported (use oracledb)
+---------------------------------------------------
-Two phase transactions are **not supported** under cx_Oracle due to poor
-driver support. As of cx_Oracle 6.0b1, the interface for
-two phase transactions has been changed to be more of a direct pass-through
-to the underlying OCI layer with less automation. The additional logic
-to support this system is not implemented in SQLAlchemy.
+Two phase transactions are **not supported** under cx_Oracle due to poor driver
+support. The newer :ref:`oracledb` dialect however **does** support two phase
+transactions and should be preferred.
.. _cx_oracle_numeric:
return False
def create_xid(self):
- """create a two-phase transaction ID.
-
- this id will be passed to do_begin_twophase(), do_rollback_twophase(),
- do_commit_twophase(). its format is unspecified.
-
- """
-
id_ = random.randint(0, 2**128)
return (0x1234, "%032x" % id_, "%032x" % 9)
:connectstring: oracle+oracledb://user:pass@hostname:port[/dbname][?service_name=<service>[&key=value&key=value...]]
:url: https://oracle.github.io/python-oracledb/
+Description
+-----------
+
python-oracledb is released by Oracle to supersede the cx_Oracle driver.
It is fully compatible with cx_Oracle and features both a "thin" client
mode that requires no dependencies, as well as a "thick" mode that uses
.. seealso::
:ref:`cx_oracle` - all of cx_Oracle's notes apply to the oracledb driver
- as well.
+ as well, with the exception that oracledb supports two phase transactions.
The SQLAlchemy ``oracledb`` dialect provides both a sync and an async
implementation under the same dialect name. The proper version is
https://python-oracledb.readthedocs.io/en/latest/api_manual/module.html#oracledb.init_oracle_client
+Two Phase Transactions Supported
+--------------------------------
+
+Two phase transactions are fully supported under oracledb. Starting with
+oracledb 2.3 two phase transactions are supported also in thin mode. APIs
+for two phase transactions are provided at the Core level via
+:meth:`_engine.Connection.begin_twophase` and :paramref:`_orm.Session.twophase`
+for transparent ORM use.
+
+.. versionchanged:: 2.0.32 added support for two phase transactions
.. versionadded:: 2.0.0 added support for oracledb driver.
f"oracledb version {self._min_version} and above are supported"
)
+ def do_begin_twophase(self, connection, xid):
+ conn_xis = connection.connection.xid(*xid)
+ connection.connection.tpc_begin(conn_xis)
+ connection.connection.info["oracledb_xid"] = conn_xis
+
+ def do_prepare_twophase(self, connection, xid):
+ should_commit = connection.connection.tpc_prepare()
+ connection.info["oracledb_should_commit"] = should_commit
+
+ def do_rollback_twophase(
+ self, connection, xid, is_prepared=True, recover=False
+ ):
+ if recover:
+ conn_xid = connection.connection.xid(*xid)
+ else:
+ conn_xid = None
+ connection.connection.tpc_rollback(conn_xid)
+
+ def do_commit_twophase(
+ self, connection, xid, is_prepared=True, recover=False
+ ):
+ conn_xid = None
+ if not is_prepared:
+ should_commit = connection.connection.tpc_prepare()
+ elif recover:
+ conn_xid = connection.connection.xid(*xid)
+ should_commit = True
+ else:
+ should_commit = connection.info["oracledb_should_commit"]
+ if should_commit:
+ connection.connection.tpc_commit(conn_xid)
+
+ def do_recover_twophase(self, connection):
+ return [
+ # oracledb seems to return bytes
+ (
+ fi,
+ gti.decode() if isinstance(gti, bytes) else gti,
+ bq.decode() if isinstance(bq, bytes) else bq,
+ )
+ for fi, gti, bq in connection.connection.tpc_recover()
+ ]
+
class AsyncAdapt_oracledb_cursor(AsyncAdapt_dbapi_cursor):
_cursor: AsyncCursor
def cursor(self):
return AsyncAdapt_oracledb_cursor(self)
+ def xid(self, *args: Any, **kwargs: Any) -> Any:
+ return self._connection.xid(*args, **kwargs)
+
+ def tpc_begin(self, *args: Any, **kwargs: Any) -> Any:
+ return await_(self._connection.tpc_begin(*args, **kwargs))
+
+ def tpc_commit(self, *args: Any, **kwargs: Any) -> Any:
+ return await_(self._connection.tpc_commit(*args, **kwargs))
+
+ def tpc_prepare(self, *args: Any, **kwargs: Any) -> Any:
+ return await_(self._connection.tpc_prepare(*args, **kwargs))
+
+ def tpc_recover(self, *args: Any, **kwargs: Any) -> Any:
+ return await_(self._connection.tpc_recover(*args, **kwargs))
+
+ def tpc_rollback(self, *args: Any, **kwargs: Any) -> Any:
+ return await_(self._connection.tpc_rollback(*args, **kwargs))
+
class OracledbAdaptDBAPI:
def __init__(self, oracledb) -> None:
for xid in conn.exec_driver_sql(
"select gid from pg_prepared_xacts"
).scalars():
- conn.execute("ROLLBACK PREPARED '%s'" % xid)
+ conn.exec_driver_sql("ROLLBACK PREPARED '%s'" % xid)
@drop_all_schema_objects_post_tables.for_db("postgresql")
from .util import resolve_lambda
from .util import rowset
from .util import run_as_contextmanager
+from .util import skip_if_timeout
from .util import teardown_events
from .warnings import assert_warnings
from .warnings import warn_test_suite
from __future__ import annotations
from collections import deque
+import contextlib
import decimal
import gc
from itertools import chain
import random
import sys
from sys import getsizeof
+import time
import types
+from typing import Any
from . import config
from . import mock
if elem:
stack = list(elem) + [sentinel] + stack
return num_elements
+
+
+@contextlib.contextmanager
+def skip_if_timeout(seconds: float, cleanup: Any = None):
+
+ now = time.time()
+ yield
+ sec = time.time() - now
+ if sec > seconds:
+ try:
+ cleanup()
+ finally:
+ config.skip_test(
+ f"test took too long ({sec:.4f} seconds > {seconds})"
+ )
@testing.requires.two_phase_transactions
@testing.requires.two_phase_recovery
- def test_two_phase_recover(self):
+ @testing.variation("commit", [True, False])
+ def test_two_phase_recover(self, commit):
users = self.tables.users
# 2020, still can't get this to work w/ modern MySQL or MariaDB.
[],
)
# recover_twophase needs to be run in a new transaction
- with testing.db.connect() as connection2:
- recoverables = connection2.recover_twophase()
- assert transaction.xid in recoverables
- connection2.commit_prepared(transaction.xid, recover=True)
-
- eq_(
- connection2.execute(
- select(users.c.user_id).order_by(users.c.user_id)
- ).fetchall(),
- [(1,)],
- )
+ with testing.db.connect() as connection3:
+ # oracle transactions can't be recovered for commit after...
+ # about 1 second? OK
+ with testing.skip_if_timeout(
+ 0.75,
+ cleanup=(
+ lambda: connection3.rollback_prepared(
+ transaction.xid, recover=True
+ )
+ ),
+ ):
+ recoverables = connection3.recover_twophase()
+ assert transaction.xid in recoverables
+
+ if commit:
+ connection3.commit_prepared(transaction.xid, recover=True)
+ res = [(1,)]
+ else:
+ connection3.rollback_prepared(transaction.xid, recover=True)
+ res = []
+
+ stmt = select(users.c.user_id).order_by(users.c.user_id)
+ eq_(connection3.execute(stmt).fetchall(), res)
@testing.requires.two_phase_transactions
def test_multiple_two_phase(self, local_connection):
else:
return num > 0
- return (
- skip_if(
- [
- no_support(
- "mssql", "two-phase xact not supported by drivers"
- ),
- no_support(
- "sqlite", "two-phase xact not supported by database"
- ),
- # in Ia3cbbf56d4882fcc7980f90519412f1711fae74d
- # we are evaluating which modern MySQL / MariaDB versions
- # can handle two-phase testing without too many problems
- # no_support(
- # "mysql",
- # "recent MySQL community editions have too many "
- # "issues (late 2016), disabling for now",
- # ),
- NotPredicate(
- LambdaPredicate(
- pg_prepared_transaction,
- "max_prepared_transactions not available or zero",
- )
- ),
- ]
- )
- + self.skip_on_oracledb_thin
+ return skip_if(
+ [
+ no_support("mssql", "two-phase xact not supported by drivers"),
+ no_support(
+ "sqlite", "two-phase xact not supported by database"
+ ),
+ # in Ia3cbbf56d4882fcc7980f90519412f1711fae74d
+ # we are evaluating which modern MySQL / MariaDB versions
+ # can handle two-phase testing without too many problems
+ # no_support(
+ # "mysql",
+ # "recent MySQL community editions have too many "
+ # "issues (late 2016), disabling for now",
+ # ),
+ NotPredicate(
+ LambdaPredicate(
+ pg_prepared_transaction,
+ "max_prepared_transactions not available or zero",
+ )
+ ),
+ ]
)
@property
["mysql", "mariadb"],
"still can't get recover to work w/ MariaDB / MySQL",
)
- + skip_if("oracle", "recovery not functional")
+ + skip_if("oracle+cx_oracle", "recovery not functional")
)
@property
return only_if(go)
- @property
- def skip_on_oracledb_thin(self):
- def go(config):
- if against(config, "oracle+oracledb"):
- with config.db.connect() as conn:
- return config.db.dialect.is_thin_mode(conn)
- return False
-
- return skip_if(go)
-
@property
def computed_columns(self):
return skip_if(["postgresql < 12", "sqlite < 3.31", "mysql < 5.7"])