From 3d6a109c839638d19790c20f301a6a9c69ad54e8 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Thu, 27 Nov 2025 01:22:38 -0500 Subject: [PATCH] drop a 400 ton anvil on oracle 23c this DB is extremely erratic in being able to connect. Add a brute force connection retrier to all engines everywhere (which for oracledb we can fortunately use their built-in feature that also works). This actually works and I can see it pausing under load, reconnecting, and succeeding. the problem is that absolutely every engine everywhere needs this routine otherwise an engine without a retrier in it will crash. That then necessitates digging into testing_engine(), making sure testing_engine() is used everywhere an engine that's going to connect is used, then dealing with the fallout from that. We also simplify some older workarounds for cx_oracle and hack into config/provision to make oracledb seem like the primary DBAPI for most tests. testing_engine has been completely overhauled, making use of a new post_configure_testing_engine() hook which moves and refines the SQLite pool sharing and savepoint logic all into sqlite/provision.py and also allows for cx_oracle to apply a retry event handler. Change-Id: I4ea4c523effb878290d28b94d8925eb32fc5ae3b --- lib/sqlalchemy/dialects/oracle/provision.py | 122 +- lib/sqlalchemy/dialects/sqlite/provision.py | 29 +- lib/sqlalchemy/pool/impl.py | 9 + lib/sqlalchemy/testing/assertions.py | 5 +- lib/sqlalchemy/testing/config.py | 9 +- lib/sqlalchemy/testing/engines.py | 104 +- lib/sqlalchemy/testing/fixtures/base.py | 5 - lib/sqlalchemy/testing/provision.py | 34 +- lib/sqlalchemy/testing/requirements.py | 4 +- test/aaa_profiling/test_orm.py | 2 +- test/dialect/postgresql/test_reflection.py | 10 +- test/engine/test_deprecations.py | 1 - test/engine/test_execute.py | 1613 +++++++++---------- test/engine/test_logging.py | 3 - test/engine/test_reconnect.py | 7 +- test/engine/test_transaction.py | 12 +- test/ext/asyncio/test_engine.py | 42 +- test/ext/asyncio/test_session.py | 8 +- test/requirements.py | 5 +- test/sql/test_insert_exec.py | 4 +- test/sql/test_lambdas.py | 2 +- 21 files changed, 1072 insertions(+), 958 deletions(-) diff --git a/lib/sqlalchemy/dialects/oracle/provision.py b/lib/sqlalchemy/dialects/oracle/provision.py index 76d13c53bf..2ae5e93f19 100644 --- a/lib/sqlalchemy/dialects/oracle/provision.py +++ b/lib/sqlalchemy/dialects/oracle/provision.py @@ -6,6 +6,8 @@ # the MIT License: https://www.opensource.org/licenses/mit-license.php # mypy: ignore-errors +import time + from ... import create_engine from ... import exc from ... import inspect @@ -16,13 +18,50 @@ from ...testing.provision import drop_all_schema_objects_post_tables from ...testing.provision import drop_all_schema_objects_pre_tables from ...testing.provision import drop_db from ...testing.provision import follower_url_from_main +from ...testing.provision import generate_driver_url +from ...testing.provision import is_preferred_driver from ...testing.provision import log from ...testing.provision import post_configure_engine +from ...testing.provision import post_configure_testing_engine from ...testing.provision import run_reap_dbs from ...testing.provision import set_default_schema_on_connection from ...testing.provision import stop_test_class_outside_fixtures from ...testing.provision import temp_table_keyword_args from ...testing.provision import update_db_opts +from ...testing.warnings import warn_test_suite + + +@generate_driver_url.for_db("oracle") +def _oracle_generate_driver_url(url, driver, query_str): + + backend = url.get_backend_name() + + new_url = url.set( + drivername="%s+%s" % (backend, driver), + ) + + # use oracledb's retry feature, which is essential for oracle 23c + # which otherwise frequently rejects connections under load + # for cx_oracle we have a connect event instead + if driver in ("oracledb", "oracledb_async"): + # oracledb is even nice enough to convert from string to int + # for these opts, apparently + new_url = new_url.update_query_pairs( + [("retry_count", "5"), ("retry_delay", "2")] + ) + else: + # remove these params for cx_oracle if we received an + # already-modified URL + new_url = new_url.difference_update_query( + ["retry_count", "retry_delay"] + ) + + try: + new_url.get_dialect() + except exc.NoSuchModuleError: + return None + else: + return new_url @create_db.for_db("oracle") @@ -103,21 +142,6 @@ def _ora_stop_test_class_outside_fixtures(config, db, cls): except exc.DatabaseError as err: log.warning("purge recyclebin command failed: %s", err) - # clear statement cache on all connections that were used - # https://github.com/oracle/python-cx_Oracle/issues/519 - - all_dbapis = {cfg.db.dialect.dbapi for cfg in config.Config.all_configs()} - for cx_oracle_conn in _all_conns: - try: - sc = cx_oracle_conn.stmtcachesize - except tuple(dbapi.InterfaceError for dbapi in all_dbapis): - # connection closed - pass - else: - cx_oracle_conn.stmtcachesize = 0 - cx_oracle_conn.stmtcachesize = sc - _all_conns.clear() - def _purge_recyclebin(eng, schema=None): with eng.begin() as conn: @@ -135,24 +159,76 @@ def _purge_recyclebin(eng, schema=None): conn.exec_driver_sql(f'purge {type_} {owner}."{object_name}"') -_all_conns = set() +@is_preferred_driver.for_db("oracle") +def _oracle_is_preferred_driver(cfg, engine): + """establish oracledb as the preferred driver to use for tests, even + though cx_Oracle is still the "default" driver""" + + return engine.dialect.driver == "oracledb" and not engine.dialect.is_async + + +def _connect_with_retry(dialect, conn_rec, cargs, cparams): + assert dialect.driver == "cx_oracle" + + def _is_couldnt_connect(err): + return "DPY-6005" in str(err) or "ORA-12516" in str(err) + + err_ = None + for _ in range(5): + try: + return dialect.loaded_dbapi.connect(*cargs, **cparams) + except ( + dialect.loaded_dbapi.DatabaseError, + dialect.loaded_dbapi.OperationalError, + ) as err: + err_ = err + if _is_couldnt_connect(err): + warn_test_suite("Oracle database reconnecting...") + time.sleep(2) + continue + else: + raise + if err_ is not None: + raise Exception("connect failed after five attempts") from err_ + + +@post_configure_testing_engine.for_db("oracle") +def _oracle_post_configure_testing_engine(url, engine, options, scope): + from ... import event + + if engine.dialect.driver == "cx_oracle": + event.listen(engine, "do_connect", _connect_with_retry) @post_configure_engine.for_db("oracle") def _oracle_post_configure_engine(url, engine, follower_ident): - from sqlalchemy import event - @event.listens_for(engine, "checkout") - def checkout(dbapi_con, con_record, con_proxy): - _all_conns.add(dbapi_con) + from ... import event @event.listens_for(engine, "checkin") def checkin(dbapi_connection, connection_record): - # work around cx_Oracle issue: + # this was meant to work around this issue: # https://github.com/oracle/python-cx_Oracle/issues/530 # invalidate oracle connections that had 2pc set up - if "cx_oracle_xid" in connection_record.info: - connection_record.invalidate() + # however things are too complex with some of the 2pc tests, + # so just block cx_oracle from being used in 2pc tests (use oracledb + # instead) + # if "cx_oracle_xid" in connection_record.info: + # connection_record.invalidate() + + # clear statement cache on all connections that were used + # https://github.com/oracle/python-cx_Oracle/issues/519 + # TODO: oracledb claims to have this feature built in somehow, + # see if that's in use and/or if it needs to be enabled + # (or if this doesnt even apply to the newer oracle's we're using) + try: + sc = dbapi_connection.stmtcachesize + except: + # connection closed + pass + else: + dbapi_connection.stmtcachesize = 0 + dbapi_connection.stmtcachesize = sc @run_reap_dbs.for_db("oracle") diff --git a/lib/sqlalchemy/dialects/sqlite/provision.py b/lib/sqlalchemy/dialects/sqlite/provision.py index e1df005e72..6ca16ab580 100644 --- a/lib/sqlalchemy/dialects/sqlite/provision.py +++ b/lib/sqlalchemy/dialects/sqlite/provision.py @@ -9,20 +9,22 @@ import os import re +from ... import event from ... import exc from ...engine import url as sa_url +from ...testing import config from ...testing.provision import create_db from ...testing.provision import drop_db from ...testing.provision import follower_url_from_main from ...testing.provision import generate_driver_url from ...testing.provision import log from ...testing.provision import post_configure_engine +from ...testing.provision import post_configure_testing_engine from ...testing.provision import run_reap_dbs from ...testing.provision import stop_test_class_outside_fixtures from ...testing.provision import temp_table_keyword_args from ...testing.provision import upsert - # TODO: I can't get this to build dynamically with pytest-xdist procs _drivernames = { "pysqlite", @@ -139,6 +141,31 @@ def _sqlite_post_configure_engine(url, engine, follower_ident): os.remove(filename) +@post_configure_testing_engine.for_db("sqlite") +def _sqlite_post_configure_testing_engine(url, engine, options, scope): + + sqlite_savepoint = options.get("sqlite_savepoint", False) + sqlite_share_pool = options.get("sqlite_share_pool", False) + + if sqlite_savepoint and engine.name == "sqlite": + # apply SQLite savepoint workaround + @event.listens_for(engine, "connect") + def do_connect(dbapi_connection, connection_record): + dbapi_connection.isolation_level = None + + @event.listens_for(engine, "begin") + def do_begin(conn): + conn.exec_driver_sql("BEGIN") + + if sqlite_share_pool: + # SingletonThreadPool, StaticPool both support "transfer" + # so a new pool can share the same SQLite connection + # (single thread only) + if hasattr(engine.pool, "_transfer_from"): + options["use_reaper"] = False + engine.pool._transfer_from(config.db.pool) + + @create_db.for_db("sqlite") def _sqlite_create_db(cfg, eng, ident): pass diff --git a/lib/sqlalchemy/pool/impl.py b/lib/sqlalchemy/pool/impl.py index af39bba170..fbb6afa3df 100644 --- a/lib/sqlalchemy/pool/impl.py +++ b/lib/sqlalchemy/pool/impl.py @@ -378,6 +378,15 @@ class SingletonThreadPool(Pool): dialect=self._dialect, ) + def _transfer_from( + self, other_singleton_pool: SingletonThreadPool + ) -> None: + # used by the test suite to make a new engine / pool without + # losing the state of an existing SQLite :memory: connection + assert not hasattr(other_singleton_pool._fairy, "current") + self._conn = other_singleton_pool._conn + self._all_conns = other_singleton_pool._all_conns + def dispose(self) -> None: """Dispose of this pool.""" diff --git a/lib/sqlalchemy/testing/assertions.py b/lib/sqlalchemy/testing/assertions.py index 431c2b7e98..05f8d355d6 100644 --- a/lib/sqlalchemy/testing/assertions.py +++ b/lib/sqlalchemy/testing/assertions.py @@ -207,7 +207,10 @@ def _expect_warnings( if raise_on_any_unexpected: def real_warn(msg, *arg, **kw): - raise AssertionError("Got unexpected warning: %r" % msg) + if isinstance(msg, sa_exc.SATestSuiteWarning): + warnings.warn(msg, *arg, **kw) + else: + raise AssertionError("Got unexpected warning: %r" % msg) else: real_warn = warnings.warn diff --git a/lib/sqlalchemy/testing/config.py b/lib/sqlalchemy/testing/config.py index 69bbe25ecd..fbd854ad9f 100644 --- a/lib/sqlalchemy/testing/config.py +++ b/lib/sqlalchemy/testing/config.py @@ -339,12 +339,9 @@ class Config: self.is_async = db.dialect.is_async - self.is_default_dialect = ( - db.url._get_entrypoint() - is db.url.set( - drivername=db.url.get_backend_name() - )._get_entrypoint() - ) + from . import provision + + self.is_default_dialect = provision.is_preferred_driver(self, db) _stack = collections.deque() _configs = set() diff --git a/lib/sqlalchemy/testing/engines.py b/lib/sqlalchemy/testing/engines.py index 2dfa07222f..26a8470eb2 100644 --- a/lib/sqlalchemy/testing/engines.py +++ b/lib/sqlalchemy/testing/engines.py @@ -16,6 +16,7 @@ from typing import Any from typing import Dict from typing import Literal from typing import Optional +from typing import Union import warnings import weakref @@ -298,33 +299,30 @@ def reconnecting_engine(url=None, options=None): @typing.overload def testing_engine( - url: Optional[URL] = None, - options: Optional[Dict[str, Any]] = None, - asyncio: Literal[False] = False, - transfer_staticpool: bool = False, + url: Optional[URL] = ..., + options: Optional[Dict[str, Any]] = ..., + *, + asyncio: Literal[False], ) -> Engine: ... @typing.overload def testing_engine( - url: Optional[URL] = None, - options: Optional[Dict[str, Any]] = None, - asyncio: Literal[True] = True, - transfer_staticpool: bool = False, + url: Optional[URL] = ..., + options: Optional[Dict[str, Any]] = ..., + *, + asyncio: Literal[True], ) -> AsyncEngine: ... def testing_engine( - url=None, - options=None, - asyncio=False, - transfer_staticpool=False, - share_pool=False, - _sqlite_savepoint=False, -): + url: Optional[URL] = None, + options: Optional[Dict[str, Any]] = None, + *, + asyncio: bool = False, +) -> Union[Engine, AsyncEngine]: if asyncio: - assert not _sqlite_savepoint from sqlalchemy.ext.asyncio import ( create_async_engine as create_engine, ) @@ -332,54 +330,33 @@ def testing_engine( from sqlalchemy import create_engine from sqlalchemy.engine.url import make_url - if not options: - use_reaper = True - scope = "function" - sqlite_savepoint = False - else: - use_reaper = options.pop("use_reaper", True) - scope = options.pop("scope", "function") - sqlite_savepoint = options.pop("sqlite_savepoint", False) - - url = url or config.db.url + url = make_url(url if url else config.db.url) - url = make_url(url) + if not options: + options = {} - if ( - config.db is None or url.drivername == config.db.url.drivername - ) and config.db_opts: - use_options = config.db_opts.copy() - else: - use_options = {} + use_options = {} - if options is not None: - use_options.update(options) + for opt_dict in (config.db_opts, options): + if not opt_dict: + continue + use_options.update( + { + opt: value + for opt, value in opt_dict.items() + if opt not in ("scope", "use_reaper") + and not opt.startswith("sqlite_") + } + ) engine = create_engine(url, **use_options) - if sqlite_savepoint and engine.name == "sqlite": - # apply SQLite savepoint workaround - @event.listens_for(engine, "connect") - def do_connect(dbapi_connection, connection_record): - dbapi_connection.isolation_level = None - - @event.listens_for(engine, "begin") - def do_begin(conn): - conn.exec_driver_sql("BEGIN") - - if transfer_staticpool: - from sqlalchemy.pool import StaticPool - - if config.db is not None and isinstance(config.db.pool, StaticPool): - use_reaper = False - engine.pool._transfer_from(config.db.pool) - elif share_pool: - engine.pool = config.db.pool - elif config.options and config.options.low_connections: + if config.options and config.options.low_connections: # for suites running with --low-connections, dispose the "global" # engines to disconnect everything before making a testing engine testing_reaper._dispose_testing_engines("global") + scope = options.get("scope", "function") if scope == "global": if asyncio: engine.sync_engine._has_events = True @@ -388,16 +365,25 @@ def testing_engine( True # enable event blocks, helps with profiling ) + from . import provision + + provision.post_configure_testing_engine(engine.url, engine, options, scope) + + # post_configure_testing_engine may have modified the options dictionary + # in place; consume additional post arguments afterwards + + use_reaper = options.get("use_reaper", True) + if use_reaper: + testing_reaper.add_engine(engine, scope) + if ( isinstance(engine.pool, pool.QueuePool) - and "pool" not in use_options - and "pool_timeout" not in use_options - and "max_overflow" not in use_options + and "pool" not in options + and "pool_timeout" not in options + and "max_overflow" not in options ): engine.pool._timeout = 0 engine.pool._max_overflow = 0 - if use_reaper: - testing_reaper.add_engine(engine, scope) return engine diff --git a/lib/sqlalchemy/testing/fixtures/base.py b/lib/sqlalchemy/testing/fixtures/base.py index 270a1b7d73..9d02e9cb37 100644 --- a/lib/sqlalchemy/testing/fixtures/base.py +++ b/lib/sqlalchemy/testing/fixtures/base.py @@ -167,10 +167,7 @@ class TestBase: def gen_testing_engine( url=None, options=None, - future=None, asyncio=False, - transfer_staticpool=False, - share_pool=False, ): if options is None: options = {} @@ -179,8 +176,6 @@ class TestBase: url=url, options=options, asyncio=asyncio, - transfer_staticpool=transfer_staticpool, - share_pool=share_pool, ) yield gen_testing_engine diff --git a/lib/sqlalchemy/testing/provision.py b/lib/sqlalchemy/testing/provision.py index ebf7f63ca5..aa7d0daaa1 100644 --- a/lib/sqlalchemy/testing/provision.py +++ b/lib/sqlalchemy/testing/provision.py @@ -89,7 +89,9 @@ def setup_config(db_url, options, file_config, follower_ident): update_db_opts(db_url, db_opts, options) db_opts["scope"] = "global" eng = engines.testing_engine(db_url, db_opts) + post_configure_engine(db_url, eng, follower_ident) + eng.connect().close() cfg = config.Config.register(eng, db_opts, options, file_config) @@ -205,6 +207,23 @@ def _generate_driver_urls(url, extra_drivers): yield new_url +@register.init +def is_preferred_driver(cfg, engine): + """Return True if the engine's URL is on the "default" driver, or + more generally the "preferred" driver to use for tests. + + Backends can override this to make a different driver the "prefeferred" + driver that's not the default. + + """ + return ( + engine.url._get_entrypoint() + is engine.url.set( + drivername=engine.url.get_backend_name() + )._get_entrypoint() + ) + + @register.init def generate_driver_url(url, driver, query_str): backend = url.get_backend_name() @@ -371,12 +390,25 @@ def update_db_opts(db_url, db_opts, options): @register.init def post_configure_engine(url, engine, follower_ident): - """Perform extra steps after configuring an engine for testing. + """Perform extra steps after configuring the main engine for testing. (For the internal dialects, currently only used by sqlite, oracle, mssql) """ +@register.init +def post_configure_testing_engine(url, engine, options, scope): + """perform extra steps after configuring any engine within the + testing_engine() function. + + this includes the main engine as well as most ad-hoc testing engines. + + steps here should not get in the way of test cases that are looking + for events, etc. + + """ + + @register.init def follower_url_from_main(url, ident): """Create a connection URL for a dynamically-created test database. diff --git a/lib/sqlalchemy/testing/requirements.py b/lib/sqlalchemy/testing/requirements.py index e17c4aab67..b79d4b952f 100644 --- a/lib/sqlalchemy/testing/requirements.py +++ b/lib/sqlalchemy/testing/requirements.py @@ -1553,9 +1553,7 @@ class SuiteRequirements(Requirements): def ad_hoc_engines(self): """Test environment must allow ad-hoc engine/connection creation. - This is now a no-op since we reconfigured ``options.low_connections`` - to cause the ``testing_engine()`` to close off other open connections - when its invoked. + No longer used in any tests; is a no-op """ return exclusions.open() diff --git a/test/aaa_profiling/test_orm.py b/test/aaa_profiling/test_orm.py index 36f98d053d..98cc307f49 100644 --- a/test/aaa_profiling/test_orm.py +++ b/test/aaa_profiling/test_orm.py @@ -856,7 +856,7 @@ class JoinedEagerLoadTest(NoCache, fixtures.MappedTest): # this test has been reworked to use the compiled cache again, # as a real-world scenario. - eng = testing_engine(share_pool=True) + eng = testing_engine(options={"sqlite_share_pool": True}) sess = Session(eng) q = sess.query(A).options( diff --git a/test/dialect/postgresql/test_reflection.py b/test/dialect/postgresql/test_reflection.py index d5bf5ab266..4ede0f6a2e 100644 --- a/test/dialect/postgresql/test_reflection.py +++ b/test/dialect/postgresql/test_reflection.py @@ -56,16 +56,14 @@ from sqlalchemy.types import NullType class ReflectionFixtures: @testing.fixture( params=[ - ("engine", True), - ("connection", True), - ("engine", False), - ("connection", False), + "engine", + "connection", ] ) def inspect_fixture(self, request, metadata, testing_engine): - engine, future = request.param + engine = request.param - eng = testing_engine(future=future) + eng = testing_engine() conn = eng.connect() diff --git a/test/engine/test_deprecations.py b/test/engine/test_deprecations.py index a4a6f1f47c..a09669a713 100644 --- a/test/engine/test_deprecations.py +++ b/test/engine/test_deprecations.py @@ -359,7 +359,6 @@ class ResetEventTest(fixtures.TestBase): class EngineEventsTest(fixtures.TestBase): - __requires__ = ("ad_hoc_engines",) __backend__ = True def teardown_test(self): diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index 9c76c160b6..b07c0cc9a8 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -74,7 +74,6 @@ class Foo: class ExecuteTest(fixtures.TablesTest): - __backend__ = True @classmethod def define_tables(cls, metadata): @@ -93,22 +92,6 @@ class ExecuteTest(fixtures.TablesTest): Column("user_name", VARCHAR(20)), ) - def test_no_params_option(self): - stmt = ( - "SELECT '%'" - + testing.db.dialect.statement_compiler( - testing.db.dialect, None - ).default_from() - ) - - with testing.db.connect() as conn: - result = ( - conn.execution_options(no_parameters=True) - .exec_driver_sql(stmt) - .scalar() - ) - eq_(result, "%") - def test_no_strings(self, connection): with expect_raises_message( tsa.exc.ObjectNotExecutableError, @@ -157,904 +140,777 @@ class ExecuteTest(fixtures.TablesTest): name="sally", ) - @testing.requires.qmark_paramstyle - def test_raw_qmark(self, connection): - conn = connection - conn.exec_driver_sql( - "insert into users (user_id, user_name) values (?, ?)", - (1, "jack"), - ) - conn.exec_driver_sql( - "insert into users (user_id, user_name) values (?, ?)", - (2, "fred"), - ) - conn.exec_driver_sql( - "insert into users (user_id, user_name) values (?, ?)", - [(3, "ed"), (4, "horse")], - ) - conn.exec_driver_sql( - "insert into users (user_id, user_name) values (?, ?)", - [(5, "barney"), (6, "donkey")], - ) - conn.exec_driver_sql( - "insert into users (user_id, user_name) values (?, ?)", - (7, "sally"), - ) - res = conn.exec_driver_sql("select * from users order by user_id") - assert res.fetchall() == [ - (1, "jack"), - (2, "fred"), - (3, "ed"), - (4, "horse"), - (5, "barney"), - (6, "donkey"), - (7, "sally"), - ] + def test_dialect_has_table_assertion(self): + with expect_raises_message( + tsa.exc.ArgumentError, + r"The argument passed to Dialect.has_table\(\) should be a", + ): + testing.db.dialect.has_table(testing.db, "some_table") - res = conn.exec_driver_sql( - "select * from users where user_name=?", ("jack",) - ) - assert res.fetchall() == [(1, "jack")] + def test_not_an_executable(self): + for obj in ( + Table("foo", MetaData(), Column("x", Integer)), + Column("x", Integer), + tsa.and_(True), + tsa.and_(True).compile(), + column("foo"), + column("foo").compile(), + select(1).cte(), + # select(1).subquery(), + MetaData(), + Integer(), + tsa.Index(name="foo"), + tsa.UniqueConstraint("x"), + ): + with testing.db.connect() as conn: + assert_raises_message( + tsa.exc.ObjectNotExecutableError, + "Not an executable object", + conn.execute, + obj, + ) - @testing.requires.format_paramstyle - def test_raw_sprintf(self, connection): - conn = connection - conn.exec_driver_sql( - "insert into users (user_id, user_name) values (%s, %s)", - (1, "jack"), - ) - conn.exec_driver_sql( - "insert into users (user_id, user_name) values (%s, %s)", - [(2, "ed"), (3, "horse")], - ) - conn.exec_driver_sql( - "insert into users (user_id, user_name) values (%s, %s)", - (4, "sally"), - ) - conn.exec_driver_sql("insert into users (user_id) values (%s)", (5,)) - res = conn.exec_driver_sql("select * from users order by user_id") - assert res.fetchall() == [ - (1, "jack"), - (2, "ed"), - (3, "horse"), - (4, "sally"), - (5, None), - ] + def test_stmt_exception_bytestring_raised(self): + name = "méil" + users = self.tables.users + with testing.db.connect() as conn: + assert_raises_message( + tsa.exc.StatementError, + "A value is required for bind parameter 'uname'\n" + ".*SELECT users.user_name AS .méil.", + conn.execute, + select(users.c.user_name.label(name)).where( + users.c.user_name == bindparam("uname") + ), + {"uname_incorrect": "foo"}, + ) - res = conn.exec_driver_sql( - "select * from users where user_name=%s", ("jack",) - ) - assert res.fetchall() == [(1, "jack")] + def test_stmt_exception_bytestring_utf8(self): + # uncommon case for Py3K, bytestring object passed + # as the error message + message = "some message méil".encode() - @testing.requires.pyformat_paramstyle - def test_raw_python(self, connection): - conn = connection - conn.exec_driver_sql( - "insert into users (user_id, user_name) " - "values (%(id)s, %(name)s)", - {"id": 1, "name": "jack"}, - ) - conn.exec_driver_sql( - "insert into users (user_id, user_name) " - "values (%(id)s, %(name)s)", - [{"id": 2, "name": "ed"}, {"id": 3, "name": "horse"}], - ) - conn.exec_driver_sql( - "insert into users (user_id, user_name) " - "values (%(id)s, %(name)s)", - dict(id=4, name="sally"), - ) - res = conn.exec_driver_sql("select * from users order by user_id") - assert res.fetchall() == [ - (1, "jack"), - (2, "ed"), - (3, "horse"), - (4, "sally"), - ] + err = tsa.exc.SQLAlchemyError(message) + eq_(str(err), "some message méil") - @testing.requires.named_paramstyle - def test_raw_named(self, connection): - conn = connection - conn.exec_driver_sql( - "insert into users (user_id, user_name) values (:id, :name)", - {"id": 1, "name": "jack"}, - ) - conn.exec_driver_sql( - "insert into users (user_id, user_name) values (:id, :name)", - [{"id": 2, "name": "ed"}, {"id": 3, "name": "horse"}], - ) - conn.exec_driver_sql( - "insert into users (user_id, user_name) values (:id, :name)", - {"id": 4, "name": "sally"}, - ) - res = conn.exec_driver_sql("select * from users order by user_id") - assert res.fetchall() == [ - (1, "jack"), - (2, "ed"), - (3, "horse"), - (4, "sally"), - ] + def test_stmt_exception_bytestring_latin1(self): + # uncommon case for Py3K, bytestring object passed + # as the error message + message = "some message méil".encode("latin-1") - def test_raw_tuple_params(self, connection): - """test #7820 + err = tsa.exc.SQLAlchemyError(message) + eq_(str(err), "some message m\\xe9il") - There was an apparent improvement in the distill params - methodology used in exec_driver_sql which allows raw tuples to - pass through. In 1.4 there seems to be a _distill_cursor_params() - function that says it can handle this kind of parameter, but it isn't - used and when I tried to substitute it in for exec_driver_sql(), - things still fail. + def test_stmt_exception_unicode_hook_unicode(self): + # uncommon case for Py2K, Unicode object passed + # as the error message + message = "some message méil" - In any case, add coverage here for the use case of passing - direct tuple params to exec_driver_sql including as the first - param, to note that it isn't mis-interpreted the way it is - in 1.x. + err = tsa.exc.SQLAlchemyError(message) + eq_(str(err), "some message méil") - """ + def test_stmt_exception_object_arg(self): + err = tsa.exc.SQLAlchemyError(Foo()) + eq_(str(err), "foo") - with patch.object(connection.dialect, "do_execute") as do_exec: - connection.exec_driver_sql( - "UPDATE users SET user_name = 'query_one' WHERE " - "user_id = %s OR user_id IN %s", - (3, (1, 2)), - ) + def test_stmt_exception_str_multi_args(self): + err = tsa.exc.SQLAlchemyError("some message", 206) + eq_(str(err), "('some message', 206)") - connection.exec_driver_sql( - "UPDATE users SET user_name = 'query_two' WHERE " - "user_id IN %s OR user_id = %s", - ((1, 2), 3), - ) + def test_stmt_exception_str_multi_args_bytestring(self): + message = "some message méil".encode() - eq_( - do_exec.mock_calls, - [ - call( - mock.ANY, - "UPDATE users SET user_name = 'query_one' " - "WHERE user_id = %s OR user_id IN %s", - connection.dialect.execute_sequence_format((3, (1, 2))), - mock.ANY, - ), - call( - mock.ANY, - "UPDATE users SET user_name = 'query_two' " - "WHERE user_id IN %s OR user_id = %s", - connection.dialect.execute_sequence_format(((1, 2), 3)), - mock.ANY, - ), - ], - ) + err = tsa.exc.SQLAlchemyError(message, 206) + eq_(str(err), str((message, 206))) - def test_non_dict_mapping(self, connection): - """ensure arbitrary Mapping works for execute()""" + def test_stmt_exception_str_multi_args_unicode(self): + message = "some message méil" - class NotADict(collections_abc.Mapping): - def __init__(self, _data): - self._data = _data + err = tsa.exc.SQLAlchemyError(message, 206) + eq_(str(err), str((message, 206))) - def __iter__(self): - return iter(self._data) + def test_generative_engine_event_dispatch_hasevents(self, testing_engine): + def l1(*arg, **kw): + pass - def __len__(self): - return len(self._data) + eng = testing_engine() + assert not eng._has_events + event.listen(eng, "before_execute", l1) + eng2 = eng.execution_options(foo="bar") + assert eng2._has_events - def __getitem__(self, key): - return self._data[key] + def test_scalar(self, connection): + conn = connection + users = self.tables.users + conn.execute( + users.insert(), + [ + {"user_id": 1, "user_name": "sandy"}, + {"user_id": 2, "user_name": "spongebob"}, + ], + ) + res = conn.scalar(select(users.c.user_name).order_by(users.c.user_id)) + eq_(res, "sandy") - def keys(self): - return self._data.keys() + def test_scalars(self, connection): + conn = connection + users = self.tables.users + conn.execute( + users.insert(), + [ + {"user_id": 1, "user_name": "sandy"}, + {"user_id": 2, "user_name": "spongebob"}, + ], + ) + res = conn.scalars(select(users.c.user_name).order_by(users.c.user_id)) + eq_(res.all(), ["sandy", "spongebob"]) - nd = NotADict({"a": 10, "b": 15}) - eq_(dict(nd), {"a": 10, "b": 15}) + @testing.combinations( + ({"user_id": 1, "user_name": "name1"},), + ([{"user_id": 1, "user_name": "name1"}],), + (({"user_id": 1, "user_name": "name1"},),), + ( + [ + {"user_id": 1, "user_name": "name1"}, + {"user_id": 2, "user_name": "name2"}, + ], + ), + argnames="parameters", + ) + def test_params_interpretation(self, connection, parameters): + users = self.tables.users - result = connection.execute( - select( - bindparam("a", type_=Integer), bindparam("b", type_=Integer) - ), - nd, - ) - eq_(result.first(), (10, 15)) + connection.execute(users.insert(), parameters) - def test_row_works_as_mapping(self, connection): - """ensure the RowMapping object works as a parameter dictionary for - execute.""" - result = connection.execute( - select(literal(10).label("a"), literal(15).label("b")) - ) - row = result.first() - eq_(row, (10, 15)) - eq_(row._mapping, {"a": 10, "b": 15}) +class ConvenienceExecuteTest(fixtures.TablesTest): - result = connection.execute( - select( - bindparam("a", type_=Integer).label("a"), - bindparam("b", type_=Integer).label("b"), - ), - row._mapping, + @classmethod + def define_tables(cls, metadata): + cls.table = Table( + "exec_test", + metadata, + Column("a", Integer), + Column("b", Integer), + test_needs_acid=True, ) - row = result.first() - eq_(row, (10, 15)) - eq_(row._mapping, {"a": 10, "b": 15}) - def test_dialect_has_table_assertion(self): - with expect_raises_message( - tsa.exc.ArgumentError, - r"The argument passed to Dialect.has_table\(\) should be a", - ): - testing.db.dialect.has_table(testing.db, "some_table") + def _trans_fn(self, is_transaction=False): + def go(conn, x, value=None): + if is_transaction: + conn = conn.connection + conn.execute(self.table.insert().values(a=x, b=value)) - def test_exception_wrapping_dbapi(self): - with testing.db.connect() as conn: - assert_raises_message( - tsa.exc.DBAPIError, - r"not_a_valid_statement", - conn.exec_driver_sql, - "not_a_valid_statement", - ) + return go - def test_exception_wrapping_orig_accessors(self): - de = None + def _trans_rollback_fn(self, is_transaction=False): + def go(conn, x, value=None): + if is_transaction: + conn = conn.connection + conn.execute(self.table.insert().values(a=x, b=value)) + raise SomeException("breakage") + return go + + def _assert_no_data(self): with testing.db.connect() as conn: - try: - conn.exec_driver_sql("not_a_valid_statement") - except tsa.exc.DBAPIError as de_caught: - de = de_caught + eq_( + conn.scalar(select(func.count("*")).select_from(self.table)), + 0, + ) - assert isinstance(de.orig, conn.dialect.dbapi.Error) + def _assert_fn(self, x, value=None): + with testing.db.connect() as conn: + eq_(conn.execute(self.table.select()).fetchall(), [(x, value)]) - # get the driver module name, the one which we know will provide - # for exceptions - top_level_dbapi_module = conn.dialect.dbapi - if isinstance(top_level_dbapi_module, AsyncAdapt_dbapi_module): - driver_module = top_level_dbapi_module.exceptions_module - else: - driver_module = top_level_dbapi_module - top_level_dbapi_module = driver_module.__name__.split(".")[0] + def test_transaction_engine_ctx_commit(self): + fn = self._trans_fn() + ctx = testing.db.begin() + testing.run_as_contextmanager(ctx, fn, 5, value=8) + self._assert_fn(5, value=8) - # check that it's not us - ne_(top_level_dbapi_module, "sqlalchemy") + def test_transaction_engine_ctx_begin_fails_dont_enter_enter(self): + """test #7272""" + engine = engines.testing_engine() - # then make sure driver_exception is from that module - assert type(de.driver_exception).__module__.startswith( - top_level_dbapi_module + mock_connection = Mock( + return_value=Mock(begin=Mock(side_effect=Exception("boom"))) ) + with mock.patch.object(engine, "_connection_cls", mock_connection): + # context manager isn't entered, doesn't actually call + # connect() or connection.begin() + engine.begin() - @testing.requires.sqlite - def test_exception_wrapping_non_dbapi_error(self): - e = create_engine("sqlite://") - e.dialect.is_disconnect = is_disconnect = Mock() - - with e.connect() as c: - c.connection.cursor = Mock( - return_value=Mock( - execute=Mock( - side_effect=TypeError("I'm not a DBAPI error") - ) - ) - ) - assert_raises_message( - TypeError, - "I'm not a DBAPI error", - c.exec_driver_sql, - "select ", - ) - eq_(is_disconnect.call_count, 0) + eq_(mock_connection.return_value.close.mock_calls, []) - def test_exception_wrapping_non_standard_dbapi_error(self): - class DBAPIError(Exception): - pass + def test_transaction_engine_ctx_begin_fails_include_enter(self): + """test #7272 - class OperationalError(DBAPIError): - pass + Note this behavior for 2.0 required that we add a new flag to + Connection _allow_autobegin=False, so that the first-connect + initialization sequence in create.py does not actually run begin() + events. previously, the initialize sequence used a future=False + connection unconditionally (and I didn't notice this). - class NonStandardException(OperationalError): - pass + """ + engine = engines.testing_engine() - # TODO: this test is assuming too much of arbitrary dialects and would - # be better suited tested against a single mock dialect that does not - # have any special behaviors + close_mock = Mock() with ( - patch.object(testing.db.dialect, "dbapi", Mock(Error=DBAPIError)), - patch.object( - testing.db.dialect, "loaded_dbapi", Mock(Error=DBAPIError) - ), - patch.object( - testing.db.dialect, "is_disconnect", lambda *arg: False - ), - patch.object( - testing.db.dialect, - "do_execute", - Mock(side_effect=NonStandardException), - ), - patch.object( - testing.db.dialect.execution_ctx_cls, - "handle_dbapi_exception", - Mock(), + mock.patch.object( + engine._connection_cls, + "begin", + Mock(side_effect=Exception("boom")), ), + mock.patch.object(engine._connection_cls, "close", close_mock), ): - with testing.db.connect() as conn: - assert_raises( - tsa.exc.OperationalError, conn.exec_driver_sql, "select 1" - ) - - def test_exception_wrapping_non_dbapi_statement(self): - class MyType(TypeDecorator): - impl = Integer - cache_ok = True + with expect_raises_message(Exception, "boom"): + with engine.begin(): + pass - def process_bind_param(self, value, dialect): - raise SomeException("nope") + eq_(close_mock.mock_calls, [call()]) - def _go(conn): - assert_raises_message( - tsa.exc.StatementError, - r"\(.*.SomeException\) " r"nope\n\[SQL\: u?SELECT 1 ", - conn.execute, - select(1).where(column("foo") == literal("bar", MyType())), - ) + def test_transaction_engine_ctx_rollback(self): + fn = self._trans_rollback_fn() + ctx = testing.db.begin() + assert_raises_message( + Exception, + "breakage", + testing.run_as_contextmanager, + ctx, + fn, + 5, + value=8, + ) + self._assert_no_data() + def test_transaction_connection_ctx_commit(self): + fn = self._trans_fn(True) with testing.db.connect() as conn: - _go(conn) - - def test_not_an_executable(self): - for obj in ( - Table("foo", MetaData(), Column("x", Integer)), - Column("x", Integer), - tsa.and_(True), - tsa.and_(True).compile(), - column("foo"), - column("foo").compile(), - select(1).cte(), - # select(1).subquery(), - MetaData(), - Integer(), - tsa.Index(name="foo"), - tsa.UniqueConstraint("x"), - ): - with testing.db.connect() as conn: - assert_raises_message( - tsa.exc.ObjectNotExecutableError, - "Not an executable object", - conn.execute, - obj, - ) + ctx = conn.begin() + testing.run_as_contextmanager(ctx, fn, 5, value=8) + self._assert_fn(5, value=8) - def test_stmt_exception_bytestring_raised(self): - name = "méil" - users = self.tables.users + def test_transaction_connection_ctx_rollback(self): + fn = self._trans_rollback_fn(True) with testing.db.connect() as conn: + ctx = conn.begin() assert_raises_message( - tsa.exc.StatementError, - "A value is required for bind parameter 'uname'\n" - ".*SELECT users.user_name AS .méil.", - conn.execute, - select(users.c.user_name.label(name)).where( - users.c.user_name == bindparam("uname") - ), - {"uname_incorrect": "foo"}, + Exception, + "breakage", + testing.run_as_contextmanager, + ctx, + fn, + 5, + value=8, ) + self._assert_no_data() - def test_stmt_exception_bytestring_utf8(self): - # uncommon case for Py3K, bytestring object passed - # as the error message - message = "some message méil".encode() - - err = tsa.exc.SQLAlchemyError(message) - eq_(str(err), "some message méil") - - def test_stmt_exception_bytestring_latin1(self): - # uncommon case for Py3K, bytestring object passed - # as the error message - message = "some message méil".encode("latin-1") - - err = tsa.exc.SQLAlchemyError(message) - eq_(str(err), "some message m\\xe9il") - - def test_stmt_exception_unicode_hook_unicode(self): - # uncommon case for Py2K, Unicode object passed - # as the error message - message = "some message méil" - - err = tsa.exc.SQLAlchemyError(message) - eq_(str(err), "some message méil") - - def test_stmt_exception_object_arg(self): - err = tsa.exc.SQLAlchemyError(Foo()) - eq_(str(err), "foo") - - def test_stmt_exception_str_multi_args(self): - err = tsa.exc.SQLAlchemyError("some message", 206) - eq_(str(err), "('some message', 206)") - - def test_stmt_exception_str_multi_args_bytestring(self): - message = "some message méil".encode() - - err = tsa.exc.SQLAlchemyError(message, 206) - eq_(str(err), str((message, 206))) - - def test_stmt_exception_str_multi_args_unicode(self): - message = "some message méil" + def test_connection_as_ctx(self): + fn = self._trans_fn() + with testing.db.begin() as conn: + fn(conn, 5, value=8) + self._assert_fn(5, value=8) - err = tsa.exc.SQLAlchemyError(message, 206) - eq_(str(err), str((message, 206))) - def test_stmt_exception_pickleable_no_dbapi(self): - self._test_stmt_exception_pickleable(Exception("hello world")) - - @testing.crashes( - "postgresql+psycopg2", - "Older versions don't support cursor pickling, newer ones do", - ) - @testing.fails_on( - "+mysqlconnector", - "Exception doesn't come back exactly the same from pickle", - ) - @testing.fails_on( - "oracle+cx_oracle", - "cx_oracle exception seems to be having some issue with pickling", - ) - @testing.fails_on( - "oracle+oracledb", - "oracledb exception seems to be having some issue with pickling", - ) - def test_stmt_exception_pickleable_plus_dbapi(self): - raw = testing.db.raw_connection() - the_orig = None - try: - try: - cursor = raw.cursor() - cursor.execute("SELECTINCORRECT") - except testing.db.dialect.dbapi.Error as orig: - # py3k has "orig" in local scope... - the_orig = orig - finally: - raw.close() - self._test_stmt_exception_pickleable(the_orig) +class ExecuteDriverTest(fixtures.TablesTest): + __backend__ = True - def _test_stmt_exception_pickleable(self, orig): - for sa_exc in ( - tsa.exc.StatementError( - "some error", - "select * from table", - {"foo": "bar"}, - orig, - False, - ), - tsa.exc.InterfaceError( - "select * from table", {"foo": "bar"}, orig, True - ), - tsa.exc.NoReferencedTableError("message", "tname"), - tsa.exc.NoReferencedColumnError("message", "tname", "cname"), - tsa.exc.CircularDependencyError( - "some message", [1, 2, 3], [(1, 2), (3, 4)] + @classmethod + def define_tables(cls, metadata): + Table( + "users", + metadata, + Column("user_id", INT, primary_key=True, autoincrement=False), + Column("user_name", VARCHAR(20)), + ) + Table( + "users_autoinc", + metadata, + Column( + "user_id", INT, primary_key=True, test_needs_autoincrement=True ), - ): - for loads, dumps in picklers(): - repickled = loads(dumps(sa_exc)) - eq_(repickled.args[0], sa_exc.args[0]) - if isinstance(sa_exc, tsa.exc.StatementError): - eq_(repickled.params, {"foo": "bar"}) - eq_(repickled.statement, sa_exc.statement) - if hasattr(sa_exc, "connection_invalidated"): - eq_( - repickled.connection_invalidated, - sa_exc.connection_invalidated, - ) - eq_(repickled.orig.args[0], orig.args[0]) - - def test_dont_wrap_mixin(self): - class MyException(Exception, tsa.exc.DontWrapMixin): - pass - - class MyType(TypeDecorator): - impl = Integer - cache_ok = True - - def process_bind_param(self, value, dialect): - raise MyException("nope") - - def _go(conn): - assert_raises_message( - MyException, - "nope", - conn.execute, - select(1).where(column("foo") == literal("bar", MyType())), - ) - - conn = testing.db.connect() - try: - _go(conn) - finally: - conn.close() + Column("user_name", VARCHAR(20)), + ) - def test_empty_insert(self, connection): - """test that execute() interprets [] as a list with no params and - warns since it has nothing to do with such an executemany. - """ - users_autoinc = self.tables.users_autoinc + def test_no_params_option(self): + stmt = ( + "SELECT '%'" + + testing.db.dialect.statement_compiler( + testing.db.dialect, None + ).default_from() + ) - with expect_deprecated( - r"Empty parameter sequence passed to execute\(\). " - "This use is deprecated and will raise an exception in a " - "future SQLAlchemy release" - ): - connection.execute( - users_autoinc.insert().values( - user_name=bindparam("name", None) - ), - [], + with testing.db.connect() as conn: + result = ( + conn.execution_options(no_parameters=True) + .exec_driver_sql(stmt) + .scalar() ) + eq_(result, "%") - eq_(len(connection.execute(users_autoinc.select()).all()), 1) - - @testing.only_on("sqlite") - def test_raw_insert_with_empty_list(self, connection): - """exec_driver_sql instead does not raise if an empty list is passed. - Let the driver do that if it wants to. - """ + @testing.requires.qmark_paramstyle + def test_raw_qmark(self, connection): conn = connection - with expect_raises_message( - tsa.exc.ProgrammingError, "Incorrect number of bindings supplied" - ): - conn.exec_driver_sql( - "insert into users (user_id, user_name) values (?, ?)", [] - ) - - @testing.requires.ad_hoc_engines - def test_engine_level_options(self): - eng = engines.testing_engine( - options={"execution_options": {"foo": "bar"}} + conn.exec_driver_sql( + "insert into users (user_id, user_name) values (?, ?)", + (1, "jack"), ) - with eng.connect() as conn: - eq_(conn._execution_options["foo"], "bar") - eq_( - conn.execution_options(bat="hoho")._execution_options["foo"], - "bar", - ) - eq_( - conn.execution_options(bat="hoho")._execution_options["bat"], - "hoho", - ) - eq_( - conn.execution_options(foo="hoho")._execution_options["foo"], - "hoho", - ) - eng.update_execution_options(foo="hoho") - conn = eng.connect() - eq_(conn._execution_options["foo"], "hoho") - - @testing.requires.ad_hoc_engines - def test_generative_engine_execution_options(self): - eng = engines.testing_engine( - options={"execution_options": {"base": "x1"}} + conn.exec_driver_sql( + "insert into users (user_id, user_name) values (?, ?)", + (2, "fred"), ) + conn.exec_driver_sql( + "insert into users (user_id, user_name) values (?, ?)", + [(3, "ed"), (4, "horse")], + ) + conn.exec_driver_sql( + "insert into users (user_id, user_name) values (?, ?)", + [(5, "barney"), (6, "donkey")], + ) + conn.exec_driver_sql( + "insert into users (user_id, user_name) values (?, ?)", + (7, "sally"), + ) + res = conn.exec_driver_sql("select * from users order by user_id") + assert res.fetchall() == [ + (1, "jack"), + (2, "fred"), + (3, "ed"), + (4, "horse"), + (5, "barney"), + (6, "donkey"), + (7, "sally"), + ] - is_(eng.engine, eng) - - eng1 = eng.execution_options(foo="b1") - is_(eng1.engine, eng1) - eng2 = eng.execution_options(foo="b2") - eng1a = eng1.execution_options(bar="a1") - eng2a = eng2.execution_options(foo="b3", bar="a2") - is_(eng2a.engine, eng2a) - - eq_(eng._execution_options, {"base": "x1"}) - eq_(eng1._execution_options, {"base": "x1", "foo": "b1"}) - eq_(eng2._execution_options, {"base": "x1", "foo": "b2"}) - eq_(eng1a._execution_options, {"base": "x1", "foo": "b1", "bar": "a1"}) - eq_(eng2a._execution_options, {"base": "x1", "foo": "b3", "bar": "a2"}) - is_(eng1a.pool, eng.pool) + res = conn.exec_driver_sql( + "select * from users where user_name=?", ("jack",) + ) + assert res.fetchall() == [(1, "jack")] - # test pool is shared - eng2.dispose() - is_(eng1a.pool, eng2.pool) - is_(eng.pool, eng2.pool) + @testing.requires.format_paramstyle + def test_raw_sprintf(self, connection): + conn = connection + conn.exec_driver_sql( + "insert into users (user_id, user_name) values (%s, %s)", + (1, "jack"), + ) + conn.exec_driver_sql( + "insert into users (user_id, user_name) values (%s, %s)", + [(2, "ed"), (3, "horse")], + ) + conn.exec_driver_sql( + "insert into users (user_id, user_name) values (%s, %s)", + (4, "sally"), + ) + conn.exec_driver_sql("insert into users (user_id) values (%s)", (5,)) + res = conn.exec_driver_sql("select * from users order by user_id") + assert res.fetchall() == [ + (1, "jack"), + (2, "ed"), + (3, "horse"), + (4, "sally"), + (5, None), + ] - @testing.requires.ad_hoc_engines - def test_autocommit_option_no_issue_first_connect(self): - eng = create_engine(testing.db.url) - eng.update_execution_options(autocommit=True) - conn = eng.connect() - eq_(conn._execution_options, {"autocommit": True}) - conn.close() + res = conn.exec_driver_sql( + "select * from users where user_name=%s", ("jack",) + ) + assert res.fetchall() == [(1, "jack")] - def test_initialize_rollback(self): - """test a rollback happens during first connect""" - eng = create_engine(testing.db.url) - with patch.object(eng.dialect, "do_rollback") as do_rollback: - assert do_rollback.call_count == 0 - connection = eng.connect() - assert do_rollback.call_count == 1 - connection.close() + @testing.requires.pyformat_paramstyle + def test_raw_python(self, connection): + conn = connection + conn.exec_driver_sql( + "insert into users (user_id, user_name) " + "values (%(id)s, %(name)s)", + {"id": 1, "name": "jack"}, + ) + conn.exec_driver_sql( + "insert into users (user_id, user_name) " + "values (%(id)s, %(name)s)", + [{"id": 2, "name": "ed"}, {"id": 3, "name": "horse"}], + ) + conn.exec_driver_sql( + "insert into users (user_id, user_name) " + "values (%(id)s, %(name)s)", + dict(id=4, name="sally"), + ) + res = conn.exec_driver_sql("select * from users order by user_id") + assert res.fetchall() == [ + (1, "jack"), + (2, "ed"), + (3, "horse"), + (4, "sally"), + ] - @testing.requires.ad_hoc_engines - def test_dialect_init_uses_options(self): - eng = create_engine(testing.db.url) + @testing.requires.named_paramstyle + def test_raw_named(self, connection): + conn = connection + conn.exec_driver_sql( + "insert into users (user_id, user_name) values (:id, :name)", + {"id": 1, "name": "jack"}, + ) + conn.exec_driver_sql( + "insert into users (user_id, user_name) values (:id, :name)", + [{"id": 2, "name": "ed"}, {"id": 3, "name": "horse"}], + ) + conn.exec_driver_sql( + "insert into users (user_id, user_name) values (:id, :name)", + {"id": 4, "name": "sally"}, + ) + res = conn.exec_driver_sql("select * from users order by user_id") + assert res.fetchall() == [ + (1, "jack"), + (2, "ed"), + (3, "horse"), + (4, "sally"), + ] - def my_init(connection): - connection.execution_options(foo="bar").execute(select(1)) + def test_raw_tuple_params(self, connection): + """test #7820 - with patch.object(eng.dialect, "initialize", my_init): - conn = eng.connect() - eq_(conn._execution_options, {}) - conn.close() + There was an apparent improvement in the distill params + methodology used in exec_driver_sql which allows raw tuples to + pass through. In 1.4 there seems to be a _distill_cursor_params() + function that says it can handle this kind of parameter, but it isn't + used and when I tried to substitute it in for exec_driver_sql(), + things still fail. - @testing.requires.ad_hoc_engines - def test_generative_engine_event_dispatch_hasevents(self): - def l1(*arg, **kw): - pass + In any case, add coverage here for the use case of passing + direct tuple params to exec_driver_sql including as the first + param, to note that it isn't mis-interpreted the way it is + in 1.x. - eng = create_engine(testing.db.url) - assert not eng._has_events - event.listen(eng, "before_execute", l1) - eng2 = eng.execution_options(foo="bar") - assert eng2._has_events + """ - def test_works_after_dispose(self): - eng = create_engine(testing.db.url) - for i in range(3): - with eng.connect() as conn: - eq_(conn.scalar(select(1)), 1) - eng.dispose() + with patch.object(connection.dialect, "do_execute") as do_exec: + connection.exec_driver_sql( + "UPDATE users SET user_name = 'query_one' WHERE " + "user_id = %s OR user_id IN %s", + (3, (1, 2)), + ) - def test_works_after_dispose_testing_engine(self): - eng = engines.testing_engine() - for i in range(3): - with eng.connect() as conn: - eq_(conn.scalar(select(1)), 1) - eng.dispose() + connection.exec_driver_sql( + "UPDATE users SET user_name = 'query_two' WHERE " + "user_id IN %s OR user_id = %s", + ((1, 2), 3), + ) - def test_scalar(self, connection): - conn = connection - users = self.tables.users - conn.execute( - users.insert(), + eq_( + do_exec.mock_calls, [ - {"user_id": 1, "user_name": "sandy"}, - {"user_id": 2, "user_name": "spongebob"}, + call( + mock.ANY, + "UPDATE users SET user_name = 'query_one' " + "WHERE user_id = %s OR user_id IN %s", + connection.dialect.execute_sequence_format((3, (1, 2))), + mock.ANY, + ), + call( + mock.ANY, + "UPDATE users SET user_name = 'query_two' " + "WHERE user_id IN %s OR user_id = %s", + connection.dialect.execute_sequence_format(((1, 2), 3)), + mock.ANY, + ), ], ) - res = conn.scalar(select(users.c.user_name).order_by(users.c.user_id)) - eq_(res, "sandy") - def test_scalars(self, connection): - conn = connection - users = self.tables.users - conn.execute( - users.insert(), - [ - {"user_id": 1, "user_name": "sandy"}, - {"user_id": 2, "user_name": "spongebob"}, - ], - ) - res = conn.scalars(select(users.c.user_name).order_by(users.c.user_id)) - eq_(res.all(), ["sandy", "spongebob"]) + def test_non_dict_mapping(self, connection): + """ensure arbitrary Mapping works for execute()""" - @testing.combinations( - ({}, {}, {}), - ({"a": "b"}, {}, {"a": "b"}), - ({"a": "b", "d": "e"}, {"a": "c"}, {"a": "c", "d": "e"}), - argnames="conn_opts, exec_opts, expected", - ) - def test_execution_opts_per_invoke( - self, connection, conn_opts, exec_opts, expected - ): - opts = [] + class NotADict(collections_abc.Mapping): + def __init__(self, _data): + self._data = _data - @event.listens_for(connection, "before_cursor_execute") - def before_cursor_execute( - conn, cursor, statement, parameters, context, executemany - ): - opts.append(context.execution_options) + def __iter__(self): + return iter(self._data) - if conn_opts: - connection = connection.execution_options(**conn_opts) + def __len__(self): + return len(self._data) - if exec_opts: - connection.execute(select(1), execution_options=exec_opts) - else: - connection.execute(select(1)) + def __getitem__(self, key): + return self._data[key] - eq_(opts, [expected]) + def keys(self): + return self._data.keys() - @testing.combinations( - ({}, {}, {}, {}), - ({}, {"a": "b"}, {}, {"a": "b"}), - ({}, {"a": "b", "d": "e"}, {"a": "c"}, {"a": "c", "d": "e"}), - ( - {"q": "z", "p": "r"}, - {"a": "b", "p": "x", "d": "e"}, - {"a": "c"}, - {"q": "z", "p": "x", "a": "c", "d": "e"}, - ), - argnames="stmt_opts, conn_opts, exec_opts, expected", - ) - def test_execution_opts_per_invoke_execute_events( - self, connection, stmt_opts, conn_opts, exec_opts, expected - ): - opts = [] + nd = NotADict({"a": 10, "b": 15}) + eq_(dict(nd), {"a": 10, "b": 15}) + + result = connection.execute( + select( + bindparam("a", type_=Integer), bindparam("b", type_=Integer) + ), + nd, + ) + eq_(result.first(), (10, 15)) + + def test_row_works_as_mapping(self, connection): + """ensure the RowMapping object works as a parameter dictionary for + execute.""" + + result = connection.execute( + select(literal(10).label("a"), literal(15).label("b")) + ) + row = result.first() + eq_(row, (10, 15)) + eq_(row._mapping, {"a": 10, "b": 15}) - @event.listens_for(connection, "before_execute") - def before_execute( - conn, clauseelement, multiparams, params, execution_options - ): - opts.append(("before", execution_options)) + result = connection.execute( + select( + bindparam("a", type_=Integer).label("a"), + bindparam("b", type_=Integer).label("b"), + ), + row._mapping, + ) + row = result.first() + eq_(row, (10, 15)) + eq_(row._mapping, {"a": 10, "b": 15}) - @event.listens_for(connection, "after_execute") - def after_execute( - conn, - clauseelement, - multiparams, - params, - execution_options, - result, - ): - opts.append(("after", execution_options)) + def test_exception_wrapping_dbapi(self): + with testing.db.connect() as conn: + assert_raises_message( + tsa.exc.DBAPIError, + r"not_a_valid_statement", + conn.exec_driver_sql, + "not_a_valid_statement", + ) - stmt = select(1) + def test_exception_wrapping_orig_accessors(self): + de = None - if stmt_opts: - stmt = stmt.execution_options(**stmt_opts) + with testing.db.connect() as conn: + try: + conn.exec_driver_sql("not_a_valid_statement") + except tsa.exc.DBAPIError as de_caught: + de = de_caught - if conn_opts: - connection = connection.execution_options(**conn_opts) + assert isinstance(de.orig, conn.dialect.dbapi.Error) - if exec_opts: - connection.execute(stmt, execution_options=exec_opts) + # get the driver module name, the one which we know will provide + # for exceptions + top_level_dbapi_module = conn.dialect.dbapi + if isinstance(top_level_dbapi_module, AsyncAdapt_dbapi_module): + driver_module = top_level_dbapi_module.exceptions_module else: - connection.execute(stmt) + driver_module = top_level_dbapi_module + top_level_dbapi_module = driver_module.__name__.split(".")[0] - eq_(opts, [("before", expected), ("after", expected)]) + # check that it's not us + ne_(top_level_dbapi_module, "sqlalchemy") - @testing.combinations( - ({"user_id": 1, "user_name": "name1"},), - ([{"user_id": 1, "user_name": "name1"}],), - (({"user_id": 1, "user_name": "name1"},),), - ( - [ - {"user_id": 1, "user_name": "name1"}, - {"user_id": 2, "user_name": "name2"}, - ], - ), - argnames="parameters", - ) - def test_params_interpretation(self, connection, parameters): - users = self.tables.users + # then make sure driver_exception is from that module + assert type(de.driver_exception).__module__.startswith( + top_level_dbapi_module + ) - connection.execute(users.insert(), parameters) + @testing.requires.sqlite + def test_exception_wrapping_non_dbapi_error(self): + e = create_engine("sqlite://") + e.dialect.is_disconnect = is_disconnect = Mock() + with e.connect() as c: + c.connection.cursor = Mock( + return_value=Mock( + execute=Mock( + side_effect=TypeError("I'm not a DBAPI error") + ) + ) + ) + assert_raises_message( + TypeError, + "I'm not a DBAPI error", + c.exec_driver_sql, + "select ", + ) + eq_(is_disconnect.call_count, 0) -class ConvenienceExecuteTest(fixtures.TablesTest): - __sparse_driver_backend__ = True + def test_exception_wrapping_non_standard_dbapi_error(self): + class DBAPIError(Exception): + pass - @classmethod - def define_tables(cls, metadata): - cls.table = Table( - "exec_test", - metadata, - Column("a", Integer), - Column("b", Integer), - test_needs_acid=True, - ) + class OperationalError(DBAPIError): + pass - def _trans_fn(self, is_transaction=False): - def go(conn, x, value=None): - if is_transaction: - conn = conn.connection - conn.execute(self.table.insert().values(a=x, b=value)) + class NonStandardException(OperationalError): + pass - return go + # TODO: this test is assuming too much of arbitrary dialects and would + # be better suited tested against a single mock dialect that does not + # have any special behaviors + with ( + patch.object(testing.db.dialect, "dbapi", Mock(Error=DBAPIError)), + patch.object( + testing.db.dialect, "loaded_dbapi", Mock(Error=DBAPIError) + ), + patch.object( + testing.db.dialect, "is_disconnect", lambda *arg: False + ), + patch.object( + testing.db.dialect, + "do_execute", + Mock(side_effect=NonStandardException), + ), + patch.object( + testing.db.dialect.execution_ctx_cls, + "handle_dbapi_exception", + Mock(), + ), + ): + with testing.db.connect() as conn: + assert_raises( + tsa.exc.OperationalError, conn.exec_driver_sql, "select 1" + ) - def _trans_rollback_fn(self, is_transaction=False): - def go(conn, x, value=None): - if is_transaction: - conn = conn.connection - conn.execute(self.table.insert().values(a=x, b=value)) - raise SomeException("breakage") + def test_exception_wrapping_non_dbapi_statement(self): + class MyType(TypeDecorator): + impl = Integer + cache_ok = True - return go + def process_bind_param(self, value, dialect): + raise SomeException("nope") - def _assert_no_data(self): - with testing.db.connect() as conn: - eq_( - conn.scalar(select(func.count("*")).select_from(self.table)), - 0, + def _go(conn): + assert_raises_message( + tsa.exc.StatementError, + r"\(.*.SomeException\) " r"nope\n\[SQL\: u?SELECT 1 ", + conn.execute, + select(1).where(column("foo") == literal("bar", MyType())), ) - def _assert_fn(self, x, value=None): with testing.db.connect() as conn: - eq_(conn.execute(self.table.select()).fetchall(), [(x, value)]) + _go(conn) - def test_transaction_engine_ctx_commit(self): - fn = self._trans_fn() - ctx = testing.db.begin() - testing.run_as_contextmanager(ctx, fn, 5, value=8) - self._assert_fn(5, value=8) + def test_stmt_exception_pickleable_no_dbapi(self): + self._test_stmt_exception_pickleable(Exception("hello world")) - def test_transaction_engine_ctx_begin_fails_dont_enter_enter(self): - """test #7272""" - engine = engines.testing_engine() + @testing.crashes( + "postgresql+psycopg2", + "Older versions don't support cursor pickling, newer ones do", + ) + @testing.fails_on( + "+mysqlconnector", + "Exception doesn't come back exactly the same from pickle", + ) + @testing.fails_on( + "oracle+cx_oracle", + "cx_oracle exception seems to be having some issue with pickling", + ) + @testing.fails_on( + "oracle+oracledb", + "oracledb exception seems to be having some issue with pickling", + ) + def test_stmt_exception_pickleable_plus_dbapi(self): + raw = testing.db.raw_connection() + the_orig = None + try: + try: + cursor = raw.cursor() + cursor.execute("SELECTINCORRECT") + except testing.db.dialect.dbapi.Error as orig: + # py3k has "orig" in local scope... + the_orig = orig + finally: + raw.close() + self._test_stmt_exception_pickleable(the_orig) - mock_connection = Mock( - return_value=Mock(begin=Mock(side_effect=Exception("boom"))) - ) - with mock.patch.object(engine, "_connection_cls", mock_connection): - # context manager isn't entered, doesn't actually call - # connect() or connection.begin() - engine.begin() + def _test_stmt_exception_pickleable(self, orig): + for sa_exc in ( + tsa.exc.StatementError( + "some error", + "select * from table", + {"foo": "bar"}, + orig, + False, + ), + tsa.exc.InterfaceError( + "select * from table", {"foo": "bar"}, orig, True + ), + tsa.exc.NoReferencedTableError("message", "tname"), + tsa.exc.NoReferencedColumnError("message", "tname", "cname"), + tsa.exc.CircularDependencyError( + "some message", [1, 2, 3], [(1, 2), (3, 4)] + ), + ): + for loads, dumps in picklers(): + repickled = loads(dumps(sa_exc)) + eq_(repickled.args[0], sa_exc.args[0]) + if isinstance(sa_exc, tsa.exc.StatementError): + eq_(repickled.params, {"foo": "bar"}) + eq_(repickled.statement, sa_exc.statement) + if hasattr(sa_exc, "connection_invalidated"): + eq_( + repickled.connection_invalidated, + sa_exc.connection_invalidated, + ) + eq_(repickled.orig.args[0], orig.args[0]) + + def test_dont_wrap_mixin(self): + class MyException(Exception, tsa.exc.DontWrapMixin): + pass + + class MyType(TypeDecorator): + impl = Integer + cache_ok = True - eq_(mock_connection.return_value.close.mock_calls, []) + def process_bind_param(self, value, dialect): + raise MyException("nope") - def test_transaction_engine_ctx_begin_fails_include_enter(self): - """test #7272 + def _go(conn): + assert_raises_message( + MyException, + "nope", + conn.execute, + select(1).where(column("foo") == literal("bar", MyType())), + ) - Note this behavior for 2.0 required that we add a new flag to - Connection _allow_autobegin=False, so that the first-connect - initialization sequence in create.py does not actually run begin() - events. previously, the initialize sequence used a future=False - connection unconditionally (and I didn't notice this). + conn = testing.db.connect() + try: + _go(conn) + finally: + conn.close() + def test_empty_insert(self, connection): + """test that execute() interprets [] as a list with no params and + warns since it has nothing to do with such an executemany. """ - engine = engines.testing_engine() + users_autoinc = self.tables.users_autoinc - close_mock = Mock() - with ( - mock.patch.object( - engine._connection_cls, - "begin", - Mock(side_effect=Exception("boom")), - ), - mock.patch.object(engine._connection_cls, "close", close_mock), + with expect_deprecated( + r"Empty parameter sequence passed to execute\(\). " + "This use is deprecated and will raise an exception in a " + "future SQLAlchemy release" ): - with expect_raises_message(Exception, "boom"): - with engine.begin(): - pass - - eq_(close_mock.mock_calls, [call()]) - - def test_transaction_engine_ctx_rollback(self): - fn = self._trans_rollback_fn() - ctx = testing.db.begin() - assert_raises_message( - Exception, - "breakage", - testing.run_as_contextmanager, - ctx, - fn, - 5, - value=8, - ) - self._assert_no_data() + connection.execute( + users_autoinc.insert().values( + user_name=bindparam("name", None) + ), + [], + ) - def test_transaction_connection_ctx_commit(self): - fn = self._trans_fn(True) - with testing.db.connect() as conn: - ctx = conn.begin() - testing.run_as_contextmanager(ctx, fn, 5, value=8) - self._assert_fn(5, value=8) + eq_(len(connection.execute(users_autoinc.select()).all()), 1) - def test_transaction_connection_ctx_rollback(self): - fn = self._trans_rollback_fn(True) - with testing.db.connect() as conn: - ctx = conn.begin() - assert_raises_message( - Exception, - "breakage", - testing.run_as_contextmanager, - ctx, - fn, - 5, - value=8, + @testing.only_on("sqlite") + def test_raw_insert_with_empty_list(self, connection): + """exec_driver_sql instead does not raise if an empty list is passed. + Let the driver do that if it wants to. + """ + conn = connection + with expect_raises_message( + tsa.exc.ProgrammingError, "Incorrect number of bindings supplied" + ): + conn.exec_driver_sql( + "insert into users (user_id, user_name) values (?, ?)", [] ) - self._assert_no_data() - def test_connection_as_ctx(self): - fn = self._trans_fn() - with testing.db.begin() as conn: - fn(conn, 5, value=8) - self._assert_fn(5, value=8) + def test_works_after_dispose_testing_engine(self): + eng = engines.testing_engine() + for i in range(3): + with eng.connect() as conn: + eq_(conn.scalar(select(1)), 1) + eng.dispose() class CompiledCacheTest(fixtures.TestBase): @@ -1653,6 +1509,157 @@ class SchemaTranslateTest(fixtures.TestBase, testing.AssertsExecutionResults): class ExecutionOptionsTest(fixtures.TestBase): + def test_engine_level_options(self): + eng = engines.testing_engine( + options={"execution_options": {"foo": "bar"}} + ) + with eng.connect() as conn: + eq_(conn._execution_options["foo"], "bar") + eq_( + conn.execution_options(bat="hoho")._execution_options["foo"], + "bar", + ) + eq_( + conn.execution_options(bat="hoho")._execution_options["bat"], + "hoho", + ) + eq_( + conn.execution_options(foo="hoho")._execution_options["foo"], + "hoho", + ) + eng.update_execution_options(foo="hoho") + conn = eng.connect() + eq_(conn._execution_options["foo"], "hoho") + + def test_generative_engine_execution_options(self): + eng = engines.testing_engine( + options={"execution_options": {"base": "x1"}} + ) + + is_(eng.engine, eng) + + eng1 = eng.execution_options(foo="b1") + is_(eng1.engine, eng1) + eng2 = eng.execution_options(foo="b2") + eng1a = eng1.execution_options(bar="a1") + eng2a = eng2.execution_options(foo="b3", bar="a2") + is_(eng2a.engine, eng2a) + + eq_(eng._execution_options, {"base": "x1"}) + eq_(eng1._execution_options, {"base": "x1", "foo": "b1"}) + eq_(eng2._execution_options, {"base": "x1", "foo": "b2"}) + eq_(eng1a._execution_options, {"base": "x1", "foo": "b1", "bar": "a1"}) + eq_(eng2a._execution_options, {"base": "x1", "foo": "b3", "bar": "a2"}) + is_(eng1a.pool, eng.pool) + + # test pool is shared + eng2.dispose() + is_(eng1a.pool, eng2.pool) + is_(eng.pool, eng2.pool) + + def test_autocommit_option_preserved_first_connect(self, testing_engine): + eng = testing_engine() + eng.update_execution_options(autocommit=True) + conn = eng.connect() + eq_(conn._execution_options, {"autocommit": True}) + conn.close() + + def test_initialize_rollback(self, testing_engine): + """test a rollback happens during first connect""" + eng = testing_engine() + with patch.object(eng.dialect, "do_rollback") as do_rollback: + assert do_rollback.call_count == 0 + connection = eng.connect() + assert do_rollback.call_count == 1 + connection.close() + + def test_dialect_init_uses_options(self, testing_engine): + eng = testing_engine() + + def my_init(connection): + connection.execution_options(foo="bar").execute(select(1)) + + with patch.object(eng.dialect, "initialize", my_init): + conn = eng.connect() + eq_(conn._execution_options, {}) + conn.close() + + @testing.combinations( + ({}, {}, {}), + ({"a": "b"}, {}, {"a": "b"}), + ({"a": "b", "d": "e"}, {"a": "c"}, {"a": "c", "d": "e"}), + argnames="conn_opts, exec_opts, expected", + ) + def test_execution_opts_per_invoke( + self, connection, conn_opts, exec_opts, expected + ): + opts = [] + + @event.listens_for(connection, "before_cursor_execute") + def before_cursor_execute( + conn, cursor, statement, parameters, context, executemany + ): + opts.append(context.execution_options) + + if conn_opts: + connection = connection.execution_options(**conn_opts) + + if exec_opts: + connection.execute(select(1), execution_options=exec_opts) + else: + connection.execute(select(1)) + + eq_(opts, [expected]) + + @testing.combinations( + ({}, {}, {}, {}), + ({}, {"a": "b"}, {}, {"a": "b"}), + ({}, {"a": "b", "d": "e"}, {"a": "c"}, {"a": "c", "d": "e"}), + ( + {"q": "z", "p": "r"}, + {"a": "b", "p": "x", "d": "e"}, + {"a": "c"}, + {"q": "z", "p": "x", "a": "c", "d": "e"}, + ), + argnames="stmt_opts, conn_opts, exec_opts, expected", + ) + def test_execution_opts_per_invoke_execute_events( + self, connection, stmt_opts, conn_opts, exec_opts, expected + ): + opts = [] + + @event.listens_for(connection, "before_execute") + def before_execute( + conn, clauseelement, multiparams, params, execution_options + ): + opts.append(("before", execution_options)) + + @event.listens_for(connection, "after_execute") + def after_execute( + conn, + clauseelement, + multiparams, + params, + execution_options, + result, + ): + opts.append(("after", execution_options)) + + stmt = select(1) + + if stmt_opts: + stmt = stmt.execution_options(**stmt_opts) + + if conn_opts: + connection = connection.execution_options(**conn_opts) + + if exec_opts: + connection.execute(stmt, execution_options=exec_opts) + else: + connection.execute(stmt) + + eq_(opts, [("before", expected), ("after", expected)]) + def test_dialect_conn_options(self, testing_engine): engine = testing_engine("sqlite://", options=dict(_initialize=False)) engine.dialect = Mock() @@ -1719,8 +1726,6 @@ class ExecutionOptionsTest(fixtures.TestBase): class EngineEventsTest(fixtures.TestBase): - __requires__ = ("ad_hoc_engines",) - __sparse_driver_backend__ = True def teardown_test(self): Engine.dispatch._clear() @@ -1808,7 +1813,6 @@ class EngineEventsTest(fixtures.TestBase): eq_(canary.be2.call_count, 1) eq_(canary.be3.call_count, 2) - @testing.requires.ad_hoc_engines def test_option_engine_registration_issue_one(self): """test #12289""" @@ -1821,7 +1825,6 @@ class EngineEventsTest(fixtures.TestBase): {"foo": "bar", "isolation_level": "AUTOCOMMIT"}, ) - @testing.requires.ad_hoc_engines def test_option_engine_registration_issue_two(self): """test #12289""" @@ -2009,7 +2012,7 @@ class EngineEventsTest(fixtures.TestBase): # new feature as of #2978 canary = Mock() - e1 = testing_engine(config.db_url, future=False) + e1 = testing_engine(config.db_url) assert not e1._has_events conn = e1.connect() @@ -2021,7 +2024,7 @@ class EngineEventsTest(fixtures.TestBase): def test_force_conn_events_false(self, testing_engine): canary = Mock() - e1 = testing_engine(config.db_url, future=False) + e1 = testing_engine(config.db_url) assert not e1._has_events event.listen(e1, "before_execute", canary.be1) @@ -2344,7 +2347,6 @@ class EngineEventsTest(fixtures.TestBase): eq_(c3._execution_options, {"foo": "bar", "bar": "bat"}) eq_(canary, ["execute", "cursor_execute"]) - @testing.requires.ad_hoc_engines def test_generative_engine_event_dispatch(self): canary = [] @@ -2376,7 +2378,6 @@ class EngineEventsTest(fixtures.TestBase): eq_(canary, ["l1", "l2", "l3", "l1", "l2"]) - @testing.requires.ad_hoc_engines def test_clslevel_engine_event_options(self): canary = [] @@ -2423,7 +2424,6 @@ class EngineEventsTest(fixtures.TestBase): conn.execute(select(1)) eq_(canary, ["l2"]) - @testing.requires.ad_hoc_engines def test_cant_listen_to_option_engine(self): from sqlalchemy.engine import base @@ -2440,7 +2440,6 @@ class EngineEventsTest(fixtures.TestBase): evt, ) - @testing.requires.ad_hoc_engines def test_dispose_event(self, testing_engine): canary = Mock() eng = testing_engine(testing.db.url) @@ -2459,7 +2458,6 @@ class EngineEventsTest(fixtures.TestBase): eq_(canary.mock_calls, [call(eng), call(eng)]) - @testing.requires.ad_hoc_engines @testing.combinations(True, False, argnames="close") def test_close_parameter(self, testing_engine, close): eng = testing_engine( @@ -2841,7 +2839,6 @@ class EngineEventsTest(fixtures.TestBase): class HandleErrorTest(fixtures.TestBase): - __requires__ = ("ad_hoc_engines",) __sparse_driver_backend__ = True def teardown_test(self): diff --git a/test/engine/test_logging.py b/test/engine/test_logging.py index ae528d1255..e47521d6c1 100644 --- a/test/engine/test_logging.py +++ b/test/engine/test_logging.py @@ -29,7 +29,6 @@ def exec_sql(engine, sql, *args, **kwargs): class LogParamsTest(fixtures.TestBase): __only_on__ = "sqlite+pysqlite" - __requires__ = ("ad_hoc_engines",) def setup_test(self): self.eng = engines.testing_engine( @@ -611,7 +610,6 @@ class PoolLoggingTest(fixtures.TestBase): class LoggingNameTest(fixtures.TestBase): - __requires__ = ("ad_hoc_engines",) def _assert_names_in_execute(self, eng, eng_name, pool_name): with eng.connect() as conn: @@ -1059,7 +1057,6 @@ class LoggingTokenTest(fixtures.TestBase): class EchoTest(fixtures.TestBase): - __requires__ = ("ad_hoc_engines",) def setup_test(self): self.level = logging.getLogger("sqlalchemy.engine").level diff --git a/test/engine/test_reconnect.py b/test/engine/test_reconnect.py index e1515a23a8..f747d9ee05 100644 --- a/test/engine/test_reconnect.py +++ b/test/engine/test_reconnect.py @@ -1013,7 +1013,7 @@ class RealPrePingEventHandlerTest(fixtures.TestBase): """ __backend__ = True - __requires__ = "graceful_disconnects", "ad_hoc_engines" + __requires__ = ("graceful_disconnects",) @testing.fixture def ping_fixture(self, testing_engine): @@ -1155,7 +1155,7 @@ class RealPrePingEventHandlerTest(fixtures.TestBase): class RealReconnectTest(fixtures.TestBase): __backend__ = True - __requires__ = "graceful_disconnects", "ad_hoc_engines" + __requires__ = ("graceful_disconnects",) def setup_test(self): self.engine = engines.reconnecting_engine() @@ -1488,9 +1488,6 @@ class PrePingRealTest(fixtures.TestBase): class InvalidateDuringResultTest(fixtures.TestBase): __backend__ = True - # test locks SQLite file databases due to unconsumed results - __requires__ = ("ad_hoc_engines",) - def setup_test(self): self.engine = engines.reconnecting_engine() self.meta = MetaData() diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py index c93ab149d4..d5c6094a04 100644 --- a/test/engine/test_transaction.py +++ b/test/engine/test_transaction.py @@ -565,8 +565,9 @@ class TransactionTest(fixtures.TablesTest): # a connection with an XID present @event.listens_for(eng, "invalidate") def conn_invalidated(dbapi_con, con_record, exception): - dbapi_con.close() - raise exception + if exception is not None: + dbapi_con.close() + raise exception with eng.connect() as conn: rec = conn.connection._connection_record @@ -1147,16 +1148,13 @@ class AutoRollbackTest(fixtures.TestBase): class IsolationLevelTest(fixtures.TestBase): """see also sqlalchemy/testing/suite/test_dialect.py::IsolationLevelTest - this suite has sparse_backend / ad_hoc_engines so wont take place + this suite has sparse_backend so wont take place for every dbdriver under a nox run. the suite test should cover that end of it """ - __requires__ = ( - "isolation_level", - "ad_hoc_engines", - ) + __requires__ = ("isolation_level",) __sparse_driver_backend__ = True def _default_isolation_level(self): diff --git a/test/ext/asyncio/test_engine.py b/test/ext/asyncio/test_engine.py index 49399f8e5e..7d965854cb 100644 --- a/test/ext/asyncio/test_engine.py +++ b/test/ext/asyncio/test_engine.py @@ -182,7 +182,9 @@ class EngineFixture(AsyncFixture, fixtures.TablesTest): @testing.fixture def async_engine(self): - return engines.testing_engine(asyncio=True, transfer_staticpool=True) + return engines.testing_engine( + asyncio=True, options={"sqlite_share_pool": True} + ) @testing.fixture def async_connection(self, async_engine): @@ -262,7 +264,7 @@ class AsyncEngineTest(EngineFixture): @async_test async def test_engine_eq_ne(self, async_engine): e2 = _async_engine.AsyncEngine(async_engine.sync_engine) - e3 = engines.testing_engine(asyncio=True, transfer_staticpool=True) + e3 = engines.testing_engine(asyncio=True) eq_(async_engine, e2) ne_(async_engine, e3) @@ -298,9 +300,7 @@ class AsyncEngineTest(EngineFixture): result.all() try: - engine = engines.testing_engine( - asyncio=True, transfer_staticpool=False - ) + engine = engines.testing_engine(asyncio=True) asyncio.run(main()) except Exception as err: @@ -743,12 +743,16 @@ class AsyncEngineTest(EngineFixture): @testing.requires.queue_pool @async_test - async def test_pool_exhausted_some_timeout(self, async_engine): - engine = create_async_engine( - testing.db.url, - pool_size=1, - max_overflow=0, - pool_timeout=0.1, + async def test_pool_exhausted_some_timeout( + self, testing_engine, async_engine + ): + engine = testing_engine( + asyncio=True, + options=dict( + pool_size=1, + max_overflow=0, + pool_timeout=0.1, + ), ) async with engine.connect(): with expect_raises(exc.TimeoutError): @@ -767,12 +771,16 @@ class AsyncEngineTest(EngineFixture): @testing.requires.queue_pool @async_test - async def test_pool_exhausted_no_timeout(self, async_engine): - engine = create_async_engine( - testing.db.url, - pool_size=1, - max_overflow=0, - pool_timeout=0, + async def test_pool_exhausted_no_timeout( + self, testing_engine, async_engine + ): + engine = testing_engine( + asyncio=True, + options=dict( + pool_size=1, + max_overflow=0, + pool_timeout=0, + ), ) async with engine.connect(): with expect_raises(exc.TimeoutError): diff --git a/test/ext/asyncio/test_session.py b/test/ext/asyncio/test_session.py index 22d53284ad..82e5579f41 100644 --- a/test/ext/asyncio/test_session.py +++ b/test/ext/asyncio/test_session.py @@ -61,13 +61,15 @@ class AsyncFixture(_AsyncFixture, _fixtures.FixtureTest): @testing.fixture def async_engine(self): - return engines.testing_engine(asyncio=True, transfer_staticpool=True) + return engines.testing_engine( + asyncio=True, options={"sqlite_share_pool": True} + ) # TODO: this seems to cause deadlocks in # OverrideSyncSession for some reason # @testing.fixture # def async_engine(self, async_testing_engine): - # return async_testing_engine(transfer_staticpool=True) + # return async_testing_engine(options={"sqlite_share_pool": True}) @testing.fixture def async_session(self, async_engine): @@ -1242,7 +1244,7 @@ class AsyncAttrsTest( @testing.fixture def async_engine(self, async_testing_engine): - yield async_testing_engine(transfer_staticpool=True) + yield async_testing_engine(options={"sqlite_share_pool": True}) @testing.fixture def ab_fixture(self, decl_base): diff --git a/test/requirements.py b/test/requirements.py index 556ab550ce..50eb38c2b6 100644 --- a/test/requirements.py +++ b/test/requirements.py @@ -976,6 +976,7 @@ class DefaultRequirements(SuiteRequirements): no_support( "sqlite", "two-phase xact not supported by database" ), + no_support("oracle+cx_oracle", "prefer oracledb"), # in Ia3cbbf56d4882fcc7980f90519412f1711fae74d # we are evaluating which modern MySQL / MariaDB versions # can handle two-phase testing without too many problems @@ -1802,10 +1803,6 @@ class DefaultRequirements(SuiteRequirements): def mssql_freetds(self): return only_on(["mssql+pymssql"]) - @property - def ad_hoc_engines(self): - return skip_if(self._sqlite_file_db) - @property def no_asyncio(self): def go(config): diff --git a/test/sql/test_insert_exec.py b/test/sql/test_insert_exec.py index d68e2ce357..6252ad5506 100644 --- a/test/sql/test_insert_exec.py +++ b/test/sql/test_insert_exec.py @@ -1201,9 +1201,7 @@ class InsertManyValuesTest(fixtures.RemovesEvents, fixtures.TablesTest): def test_disabled(self, testing_engine): e = testing_engine( - options={"use_insertmanyvalues": False}, - share_pool=True, - transfer_staticpool=True, + options={"use_insertmanyvalues": False, "sqlite_share_pool": True}, ) totalnum = 1275 data = [{"x": "x%d" % i, "y": "y%d" % i} for i in range(1, totalnum)] diff --git a/test/sql/test_lambdas.py b/test/sql/test_lambdas.py index c357b92dff..007f3e189c 100644 --- a/test/sql/test_lambdas.py +++ b/test/sql/test_lambdas.py @@ -1895,7 +1895,7 @@ class LambdaElementTest( users, addresses = user_address_fixture engine = testing_engine( - share_pool=True, options={"query_cache_size": 0} + options={"query_cache_size": 0, "sqlite_share_pool": True} ) with engine.begin() as conn: conn.execute( -- 2.47.3