]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
drop a 400 ton anvil on oracle 23c
authorMike Bayer <mike_mp@zzzcomputing.com>
Thu, 27 Nov 2025 06:22:38 +0000 (01:22 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Fri, 28 Nov 2025 18:16:13 +0000 (13:16 -0500)
this DB is extremely erratic in being able to connect.   Add
a brute force connection retrier to all engines everywhere
(which for oracledb we can fortunately use their built-in feature
that also works).
This actually works and I can see it pausing under load, reconnecting,
and succeeding.  the problem is that absolutely every engine everywhere
needs this routine otherwise an engine without a retrier in it will
crash.    That then necessitates digging into testing_engine(),
making sure testing_engine() is used everywhere an engine that's going
to connect is used, then dealing with the fallout from that.

We also simplify some older workarounds for cx_oracle and
hack into config/provision to make oracledb seem like the primary
DBAPI for most tests.

testing_engine has been completely overhauled, making use of a new
post_configure_testing_engine() hook which moves and refines
the SQLite pool sharing and savepoint logic all into sqlite/provision.py
and also allows for cx_oracle to apply a retry event handler.

Change-Id: I4ea4c523effb878290d28b94d8925eb32fc5ae3b

21 files changed:
lib/sqlalchemy/dialects/oracle/provision.py
lib/sqlalchemy/dialects/sqlite/provision.py
lib/sqlalchemy/pool/impl.py
lib/sqlalchemy/testing/assertions.py
lib/sqlalchemy/testing/config.py
lib/sqlalchemy/testing/engines.py
lib/sqlalchemy/testing/fixtures/base.py
lib/sqlalchemy/testing/provision.py
lib/sqlalchemy/testing/requirements.py
test/aaa_profiling/test_orm.py
test/dialect/postgresql/test_reflection.py
test/engine/test_deprecations.py
test/engine/test_execute.py
test/engine/test_logging.py
test/engine/test_reconnect.py
test/engine/test_transaction.py
test/ext/asyncio/test_engine.py
test/ext/asyncio/test_session.py
test/requirements.py
test/sql/test_insert_exec.py
test/sql/test_lambdas.py

index 76d13c53bf4ef8483d133f2217453b067dfeed58..2ae5e93f191c1307653c81c3b44bfef1620856ef 100644 (file)
@@ -6,6 +6,8 @@
 # the MIT License: https://www.opensource.org/licenses/mit-license.php
 # mypy: ignore-errors
 
+import time
+
 from ... import create_engine
 from ... import exc
 from ... import inspect
@@ -16,13 +18,50 @@ from ...testing.provision import drop_all_schema_objects_post_tables
 from ...testing.provision import drop_all_schema_objects_pre_tables
 from ...testing.provision import drop_db
 from ...testing.provision import follower_url_from_main
+from ...testing.provision import generate_driver_url
+from ...testing.provision import is_preferred_driver
 from ...testing.provision import log
 from ...testing.provision import post_configure_engine
+from ...testing.provision import post_configure_testing_engine
 from ...testing.provision import run_reap_dbs
 from ...testing.provision import set_default_schema_on_connection
 from ...testing.provision import stop_test_class_outside_fixtures
 from ...testing.provision import temp_table_keyword_args
 from ...testing.provision import update_db_opts
+from ...testing.warnings import warn_test_suite
+
+
+@generate_driver_url.for_db("oracle")
+def _oracle_generate_driver_url(url, driver, query_str):
+
+    backend = url.get_backend_name()
+
+    new_url = url.set(
+        drivername="%s+%s" % (backend, driver),
+    )
+
+    # use oracledb's retry feature, which is essential for oracle 23c
+    # which otherwise frequently rejects connections under load
+    # for cx_oracle we have a connect event instead
+    if driver in ("oracledb", "oracledb_async"):
+        # oracledb is even nice enough to convert from string to int
+        # for these opts, apparently
+        new_url = new_url.update_query_pairs(
+            [("retry_count", "5"), ("retry_delay", "2")]
+        )
+    else:
+        # remove these params for cx_oracle if we received an
+        # already-modified URL
+        new_url = new_url.difference_update_query(
+            ["retry_count", "retry_delay"]
+        )
+
+    try:
+        new_url.get_dialect()
+    except exc.NoSuchModuleError:
+        return None
+    else:
+        return new_url
 
 
 @create_db.for_db("oracle")
@@ -103,21 +142,6 @@ def _ora_stop_test_class_outside_fixtures(config, db, cls):
     except exc.DatabaseError as err:
         log.warning("purge recyclebin command failed: %s", err)
 
-    # clear statement cache on all connections that were used
-    # https://github.com/oracle/python-cx_Oracle/issues/519
-
-    all_dbapis = {cfg.db.dialect.dbapi for cfg in config.Config.all_configs()}
-    for cx_oracle_conn in _all_conns:
-        try:
-            sc = cx_oracle_conn.stmtcachesize
-        except tuple(dbapi.InterfaceError for dbapi in all_dbapis):
-            # connection closed
-            pass
-        else:
-            cx_oracle_conn.stmtcachesize = 0
-            cx_oracle_conn.stmtcachesize = sc
-    _all_conns.clear()
-
 
 def _purge_recyclebin(eng, schema=None):
     with eng.begin() as conn:
@@ -135,24 +159,76 @@ def _purge_recyclebin(eng, schema=None):
                 conn.exec_driver_sql(f'purge {type_} {owner}."{object_name}"')
 
 
-_all_conns = set()
+@is_preferred_driver.for_db("oracle")
+def _oracle_is_preferred_driver(cfg, engine):
+    """establish oracledb as the preferred driver to use for tests, even
+    though cx_Oracle is still the "default" driver"""
+
+    return engine.dialect.driver == "oracledb" and not engine.dialect.is_async
+
+
+def _connect_with_retry(dialect, conn_rec, cargs, cparams):
+    assert dialect.driver == "cx_oracle"
+
+    def _is_couldnt_connect(err):
+        return "DPY-6005" in str(err) or "ORA-12516" in str(err)
+
+    err_ = None
+    for _ in range(5):
+        try:
+            return dialect.loaded_dbapi.connect(*cargs, **cparams)
+        except (
+            dialect.loaded_dbapi.DatabaseError,
+            dialect.loaded_dbapi.OperationalError,
+        ) as err:
+            err_ = err
+            if _is_couldnt_connect(err):
+                warn_test_suite("Oracle database reconnecting...")
+                time.sleep(2)
+                continue
+            else:
+                raise
+    if err_ is not None:
+        raise Exception("connect failed after five attempts") from err_
+
+
+@post_configure_testing_engine.for_db("oracle")
+def _oracle_post_configure_testing_engine(url, engine, options, scope):
+    from ... import event
+
+    if engine.dialect.driver == "cx_oracle":
+        event.listen(engine, "do_connect", _connect_with_retry)
 
 
 @post_configure_engine.for_db("oracle")
 def _oracle_post_configure_engine(url, engine, follower_ident):
-    from sqlalchemy import event
 
-    @event.listens_for(engine, "checkout")
-    def checkout(dbapi_con, con_record, con_proxy):
-        _all_conns.add(dbapi_con)
+    from ... import event
 
     @event.listens_for(engine, "checkin")
     def checkin(dbapi_connection, connection_record):
-        # work around cx_Oracle issue:
+        # this was meant to work around this issue:
         # https://github.com/oracle/python-cx_Oracle/issues/530
         # invalidate oracle connections that had 2pc set up
-        if "cx_oracle_xid" in connection_record.info:
-            connection_record.invalidate()
+        # however things are too complex with some of the 2pc tests,
+        # so just block cx_oracle from being used in 2pc tests (use oracledb
+        # instead)
+        # if "cx_oracle_xid" in connection_record.info:
+        #    connection_record.invalidate()
+
+        # clear statement cache on all connections that were used
+        # https://github.com/oracle/python-cx_Oracle/issues/519
+        # TODO: oracledb claims to have this feature built in somehow,
+        # see if that's in use and/or if it needs to be enabled
+        # (or if this doesnt even apply to the newer oracle's we're using)
+        try:
+            sc = dbapi_connection.stmtcachesize
+        except:
+            # connection closed
+            pass
+        else:
+            dbapi_connection.stmtcachesize = 0
+            dbapi_connection.stmtcachesize = sc
 
 
 @run_reap_dbs.for_db("oracle")
index e1df005e72ccd51f760be51f40dc6bc05bd037ff..6ca16ab580628f469dece3284851ca4ae66adf0c 100644 (file)
@@ -9,20 +9,22 @@
 import os
 import re
 
+from ... import event
 from ... import exc
 from ...engine import url as sa_url
+from ...testing import config
 from ...testing.provision import create_db
 from ...testing.provision import drop_db
 from ...testing.provision import follower_url_from_main
 from ...testing.provision import generate_driver_url
 from ...testing.provision import log
 from ...testing.provision import post_configure_engine
+from ...testing.provision import post_configure_testing_engine
 from ...testing.provision import run_reap_dbs
 from ...testing.provision import stop_test_class_outside_fixtures
 from ...testing.provision import temp_table_keyword_args
 from ...testing.provision import upsert
 
-
 # TODO: I can't get this to build dynamically with pytest-xdist procs
 _drivernames = {
     "pysqlite",
@@ -139,6 +141,31 @@ def _sqlite_post_configure_engine(url, engine, follower_ident):
             os.remove(filename)
 
 
+@post_configure_testing_engine.for_db("sqlite")
+def _sqlite_post_configure_testing_engine(url, engine, options, scope):
+
+    sqlite_savepoint = options.get("sqlite_savepoint", False)
+    sqlite_share_pool = options.get("sqlite_share_pool", False)
+
+    if sqlite_savepoint and engine.name == "sqlite":
+        # apply SQLite savepoint workaround
+        @event.listens_for(engine, "connect")
+        def do_connect(dbapi_connection, connection_record):
+            dbapi_connection.isolation_level = None
+
+        @event.listens_for(engine, "begin")
+        def do_begin(conn):
+            conn.exec_driver_sql("BEGIN")
+
+    if sqlite_share_pool:
+        # SingletonThreadPool, StaticPool both support "transfer"
+        # so a new pool can share the same SQLite connection
+        # (single thread only)
+        if hasattr(engine.pool, "_transfer_from"):
+            options["use_reaper"] = False
+            engine.pool._transfer_from(config.db.pool)
+
+
 @create_db.for_db("sqlite")
 def _sqlite_create_db(cfg, eng, ident):
     pass
index af39bba1700555d05391b0f7969fb692c41db94f..fbb6afa3dfcd9ecbbd419ff4f6cc508854f4ad61 100644 (file)
@@ -378,6 +378,15 @@ class SingletonThreadPool(Pool):
             dialect=self._dialect,
         )
 
+    def _transfer_from(
+        self, other_singleton_pool: SingletonThreadPool
+    ) -> None:
+        # used by the test suite to make a new engine / pool without
+        # losing the state of an existing SQLite :memory: connection
+        assert not hasattr(other_singleton_pool._fairy, "current")
+        self._conn = other_singleton_pool._conn
+        self._all_conns = other_singleton_pool._all_conns
+
     def dispose(self) -> None:
         """Dispose of this pool."""
 
index 431c2b7e98a1954b529e466332ee03bb486b13d0..05f8d355d69d32e71d17e473b7e4d49fe0b94ef6 100644 (file)
@@ -207,7 +207,10 @@ def _expect_warnings(
         if raise_on_any_unexpected:
 
             def real_warn(msg, *arg, **kw):
-                raise AssertionError("Got unexpected warning: %r" % msg)
+                if isinstance(msg, sa_exc.SATestSuiteWarning):
+                    warnings.warn(msg, *arg, **kw)
+                else:
+                    raise AssertionError("Got unexpected warning: %r" % msg)
 
         else:
             real_warn = warnings.warn
index 69bbe25ecd4231a6b26efccf6e034d3431dc43d2..fbd854ad9f03a76d0da881275ab090d293c99d93 100644 (file)
@@ -339,12 +339,9 @@ class Config:
 
         self.is_async = db.dialect.is_async
 
-        self.is_default_dialect = (
-            db.url._get_entrypoint()
-            is db.url.set(
-                drivername=db.url.get_backend_name()
-            )._get_entrypoint()
-        )
+        from . import provision
+
+        self.is_default_dialect = provision.is_preferred_driver(self, db)
 
     _stack = collections.deque()
     _configs = set()
index 2dfa07222ff2109a4b814f09189e90b2ab41e002..26a8470eb2d7638edbf7496bc7242e7ab52978fb 100644 (file)
@@ -16,6 +16,7 @@ from typing import Any
 from typing import Dict
 from typing import Literal
 from typing import Optional
+from typing import Union
 import warnings
 import weakref
 
@@ -298,33 +299,30 @@ def reconnecting_engine(url=None, options=None):
 
 @typing.overload
 def testing_engine(
-    url: Optional[URL] = None,
-    options: Optional[Dict[str, Any]] = None,
-    asyncio: Literal[False] = False,
-    transfer_staticpool: bool = False,
+    url: Optional[URL] = ...,
+    options: Optional[Dict[str, Any]] = ...,
+    *,
+    asyncio: Literal[False],
 ) -> Engine: ...
 
 
 @typing.overload
 def testing_engine(
-    url: Optional[URL] = None,
-    options: Optional[Dict[str, Any]] = None,
-    asyncio: Literal[True] = True,
-    transfer_staticpool: bool = False,
+    url: Optional[URL] = ...,
+    options: Optional[Dict[str, Any]] = ...,
+    *,
+    asyncio: Literal[True],
 ) -> AsyncEngine: ...
 
 
 def testing_engine(
-    url=None,
-    options=None,
-    asyncio=False,
-    transfer_staticpool=False,
-    share_pool=False,
-    _sqlite_savepoint=False,
-):
+    url: Optional[URL] = None,
+    options: Optional[Dict[str, Any]] = None,
+    *,
+    asyncio: bool = False,
+) -> Union[Engine, AsyncEngine]:
 
     if asyncio:
-        assert not _sqlite_savepoint
         from sqlalchemy.ext.asyncio import (
             create_async_engine as create_engine,
         )
@@ -332,54 +330,33 @@ def testing_engine(
         from sqlalchemy import create_engine
     from sqlalchemy.engine.url import make_url
 
-    if not options:
-        use_reaper = True
-        scope = "function"
-        sqlite_savepoint = False
-    else:
-        use_reaper = options.pop("use_reaper", True)
-        scope = options.pop("scope", "function")
-        sqlite_savepoint = options.pop("sqlite_savepoint", False)
-
-    url = url or config.db.url
+    url = make_url(url if url else config.db.url)
 
-    url = make_url(url)
+    if not options:
+        options = {}
 
-    if (
-        config.db is None or url.drivername == config.db.url.drivername
-    ) and config.db_opts:
-        use_options = config.db_opts.copy()
-    else:
-        use_options = {}
+    use_options = {}
 
-    if options is not None:
-        use_options.update(options)
+    for opt_dict in (config.db_opts, options):
+        if not opt_dict:
+            continue
+        use_options.update(
+            {
+                opt: value
+                for opt, value in opt_dict.items()
+                if opt not in ("scope", "use_reaper")
+                and not opt.startswith("sqlite_")
+            }
+        )
 
     engine = create_engine(url, **use_options)
 
-    if sqlite_savepoint and engine.name == "sqlite":
-        # apply SQLite savepoint workaround
-        @event.listens_for(engine, "connect")
-        def do_connect(dbapi_connection, connection_record):
-            dbapi_connection.isolation_level = None
-
-        @event.listens_for(engine, "begin")
-        def do_begin(conn):
-            conn.exec_driver_sql("BEGIN")
-
-    if transfer_staticpool:
-        from sqlalchemy.pool import StaticPool
-
-        if config.db is not None and isinstance(config.db.pool, StaticPool):
-            use_reaper = False
-            engine.pool._transfer_from(config.db.pool)
-    elif share_pool:
-        engine.pool = config.db.pool
-    elif config.options and config.options.low_connections:
+    if config.options and config.options.low_connections:
         # for suites running with --low-connections, dispose the "global"
         # engines to disconnect everything before making a testing engine
         testing_reaper._dispose_testing_engines("global")
 
+    scope = options.get("scope", "function")
     if scope == "global":
         if asyncio:
             engine.sync_engine._has_events = True
@@ -388,16 +365,25 @@ def testing_engine(
                 True  # enable event blocks, helps with profiling
             )
 
+    from . import provision
+
+    provision.post_configure_testing_engine(engine.url, engine, options, scope)
+
+    # post_configure_testing_engine may have modified the options dictionary
+    # in place; consume additional post arguments afterwards
+
+    use_reaper = options.get("use_reaper", True)
+    if use_reaper:
+        testing_reaper.add_engine(engine, scope)
+
     if (
         isinstance(engine.pool, pool.QueuePool)
-        and "pool" not in use_options
-        and "pool_timeout" not in use_options
-        and "max_overflow" not in use_options
+        and "pool" not in options
+        and "pool_timeout" not in options
+        and "max_overflow" not in options
     ):
         engine.pool._timeout = 0
         engine.pool._max_overflow = 0
-    if use_reaper:
-        testing_reaper.add_engine(engine, scope)
 
     return engine
 
index 270a1b7d73ebbd00f67715fd94772f42ad5ad423..9d02e9cb3710bb5dc1a04387d061b72e48235b7c 100644 (file)
@@ -167,10 +167,7 @@ class TestBase:
         def gen_testing_engine(
             url=None,
             options=None,
-            future=None,
             asyncio=False,
-            transfer_staticpool=False,
-            share_pool=False,
         ):
             if options is None:
                 options = {}
@@ -179,8 +176,6 @@ class TestBase:
                 url=url,
                 options=options,
                 asyncio=asyncio,
-                transfer_staticpool=transfer_staticpool,
-                share_pool=share_pool,
             )
 
         yield gen_testing_engine
index ebf7f63ca57e4cb53b9fc38a5e4d1105a13918aa..aa7d0daaa1584908c5120628be97f102f54f7501 100644 (file)
@@ -89,7 +89,9 @@ def setup_config(db_url, options, file_config, follower_ident):
     update_db_opts(db_url, db_opts, options)
     db_opts["scope"] = "global"
     eng = engines.testing_engine(db_url, db_opts)
+
     post_configure_engine(db_url, eng, follower_ident)
+
     eng.connect().close()
 
     cfg = config.Config.register(eng, db_opts, options, file_config)
@@ -205,6 +207,23 @@ def _generate_driver_urls(url, extra_drivers):
             yield new_url
 
 
+@register.init
+def is_preferred_driver(cfg, engine):
+    """Return True if the engine's URL is on the "default" driver, or
+    more generally the "preferred" driver to use for tests.
+
+    Backends can override this to make a different driver the "prefeferred"
+    driver that's not the default.
+
+    """
+    return (
+        engine.url._get_entrypoint()
+        is engine.url.set(
+            drivername=engine.url.get_backend_name()
+        )._get_entrypoint()
+    )
+
+
 @register.init
 def generate_driver_url(url, driver, query_str):
     backend = url.get_backend_name()
@@ -371,12 +390,25 @@ def update_db_opts(db_url, db_opts, options):
 
 @register.init
 def post_configure_engine(url, engine, follower_ident):
-    """Perform extra steps after configuring an engine for testing.
+    """Perform extra steps after configuring the main engine for testing.
 
     (For the internal dialects, currently only used by sqlite, oracle, mssql)
     """
 
 
+@register.init
+def post_configure_testing_engine(url, engine, options, scope):
+    """perform extra steps after configuring any engine within the
+    testing_engine() function.
+
+    this includes the main engine as well as most ad-hoc testing engines.
+
+    steps here should not get in the way of test cases that are looking
+    for events, etc.
+
+    """
+
+
 @register.init
 def follower_url_from_main(url, ident):
     """Create a connection URL for a dynamically-created test database.
index e17c4aab67b95bd2051b621904a63047d05897fa..b79d4b952f836b50747a3411bdcda018bab3df5c 100644 (file)
@@ -1553,9 +1553,7 @@ class SuiteRequirements(Requirements):
     def ad_hoc_engines(self):
         """Test environment must allow ad-hoc engine/connection creation.
 
-        This is now a no-op since we reconfigured ``options.low_connections``
-        to cause the ``testing_engine()`` to close off other open connections
-        when its invoked.
+        No longer used in any tests; is a no-op
 
         """
         return exclusions.open()
index 36f98d053d8802972f916c3d69580a3d359ce9ff..98cc307f497d9c6d3f9159afd19186d4f3a34f6a 100644 (file)
@@ -856,7 +856,7 @@ class JoinedEagerLoadTest(NoCache, fixtures.MappedTest):
         # this test has been reworked to use the compiled cache again,
         # as a real-world scenario.
 
-        eng = testing_engine(share_pool=True)
+        eng = testing_engine(options={"sqlite_share_pool": True})
         sess = Session(eng)
 
         q = sess.query(A).options(
index d5bf5ab26612f81873a06aa48a5d4f2eb7d2f593..4ede0f6a2e67ea91c495bca7aa72f1c8336e485a 100644 (file)
@@ -56,16 +56,14 @@ from sqlalchemy.types import NullType
 class ReflectionFixtures:
     @testing.fixture(
         params=[
-            ("engine", True),
-            ("connection", True),
-            ("engine", False),
-            ("connection", False),
+            "engine",
+            "connection",
         ]
     )
     def inspect_fixture(self, request, metadata, testing_engine):
-        engine, future = request.param
+        engine = request.param
 
-        eng = testing_engine(future=future)
+        eng = testing_engine()
 
         conn = eng.connect()
 
index a4a6f1f47cd55da5e7f61323046b95f550b16a65..a09669a7131a02fe85614bcf7cc6d566b5ab54fd 100644 (file)
@@ -359,7 +359,6 @@ class ResetEventTest(fixtures.TestBase):
 
 
 class EngineEventsTest(fixtures.TestBase):
-    __requires__ = ("ad_hoc_engines",)
     __backend__ = True
 
     def teardown_test(self):
index 9c76c160b6923d3b9c6f86dd9da05bef22d837d2..b07c0cc9a828c8ed20f7cfa253e2154905e0f7c0 100644 (file)
@@ -74,7 +74,6 @@ class Foo:
 
 
 class ExecuteTest(fixtures.TablesTest):
-    __backend__ = True
 
     @classmethod
     def define_tables(cls, metadata):
@@ -93,22 +92,6 @@ class ExecuteTest(fixtures.TablesTest):
             Column("user_name", VARCHAR(20)),
         )
 
-    def test_no_params_option(self):
-        stmt = (
-            "SELECT '%'"
-            + testing.db.dialect.statement_compiler(
-                testing.db.dialect, None
-            ).default_from()
-        )
-
-        with testing.db.connect() as conn:
-            result = (
-                conn.execution_options(no_parameters=True)
-                .exec_driver_sql(stmt)
-                .scalar()
-            )
-            eq_(result, "%")
-
     def test_no_strings(self, connection):
         with expect_raises_message(
             tsa.exc.ObjectNotExecutableError,
@@ -157,904 +140,777 @@ class ExecuteTest(fixtures.TablesTest):
             name="sally",
         )
 
-    @testing.requires.qmark_paramstyle
-    def test_raw_qmark(self, connection):
-        conn = connection
-        conn.exec_driver_sql(
-            "insert into users (user_id, user_name) values (?, ?)",
-            (1, "jack"),
-        )
-        conn.exec_driver_sql(
-            "insert into users (user_id, user_name) values (?, ?)",
-            (2, "fred"),
-        )
-        conn.exec_driver_sql(
-            "insert into users (user_id, user_name) values (?, ?)",
-            [(3, "ed"), (4, "horse")],
-        )
-        conn.exec_driver_sql(
-            "insert into users (user_id, user_name) values (?, ?)",
-            [(5, "barney"), (6, "donkey")],
-        )
-        conn.exec_driver_sql(
-            "insert into users (user_id, user_name) values (?, ?)",
-            (7, "sally"),
-        )
-        res = conn.exec_driver_sql("select * from users order by user_id")
-        assert res.fetchall() == [
-            (1, "jack"),
-            (2, "fred"),
-            (3, "ed"),
-            (4, "horse"),
-            (5, "barney"),
-            (6, "donkey"),
-            (7, "sally"),
-        ]
+    def test_dialect_has_table_assertion(self):
+        with expect_raises_message(
+            tsa.exc.ArgumentError,
+            r"The argument passed to Dialect.has_table\(\) should be a",
+        ):
+            testing.db.dialect.has_table(testing.db, "some_table")
 
-        res = conn.exec_driver_sql(
-            "select * from users where user_name=?", ("jack",)
-        )
-        assert res.fetchall() == [(1, "jack")]
+    def test_not_an_executable(self):
+        for obj in (
+            Table("foo", MetaData(), Column("x", Integer)),
+            Column("x", Integer),
+            tsa.and_(True),
+            tsa.and_(True).compile(),
+            column("foo"),
+            column("foo").compile(),
+            select(1).cte(),
+            # select(1).subquery(),
+            MetaData(),
+            Integer(),
+            tsa.Index(name="foo"),
+            tsa.UniqueConstraint("x"),
+        ):
+            with testing.db.connect() as conn:
+                assert_raises_message(
+                    tsa.exc.ObjectNotExecutableError,
+                    "Not an executable object",
+                    conn.execute,
+                    obj,
+                )
 
-    @testing.requires.format_paramstyle
-    def test_raw_sprintf(self, connection):
-        conn = connection
-        conn.exec_driver_sql(
-            "insert into users (user_id, user_name) values (%s, %s)",
-            (1, "jack"),
-        )
-        conn.exec_driver_sql(
-            "insert into users (user_id, user_name) values (%s, %s)",
-            [(2, "ed"), (3, "horse")],
-        )
-        conn.exec_driver_sql(
-            "insert into users (user_id, user_name) values (%s, %s)",
-            (4, "sally"),
-        )
-        conn.exec_driver_sql("insert into users (user_id) values (%s)", (5,))
-        res = conn.exec_driver_sql("select * from users order by user_id")
-        assert res.fetchall() == [
-            (1, "jack"),
-            (2, "ed"),
-            (3, "horse"),
-            (4, "sally"),
-            (5, None),
-        ]
+    def test_stmt_exception_bytestring_raised(self):
+        name = "méil"
+        users = self.tables.users
+        with testing.db.connect() as conn:
+            assert_raises_message(
+                tsa.exc.StatementError,
+                "A value is required for bind parameter 'uname'\n"
+                ".*SELECT users.user_name AS .méil.",
+                conn.execute,
+                select(users.c.user_name.label(name)).where(
+                    users.c.user_name == bindparam("uname")
+                ),
+                {"uname_incorrect": "foo"},
+            )
 
-        res = conn.exec_driver_sql(
-            "select * from users where user_name=%s", ("jack",)
-        )
-        assert res.fetchall() == [(1, "jack")]
+    def test_stmt_exception_bytestring_utf8(self):
+        # uncommon case for Py3K, bytestring object passed
+        # as the error message
+        message = "some message méil".encode()
 
-    @testing.requires.pyformat_paramstyle
-    def test_raw_python(self, connection):
-        conn = connection
-        conn.exec_driver_sql(
-            "insert into users (user_id, user_name) "
-            "values (%(id)s, %(name)s)",
-            {"id": 1, "name": "jack"},
-        )
-        conn.exec_driver_sql(
-            "insert into users (user_id, user_name) "
-            "values (%(id)s, %(name)s)",
-            [{"id": 2, "name": "ed"}, {"id": 3, "name": "horse"}],
-        )
-        conn.exec_driver_sql(
-            "insert into users (user_id, user_name) "
-            "values (%(id)s, %(name)s)",
-            dict(id=4, name="sally"),
-        )
-        res = conn.exec_driver_sql("select * from users order by user_id")
-        assert res.fetchall() == [
-            (1, "jack"),
-            (2, "ed"),
-            (3, "horse"),
-            (4, "sally"),
-        ]
+        err = tsa.exc.SQLAlchemyError(message)
+        eq_(str(err), "some message méil")
 
-    @testing.requires.named_paramstyle
-    def test_raw_named(self, connection):
-        conn = connection
-        conn.exec_driver_sql(
-            "insert into users (user_id, user_name) values (:id, :name)",
-            {"id": 1, "name": "jack"},
-        )
-        conn.exec_driver_sql(
-            "insert into users (user_id, user_name) values (:id, :name)",
-            [{"id": 2, "name": "ed"}, {"id": 3, "name": "horse"}],
-        )
-        conn.exec_driver_sql(
-            "insert into users (user_id, user_name) values (:id, :name)",
-            {"id": 4, "name": "sally"},
-        )
-        res = conn.exec_driver_sql("select * from users order by user_id")
-        assert res.fetchall() == [
-            (1, "jack"),
-            (2, "ed"),
-            (3, "horse"),
-            (4, "sally"),
-        ]
+    def test_stmt_exception_bytestring_latin1(self):
+        # uncommon case for Py3K, bytestring object passed
+        # as the error message
+        message = "some message méil".encode("latin-1")
 
-    def test_raw_tuple_params(self, connection):
-        """test #7820
+        err = tsa.exc.SQLAlchemyError(message)
+        eq_(str(err), "some message m\\xe9il")
 
-        There was an apparent improvement in the distill params
-        methodology used in exec_driver_sql which allows raw tuples to
-        pass through.  In 1.4 there seems to be a _distill_cursor_params()
-        function that says it can handle this kind of parameter, but it isn't
-        used and when I tried to substitute it in for exec_driver_sql(),
-        things still fail.
+    def test_stmt_exception_unicode_hook_unicode(self):
+        # uncommon case for Py2K, Unicode object passed
+        # as the error message
+        message = "some message méil"
 
-        In any case, add coverage here for the use case of passing
-        direct tuple params to exec_driver_sql including as the first
-        param, to note that it isn't mis-interpreted the way it is
-        in 1.x.
+        err = tsa.exc.SQLAlchemyError(message)
+        eq_(str(err), "some message méil")
 
-        """
+    def test_stmt_exception_object_arg(self):
+        err = tsa.exc.SQLAlchemyError(Foo())
+        eq_(str(err), "foo")
 
-        with patch.object(connection.dialect, "do_execute") as do_exec:
-            connection.exec_driver_sql(
-                "UPDATE users SET user_name = 'query_one' WHERE "
-                "user_id = %s OR user_id IN %s",
-                (3, (1, 2)),
-            )
+    def test_stmt_exception_str_multi_args(self):
+        err = tsa.exc.SQLAlchemyError("some message", 206)
+        eq_(str(err), "('some message', 206)")
 
-            connection.exec_driver_sql(
-                "UPDATE users SET user_name = 'query_two' WHERE "
-                "user_id IN %s OR user_id = %s",
-                ((1, 2), 3),
-            )
+    def test_stmt_exception_str_multi_args_bytestring(self):
+        message = "some message méil".encode()
 
-        eq_(
-            do_exec.mock_calls,
-            [
-                call(
-                    mock.ANY,
-                    "UPDATE users SET user_name = 'query_one' "
-                    "WHERE user_id = %s OR user_id IN %s",
-                    connection.dialect.execute_sequence_format((3, (1, 2))),
-                    mock.ANY,
-                ),
-                call(
-                    mock.ANY,
-                    "UPDATE users SET user_name = 'query_two' "
-                    "WHERE user_id IN %s OR user_id = %s",
-                    connection.dialect.execute_sequence_format(((1, 2), 3)),
-                    mock.ANY,
-                ),
-            ],
-        )
+        err = tsa.exc.SQLAlchemyError(message, 206)
+        eq_(str(err), str((message, 206)))
 
-    def test_non_dict_mapping(self, connection):
-        """ensure arbitrary Mapping works for execute()"""
+    def test_stmt_exception_str_multi_args_unicode(self):
+        message = "some message méil"
 
-        class NotADict(collections_abc.Mapping):
-            def __init__(self, _data):
-                self._data = _data
+        err = tsa.exc.SQLAlchemyError(message, 206)
+        eq_(str(err), str((message, 206)))
 
-            def __iter__(self):
-                return iter(self._data)
+    def test_generative_engine_event_dispatch_hasevents(self, testing_engine):
+        def l1(*arg, **kw):
+            pass
 
-            def __len__(self):
-                return len(self._data)
+        eng = testing_engine()
+        assert not eng._has_events
+        event.listen(eng, "before_execute", l1)
+        eng2 = eng.execution_options(foo="bar")
+        assert eng2._has_events
 
-            def __getitem__(self, key):
-                return self._data[key]
+    def test_scalar(self, connection):
+        conn = connection
+        users = self.tables.users
+        conn.execute(
+            users.insert(),
+            [
+                {"user_id": 1, "user_name": "sandy"},
+                {"user_id": 2, "user_name": "spongebob"},
+            ],
+        )
+        res = conn.scalar(select(users.c.user_name).order_by(users.c.user_id))
+        eq_(res, "sandy")
 
-            def keys(self):
-                return self._data.keys()
+    def test_scalars(self, connection):
+        conn = connection
+        users = self.tables.users
+        conn.execute(
+            users.insert(),
+            [
+                {"user_id": 1, "user_name": "sandy"},
+                {"user_id": 2, "user_name": "spongebob"},
+            ],
+        )
+        res = conn.scalars(select(users.c.user_name).order_by(users.c.user_id))
+        eq_(res.all(), ["sandy", "spongebob"])
 
-        nd = NotADict({"a": 10, "b": 15})
-        eq_(dict(nd), {"a": 10, "b": 15})
+    @testing.combinations(
+        ({"user_id": 1, "user_name": "name1"},),
+        ([{"user_id": 1, "user_name": "name1"}],),
+        (({"user_id": 1, "user_name": "name1"},),),
+        (
+            [
+                {"user_id": 1, "user_name": "name1"},
+                {"user_id": 2, "user_name": "name2"},
+            ],
+        ),
+        argnames="parameters",
+    )
+    def test_params_interpretation(self, connection, parameters):
+        users = self.tables.users
 
-        result = connection.execute(
-            select(
-                bindparam("a", type_=Integer), bindparam("b", type_=Integer)
-            ),
-            nd,
-        )
-        eq_(result.first(), (10, 15))
+        connection.execute(users.insert(), parameters)
 
-    def test_row_works_as_mapping(self, connection):
-        """ensure the RowMapping object works as a parameter dictionary for
-        execute."""
 
-        result = connection.execute(
-            select(literal(10).label("a"), literal(15).label("b"))
-        )
-        row = result.first()
-        eq_(row, (10, 15))
-        eq_(row._mapping, {"a": 10, "b": 15})
+class ConvenienceExecuteTest(fixtures.TablesTest):
 
-        result = connection.execute(
-            select(
-                bindparam("a", type_=Integer).label("a"),
-                bindparam("b", type_=Integer).label("b"),
-            ),
-            row._mapping,
+    @classmethod
+    def define_tables(cls, metadata):
+        cls.table = Table(
+            "exec_test",
+            metadata,
+            Column("a", Integer),
+            Column("b", Integer),
+            test_needs_acid=True,
         )
-        row = result.first()
-        eq_(row, (10, 15))
-        eq_(row._mapping, {"a": 10, "b": 15})
 
-    def test_dialect_has_table_assertion(self):
-        with expect_raises_message(
-            tsa.exc.ArgumentError,
-            r"The argument passed to Dialect.has_table\(\) should be a",
-        ):
-            testing.db.dialect.has_table(testing.db, "some_table")
+    def _trans_fn(self, is_transaction=False):
+        def go(conn, x, value=None):
+            if is_transaction:
+                conn = conn.connection
+            conn.execute(self.table.insert().values(a=x, b=value))
 
-    def test_exception_wrapping_dbapi(self):
-        with testing.db.connect() as conn:
-            assert_raises_message(
-                tsa.exc.DBAPIError,
-                r"not_a_valid_statement",
-                conn.exec_driver_sql,
-                "not_a_valid_statement",
-            )
+        return go
 
-    def test_exception_wrapping_orig_accessors(self):
-        de = None
+    def _trans_rollback_fn(self, is_transaction=False):
+        def go(conn, x, value=None):
+            if is_transaction:
+                conn = conn.connection
+            conn.execute(self.table.insert().values(a=x, b=value))
+            raise SomeException("breakage")
 
+        return go
+
+    def _assert_no_data(self):
         with testing.db.connect() as conn:
-            try:
-                conn.exec_driver_sql("not_a_valid_statement")
-            except tsa.exc.DBAPIError as de_caught:
-                de = de_caught
+            eq_(
+                conn.scalar(select(func.count("*")).select_from(self.table)),
+                0,
+            )
 
-        assert isinstance(de.orig, conn.dialect.dbapi.Error)
+    def _assert_fn(self, x, value=None):
+        with testing.db.connect() as conn:
+            eq_(conn.execute(self.table.select()).fetchall(), [(x, value)])
 
-        # get the driver module name, the one which we know will provide
-        # for exceptions
-        top_level_dbapi_module = conn.dialect.dbapi
-        if isinstance(top_level_dbapi_module, AsyncAdapt_dbapi_module):
-            driver_module = top_level_dbapi_module.exceptions_module
-        else:
-            driver_module = top_level_dbapi_module
-        top_level_dbapi_module = driver_module.__name__.split(".")[0]
+    def test_transaction_engine_ctx_commit(self):
+        fn = self._trans_fn()
+        ctx = testing.db.begin()
+        testing.run_as_contextmanager(ctx, fn, 5, value=8)
+        self._assert_fn(5, value=8)
 
-        # check that it's not us
-        ne_(top_level_dbapi_module, "sqlalchemy")
+    def test_transaction_engine_ctx_begin_fails_dont_enter_enter(self):
+        """test #7272"""
+        engine = engines.testing_engine()
 
-        # then make sure driver_exception is from that module
-        assert type(de.driver_exception).__module__.startswith(
-            top_level_dbapi_module
+        mock_connection = Mock(
+            return_value=Mock(begin=Mock(side_effect=Exception("boom")))
         )
+        with mock.patch.object(engine, "_connection_cls", mock_connection):
+            # context manager isn't entered, doesn't actually call
+            # connect() or connection.begin()
+            engine.begin()
 
-    @testing.requires.sqlite
-    def test_exception_wrapping_non_dbapi_error(self):
-        e = create_engine("sqlite://")
-        e.dialect.is_disconnect = is_disconnect = Mock()
-
-        with e.connect() as c:
-            c.connection.cursor = Mock(
-                return_value=Mock(
-                    execute=Mock(
-                        side_effect=TypeError("I'm not a DBAPI error")
-                    )
-                )
-            )
-            assert_raises_message(
-                TypeError,
-                "I'm not a DBAPI error",
-                c.exec_driver_sql,
-                "select ",
-            )
-            eq_(is_disconnect.call_count, 0)
+        eq_(mock_connection.return_value.close.mock_calls, [])
 
-    def test_exception_wrapping_non_standard_dbapi_error(self):
-        class DBAPIError(Exception):
-            pass
+    def test_transaction_engine_ctx_begin_fails_include_enter(self):
+        """test #7272
 
-        class OperationalError(DBAPIError):
-            pass
+        Note this behavior for 2.0 required that we add a new flag to
+        Connection _allow_autobegin=False, so that the first-connect
+        initialization sequence in create.py does not actually run begin()
+        events. previously, the initialize sequence used a future=False
+        connection unconditionally (and I didn't notice this).
 
-        class NonStandardException(OperationalError):
-            pass
+        """
+        engine = engines.testing_engine()
 
-        # TODO: this test is assuming too much of arbitrary dialects and would
-        # be better suited tested against a single mock dialect that does not
-        # have any special behaviors
+        close_mock = Mock()
         with (
-            patch.object(testing.db.dialect, "dbapi", Mock(Error=DBAPIError)),
-            patch.object(
-                testing.db.dialect, "loaded_dbapi", Mock(Error=DBAPIError)
-            ),
-            patch.object(
-                testing.db.dialect, "is_disconnect", lambda *arg: False
-            ),
-            patch.object(
-                testing.db.dialect,
-                "do_execute",
-                Mock(side_effect=NonStandardException),
-            ),
-            patch.object(
-                testing.db.dialect.execution_ctx_cls,
-                "handle_dbapi_exception",
-                Mock(),
+            mock.patch.object(
+                engine._connection_cls,
+                "begin",
+                Mock(side_effect=Exception("boom")),
             ),
+            mock.patch.object(engine._connection_cls, "close", close_mock),
         ):
-            with testing.db.connect() as conn:
-                assert_raises(
-                    tsa.exc.OperationalError, conn.exec_driver_sql, "select 1"
-                )
-
-    def test_exception_wrapping_non_dbapi_statement(self):
-        class MyType(TypeDecorator):
-            impl = Integer
-            cache_ok = True
+            with expect_raises_message(Exception, "boom"):
+                with engine.begin():
+                    pass
 
-            def process_bind_param(self, value, dialect):
-                raise SomeException("nope")
+        eq_(close_mock.mock_calls, [call()])
 
-        def _go(conn):
-            assert_raises_message(
-                tsa.exc.StatementError,
-                r"\(.*.SomeException\) " r"nope\n\[SQL\: u?SELECT 1 ",
-                conn.execute,
-                select(1).where(column("foo") == literal("bar", MyType())),
-            )
+    def test_transaction_engine_ctx_rollback(self):
+        fn = self._trans_rollback_fn()
+        ctx = testing.db.begin()
+        assert_raises_message(
+            Exception,
+            "breakage",
+            testing.run_as_contextmanager,
+            ctx,
+            fn,
+            5,
+            value=8,
+        )
+        self._assert_no_data()
 
+    def test_transaction_connection_ctx_commit(self):
+        fn = self._trans_fn(True)
         with testing.db.connect() as conn:
-            _go(conn)
-
-    def test_not_an_executable(self):
-        for obj in (
-            Table("foo", MetaData(), Column("x", Integer)),
-            Column("x", Integer),
-            tsa.and_(True),
-            tsa.and_(True).compile(),
-            column("foo"),
-            column("foo").compile(),
-            select(1).cte(),
-            # select(1).subquery(),
-            MetaData(),
-            Integer(),
-            tsa.Index(name="foo"),
-            tsa.UniqueConstraint("x"),
-        ):
-            with testing.db.connect() as conn:
-                assert_raises_message(
-                    tsa.exc.ObjectNotExecutableError,
-                    "Not an executable object",
-                    conn.execute,
-                    obj,
-                )
+            ctx = conn.begin()
+            testing.run_as_contextmanager(ctx, fn, 5, value=8)
+            self._assert_fn(5, value=8)
 
-    def test_stmt_exception_bytestring_raised(self):
-        name = "méil"
-        users = self.tables.users
+    def test_transaction_connection_ctx_rollback(self):
+        fn = self._trans_rollback_fn(True)
         with testing.db.connect() as conn:
+            ctx = conn.begin()
             assert_raises_message(
-                tsa.exc.StatementError,
-                "A value is required for bind parameter 'uname'\n"
-                ".*SELECT users.user_name AS .méil.",
-                conn.execute,
-                select(users.c.user_name.label(name)).where(
-                    users.c.user_name == bindparam("uname")
-                ),
-                {"uname_incorrect": "foo"},
+                Exception,
+                "breakage",
+                testing.run_as_contextmanager,
+                ctx,
+                fn,
+                5,
+                value=8,
             )
+            self._assert_no_data()
 
-    def test_stmt_exception_bytestring_utf8(self):
-        # uncommon case for Py3K, bytestring object passed
-        # as the error message
-        message = "some message méil".encode()
-
-        err = tsa.exc.SQLAlchemyError(message)
-        eq_(str(err), "some message méil")
-
-    def test_stmt_exception_bytestring_latin1(self):
-        # uncommon case for Py3K, bytestring object passed
-        # as the error message
-        message = "some message méil".encode("latin-1")
-
-        err = tsa.exc.SQLAlchemyError(message)
-        eq_(str(err), "some message m\\xe9il")
-
-    def test_stmt_exception_unicode_hook_unicode(self):
-        # uncommon case for Py2K, Unicode object passed
-        # as the error message
-        message = "some message méil"
-
-        err = tsa.exc.SQLAlchemyError(message)
-        eq_(str(err), "some message méil")
-
-    def test_stmt_exception_object_arg(self):
-        err = tsa.exc.SQLAlchemyError(Foo())
-        eq_(str(err), "foo")
-
-    def test_stmt_exception_str_multi_args(self):
-        err = tsa.exc.SQLAlchemyError("some message", 206)
-        eq_(str(err), "('some message', 206)")
-
-    def test_stmt_exception_str_multi_args_bytestring(self):
-        message = "some message méil".encode()
-
-        err = tsa.exc.SQLAlchemyError(message, 206)
-        eq_(str(err), str((message, 206)))
-
-    def test_stmt_exception_str_multi_args_unicode(self):
-        message = "some message méil"
+    def test_connection_as_ctx(self):
+        fn = self._trans_fn()
+        with testing.db.begin() as conn:
+            fn(conn, 5, value=8)
+        self._assert_fn(5, value=8)
 
-        err = tsa.exc.SQLAlchemyError(message, 206)
-        eq_(str(err), str((message, 206)))
 
-    def test_stmt_exception_pickleable_no_dbapi(self):
-        self._test_stmt_exception_pickleable(Exception("hello world"))
-
-    @testing.crashes(
-        "postgresql+psycopg2",
-        "Older versions don't support cursor pickling, newer ones do",
-    )
-    @testing.fails_on(
-        "+mysqlconnector",
-        "Exception doesn't come back exactly the same from pickle",
-    )
-    @testing.fails_on(
-        "oracle+cx_oracle",
-        "cx_oracle exception seems to be having some issue with pickling",
-    )
-    @testing.fails_on(
-        "oracle+oracledb",
-        "oracledb exception seems to be having some issue with pickling",
-    )
-    def test_stmt_exception_pickleable_plus_dbapi(self):
-        raw = testing.db.raw_connection()
-        the_orig = None
-        try:
-            try:
-                cursor = raw.cursor()
-                cursor.execute("SELECTINCORRECT")
-            except testing.db.dialect.dbapi.Error as orig:
-                # py3k has "orig" in local scope...
-                the_orig = orig
-        finally:
-            raw.close()
-        self._test_stmt_exception_pickleable(the_orig)
+class ExecuteDriverTest(fixtures.TablesTest):
+    __backend__ = True
 
-    def _test_stmt_exception_pickleable(self, orig):
-        for sa_exc in (
-            tsa.exc.StatementError(
-                "some error",
-                "select * from table",
-                {"foo": "bar"},
-                orig,
-                False,
-            ),
-            tsa.exc.InterfaceError(
-                "select * from table", {"foo": "bar"}, orig, True
-            ),
-            tsa.exc.NoReferencedTableError("message", "tname"),
-            tsa.exc.NoReferencedColumnError("message", "tname", "cname"),
-            tsa.exc.CircularDependencyError(
-                "some message", [1, 2, 3], [(1, 2), (3, 4)]
+    @classmethod
+    def define_tables(cls, metadata):
+        Table(
+            "users",
+            metadata,
+            Column("user_id", INT, primary_key=True, autoincrement=False),
+            Column("user_name", VARCHAR(20)),
+        )
+        Table(
+            "users_autoinc",
+            metadata,
+            Column(
+                "user_id", INT, primary_key=True, test_needs_autoincrement=True
             ),
-        ):
-            for loads, dumps in picklers():
-                repickled = loads(dumps(sa_exc))
-                eq_(repickled.args[0], sa_exc.args[0])
-                if isinstance(sa_exc, tsa.exc.StatementError):
-                    eq_(repickled.params, {"foo": "bar"})
-                    eq_(repickled.statement, sa_exc.statement)
-                    if hasattr(sa_exc, "connection_invalidated"):
-                        eq_(
-                            repickled.connection_invalidated,
-                            sa_exc.connection_invalidated,
-                        )
-                    eq_(repickled.orig.args[0], orig.args[0])
-
-    def test_dont_wrap_mixin(self):
-        class MyException(Exception, tsa.exc.DontWrapMixin):
-            pass
-
-        class MyType(TypeDecorator):
-            impl = Integer
-            cache_ok = True
-
-            def process_bind_param(self, value, dialect):
-                raise MyException("nope")
-
-        def _go(conn):
-            assert_raises_message(
-                MyException,
-                "nope",
-                conn.execute,
-                select(1).where(column("foo") == literal("bar", MyType())),
-            )
-
-        conn = testing.db.connect()
-        try:
-            _go(conn)
-        finally:
-            conn.close()
+            Column("user_name", VARCHAR(20)),
+        )
 
-    def test_empty_insert(self, connection):
-        """test that execute() interprets [] as a list with no params and
-        warns since it has nothing to do with such an executemany.
-        """
-        users_autoinc = self.tables.users_autoinc
+    def test_no_params_option(self):
+        stmt = (
+            "SELECT '%'"
+            + testing.db.dialect.statement_compiler(
+                testing.db.dialect, None
+            ).default_from()
+        )
 
-        with expect_deprecated(
-            r"Empty parameter sequence passed to execute\(\). "
-            "This use is deprecated and will raise an exception in a "
-            "future SQLAlchemy release"
-        ):
-            connection.execute(
-                users_autoinc.insert().values(
-                    user_name=bindparam("name", None)
-                ),
-                [],
+        with testing.db.connect() as conn:
+            result = (
+                conn.execution_options(no_parameters=True)
+                .exec_driver_sql(stmt)
+                .scalar()
             )
+            eq_(result, "%")
 
-        eq_(len(connection.execute(users_autoinc.select()).all()), 1)
-
-    @testing.only_on("sqlite")
-    def test_raw_insert_with_empty_list(self, connection):
-        """exec_driver_sql instead does not raise if an empty list is passed.
-        Let the driver do that if it wants to.
-        """
+    @testing.requires.qmark_paramstyle
+    def test_raw_qmark(self, connection):
         conn = connection
-        with expect_raises_message(
-            tsa.exc.ProgrammingError, "Incorrect number of bindings supplied"
-        ):
-            conn.exec_driver_sql(
-                "insert into users (user_id, user_name) values (?, ?)", []
-            )
-
-    @testing.requires.ad_hoc_engines
-    def test_engine_level_options(self):
-        eng = engines.testing_engine(
-            options={"execution_options": {"foo": "bar"}}
+        conn.exec_driver_sql(
+            "insert into users (user_id, user_name) values (?, ?)",
+            (1, "jack"),
         )
-        with eng.connect() as conn:
-            eq_(conn._execution_options["foo"], "bar")
-            eq_(
-                conn.execution_options(bat="hoho")._execution_options["foo"],
-                "bar",
-            )
-            eq_(
-                conn.execution_options(bat="hoho")._execution_options["bat"],
-                "hoho",
-            )
-            eq_(
-                conn.execution_options(foo="hoho")._execution_options["foo"],
-                "hoho",
-            )
-            eng.update_execution_options(foo="hoho")
-            conn = eng.connect()
-            eq_(conn._execution_options["foo"], "hoho")
-
-    @testing.requires.ad_hoc_engines
-    def test_generative_engine_execution_options(self):
-        eng = engines.testing_engine(
-            options={"execution_options": {"base": "x1"}}
+        conn.exec_driver_sql(
+            "insert into users (user_id, user_name) values (?, ?)",
+            (2, "fred"),
         )
+        conn.exec_driver_sql(
+            "insert into users (user_id, user_name) values (?, ?)",
+            [(3, "ed"), (4, "horse")],
+        )
+        conn.exec_driver_sql(
+            "insert into users (user_id, user_name) values (?, ?)",
+            [(5, "barney"), (6, "donkey")],
+        )
+        conn.exec_driver_sql(
+            "insert into users (user_id, user_name) values (?, ?)",
+            (7, "sally"),
+        )
+        res = conn.exec_driver_sql("select * from users order by user_id")
+        assert res.fetchall() == [
+            (1, "jack"),
+            (2, "fred"),
+            (3, "ed"),
+            (4, "horse"),
+            (5, "barney"),
+            (6, "donkey"),
+            (7, "sally"),
+        ]
 
-        is_(eng.engine, eng)
-
-        eng1 = eng.execution_options(foo="b1")
-        is_(eng1.engine, eng1)
-        eng2 = eng.execution_options(foo="b2")
-        eng1a = eng1.execution_options(bar="a1")
-        eng2a = eng2.execution_options(foo="b3", bar="a2")
-        is_(eng2a.engine, eng2a)
-
-        eq_(eng._execution_options, {"base": "x1"})
-        eq_(eng1._execution_options, {"base": "x1", "foo": "b1"})
-        eq_(eng2._execution_options, {"base": "x1", "foo": "b2"})
-        eq_(eng1a._execution_options, {"base": "x1", "foo": "b1", "bar": "a1"})
-        eq_(eng2a._execution_options, {"base": "x1", "foo": "b3", "bar": "a2"})
-        is_(eng1a.pool, eng.pool)
+        res = conn.exec_driver_sql(
+            "select * from users where user_name=?", ("jack",)
+        )
+        assert res.fetchall() == [(1, "jack")]
 
-        # test pool is shared
-        eng2.dispose()
-        is_(eng1a.pool, eng2.pool)
-        is_(eng.pool, eng2.pool)
+    @testing.requires.format_paramstyle
+    def test_raw_sprintf(self, connection):
+        conn = connection
+        conn.exec_driver_sql(
+            "insert into users (user_id, user_name) values (%s, %s)",
+            (1, "jack"),
+        )
+        conn.exec_driver_sql(
+            "insert into users (user_id, user_name) values (%s, %s)",
+            [(2, "ed"), (3, "horse")],
+        )
+        conn.exec_driver_sql(
+            "insert into users (user_id, user_name) values (%s, %s)",
+            (4, "sally"),
+        )
+        conn.exec_driver_sql("insert into users (user_id) values (%s)", (5,))
+        res = conn.exec_driver_sql("select * from users order by user_id")
+        assert res.fetchall() == [
+            (1, "jack"),
+            (2, "ed"),
+            (3, "horse"),
+            (4, "sally"),
+            (5, None),
+        ]
 
-    @testing.requires.ad_hoc_engines
-    def test_autocommit_option_no_issue_first_connect(self):
-        eng = create_engine(testing.db.url)
-        eng.update_execution_options(autocommit=True)
-        conn = eng.connect()
-        eq_(conn._execution_options, {"autocommit": True})
-        conn.close()
+        res = conn.exec_driver_sql(
+            "select * from users where user_name=%s", ("jack",)
+        )
+        assert res.fetchall() == [(1, "jack")]
 
-    def test_initialize_rollback(self):
-        """test a rollback happens during first connect"""
-        eng = create_engine(testing.db.url)
-        with patch.object(eng.dialect, "do_rollback") as do_rollback:
-            assert do_rollback.call_count == 0
-            connection = eng.connect()
-            assert do_rollback.call_count == 1
-        connection.close()
+    @testing.requires.pyformat_paramstyle
+    def test_raw_python(self, connection):
+        conn = connection
+        conn.exec_driver_sql(
+            "insert into users (user_id, user_name) "
+            "values (%(id)s, %(name)s)",
+            {"id": 1, "name": "jack"},
+        )
+        conn.exec_driver_sql(
+            "insert into users (user_id, user_name) "
+            "values (%(id)s, %(name)s)",
+            [{"id": 2, "name": "ed"}, {"id": 3, "name": "horse"}],
+        )
+        conn.exec_driver_sql(
+            "insert into users (user_id, user_name) "
+            "values (%(id)s, %(name)s)",
+            dict(id=4, name="sally"),
+        )
+        res = conn.exec_driver_sql("select * from users order by user_id")
+        assert res.fetchall() == [
+            (1, "jack"),
+            (2, "ed"),
+            (3, "horse"),
+            (4, "sally"),
+        ]
 
-    @testing.requires.ad_hoc_engines
-    def test_dialect_init_uses_options(self):
-        eng = create_engine(testing.db.url)
+    @testing.requires.named_paramstyle
+    def test_raw_named(self, connection):
+        conn = connection
+        conn.exec_driver_sql(
+            "insert into users (user_id, user_name) values (:id, :name)",
+            {"id": 1, "name": "jack"},
+        )
+        conn.exec_driver_sql(
+            "insert into users (user_id, user_name) values (:id, :name)",
+            [{"id": 2, "name": "ed"}, {"id": 3, "name": "horse"}],
+        )
+        conn.exec_driver_sql(
+            "insert into users (user_id, user_name) values (:id, :name)",
+            {"id": 4, "name": "sally"},
+        )
+        res = conn.exec_driver_sql("select * from users order by user_id")
+        assert res.fetchall() == [
+            (1, "jack"),
+            (2, "ed"),
+            (3, "horse"),
+            (4, "sally"),
+        ]
 
-        def my_init(connection):
-            connection.execution_options(foo="bar").execute(select(1))
+    def test_raw_tuple_params(self, connection):
+        """test #7820
 
-        with patch.object(eng.dialect, "initialize", my_init):
-            conn = eng.connect()
-            eq_(conn._execution_options, {})
-            conn.close()
+        There was an apparent improvement in the distill params
+        methodology used in exec_driver_sql which allows raw tuples to
+        pass through.  In 1.4 there seems to be a _distill_cursor_params()
+        function that says it can handle this kind of parameter, but it isn't
+        used and when I tried to substitute it in for exec_driver_sql(),
+        things still fail.
 
-    @testing.requires.ad_hoc_engines
-    def test_generative_engine_event_dispatch_hasevents(self):
-        def l1(*arg, **kw):
-            pass
+        In any case, add coverage here for the use case of passing
+        direct tuple params to exec_driver_sql including as the first
+        param, to note that it isn't mis-interpreted the way it is
+        in 1.x.
 
-        eng = create_engine(testing.db.url)
-        assert not eng._has_events
-        event.listen(eng, "before_execute", l1)
-        eng2 = eng.execution_options(foo="bar")
-        assert eng2._has_events
+        """
 
-    def test_works_after_dispose(self):
-        eng = create_engine(testing.db.url)
-        for i in range(3):
-            with eng.connect() as conn:
-                eq_(conn.scalar(select(1)), 1)
-            eng.dispose()
+        with patch.object(connection.dialect, "do_execute") as do_exec:
+            connection.exec_driver_sql(
+                "UPDATE users SET user_name = 'query_one' WHERE "
+                "user_id = %s OR user_id IN %s",
+                (3, (1, 2)),
+            )
 
-    def test_works_after_dispose_testing_engine(self):
-        eng = engines.testing_engine()
-        for i in range(3):
-            with eng.connect() as conn:
-                eq_(conn.scalar(select(1)), 1)
-            eng.dispose()
+            connection.exec_driver_sql(
+                "UPDATE users SET user_name = 'query_two' WHERE "
+                "user_id IN %s OR user_id = %s",
+                ((1, 2), 3),
+            )
 
-    def test_scalar(self, connection):
-        conn = connection
-        users = self.tables.users
-        conn.execute(
-            users.insert(),
+        eq_(
+            do_exec.mock_calls,
             [
-                {"user_id": 1, "user_name": "sandy"},
-                {"user_id": 2, "user_name": "spongebob"},
+                call(
+                    mock.ANY,
+                    "UPDATE users SET user_name = 'query_one' "
+                    "WHERE user_id = %s OR user_id IN %s",
+                    connection.dialect.execute_sequence_format((3, (1, 2))),
+                    mock.ANY,
+                ),
+                call(
+                    mock.ANY,
+                    "UPDATE users SET user_name = 'query_two' "
+                    "WHERE user_id IN %s OR user_id = %s",
+                    connection.dialect.execute_sequence_format(((1, 2), 3)),
+                    mock.ANY,
+                ),
             ],
         )
-        res = conn.scalar(select(users.c.user_name).order_by(users.c.user_id))
-        eq_(res, "sandy")
 
-    def test_scalars(self, connection):
-        conn = connection
-        users = self.tables.users
-        conn.execute(
-            users.insert(),
-            [
-                {"user_id": 1, "user_name": "sandy"},
-                {"user_id": 2, "user_name": "spongebob"},
-            ],
-        )
-        res = conn.scalars(select(users.c.user_name).order_by(users.c.user_id))
-        eq_(res.all(), ["sandy", "spongebob"])
+    def test_non_dict_mapping(self, connection):
+        """ensure arbitrary Mapping works for execute()"""
 
-    @testing.combinations(
-        ({}, {}, {}),
-        ({"a": "b"}, {}, {"a": "b"}),
-        ({"a": "b", "d": "e"}, {"a": "c"}, {"a": "c", "d": "e"}),
-        argnames="conn_opts, exec_opts, expected",
-    )
-    def test_execution_opts_per_invoke(
-        self, connection, conn_opts, exec_opts, expected
-    ):
-        opts = []
+        class NotADict(collections_abc.Mapping):
+            def __init__(self, _data):
+                self._data = _data
 
-        @event.listens_for(connection, "before_cursor_execute")
-        def before_cursor_execute(
-            conn, cursor, statement, parameters, context, executemany
-        ):
-            opts.append(context.execution_options)
+            def __iter__(self):
+                return iter(self._data)
 
-        if conn_opts:
-            connection = connection.execution_options(**conn_opts)
+            def __len__(self):
+                return len(self._data)
 
-        if exec_opts:
-            connection.execute(select(1), execution_options=exec_opts)
-        else:
-            connection.execute(select(1))
+            def __getitem__(self, key):
+                return self._data[key]
 
-        eq_(opts, [expected])
+            def keys(self):
+                return self._data.keys()
 
-    @testing.combinations(
-        ({}, {}, {}, {}),
-        ({}, {"a": "b"}, {}, {"a": "b"}),
-        ({}, {"a": "b", "d": "e"}, {"a": "c"}, {"a": "c", "d": "e"}),
-        (
-            {"q": "z", "p": "r"},
-            {"a": "b", "p": "x", "d": "e"},
-            {"a": "c"},
-            {"q": "z", "p": "x", "a": "c", "d": "e"},
-        ),
-        argnames="stmt_opts, conn_opts, exec_opts, expected",
-    )
-    def test_execution_opts_per_invoke_execute_events(
-        self, connection, stmt_opts, conn_opts, exec_opts, expected
-    ):
-        opts = []
+        nd = NotADict({"a": 10, "b": 15})
+        eq_(dict(nd), {"a": 10, "b": 15})
+
+        result = connection.execute(
+            select(
+                bindparam("a", type_=Integer), bindparam("b", type_=Integer)
+            ),
+            nd,
+        )
+        eq_(result.first(), (10, 15))
+
+    def test_row_works_as_mapping(self, connection):
+        """ensure the RowMapping object works as a parameter dictionary for
+        execute."""
+
+        result = connection.execute(
+            select(literal(10).label("a"), literal(15).label("b"))
+        )
+        row = result.first()
+        eq_(row, (10, 15))
+        eq_(row._mapping, {"a": 10, "b": 15})
 
-        @event.listens_for(connection, "before_execute")
-        def before_execute(
-            conn, clauseelement, multiparams, params, execution_options
-        ):
-            opts.append(("before", execution_options))
+        result = connection.execute(
+            select(
+                bindparam("a", type_=Integer).label("a"),
+                bindparam("b", type_=Integer).label("b"),
+            ),
+            row._mapping,
+        )
+        row = result.first()
+        eq_(row, (10, 15))
+        eq_(row._mapping, {"a": 10, "b": 15})
 
-        @event.listens_for(connection, "after_execute")
-        def after_execute(
-            conn,
-            clauseelement,
-            multiparams,
-            params,
-            execution_options,
-            result,
-        ):
-            opts.append(("after", execution_options))
+    def test_exception_wrapping_dbapi(self):
+        with testing.db.connect() as conn:
+            assert_raises_message(
+                tsa.exc.DBAPIError,
+                r"not_a_valid_statement",
+                conn.exec_driver_sql,
+                "not_a_valid_statement",
+            )
 
-        stmt = select(1)
+    def test_exception_wrapping_orig_accessors(self):
+        de = None
 
-        if stmt_opts:
-            stmt = stmt.execution_options(**stmt_opts)
+        with testing.db.connect() as conn:
+            try:
+                conn.exec_driver_sql("not_a_valid_statement")
+            except tsa.exc.DBAPIError as de_caught:
+                de = de_caught
 
-        if conn_opts:
-            connection = connection.execution_options(**conn_opts)
+        assert isinstance(de.orig, conn.dialect.dbapi.Error)
 
-        if exec_opts:
-            connection.execute(stmt, execution_options=exec_opts)
+        # get the driver module name, the one which we know will provide
+        # for exceptions
+        top_level_dbapi_module = conn.dialect.dbapi
+        if isinstance(top_level_dbapi_module, AsyncAdapt_dbapi_module):
+            driver_module = top_level_dbapi_module.exceptions_module
         else:
-            connection.execute(stmt)
+            driver_module = top_level_dbapi_module
+        top_level_dbapi_module = driver_module.__name__.split(".")[0]
 
-        eq_(opts, [("before", expected), ("after", expected)])
+        # check that it's not us
+        ne_(top_level_dbapi_module, "sqlalchemy")
 
-    @testing.combinations(
-        ({"user_id": 1, "user_name": "name1"},),
-        ([{"user_id": 1, "user_name": "name1"}],),
-        (({"user_id": 1, "user_name": "name1"},),),
-        (
-            [
-                {"user_id": 1, "user_name": "name1"},
-                {"user_id": 2, "user_name": "name2"},
-            ],
-        ),
-        argnames="parameters",
-    )
-    def test_params_interpretation(self, connection, parameters):
-        users = self.tables.users
+        # then make sure driver_exception is from that module
+        assert type(de.driver_exception).__module__.startswith(
+            top_level_dbapi_module
+        )
 
-        connection.execute(users.insert(), parameters)
+    @testing.requires.sqlite
+    def test_exception_wrapping_non_dbapi_error(self):
+        e = create_engine("sqlite://")
+        e.dialect.is_disconnect = is_disconnect = Mock()
 
+        with e.connect() as c:
+            c.connection.cursor = Mock(
+                return_value=Mock(
+                    execute=Mock(
+                        side_effect=TypeError("I'm not a DBAPI error")
+                    )
+                )
+            )
+            assert_raises_message(
+                TypeError,
+                "I'm not a DBAPI error",
+                c.exec_driver_sql,
+                "select ",
+            )
+            eq_(is_disconnect.call_count, 0)
 
-class ConvenienceExecuteTest(fixtures.TablesTest):
-    __sparse_driver_backend__ = True
+    def test_exception_wrapping_non_standard_dbapi_error(self):
+        class DBAPIError(Exception):
+            pass
 
-    @classmethod
-    def define_tables(cls, metadata):
-        cls.table = Table(
-            "exec_test",
-            metadata,
-            Column("a", Integer),
-            Column("b", Integer),
-            test_needs_acid=True,
-        )
+        class OperationalError(DBAPIError):
+            pass
 
-    def _trans_fn(self, is_transaction=False):
-        def go(conn, x, value=None):
-            if is_transaction:
-                conn = conn.connection
-            conn.execute(self.table.insert().values(a=x, b=value))
+        class NonStandardException(OperationalError):
+            pass
 
-        return go
+        # TODO: this test is assuming too much of arbitrary dialects and would
+        # be better suited tested against a single mock dialect that does not
+        # have any special behaviors
+        with (
+            patch.object(testing.db.dialect, "dbapi", Mock(Error=DBAPIError)),
+            patch.object(
+                testing.db.dialect, "loaded_dbapi", Mock(Error=DBAPIError)
+            ),
+            patch.object(
+                testing.db.dialect, "is_disconnect", lambda *arg: False
+            ),
+            patch.object(
+                testing.db.dialect,
+                "do_execute",
+                Mock(side_effect=NonStandardException),
+            ),
+            patch.object(
+                testing.db.dialect.execution_ctx_cls,
+                "handle_dbapi_exception",
+                Mock(),
+            ),
+        ):
+            with testing.db.connect() as conn:
+                assert_raises(
+                    tsa.exc.OperationalError, conn.exec_driver_sql, "select 1"
+                )
 
-    def _trans_rollback_fn(self, is_transaction=False):
-        def go(conn, x, value=None):
-            if is_transaction:
-                conn = conn.connection
-            conn.execute(self.table.insert().values(a=x, b=value))
-            raise SomeException("breakage")
+    def test_exception_wrapping_non_dbapi_statement(self):
+        class MyType(TypeDecorator):
+            impl = Integer
+            cache_ok = True
 
-        return go
+            def process_bind_param(self, value, dialect):
+                raise SomeException("nope")
 
-    def _assert_no_data(self):
-        with testing.db.connect() as conn:
-            eq_(
-                conn.scalar(select(func.count("*")).select_from(self.table)),
-                0,
+        def _go(conn):
+            assert_raises_message(
+                tsa.exc.StatementError,
+                r"\(.*.SomeException\) " r"nope\n\[SQL\: u?SELECT 1 ",
+                conn.execute,
+                select(1).where(column("foo") == literal("bar", MyType())),
             )
 
-    def _assert_fn(self, x, value=None):
         with testing.db.connect() as conn:
-            eq_(conn.execute(self.table.select()).fetchall(), [(x, value)])
+            _go(conn)
 
-    def test_transaction_engine_ctx_commit(self):
-        fn = self._trans_fn()
-        ctx = testing.db.begin()
-        testing.run_as_contextmanager(ctx, fn, 5, value=8)
-        self._assert_fn(5, value=8)
+    def test_stmt_exception_pickleable_no_dbapi(self):
+        self._test_stmt_exception_pickleable(Exception("hello world"))
 
-    def test_transaction_engine_ctx_begin_fails_dont_enter_enter(self):
-        """test #7272"""
-        engine = engines.testing_engine()
+    @testing.crashes(
+        "postgresql+psycopg2",
+        "Older versions don't support cursor pickling, newer ones do",
+    )
+    @testing.fails_on(
+        "+mysqlconnector",
+        "Exception doesn't come back exactly the same from pickle",
+    )
+    @testing.fails_on(
+        "oracle+cx_oracle",
+        "cx_oracle exception seems to be having some issue with pickling",
+    )
+    @testing.fails_on(
+        "oracle+oracledb",
+        "oracledb exception seems to be having some issue with pickling",
+    )
+    def test_stmt_exception_pickleable_plus_dbapi(self):
+        raw = testing.db.raw_connection()
+        the_orig = None
+        try:
+            try:
+                cursor = raw.cursor()
+                cursor.execute("SELECTINCORRECT")
+            except testing.db.dialect.dbapi.Error as orig:
+                # py3k has "orig" in local scope...
+                the_orig = orig
+        finally:
+            raw.close()
+        self._test_stmt_exception_pickleable(the_orig)
 
-        mock_connection = Mock(
-            return_value=Mock(begin=Mock(side_effect=Exception("boom")))
-        )
-        with mock.patch.object(engine, "_connection_cls", mock_connection):
-            # context manager isn't entered, doesn't actually call
-            # connect() or connection.begin()
-            engine.begin()
+    def _test_stmt_exception_pickleable(self, orig):
+        for sa_exc in (
+            tsa.exc.StatementError(
+                "some error",
+                "select * from table",
+                {"foo": "bar"},
+                orig,
+                False,
+            ),
+            tsa.exc.InterfaceError(
+                "select * from table", {"foo": "bar"}, orig, True
+            ),
+            tsa.exc.NoReferencedTableError("message", "tname"),
+            tsa.exc.NoReferencedColumnError("message", "tname", "cname"),
+            tsa.exc.CircularDependencyError(
+                "some message", [1, 2, 3], [(1, 2), (3, 4)]
+            ),
+        ):
+            for loads, dumps in picklers():
+                repickled = loads(dumps(sa_exc))
+                eq_(repickled.args[0], sa_exc.args[0])
+                if isinstance(sa_exc, tsa.exc.StatementError):
+                    eq_(repickled.params, {"foo": "bar"})
+                    eq_(repickled.statement, sa_exc.statement)
+                    if hasattr(sa_exc, "connection_invalidated"):
+                        eq_(
+                            repickled.connection_invalidated,
+                            sa_exc.connection_invalidated,
+                        )
+                    eq_(repickled.orig.args[0], orig.args[0])
+
+    def test_dont_wrap_mixin(self):
+        class MyException(Exception, tsa.exc.DontWrapMixin):
+            pass
+
+        class MyType(TypeDecorator):
+            impl = Integer
+            cache_ok = True
 
-        eq_(mock_connection.return_value.close.mock_calls, [])
+            def process_bind_param(self, value, dialect):
+                raise MyException("nope")
 
-    def test_transaction_engine_ctx_begin_fails_include_enter(self):
-        """test #7272
+        def _go(conn):
+            assert_raises_message(
+                MyException,
+                "nope",
+                conn.execute,
+                select(1).where(column("foo") == literal("bar", MyType())),
+            )
 
-        Note this behavior for 2.0 required that we add a new flag to
-        Connection _allow_autobegin=False, so that the first-connect
-        initialization sequence in create.py does not actually run begin()
-        events. previously, the initialize sequence used a future=False
-        connection unconditionally (and I didn't notice this).
+        conn = testing.db.connect()
+        try:
+            _go(conn)
+        finally:
+            conn.close()
 
+    def test_empty_insert(self, connection):
+        """test that execute() interprets [] as a list with no params and
+        warns since it has nothing to do with such an executemany.
         """
-        engine = engines.testing_engine()
+        users_autoinc = self.tables.users_autoinc
 
-        close_mock = Mock()
-        with (
-            mock.patch.object(
-                engine._connection_cls,
-                "begin",
-                Mock(side_effect=Exception("boom")),
-            ),
-            mock.patch.object(engine._connection_cls, "close", close_mock),
+        with expect_deprecated(
+            r"Empty parameter sequence passed to execute\(\). "
+            "This use is deprecated and will raise an exception in a "
+            "future SQLAlchemy release"
         ):
-            with expect_raises_message(Exception, "boom"):
-                with engine.begin():
-                    pass
-
-        eq_(close_mock.mock_calls, [call()])
-
-    def test_transaction_engine_ctx_rollback(self):
-        fn = self._trans_rollback_fn()
-        ctx = testing.db.begin()
-        assert_raises_message(
-            Exception,
-            "breakage",
-            testing.run_as_contextmanager,
-            ctx,
-            fn,
-            5,
-            value=8,
-        )
-        self._assert_no_data()
+            connection.execute(
+                users_autoinc.insert().values(
+                    user_name=bindparam("name", None)
+                ),
+                [],
+            )
 
-    def test_transaction_connection_ctx_commit(self):
-        fn = self._trans_fn(True)
-        with testing.db.connect() as conn:
-            ctx = conn.begin()
-            testing.run_as_contextmanager(ctx, fn, 5, value=8)
-            self._assert_fn(5, value=8)
+        eq_(len(connection.execute(users_autoinc.select()).all()), 1)
 
-    def test_transaction_connection_ctx_rollback(self):
-        fn = self._trans_rollback_fn(True)
-        with testing.db.connect() as conn:
-            ctx = conn.begin()
-            assert_raises_message(
-                Exception,
-                "breakage",
-                testing.run_as_contextmanager,
-                ctx,
-                fn,
-                5,
-                value=8,
+    @testing.only_on("sqlite")
+    def test_raw_insert_with_empty_list(self, connection):
+        """exec_driver_sql instead does not raise if an empty list is passed.
+        Let the driver do that if it wants to.
+        """
+        conn = connection
+        with expect_raises_message(
+            tsa.exc.ProgrammingError, "Incorrect number of bindings supplied"
+        ):
+            conn.exec_driver_sql(
+                "insert into users (user_id, user_name) values (?, ?)", []
             )
-            self._assert_no_data()
 
-    def test_connection_as_ctx(self):
-        fn = self._trans_fn()
-        with testing.db.begin() as conn:
-            fn(conn, 5, value=8)
-        self._assert_fn(5, value=8)
+    def test_works_after_dispose_testing_engine(self):
+        eng = engines.testing_engine()
+        for i in range(3):
+            with eng.connect() as conn:
+                eq_(conn.scalar(select(1)), 1)
+            eng.dispose()
 
 
 class CompiledCacheTest(fixtures.TestBase):
@@ -1653,6 +1509,157 @@ class SchemaTranslateTest(fixtures.TestBase, testing.AssertsExecutionResults):
 
 
 class ExecutionOptionsTest(fixtures.TestBase):
+    def test_engine_level_options(self):
+        eng = engines.testing_engine(
+            options={"execution_options": {"foo": "bar"}}
+        )
+        with eng.connect() as conn:
+            eq_(conn._execution_options["foo"], "bar")
+            eq_(
+                conn.execution_options(bat="hoho")._execution_options["foo"],
+                "bar",
+            )
+            eq_(
+                conn.execution_options(bat="hoho")._execution_options["bat"],
+                "hoho",
+            )
+            eq_(
+                conn.execution_options(foo="hoho")._execution_options["foo"],
+                "hoho",
+            )
+            eng.update_execution_options(foo="hoho")
+            conn = eng.connect()
+            eq_(conn._execution_options["foo"], "hoho")
+
+    def test_generative_engine_execution_options(self):
+        eng = engines.testing_engine(
+            options={"execution_options": {"base": "x1"}}
+        )
+
+        is_(eng.engine, eng)
+
+        eng1 = eng.execution_options(foo="b1")
+        is_(eng1.engine, eng1)
+        eng2 = eng.execution_options(foo="b2")
+        eng1a = eng1.execution_options(bar="a1")
+        eng2a = eng2.execution_options(foo="b3", bar="a2")
+        is_(eng2a.engine, eng2a)
+
+        eq_(eng._execution_options, {"base": "x1"})
+        eq_(eng1._execution_options, {"base": "x1", "foo": "b1"})
+        eq_(eng2._execution_options, {"base": "x1", "foo": "b2"})
+        eq_(eng1a._execution_options, {"base": "x1", "foo": "b1", "bar": "a1"})
+        eq_(eng2a._execution_options, {"base": "x1", "foo": "b3", "bar": "a2"})
+        is_(eng1a.pool, eng.pool)
+
+        # test pool is shared
+        eng2.dispose()
+        is_(eng1a.pool, eng2.pool)
+        is_(eng.pool, eng2.pool)
+
+    def test_autocommit_option_preserved_first_connect(self, testing_engine):
+        eng = testing_engine()
+        eng.update_execution_options(autocommit=True)
+        conn = eng.connect()
+        eq_(conn._execution_options, {"autocommit": True})
+        conn.close()
+
+    def test_initialize_rollback(self, testing_engine):
+        """test a rollback happens during first connect"""
+        eng = testing_engine()
+        with patch.object(eng.dialect, "do_rollback") as do_rollback:
+            assert do_rollback.call_count == 0
+            connection = eng.connect()
+            assert do_rollback.call_count == 1
+        connection.close()
+
+    def test_dialect_init_uses_options(self, testing_engine):
+        eng = testing_engine()
+
+        def my_init(connection):
+            connection.execution_options(foo="bar").execute(select(1))
+
+        with patch.object(eng.dialect, "initialize", my_init):
+            conn = eng.connect()
+            eq_(conn._execution_options, {})
+            conn.close()
+
+    @testing.combinations(
+        ({}, {}, {}),
+        ({"a": "b"}, {}, {"a": "b"}),
+        ({"a": "b", "d": "e"}, {"a": "c"}, {"a": "c", "d": "e"}),
+        argnames="conn_opts, exec_opts, expected",
+    )
+    def test_execution_opts_per_invoke(
+        self, connection, conn_opts, exec_opts, expected
+    ):
+        opts = []
+
+        @event.listens_for(connection, "before_cursor_execute")
+        def before_cursor_execute(
+            conn, cursor, statement, parameters, context, executemany
+        ):
+            opts.append(context.execution_options)
+
+        if conn_opts:
+            connection = connection.execution_options(**conn_opts)
+
+        if exec_opts:
+            connection.execute(select(1), execution_options=exec_opts)
+        else:
+            connection.execute(select(1))
+
+        eq_(opts, [expected])
+
+    @testing.combinations(
+        ({}, {}, {}, {}),
+        ({}, {"a": "b"}, {}, {"a": "b"}),
+        ({}, {"a": "b", "d": "e"}, {"a": "c"}, {"a": "c", "d": "e"}),
+        (
+            {"q": "z", "p": "r"},
+            {"a": "b", "p": "x", "d": "e"},
+            {"a": "c"},
+            {"q": "z", "p": "x", "a": "c", "d": "e"},
+        ),
+        argnames="stmt_opts, conn_opts, exec_opts, expected",
+    )
+    def test_execution_opts_per_invoke_execute_events(
+        self, connection, stmt_opts, conn_opts, exec_opts, expected
+    ):
+        opts = []
+
+        @event.listens_for(connection, "before_execute")
+        def before_execute(
+            conn, clauseelement, multiparams, params, execution_options
+        ):
+            opts.append(("before", execution_options))
+
+        @event.listens_for(connection, "after_execute")
+        def after_execute(
+            conn,
+            clauseelement,
+            multiparams,
+            params,
+            execution_options,
+            result,
+        ):
+            opts.append(("after", execution_options))
+
+        stmt = select(1)
+
+        if stmt_opts:
+            stmt = stmt.execution_options(**stmt_opts)
+
+        if conn_opts:
+            connection = connection.execution_options(**conn_opts)
+
+        if exec_opts:
+            connection.execute(stmt, execution_options=exec_opts)
+        else:
+            connection.execute(stmt)
+
+        eq_(opts, [("before", expected), ("after", expected)])
+
     def test_dialect_conn_options(self, testing_engine):
         engine = testing_engine("sqlite://", options=dict(_initialize=False))
         engine.dialect = Mock()
@@ -1719,8 +1726,6 @@ class ExecutionOptionsTest(fixtures.TestBase):
 
 
 class EngineEventsTest(fixtures.TestBase):
-    __requires__ = ("ad_hoc_engines",)
-    __sparse_driver_backend__ = True
 
     def teardown_test(self):
         Engine.dispatch._clear()
@@ -1808,7 +1813,6 @@ class EngineEventsTest(fixtures.TestBase):
         eq_(canary.be2.call_count, 1)
         eq_(canary.be3.call_count, 2)
 
-    @testing.requires.ad_hoc_engines
     def test_option_engine_registration_issue_one(self):
         """test #12289"""
 
@@ -1821,7 +1825,6 @@ class EngineEventsTest(fixtures.TestBase):
             {"foo": "bar", "isolation_level": "AUTOCOMMIT"},
         )
 
-    @testing.requires.ad_hoc_engines
     def test_option_engine_registration_issue_two(self):
         """test #12289"""
 
@@ -2009,7 +2012,7 @@ class EngineEventsTest(fixtures.TestBase):
         # new feature as of #2978
 
         canary = Mock()
-        e1 = testing_engine(config.db_url, future=False)
+        e1 = testing_engine(config.db_url)
         assert not e1._has_events
 
         conn = e1.connect()
@@ -2021,7 +2024,7 @@ class EngineEventsTest(fixtures.TestBase):
 
     def test_force_conn_events_false(self, testing_engine):
         canary = Mock()
-        e1 = testing_engine(config.db_url, future=False)
+        e1 = testing_engine(config.db_url)
         assert not e1._has_events
 
         event.listen(e1, "before_execute", canary.be1)
@@ -2344,7 +2347,6 @@ class EngineEventsTest(fixtures.TestBase):
         eq_(c3._execution_options, {"foo": "bar", "bar": "bat"})
         eq_(canary, ["execute", "cursor_execute"])
 
-    @testing.requires.ad_hoc_engines
     def test_generative_engine_event_dispatch(self):
         canary = []
 
@@ -2376,7 +2378,6 @@ class EngineEventsTest(fixtures.TestBase):
 
         eq_(canary, ["l1", "l2", "l3", "l1", "l2"])
 
-    @testing.requires.ad_hoc_engines
     def test_clslevel_engine_event_options(self):
         canary = []
 
@@ -2423,7 +2424,6 @@ class EngineEventsTest(fixtures.TestBase):
             conn.execute(select(1))
         eq_(canary, ["l2"])
 
-    @testing.requires.ad_hoc_engines
     def test_cant_listen_to_option_engine(self):
         from sqlalchemy.engine import base
 
@@ -2440,7 +2440,6 @@ class EngineEventsTest(fixtures.TestBase):
             evt,
         )
 
-    @testing.requires.ad_hoc_engines
     def test_dispose_event(self, testing_engine):
         canary = Mock()
         eng = testing_engine(testing.db.url)
@@ -2459,7 +2458,6 @@ class EngineEventsTest(fixtures.TestBase):
 
         eq_(canary.mock_calls, [call(eng), call(eng)])
 
-    @testing.requires.ad_hoc_engines
     @testing.combinations(True, False, argnames="close")
     def test_close_parameter(self, testing_engine, close):
         eng = testing_engine(
@@ -2841,7 +2839,6 @@ class EngineEventsTest(fixtures.TestBase):
 
 
 class HandleErrorTest(fixtures.TestBase):
-    __requires__ = ("ad_hoc_engines",)
     __sparse_driver_backend__ = True
 
     def teardown_test(self):
index ae528d12558118a1add73b7ddf67af622435e7fc..e47521d6c13009d35a73c50303b31100fe236b3f 100644 (file)
@@ -29,7 +29,6 @@ def exec_sql(engine, sql, *args, **kwargs):
 
 class LogParamsTest(fixtures.TestBase):
     __only_on__ = "sqlite+pysqlite"
-    __requires__ = ("ad_hoc_engines",)
 
     def setup_test(self):
         self.eng = engines.testing_engine(
@@ -611,7 +610,6 @@ class PoolLoggingTest(fixtures.TestBase):
 
 
 class LoggingNameTest(fixtures.TestBase):
-    __requires__ = ("ad_hoc_engines",)
 
     def _assert_names_in_execute(self, eng, eng_name, pool_name):
         with eng.connect() as conn:
@@ -1059,7 +1057,6 @@ class LoggingTokenTest(fixtures.TestBase):
 
 
 class EchoTest(fixtures.TestBase):
-    __requires__ = ("ad_hoc_engines",)
 
     def setup_test(self):
         self.level = logging.getLogger("sqlalchemy.engine").level
index e1515a23a863c22153832c3fc1d56bf57e11f7b6..f747d9ee0568046c6baeac6471f590726b161209 100644 (file)
@@ -1013,7 +1013,7 @@ class RealPrePingEventHandlerTest(fixtures.TestBase):
     """
 
     __backend__ = True
-    __requires__ = "graceful_disconnects", "ad_hoc_engines"
+    __requires__ = ("graceful_disconnects",)
 
     @testing.fixture
     def ping_fixture(self, testing_engine):
@@ -1155,7 +1155,7 @@ class RealPrePingEventHandlerTest(fixtures.TestBase):
 
 class RealReconnectTest(fixtures.TestBase):
     __backend__ = True
-    __requires__ = "graceful_disconnects", "ad_hoc_engines"
+    __requires__ = ("graceful_disconnects",)
 
     def setup_test(self):
         self.engine = engines.reconnecting_engine()
@@ -1488,9 +1488,6 @@ class PrePingRealTest(fixtures.TestBase):
 class InvalidateDuringResultTest(fixtures.TestBase):
     __backend__ = True
 
-    # test locks SQLite file databases due to unconsumed results
-    __requires__ = ("ad_hoc_engines",)
-
     def setup_test(self):
         self.engine = engines.reconnecting_engine()
         self.meta = MetaData()
index c93ab149d481e09806ba0d17bbebc35364c3f4a5..d5c6094a0406267bb440850c11dfc659cb805498 100644 (file)
@@ -565,8 +565,9 @@ class TransactionTest(fixtures.TablesTest):
         # a connection with an XID present
         @event.listens_for(eng, "invalidate")
         def conn_invalidated(dbapi_con, con_record, exception):
-            dbapi_con.close()
-            raise exception
+            if exception is not None:
+                dbapi_con.close()
+                raise exception
 
         with eng.connect() as conn:
             rec = conn.connection._connection_record
@@ -1147,16 +1148,13 @@ class AutoRollbackTest(fixtures.TestBase):
 class IsolationLevelTest(fixtures.TestBase):
     """see also sqlalchemy/testing/suite/test_dialect.py::IsolationLevelTest
 
-    this suite has sparse_backend / ad_hoc_engines so wont take place
+    this suite has sparse_backend so wont take place
     for every dbdriver under a nox run.   the suite test should cover
     that end of it
 
     """
 
-    __requires__ = (
-        "isolation_level",
-        "ad_hoc_engines",
-    )
+    __requires__ = ("isolation_level",)
     __sparse_driver_backend__ = True
 
     def _default_isolation_level(self):
index 49399f8e5ecec98241847cd37373d96be0c51845..7d965854cb492cf827cfb623dc8b1e4cbd185e8e 100644 (file)
@@ -182,7 +182,9 @@ class EngineFixture(AsyncFixture, fixtures.TablesTest):
 
     @testing.fixture
     def async_engine(self):
-        return engines.testing_engine(asyncio=True, transfer_staticpool=True)
+        return engines.testing_engine(
+            asyncio=True, options={"sqlite_share_pool": True}
+        )
 
     @testing.fixture
     def async_connection(self, async_engine):
@@ -262,7 +264,7 @@ class AsyncEngineTest(EngineFixture):
     @async_test
     async def test_engine_eq_ne(self, async_engine):
         e2 = _async_engine.AsyncEngine(async_engine.sync_engine)
-        e3 = engines.testing_engine(asyncio=True, transfer_staticpool=True)
+        e3 = engines.testing_engine(asyncio=True)
 
         eq_(async_engine, e2)
         ne_(async_engine, e3)
@@ -298,9 +300,7 @@ class AsyncEngineTest(EngineFixture):
                     result.all()
 
             try:
-                engine = engines.testing_engine(
-                    asyncio=True, transfer_staticpool=False
-                )
+                engine = engines.testing_engine(asyncio=True)
 
                 asyncio.run(main())
             except Exception as err:
@@ -743,12 +743,16 @@ class AsyncEngineTest(EngineFixture):
 
     @testing.requires.queue_pool
     @async_test
-    async def test_pool_exhausted_some_timeout(self, async_engine):
-        engine = create_async_engine(
-            testing.db.url,
-            pool_size=1,
-            max_overflow=0,
-            pool_timeout=0.1,
+    async def test_pool_exhausted_some_timeout(
+        self, testing_engine, async_engine
+    ):
+        engine = testing_engine(
+            asyncio=True,
+            options=dict(
+                pool_size=1,
+                max_overflow=0,
+                pool_timeout=0.1,
+            ),
         )
         async with engine.connect():
             with expect_raises(exc.TimeoutError):
@@ -767,12 +771,16 @@ class AsyncEngineTest(EngineFixture):
 
     @testing.requires.queue_pool
     @async_test
-    async def test_pool_exhausted_no_timeout(self, async_engine):
-        engine = create_async_engine(
-            testing.db.url,
-            pool_size=1,
-            max_overflow=0,
-            pool_timeout=0,
+    async def test_pool_exhausted_no_timeout(
+        self, testing_engine, async_engine
+    ):
+        engine = testing_engine(
+            asyncio=True,
+            options=dict(
+                pool_size=1,
+                max_overflow=0,
+                pool_timeout=0,
+            ),
         )
         async with engine.connect():
             with expect_raises(exc.TimeoutError):
index 22d53284ad19215250a7d10f8e00877ccc7e3509..82e5579f41a545923467c9ffc74bbf073d22075a 100644 (file)
@@ -61,13 +61,15 @@ class AsyncFixture(_AsyncFixture, _fixtures.FixtureTest):
 
     @testing.fixture
     def async_engine(self):
-        return engines.testing_engine(asyncio=True, transfer_staticpool=True)
+        return engines.testing_engine(
+            asyncio=True, options={"sqlite_share_pool": True}
+        )
 
     # TODO: this seems to cause deadlocks in
     # OverrideSyncSession for some reason
     # @testing.fixture
     # def async_engine(self, async_testing_engine):
-    # return async_testing_engine(transfer_staticpool=True)
+    # return async_testing_engine(options={"sqlite_share_pool": True})
 
     @testing.fixture
     def async_session(self, async_engine):
@@ -1242,7 +1244,7 @@ class AsyncAttrsTest(
 
     @testing.fixture
     def async_engine(self, async_testing_engine):
-        yield async_testing_engine(transfer_staticpool=True)
+        yield async_testing_engine(options={"sqlite_share_pool": True})
 
     @testing.fixture
     def ab_fixture(self, decl_base):
index 556ab550ceadc448fd5c34705a1cf4bce85e0a4f..50eb38c2b6394c9d9c1bf6146ea0e4c73b979cf1 100644 (file)
@@ -976,6 +976,7 @@ class DefaultRequirements(SuiteRequirements):
                 no_support(
                     "sqlite", "two-phase xact not supported by database"
                 ),
+                no_support("oracle+cx_oracle", "prefer oracledb"),
                 # in Ia3cbbf56d4882fcc7980f90519412f1711fae74d
                 # we are evaluating which modern MySQL / MariaDB versions
                 # can handle two-phase testing without too many problems
@@ -1802,10 +1803,6 @@ class DefaultRequirements(SuiteRequirements):
     def mssql_freetds(self):
         return only_on(["mssql+pymssql"])
 
-    @property
-    def ad_hoc_engines(self):
-        return skip_if(self._sqlite_file_db)
-
     @property
     def no_asyncio(self):
         def go(config):
index d68e2ce357044e6cfe09031181e007719c1b81e4..6252ad55067ef7cf9735c6743af1e5062d2fdb2d 100644 (file)
@@ -1201,9 +1201,7 @@ class InsertManyValuesTest(fixtures.RemovesEvents, fixtures.TablesTest):
 
     def test_disabled(self, testing_engine):
         e = testing_engine(
-            options={"use_insertmanyvalues": False},
-            share_pool=True,
-            transfer_staticpool=True,
+            options={"use_insertmanyvalues": False, "sqlite_share_pool": True},
         )
         totalnum = 1275
         data = [{"x": "x%d" % i, "y": "y%d" % i} for i in range(1, totalnum)]
index c357b92dffb93d3dd7a95a5032252f5de7826c16..007f3e189ccc6568104ae0ac86c0eac86caf9fe3 100644 (file)
@@ -1895,7 +1895,7 @@ class LambdaElementTest(
         users, addresses = user_address_fixture
 
         engine = testing_engine(
-            share_pool=True, options={"query_cache_size": 0}
+            options={"query_cache_size": 0, "sqlite_share_pool": True}
         )
         with engine.begin() as conn:
             conn.execute(