class ExecuteTest(fixtures.TablesTest):
- __backend__ = True
@classmethod
def define_tables(cls, metadata):
Column("user_name", VARCHAR(20)),
)
- def test_no_params_option(self):
- stmt = (
- "SELECT '%'"
- + testing.db.dialect.statement_compiler(
- testing.db.dialect, None
- ).default_from()
- )
-
- with testing.db.connect() as conn:
- result = (
- conn.execution_options(no_parameters=True)
- .exec_driver_sql(stmt)
- .scalar()
- )
- eq_(result, "%")
-
def test_no_strings(self, connection):
with expect_raises_message(
tsa.exc.ObjectNotExecutableError,
name="sally",
)
- @testing.requires.qmark_paramstyle
- def test_raw_qmark(self, connection):
- conn = connection
- conn.exec_driver_sql(
- "insert into users (user_id, user_name) values (?, ?)",
- (1, "jack"),
- )
- conn.exec_driver_sql(
- "insert into users (user_id, user_name) values (?, ?)",
- (2, "fred"),
- )
- conn.exec_driver_sql(
- "insert into users (user_id, user_name) values (?, ?)",
- [(3, "ed"), (4, "horse")],
- )
- conn.exec_driver_sql(
- "insert into users (user_id, user_name) values (?, ?)",
- [(5, "barney"), (6, "donkey")],
- )
- conn.exec_driver_sql(
- "insert into users (user_id, user_name) values (?, ?)",
- (7, "sally"),
- )
- res = conn.exec_driver_sql("select * from users order by user_id")
- assert res.fetchall() == [
- (1, "jack"),
- (2, "fred"),
- (3, "ed"),
- (4, "horse"),
- (5, "barney"),
- (6, "donkey"),
- (7, "sally"),
- ]
+ def test_dialect_has_table_assertion(self):
+ with expect_raises_message(
+ tsa.exc.ArgumentError,
+ r"The argument passed to Dialect.has_table\(\) should be a",
+ ):
+ testing.db.dialect.has_table(testing.db, "some_table")
- res = conn.exec_driver_sql(
- "select * from users where user_name=?", ("jack",)
- )
- assert res.fetchall() == [(1, "jack")]
+ def test_not_an_executable(self):
+ for obj in (
+ Table("foo", MetaData(), Column("x", Integer)),
+ Column("x", Integer),
+ tsa.and_(True),
+ tsa.and_(True).compile(),
+ column("foo"),
+ column("foo").compile(),
+ select(1).cte(),
+ # select(1).subquery(),
+ MetaData(),
+ Integer(),
+ tsa.Index(name="foo"),
+ tsa.UniqueConstraint("x"),
+ ):
+ with testing.db.connect() as conn:
+ assert_raises_message(
+ tsa.exc.ObjectNotExecutableError,
+ "Not an executable object",
+ conn.execute,
+ obj,
+ )
- @testing.requires.format_paramstyle
- def test_raw_sprintf(self, connection):
- conn = connection
- conn.exec_driver_sql(
- "insert into users (user_id, user_name) values (%s, %s)",
- (1, "jack"),
- )
- conn.exec_driver_sql(
- "insert into users (user_id, user_name) values (%s, %s)",
- [(2, "ed"), (3, "horse")],
- )
- conn.exec_driver_sql(
- "insert into users (user_id, user_name) values (%s, %s)",
- (4, "sally"),
- )
- conn.exec_driver_sql("insert into users (user_id) values (%s)", (5,))
- res = conn.exec_driver_sql("select * from users order by user_id")
- assert res.fetchall() == [
- (1, "jack"),
- (2, "ed"),
- (3, "horse"),
- (4, "sally"),
- (5, None),
- ]
+ def test_stmt_exception_bytestring_raised(self):
+ name = "méil"
+ users = self.tables.users
+ with testing.db.connect() as conn:
+ assert_raises_message(
+ tsa.exc.StatementError,
+ "A value is required for bind parameter 'uname'\n"
+ ".*SELECT users.user_name AS .méil.",
+ conn.execute,
+ select(users.c.user_name.label(name)).where(
+ users.c.user_name == bindparam("uname")
+ ),
+ {"uname_incorrect": "foo"},
+ )
- res = conn.exec_driver_sql(
- "select * from users where user_name=%s", ("jack",)
- )
- assert res.fetchall() == [(1, "jack")]
+ def test_stmt_exception_bytestring_utf8(self):
+ # uncommon case for Py3K, bytestring object passed
+ # as the error message
+ message = "some message méil".encode()
- @testing.requires.pyformat_paramstyle
- def test_raw_python(self, connection):
- conn = connection
- conn.exec_driver_sql(
- "insert into users (user_id, user_name) "
- "values (%(id)s, %(name)s)",
- {"id": 1, "name": "jack"},
- )
- conn.exec_driver_sql(
- "insert into users (user_id, user_name) "
- "values (%(id)s, %(name)s)",
- [{"id": 2, "name": "ed"}, {"id": 3, "name": "horse"}],
- )
- conn.exec_driver_sql(
- "insert into users (user_id, user_name) "
- "values (%(id)s, %(name)s)",
- dict(id=4, name="sally"),
- )
- res = conn.exec_driver_sql("select * from users order by user_id")
- assert res.fetchall() == [
- (1, "jack"),
- (2, "ed"),
- (3, "horse"),
- (4, "sally"),
- ]
+ err = tsa.exc.SQLAlchemyError(message)
+ eq_(str(err), "some message méil")
- @testing.requires.named_paramstyle
- def test_raw_named(self, connection):
- conn = connection
- conn.exec_driver_sql(
- "insert into users (user_id, user_name) values (:id, :name)",
- {"id": 1, "name": "jack"},
- )
- conn.exec_driver_sql(
- "insert into users (user_id, user_name) values (:id, :name)",
- [{"id": 2, "name": "ed"}, {"id": 3, "name": "horse"}],
- )
- conn.exec_driver_sql(
- "insert into users (user_id, user_name) values (:id, :name)",
- {"id": 4, "name": "sally"},
- )
- res = conn.exec_driver_sql("select * from users order by user_id")
- assert res.fetchall() == [
- (1, "jack"),
- (2, "ed"),
- (3, "horse"),
- (4, "sally"),
- ]
+ def test_stmt_exception_bytestring_latin1(self):
+ # uncommon case for Py3K, bytestring object passed
+ # as the error message
+ message = "some message méil".encode("latin-1")
- def test_raw_tuple_params(self, connection):
- """test #7820
+ err = tsa.exc.SQLAlchemyError(message)
+ eq_(str(err), "some message m\\xe9il")
- There was an apparent improvement in the distill params
- methodology used in exec_driver_sql which allows raw tuples to
- pass through. In 1.4 there seems to be a _distill_cursor_params()
- function that says it can handle this kind of parameter, but it isn't
- used and when I tried to substitute it in for exec_driver_sql(),
- things still fail.
+ def test_stmt_exception_unicode_hook_unicode(self):
+ # uncommon case for Py2K, Unicode object passed
+ # as the error message
+ message = "some message méil"
- In any case, add coverage here for the use case of passing
- direct tuple params to exec_driver_sql including as the first
- param, to note that it isn't mis-interpreted the way it is
- in 1.x.
+ err = tsa.exc.SQLAlchemyError(message)
+ eq_(str(err), "some message méil")
- """
+ def test_stmt_exception_object_arg(self):
+ err = tsa.exc.SQLAlchemyError(Foo())
+ eq_(str(err), "foo")
- with patch.object(connection.dialect, "do_execute") as do_exec:
- connection.exec_driver_sql(
- "UPDATE users SET user_name = 'query_one' WHERE "
- "user_id = %s OR user_id IN %s",
- (3, (1, 2)),
- )
+ def test_stmt_exception_str_multi_args(self):
+ err = tsa.exc.SQLAlchemyError("some message", 206)
+ eq_(str(err), "('some message', 206)")
- connection.exec_driver_sql(
- "UPDATE users SET user_name = 'query_two' WHERE "
- "user_id IN %s OR user_id = %s",
- ((1, 2), 3),
- )
+ def test_stmt_exception_str_multi_args_bytestring(self):
+ message = "some message méil".encode()
- eq_(
- do_exec.mock_calls,
- [
- call(
- mock.ANY,
- "UPDATE users SET user_name = 'query_one' "
- "WHERE user_id = %s OR user_id IN %s",
- connection.dialect.execute_sequence_format((3, (1, 2))),
- mock.ANY,
- ),
- call(
- mock.ANY,
- "UPDATE users SET user_name = 'query_two' "
- "WHERE user_id IN %s OR user_id = %s",
- connection.dialect.execute_sequence_format(((1, 2), 3)),
- mock.ANY,
- ),
- ],
- )
+ err = tsa.exc.SQLAlchemyError(message, 206)
+ eq_(str(err), str((message, 206)))
- def test_non_dict_mapping(self, connection):
- """ensure arbitrary Mapping works for execute()"""
+ def test_stmt_exception_str_multi_args_unicode(self):
+ message = "some message méil"
- class NotADict(collections_abc.Mapping):
- def __init__(self, _data):
- self._data = _data
+ err = tsa.exc.SQLAlchemyError(message, 206)
+ eq_(str(err), str((message, 206)))
- def __iter__(self):
- return iter(self._data)
+ def test_generative_engine_event_dispatch_hasevents(self, testing_engine):
+ def l1(*arg, **kw):
+ pass
- def __len__(self):
- return len(self._data)
+ eng = testing_engine()
+ assert not eng._has_events
+ event.listen(eng, "before_execute", l1)
+ eng2 = eng.execution_options(foo="bar")
+ assert eng2._has_events
- def __getitem__(self, key):
- return self._data[key]
+ def test_scalar(self, connection):
+ conn = connection
+ users = self.tables.users
+ conn.execute(
+ users.insert(),
+ [
+ {"user_id": 1, "user_name": "sandy"},
+ {"user_id": 2, "user_name": "spongebob"},
+ ],
+ )
+ res = conn.scalar(select(users.c.user_name).order_by(users.c.user_id))
+ eq_(res, "sandy")
- def keys(self):
- return self._data.keys()
+ def test_scalars(self, connection):
+ conn = connection
+ users = self.tables.users
+ conn.execute(
+ users.insert(),
+ [
+ {"user_id": 1, "user_name": "sandy"},
+ {"user_id": 2, "user_name": "spongebob"},
+ ],
+ )
+ res = conn.scalars(select(users.c.user_name).order_by(users.c.user_id))
+ eq_(res.all(), ["sandy", "spongebob"])
- nd = NotADict({"a": 10, "b": 15})
- eq_(dict(nd), {"a": 10, "b": 15})
+ @testing.combinations(
+ ({"user_id": 1, "user_name": "name1"},),
+ ([{"user_id": 1, "user_name": "name1"}],),
+ (({"user_id": 1, "user_name": "name1"},),),
+ (
+ [
+ {"user_id": 1, "user_name": "name1"},
+ {"user_id": 2, "user_name": "name2"},
+ ],
+ ),
+ argnames="parameters",
+ )
+ def test_params_interpretation(self, connection, parameters):
+ users = self.tables.users
- result = connection.execute(
- select(
- bindparam("a", type_=Integer), bindparam("b", type_=Integer)
- ),
- nd,
- )
- eq_(result.first(), (10, 15))
+ connection.execute(users.insert(), parameters)
- def test_row_works_as_mapping(self, connection):
- """ensure the RowMapping object works as a parameter dictionary for
- execute."""
- result = connection.execute(
- select(literal(10).label("a"), literal(15).label("b"))
- )
- row = result.first()
- eq_(row, (10, 15))
- eq_(row._mapping, {"a": 10, "b": 15})
+class ConvenienceExecuteTest(fixtures.TablesTest):
- result = connection.execute(
- select(
- bindparam("a", type_=Integer).label("a"),
- bindparam("b", type_=Integer).label("b"),
- ),
- row._mapping,
+ @classmethod
+ def define_tables(cls, metadata):
+ cls.table = Table(
+ "exec_test",
+ metadata,
+ Column("a", Integer),
+ Column("b", Integer),
+ test_needs_acid=True,
)
- row = result.first()
- eq_(row, (10, 15))
- eq_(row._mapping, {"a": 10, "b": 15})
- def test_dialect_has_table_assertion(self):
- with expect_raises_message(
- tsa.exc.ArgumentError,
- r"The argument passed to Dialect.has_table\(\) should be a",
- ):
- testing.db.dialect.has_table(testing.db, "some_table")
+ def _trans_fn(self, is_transaction=False):
+ def go(conn, x, value=None):
+ if is_transaction:
+ conn = conn.connection
+ conn.execute(self.table.insert().values(a=x, b=value))
- def test_exception_wrapping_dbapi(self):
- with testing.db.connect() as conn:
- assert_raises_message(
- tsa.exc.DBAPIError,
- r"not_a_valid_statement",
- conn.exec_driver_sql,
- "not_a_valid_statement",
- )
+ return go
- def test_exception_wrapping_orig_accessors(self):
- de = None
+ def _trans_rollback_fn(self, is_transaction=False):
+ def go(conn, x, value=None):
+ if is_transaction:
+ conn = conn.connection
+ conn.execute(self.table.insert().values(a=x, b=value))
+ raise SomeException("breakage")
+ return go
+
+ def _assert_no_data(self):
with testing.db.connect() as conn:
- try:
- conn.exec_driver_sql("not_a_valid_statement")
- except tsa.exc.DBAPIError as de_caught:
- de = de_caught
+ eq_(
+ conn.scalar(select(func.count("*")).select_from(self.table)),
+ 0,
+ )
- assert isinstance(de.orig, conn.dialect.dbapi.Error)
+ def _assert_fn(self, x, value=None):
+ with testing.db.connect() as conn:
+ eq_(conn.execute(self.table.select()).fetchall(), [(x, value)])
- # get the driver module name, the one which we know will provide
- # for exceptions
- top_level_dbapi_module = conn.dialect.dbapi
- if isinstance(top_level_dbapi_module, AsyncAdapt_dbapi_module):
- driver_module = top_level_dbapi_module.exceptions_module
- else:
- driver_module = top_level_dbapi_module
- top_level_dbapi_module = driver_module.__name__.split(".")[0]
+ def test_transaction_engine_ctx_commit(self):
+ fn = self._trans_fn()
+ ctx = testing.db.begin()
+ testing.run_as_contextmanager(ctx, fn, 5, value=8)
+ self._assert_fn(5, value=8)
- # check that it's not us
- ne_(top_level_dbapi_module, "sqlalchemy")
+ def test_transaction_engine_ctx_begin_fails_dont_enter_enter(self):
+ """test #7272"""
+ engine = engines.testing_engine()
- # then make sure driver_exception is from that module
- assert type(de.driver_exception).__module__.startswith(
- top_level_dbapi_module
+ mock_connection = Mock(
+ return_value=Mock(begin=Mock(side_effect=Exception("boom")))
)
+ with mock.patch.object(engine, "_connection_cls", mock_connection):
+ # context manager isn't entered, doesn't actually call
+ # connect() or connection.begin()
+ engine.begin()
- @testing.requires.sqlite
- def test_exception_wrapping_non_dbapi_error(self):
- e = create_engine("sqlite://")
- e.dialect.is_disconnect = is_disconnect = Mock()
-
- with e.connect() as c:
- c.connection.cursor = Mock(
- return_value=Mock(
- execute=Mock(
- side_effect=TypeError("I'm not a DBAPI error")
- )
- )
- )
- assert_raises_message(
- TypeError,
- "I'm not a DBAPI error",
- c.exec_driver_sql,
- "select ",
- )
- eq_(is_disconnect.call_count, 0)
+ eq_(mock_connection.return_value.close.mock_calls, [])
- def test_exception_wrapping_non_standard_dbapi_error(self):
- class DBAPIError(Exception):
- pass
+ def test_transaction_engine_ctx_begin_fails_include_enter(self):
+ """test #7272
- class OperationalError(DBAPIError):
- pass
+ Note this behavior for 2.0 required that we add a new flag to
+ Connection _allow_autobegin=False, so that the first-connect
+ initialization sequence in create.py does not actually run begin()
+ events. previously, the initialize sequence used a future=False
+ connection unconditionally (and I didn't notice this).
- class NonStandardException(OperationalError):
- pass
+ """
+ engine = engines.testing_engine()
- # TODO: this test is assuming too much of arbitrary dialects and would
- # be better suited tested against a single mock dialect that does not
- # have any special behaviors
+ close_mock = Mock()
with (
- patch.object(testing.db.dialect, "dbapi", Mock(Error=DBAPIError)),
- patch.object(
- testing.db.dialect, "loaded_dbapi", Mock(Error=DBAPIError)
- ),
- patch.object(
- testing.db.dialect, "is_disconnect", lambda *arg: False
- ),
- patch.object(
- testing.db.dialect,
- "do_execute",
- Mock(side_effect=NonStandardException),
- ),
- patch.object(
- testing.db.dialect.execution_ctx_cls,
- "handle_dbapi_exception",
- Mock(),
+ mock.patch.object(
+ engine._connection_cls,
+ "begin",
+ Mock(side_effect=Exception("boom")),
),
+ mock.patch.object(engine._connection_cls, "close", close_mock),
):
- with testing.db.connect() as conn:
- assert_raises(
- tsa.exc.OperationalError, conn.exec_driver_sql, "select 1"
- )
-
- def test_exception_wrapping_non_dbapi_statement(self):
- class MyType(TypeDecorator):
- impl = Integer
- cache_ok = True
+ with expect_raises_message(Exception, "boom"):
+ with engine.begin():
+ pass
- def process_bind_param(self, value, dialect):
- raise SomeException("nope")
+ eq_(close_mock.mock_calls, [call()])
- def _go(conn):
- assert_raises_message(
- tsa.exc.StatementError,
- r"\(.*.SomeException\) " r"nope\n\[SQL\: u?SELECT 1 ",
- conn.execute,
- select(1).where(column("foo") == literal("bar", MyType())),
- )
+ def test_transaction_engine_ctx_rollback(self):
+ fn = self._trans_rollback_fn()
+ ctx = testing.db.begin()
+ assert_raises_message(
+ Exception,
+ "breakage",
+ testing.run_as_contextmanager,
+ ctx,
+ fn,
+ 5,
+ value=8,
+ )
+ self._assert_no_data()
+ def test_transaction_connection_ctx_commit(self):
+ fn = self._trans_fn(True)
with testing.db.connect() as conn:
- _go(conn)
-
- def test_not_an_executable(self):
- for obj in (
- Table("foo", MetaData(), Column("x", Integer)),
- Column("x", Integer),
- tsa.and_(True),
- tsa.and_(True).compile(),
- column("foo"),
- column("foo").compile(),
- select(1).cte(),
- # select(1).subquery(),
- MetaData(),
- Integer(),
- tsa.Index(name="foo"),
- tsa.UniqueConstraint("x"),
- ):
- with testing.db.connect() as conn:
- assert_raises_message(
- tsa.exc.ObjectNotExecutableError,
- "Not an executable object",
- conn.execute,
- obj,
- )
+ ctx = conn.begin()
+ testing.run_as_contextmanager(ctx, fn, 5, value=8)
+ self._assert_fn(5, value=8)
- def test_stmt_exception_bytestring_raised(self):
- name = "méil"
- users = self.tables.users
+ def test_transaction_connection_ctx_rollback(self):
+ fn = self._trans_rollback_fn(True)
with testing.db.connect() as conn:
+ ctx = conn.begin()
assert_raises_message(
- tsa.exc.StatementError,
- "A value is required for bind parameter 'uname'\n"
- ".*SELECT users.user_name AS .méil.",
- conn.execute,
- select(users.c.user_name.label(name)).where(
- users.c.user_name == bindparam("uname")
- ),
- {"uname_incorrect": "foo"},
+ Exception,
+ "breakage",
+ testing.run_as_contextmanager,
+ ctx,
+ fn,
+ 5,
+ value=8,
)
+ self._assert_no_data()
- def test_stmt_exception_bytestring_utf8(self):
- # uncommon case for Py3K, bytestring object passed
- # as the error message
- message = "some message méil".encode()
-
- err = tsa.exc.SQLAlchemyError(message)
- eq_(str(err), "some message méil")
-
- def test_stmt_exception_bytestring_latin1(self):
- # uncommon case for Py3K, bytestring object passed
- # as the error message
- message = "some message méil".encode("latin-1")
-
- err = tsa.exc.SQLAlchemyError(message)
- eq_(str(err), "some message m\\xe9il")
-
- def test_stmt_exception_unicode_hook_unicode(self):
- # uncommon case for Py2K, Unicode object passed
- # as the error message
- message = "some message méil"
-
- err = tsa.exc.SQLAlchemyError(message)
- eq_(str(err), "some message méil")
-
- def test_stmt_exception_object_arg(self):
- err = tsa.exc.SQLAlchemyError(Foo())
- eq_(str(err), "foo")
-
- def test_stmt_exception_str_multi_args(self):
- err = tsa.exc.SQLAlchemyError("some message", 206)
- eq_(str(err), "('some message', 206)")
-
- def test_stmt_exception_str_multi_args_bytestring(self):
- message = "some message méil".encode()
-
- err = tsa.exc.SQLAlchemyError(message, 206)
- eq_(str(err), str((message, 206)))
-
- def test_stmt_exception_str_multi_args_unicode(self):
- message = "some message méil"
+ def test_connection_as_ctx(self):
+ fn = self._trans_fn()
+ with testing.db.begin() as conn:
+ fn(conn, 5, value=8)
+ self._assert_fn(5, value=8)
- err = tsa.exc.SQLAlchemyError(message, 206)
- eq_(str(err), str((message, 206)))
- def test_stmt_exception_pickleable_no_dbapi(self):
- self._test_stmt_exception_pickleable(Exception("hello world"))
-
- @testing.crashes(
- "postgresql+psycopg2",
- "Older versions don't support cursor pickling, newer ones do",
- )
- @testing.fails_on(
- "+mysqlconnector",
- "Exception doesn't come back exactly the same from pickle",
- )
- @testing.fails_on(
- "oracle+cx_oracle",
- "cx_oracle exception seems to be having some issue with pickling",
- )
- @testing.fails_on(
- "oracle+oracledb",
- "oracledb exception seems to be having some issue with pickling",
- )
- def test_stmt_exception_pickleable_plus_dbapi(self):
- raw = testing.db.raw_connection()
- the_orig = None
- try:
- try:
- cursor = raw.cursor()
- cursor.execute("SELECTINCORRECT")
- except testing.db.dialect.dbapi.Error as orig:
- # py3k has "orig" in local scope...
- the_orig = orig
- finally:
- raw.close()
- self._test_stmt_exception_pickleable(the_orig)
+class ExecuteDriverTest(fixtures.TablesTest):
+ __backend__ = True
- def _test_stmt_exception_pickleable(self, orig):
- for sa_exc in (
- tsa.exc.StatementError(
- "some error",
- "select * from table",
- {"foo": "bar"},
- orig,
- False,
- ),
- tsa.exc.InterfaceError(
- "select * from table", {"foo": "bar"}, orig, True
- ),
- tsa.exc.NoReferencedTableError("message", "tname"),
- tsa.exc.NoReferencedColumnError("message", "tname", "cname"),
- tsa.exc.CircularDependencyError(
- "some message", [1, 2, 3], [(1, 2), (3, 4)]
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "users",
+ metadata,
+ Column("user_id", INT, primary_key=True, autoincrement=False),
+ Column("user_name", VARCHAR(20)),
+ )
+ Table(
+ "users_autoinc",
+ metadata,
+ Column(
+ "user_id", INT, primary_key=True, test_needs_autoincrement=True
),
- ):
- for loads, dumps in picklers():
- repickled = loads(dumps(sa_exc))
- eq_(repickled.args[0], sa_exc.args[0])
- if isinstance(sa_exc, tsa.exc.StatementError):
- eq_(repickled.params, {"foo": "bar"})
- eq_(repickled.statement, sa_exc.statement)
- if hasattr(sa_exc, "connection_invalidated"):
- eq_(
- repickled.connection_invalidated,
- sa_exc.connection_invalidated,
- )
- eq_(repickled.orig.args[0], orig.args[0])
-
- def test_dont_wrap_mixin(self):
- class MyException(Exception, tsa.exc.DontWrapMixin):
- pass
-
- class MyType(TypeDecorator):
- impl = Integer
- cache_ok = True
-
- def process_bind_param(self, value, dialect):
- raise MyException("nope")
-
- def _go(conn):
- assert_raises_message(
- MyException,
- "nope",
- conn.execute,
- select(1).where(column("foo") == literal("bar", MyType())),
- )
-
- conn = testing.db.connect()
- try:
- _go(conn)
- finally:
- conn.close()
+ Column("user_name", VARCHAR(20)),
+ )
- def test_empty_insert(self, connection):
- """test that execute() interprets [] as a list with no params and
- warns since it has nothing to do with such an executemany.
- """
- users_autoinc = self.tables.users_autoinc
+ def test_no_params_option(self):
+ stmt = (
+ "SELECT '%'"
+ + testing.db.dialect.statement_compiler(
+ testing.db.dialect, None
+ ).default_from()
+ )
- with expect_deprecated(
- r"Empty parameter sequence passed to execute\(\). "
- "This use is deprecated and will raise an exception in a "
- "future SQLAlchemy release"
- ):
- connection.execute(
- users_autoinc.insert().values(
- user_name=bindparam("name", None)
- ),
- [],
+ with testing.db.connect() as conn:
+ result = (
+ conn.execution_options(no_parameters=True)
+ .exec_driver_sql(stmt)
+ .scalar()
)
+ eq_(result, "%")
- eq_(len(connection.execute(users_autoinc.select()).all()), 1)
-
- @testing.only_on("sqlite")
- def test_raw_insert_with_empty_list(self, connection):
- """exec_driver_sql instead does not raise if an empty list is passed.
- Let the driver do that if it wants to.
- """
+ @testing.requires.qmark_paramstyle
+ def test_raw_qmark(self, connection):
conn = connection
- with expect_raises_message(
- tsa.exc.ProgrammingError, "Incorrect number of bindings supplied"
- ):
- conn.exec_driver_sql(
- "insert into users (user_id, user_name) values (?, ?)", []
- )
-
- @testing.requires.ad_hoc_engines
- def test_engine_level_options(self):
- eng = engines.testing_engine(
- options={"execution_options": {"foo": "bar"}}
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) values (?, ?)",
+ (1, "jack"),
)
- with eng.connect() as conn:
- eq_(conn._execution_options["foo"], "bar")
- eq_(
- conn.execution_options(bat="hoho")._execution_options["foo"],
- "bar",
- )
- eq_(
- conn.execution_options(bat="hoho")._execution_options["bat"],
- "hoho",
- )
- eq_(
- conn.execution_options(foo="hoho")._execution_options["foo"],
- "hoho",
- )
- eng.update_execution_options(foo="hoho")
- conn = eng.connect()
- eq_(conn._execution_options["foo"], "hoho")
-
- @testing.requires.ad_hoc_engines
- def test_generative_engine_execution_options(self):
- eng = engines.testing_engine(
- options={"execution_options": {"base": "x1"}}
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) values (?, ?)",
+ (2, "fred"),
)
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) values (?, ?)",
+ [(3, "ed"), (4, "horse")],
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) values (?, ?)",
+ [(5, "barney"), (6, "donkey")],
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) values (?, ?)",
+ (7, "sally"),
+ )
+ res = conn.exec_driver_sql("select * from users order by user_id")
+ assert res.fetchall() == [
+ (1, "jack"),
+ (2, "fred"),
+ (3, "ed"),
+ (4, "horse"),
+ (5, "barney"),
+ (6, "donkey"),
+ (7, "sally"),
+ ]
- is_(eng.engine, eng)
-
- eng1 = eng.execution_options(foo="b1")
- is_(eng1.engine, eng1)
- eng2 = eng.execution_options(foo="b2")
- eng1a = eng1.execution_options(bar="a1")
- eng2a = eng2.execution_options(foo="b3", bar="a2")
- is_(eng2a.engine, eng2a)
-
- eq_(eng._execution_options, {"base": "x1"})
- eq_(eng1._execution_options, {"base": "x1", "foo": "b1"})
- eq_(eng2._execution_options, {"base": "x1", "foo": "b2"})
- eq_(eng1a._execution_options, {"base": "x1", "foo": "b1", "bar": "a1"})
- eq_(eng2a._execution_options, {"base": "x1", "foo": "b3", "bar": "a2"})
- is_(eng1a.pool, eng.pool)
+ res = conn.exec_driver_sql(
+ "select * from users where user_name=?", ("jack",)
+ )
+ assert res.fetchall() == [(1, "jack")]
- # test pool is shared
- eng2.dispose()
- is_(eng1a.pool, eng2.pool)
- is_(eng.pool, eng2.pool)
+ @testing.requires.format_paramstyle
+ def test_raw_sprintf(self, connection):
+ conn = connection
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) values (%s, %s)",
+ (1, "jack"),
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) values (%s, %s)",
+ [(2, "ed"), (3, "horse")],
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) values (%s, %s)",
+ (4, "sally"),
+ )
+ conn.exec_driver_sql("insert into users (user_id) values (%s)", (5,))
+ res = conn.exec_driver_sql("select * from users order by user_id")
+ assert res.fetchall() == [
+ (1, "jack"),
+ (2, "ed"),
+ (3, "horse"),
+ (4, "sally"),
+ (5, None),
+ ]
- @testing.requires.ad_hoc_engines
- def test_autocommit_option_no_issue_first_connect(self):
- eng = create_engine(testing.db.url)
- eng.update_execution_options(autocommit=True)
- conn = eng.connect()
- eq_(conn._execution_options, {"autocommit": True})
- conn.close()
+ res = conn.exec_driver_sql(
+ "select * from users where user_name=%s", ("jack",)
+ )
+ assert res.fetchall() == [(1, "jack")]
- def test_initialize_rollback(self):
- """test a rollback happens during first connect"""
- eng = create_engine(testing.db.url)
- with patch.object(eng.dialect, "do_rollback") as do_rollback:
- assert do_rollback.call_count == 0
- connection = eng.connect()
- assert do_rollback.call_count == 1
- connection.close()
+ @testing.requires.pyformat_paramstyle
+ def test_raw_python(self, connection):
+ conn = connection
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) "
+ "values (%(id)s, %(name)s)",
+ {"id": 1, "name": "jack"},
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) "
+ "values (%(id)s, %(name)s)",
+ [{"id": 2, "name": "ed"}, {"id": 3, "name": "horse"}],
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) "
+ "values (%(id)s, %(name)s)",
+ dict(id=4, name="sally"),
+ )
+ res = conn.exec_driver_sql("select * from users order by user_id")
+ assert res.fetchall() == [
+ (1, "jack"),
+ (2, "ed"),
+ (3, "horse"),
+ (4, "sally"),
+ ]
- @testing.requires.ad_hoc_engines
- def test_dialect_init_uses_options(self):
- eng = create_engine(testing.db.url)
+ @testing.requires.named_paramstyle
+ def test_raw_named(self, connection):
+ conn = connection
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) values (:id, :name)",
+ {"id": 1, "name": "jack"},
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) values (:id, :name)",
+ [{"id": 2, "name": "ed"}, {"id": 3, "name": "horse"}],
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) values (:id, :name)",
+ {"id": 4, "name": "sally"},
+ )
+ res = conn.exec_driver_sql("select * from users order by user_id")
+ assert res.fetchall() == [
+ (1, "jack"),
+ (2, "ed"),
+ (3, "horse"),
+ (4, "sally"),
+ ]
- def my_init(connection):
- connection.execution_options(foo="bar").execute(select(1))
+ def test_raw_tuple_params(self, connection):
+ """test #7820
- with patch.object(eng.dialect, "initialize", my_init):
- conn = eng.connect()
- eq_(conn._execution_options, {})
- conn.close()
+ There was an apparent improvement in the distill params
+ methodology used in exec_driver_sql which allows raw tuples to
+ pass through. In 1.4 there seems to be a _distill_cursor_params()
+ function that says it can handle this kind of parameter, but it isn't
+ used and when I tried to substitute it in for exec_driver_sql(),
+ things still fail.
- @testing.requires.ad_hoc_engines
- def test_generative_engine_event_dispatch_hasevents(self):
- def l1(*arg, **kw):
- pass
+ In any case, add coverage here for the use case of passing
+ direct tuple params to exec_driver_sql including as the first
+ param, to note that it isn't mis-interpreted the way it is
+ in 1.x.
- eng = create_engine(testing.db.url)
- assert not eng._has_events
- event.listen(eng, "before_execute", l1)
- eng2 = eng.execution_options(foo="bar")
- assert eng2._has_events
+ """
- def test_works_after_dispose(self):
- eng = create_engine(testing.db.url)
- for i in range(3):
- with eng.connect() as conn:
- eq_(conn.scalar(select(1)), 1)
- eng.dispose()
+ with patch.object(connection.dialect, "do_execute") as do_exec:
+ connection.exec_driver_sql(
+ "UPDATE users SET user_name = 'query_one' WHERE "
+ "user_id = %s OR user_id IN %s",
+ (3, (1, 2)),
+ )
- def test_works_after_dispose_testing_engine(self):
- eng = engines.testing_engine()
- for i in range(3):
- with eng.connect() as conn:
- eq_(conn.scalar(select(1)), 1)
- eng.dispose()
+ connection.exec_driver_sql(
+ "UPDATE users SET user_name = 'query_two' WHERE "
+ "user_id IN %s OR user_id = %s",
+ ((1, 2), 3),
+ )
- def test_scalar(self, connection):
- conn = connection
- users = self.tables.users
- conn.execute(
- users.insert(),
+ eq_(
+ do_exec.mock_calls,
[
- {"user_id": 1, "user_name": "sandy"},
- {"user_id": 2, "user_name": "spongebob"},
+ call(
+ mock.ANY,
+ "UPDATE users SET user_name = 'query_one' "
+ "WHERE user_id = %s OR user_id IN %s",
+ connection.dialect.execute_sequence_format((3, (1, 2))),
+ mock.ANY,
+ ),
+ call(
+ mock.ANY,
+ "UPDATE users SET user_name = 'query_two' "
+ "WHERE user_id IN %s OR user_id = %s",
+ connection.dialect.execute_sequence_format(((1, 2), 3)),
+ mock.ANY,
+ ),
],
)
- res = conn.scalar(select(users.c.user_name).order_by(users.c.user_id))
- eq_(res, "sandy")
- def test_scalars(self, connection):
- conn = connection
- users = self.tables.users
- conn.execute(
- users.insert(),
- [
- {"user_id": 1, "user_name": "sandy"},
- {"user_id": 2, "user_name": "spongebob"},
- ],
- )
- res = conn.scalars(select(users.c.user_name).order_by(users.c.user_id))
- eq_(res.all(), ["sandy", "spongebob"])
+ def test_non_dict_mapping(self, connection):
+ """ensure arbitrary Mapping works for execute()"""
- @testing.combinations(
- ({}, {}, {}),
- ({"a": "b"}, {}, {"a": "b"}),
- ({"a": "b", "d": "e"}, {"a": "c"}, {"a": "c", "d": "e"}),
- argnames="conn_opts, exec_opts, expected",
- )
- def test_execution_opts_per_invoke(
- self, connection, conn_opts, exec_opts, expected
- ):
- opts = []
+ class NotADict(collections_abc.Mapping):
+ def __init__(self, _data):
+ self._data = _data
- @event.listens_for(connection, "before_cursor_execute")
- def before_cursor_execute(
- conn, cursor, statement, parameters, context, executemany
- ):
- opts.append(context.execution_options)
+ def __iter__(self):
+ return iter(self._data)
- if conn_opts:
- connection = connection.execution_options(**conn_opts)
+ def __len__(self):
+ return len(self._data)
- if exec_opts:
- connection.execute(select(1), execution_options=exec_opts)
- else:
- connection.execute(select(1))
+ def __getitem__(self, key):
+ return self._data[key]
- eq_(opts, [expected])
+ def keys(self):
+ return self._data.keys()
- @testing.combinations(
- ({}, {}, {}, {}),
- ({}, {"a": "b"}, {}, {"a": "b"}),
- ({}, {"a": "b", "d": "e"}, {"a": "c"}, {"a": "c", "d": "e"}),
- (
- {"q": "z", "p": "r"},
- {"a": "b", "p": "x", "d": "e"},
- {"a": "c"},
- {"q": "z", "p": "x", "a": "c", "d": "e"},
- ),
- argnames="stmt_opts, conn_opts, exec_opts, expected",
- )
- def test_execution_opts_per_invoke_execute_events(
- self, connection, stmt_opts, conn_opts, exec_opts, expected
- ):
- opts = []
+ nd = NotADict({"a": 10, "b": 15})
+ eq_(dict(nd), {"a": 10, "b": 15})
+
+ result = connection.execute(
+ select(
+ bindparam("a", type_=Integer), bindparam("b", type_=Integer)
+ ),
+ nd,
+ )
+ eq_(result.first(), (10, 15))
+
+ def test_row_works_as_mapping(self, connection):
+ """ensure the RowMapping object works as a parameter dictionary for
+ execute."""
+
+ result = connection.execute(
+ select(literal(10).label("a"), literal(15).label("b"))
+ )
+ row = result.first()
+ eq_(row, (10, 15))
+ eq_(row._mapping, {"a": 10, "b": 15})
- @event.listens_for(connection, "before_execute")
- def before_execute(
- conn, clauseelement, multiparams, params, execution_options
- ):
- opts.append(("before", execution_options))
+ result = connection.execute(
+ select(
+ bindparam("a", type_=Integer).label("a"),
+ bindparam("b", type_=Integer).label("b"),
+ ),
+ row._mapping,
+ )
+ row = result.first()
+ eq_(row, (10, 15))
+ eq_(row._mapping, {"a": 10, "b": 15})
- @event.listens_for(connection, "after_execute")
- def after_execute(
- conn,
- clauseelement,
- multiparams,
- params,
- execution_options,
- result,
- ):
- opts.append(("after", execution_options))
+ def test_exception_wrapping_dbapi(self):
+ with testing.db.connect() as conn:
+ assert_raises_message(
+ tsa.exc.DBAPIError,
+ r"not_a_valid_statement",
+ conn.exec_driver_sql,
+ "not_a_valid_statement",
+ )
- stmt = select(1)
+ def test_exception_wrapping_orig_accessors(self):
+ de = None
- if stmt_opts:
- stmt = stmt.execution_options(**stmt_opts)
+ with testing.db.connect() as conn:
+ try:
+ conn.exec_driver_sql("not_a_valid_statement")
+ except tsa.exc.DBAPIError as de_caught:
+ de = de_caught
- if conn_opts:
- connection = connection.execution_options(**conn_opts)
+ assert isinstance(de.orig, conn.dialect.dbapi.Error)
- if exec_opts:
- connection.execute(stmt, execution_options=exec_opts)
+ # get the driver module name, the one which we know will provide
+ # for exceptions
+ top_level_dbapi_module = conn.dialect.dbapi
+ if isinstance(top_level_dbapi_module, AsyncAdapt_dbapi_module):
+ driver_module = top_level_dbapi_module.exceptions_module
else:
- connection.execute(stmt)
+ driver_module = top_level_dbapi_module
+ top_level_dbapi_module = driver_module.__name__.split(".")[0]
- eq_(opts, [("before", expected), ("after", expected)])
+ # check that it's not us
+ ne_(top_level_dbapi_module, "sqlalchemy")
- @testing.combinations(
- ({"user_id": 1, "user_name": "name1"},),
- ([{"user_id": 1, "user_name": "name1"}],),
- (({"user_id": 1, "user_name": "name1"},),),
- (
- [
- {"user_id": 1, "user_name": "name1"},
- {"user_id": 2, "user_name": "name2"},
- ],
- ),
- argnames="parameters",
- )
- def test_params_interpretation(self, connection, parameters):
- users = self.tables.users
+ # then make sure driver_exception is from that module
+ assert type(de.driver_exception).__module__.startswith(
+ top_level_dbapi_module
+ )
- connection.execute(users.insert(), parameters)
+ @testing.requires.sqlite
+ def test_exception_wrapping_non_dbapi_error(self):
+ e = create_engine("sqlite://")
+ e.dialect.is_disconnect = is_disconnect = Mock()
+ with e.connect() as c:
+ c.connection.cursor = Mock(
+ return_value=Mock(
+ execute=Mock(
+ side_effect=TypeError("I'm not a DBAPI error")
+ )
+ )
+ )
+ assert_raises_message(
+ TypeError,
+ "I'm not a DBAPI error",
+ c.exec_driver_sql,
+ "select ",
+ )
+ eq_(is_disconnect.call_count, 0)
-class ConvenienceExecuteTest(fixtures.TablesTest):
- __sparse_driver_backend__ = True
+ def test_exception_wrapping_non_standard_dbapi_error(self):
+ class DBAPIError(Exception):
+ pass
- @classmethod
- def define_tables(cls, metadata):
- cls.table = Table(
- "exec_test",
- metadata,
- Column("a", Integer),
- Column("b", Integer),
- test_needs_acid=True,
- )
+ class OperationalError(DBAPIError):
+ pass
- def _trans_fn(self, is_transaction=False):
- def go(conn, x, value=None):
- if is_transaction:
- conn = conn.connection
- conn.execute(self.table.insert().values(a=x, b=value))
+ class NonStandardException(OperationalError):
+ pass
- return go
+ # TODO: this test is assuming too much of arbitrary dialects and would
+ # be better suited tested against a single mock dialect that does not
+ # have any special behaviors
+ with (
+ patch.object(testing.db.dialect, "dbapi", Mock(Error=DBAPIError)),
+ patch.object(
+ testing.db.dialect, "loaded_dbapi", Mock(Error=DBAPIError)
+ ),
+ patch.object(
+ testing.db.dialect, "is_disconnect", lambda *arg: False
+ ),
+ patch.object(
+ testing.db.dialect,
+ "do_execute",
+ Mock(side_effect=NonStandardException),
+ ),
+ patch.object(
+ testing.db.dialect.execution_ctx_cls,
+ "handle_dbapi_exception",
+ Mock(),
+ ),
+ ):
+ with testing.db.connect() as conn:
+ assert_raises(
+ tsa.exc.OperationalError, conn.exec_driver_sql, "select 1"
+ )
- def _trans_rollback_fn(self, is_transaction=False):
- def go(conn, x, value=None):
- if is_transaction:
- conn = conn.connection
- conn.execute(self.table.insert().values(a=x, b=value))
- raise SomeException("breakage")
+ def test_exception_wrapping_non_dbapi_statement(self):
+ class MyType(TypeDecorator):
+ impl = Integer
+ cache_ok = True
- return go
+ def process_bind_param(self, value, dialect):
+ raise SomeException("nope")
- def _assert_no_data(self):
- with testing.db.connect() as conn:
- eq_(
- conn.scalar(select(func.count("*")).select_from(self.table)),
- 0,
+ def _go(conn):
+ assert_raises_message(
+ tsa.exc.StatementError,
+ r"\(.*.SomeException\) " r"nope\n\[SQL\: u?SELECT 1 ",
+ conn.execute,
+ select(1).where(column("foo") == literal("bar", MyType())),
)
- def _assert_fn(self, x, value=None):
with testing.db.connect() as conn:
- eq_(conn.execute(self.table.select()).fetchall(), [(x, value)])
+ _go(conn)
- def test_transaction_engine_ctx_commit(self):
- fn = self._trans_fn()
- ctx = testing.db.begin()
- testing.run_as_contextmanager(ctx, fn, 5, value=8)
- self._assert_fn(5, value=8)
+ def test_stmt_exception_pickleable_no_dbapi(self):
+ self._test_stmt_exception_pickleable(Exception("hello world"))
- def test_transaction_engine_ctx_begin_fails_dont_enter_enter(self):
- """test #7272"""
- engine = engines.testing_engine()
+ @testing.crashes(
+ "postgresql+psycopg2",
+ "Older versions don't support cursor pickling, newer ones do",
+ )
+ @testing.fails_on(
+ "+mysqlconnector",
+ "Exception doesn't come back exactly the same from pickle",
+ )
+ @testing.fails_on(
+ "oracle+cx_oracle",
+ "cx_oracle exception seems to be having some issue with pickling",
+ )
+ @testing.fails_on(
+ "oracle+oracledb",
+ "oracledb exception seems to be having some issue with pickling",
+ )
+ def test_stmt_exception_pickleable_plus_dbapi(self):
+ raw = testing.db.raw_connection()
+ the_orig = None
+ try:
+ try:
+ cursor = raw.cursor()
+ cursor.execute("SELECTINCORRECT")
+ except testing.db.dialect.dbapi.Error as orig:
+ # py3k has "orig" in local scope...
+ the_orig = orig
+ finally:
+ raw.close()
+ self._test_stmt_exception_pickleable(the_orig)
- mock_connection = Mock(
- return_value=Mock(begin=Mock(side_effect=Exception("boom")))
- )
- with mock.patch.object(engine, "_connection_cls", mock_connection):
- # context manager isn't entered, doesn't actually call
- # connect() or connection.begin()
- engine.begin()
+ def _test_stmt_exception_pickleable(self, orig):
+ for sa_exc in (
+ tsa.exc.StatementError(
+ "some error",
+ "select * from table",
+ {"foo": "bar"},
+ orig,
+ False,
+ ),
+ tsa.exc.InterfaceError(
+ "select * from table", {"foo": "bar"}, orig, True
+ ),
+ tsa.exc.NoReferencedTableError("message", "tname"),
+ tsa.exc.NoReferencedColumnError("message", "tname", "cname"),
+ tsa.exc.CircularDependencyError(
+ "some message", [1, 2, 3], [(1, 2), (3, 4)]
+ ),
+ ):
+ for loads, dumps in picklers():
+ repickled = loads(dumps(sa_exc))
+ eq_(repickled.args[0], sa_exc.args[0])
+ if isinstance(sa_exc, tsa.exc.StatementError):
+ eq_(repickled.params, {"foo": "bar"})
+ eq_(repickled.statement, sa_exc.statement)
+ if hasattr(sa_exc, "connection_invalidated"):
+ eq_(
+ repickled.connection_invalidated,
+ sa_exc.connection_invalidated,
+ )
+ eq_(repickled.orig.args[0], orig.args[0])
+
+ def test_dont_wrap_mixin(self):
+ class MyException(Exception, tsa.exc.DontWrapMixin):
+ pass
+
+ class MyType(TypeDecorator):
+ impl = Integer
+ cache_ok = True
- eq_(mock_connection.return_value.close.mock_calls, [])
+ def process_bind_param(self, value, dialect):
+ raise MyException("nope")
- def test_transaction_engine_ctx_begin_fails_include_enter(self):
- """test #7272
+ def _go(conn):
+ assert_raises_message(
+ MyException,
+ "nope",
+ conn.execute,
+ select(1).where(column("foo") == literal("bar", MyType())),
+ )
- Note this behavior for 2.0 required that we add a new flag to
- Connection _allow_autobegin=False, so that the first-connect
- initialization sequence in create.py does not actually run begin()
- events. previously, the initialize sequence used a future=False
- connection unconditionally (and I didn't notice this).
+ conn = testing.db.connect()
+ try:
+ _go(conn)
+ finally:
+ conn.close()
+ def test_empty_insert(self, connection):
+ """test that execute() interprets [] as a list with no params and
+ warns since it has nothing to do with such an executemany.
"""
- engine = engines.testing_engine()
+ users_autoinc = self.tables.users_autoinc
- close_mock = Mock()
- with (
- mock.patch.object(
- engine._connection_cls,
- "begin",
- Mock(side_effect=Exception("boom")),
- ),
- mock.patch.object(engine._connection_cls, "close", close_mock),
+ with expect_deprecated(
+ r"Empty parameter sequence passed to execute\(\). "
+ "This use is deprecated and will raise an exception in a "
+ "future SQLAlchemy release"
):
- with expect_raises_message(Exception, "boom"):
- with engine.begin():
- pass
-
- eq_(close_mock.mock_calls, [call()])
-
- def test_transaction_engine_ctx_rollback(self):
- fn = self._trans_rollback_fn()
- ctx = testing.db.begin()
- assert_raises_message(
- Exception,
- "breakage",
- testing.run_as_contextmanager,
- ctx,
- fn,
- 5,
- value=8,
- )
- self._assert_no_data()
+ connection.execute(
+ users_autoinc.insert().values(
+ user_name=bindparam("name", None)
+ ),
+ [],
+ )
- def test_transaction_connection_ctx_commit(self):
- fn = self._trans_fn(True)
- with testing.db.connect() as conn:
- ctx = conn.begin()
- testing.run_as_contextmanager(ctx, fn, 5, value=8)
- self._assert_fn(5, value=8)
+ eq_(len(connection.execute(users_autoinc.select()).all()), 1)
- def test_transaction_connection_ctx_rollback(self):
- fn = self._trans_rollback_fn(True)
- with testing.db.connect() as conn:
- ctx = conn.begin()
- assert_raises_message(
- Exception,
- "breakage",
- testing.run_as_contextmanager,
- ctx,
- fn,
- 5,
- value=8,
+ @testing.only_on("sqlite")
+ def test_raw_insert_with_empty_list(self, connection):
+ """exec_driver_sql instead does not raise if an empty list is passed.
+ Let the driver do that if it wants to.
+ """
+ conn = connection
+ with expect_raises_message(
+ tsa.exc.ProgrammingError, "Incorrect number of bindings supplied"
+ ):
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) values (?, ?)", []
)
- self._assert_no_data()
- def test_connection_as_ctx(self):
- fn = self._trans_fn()
- with testing.db.begin() as conn:
- fn(conn, 5, value=8)
- self._assert_fn(5, value=8)
+ def test_works_after_dispose_testing_engine(self):
+ eng = engines.testing_engine()
+ for i in range(3):
+ with eng.connect() as conn:
+ eq_(conn.scalar(select(1)), 1)
+ eng.dispose()
class CompiledCacheTest(fixtures.TestBase):
class ExecutionOptionsTest(fixtures.TestBase):
+ def test_engine_level_options(self):
+ eng = engines.testing_engine(
+ options={"execution_options": {"foo": "bar"}}
+ )
+ with eng.connect() as conn:
+ eq_(conn._execution_options["foo"], "bar")
+ eq_(
+ conn.execution_options(bat="hoho")._execution_options["foo"],
+ "bar",
+ )
+ eq_(
+ conn.execution_options(bat="hoho")._execution_options["bat"],
+ "hoho",
+ )
+ eq_(
+ conn.execution_options(foo="hoho")._execution_options["foo"],
+ "hoho",
+ )
+ eng.update_execution_options(foo="hoho")
+ conn = eng.connect()
+ eq_(conn._execution_options["foo"], "hoho")
+
+ def test_generative_engine_execution_options(self):
+ eng = engines.testing_engine(
+ options={"execution_options": {"base": "x1"}}
+ )
+
+ is_(eng.engine, eng)
+
+ eng1 = eng.execution_options(foo="b1")
+ is_(eng1.engine, eng1)
+ eng2 = eng.execution_options(foo="b2")
+ eng1a = eng1.execution_options(bar="a1")
+ eng2a = eng2.execution_options(foo="b3", bar="a2")
+ is_(eng2a.engine, eng2a)
+
+ eq_(eng._execution_options, {"base": "x1"})
+ eq_(eng1._execution_options, {"base": "x1", "foo": "b1"})
+ eq_(eng2._execution_options, {"base": "x1", "foo": "b2"})
+ eq_(eng1a._execution_options, {"base": "x1", "foo": "b1", "bar": "a1"})
+ eq_(eng2a._execution_options, {"base": "x1", "foo": "b3", "bar": "a2"})
+ is_(eng1a.pool, eng.pool)
+
+ # test pool is shared
+ eng2.dispose()
+ is_(eng1a.pool, eng2.pool)
+ is_(eng.pool, eng2.pool)
+
+ def test_autocommit_option_preserved_first_connect(self, testing_engine):
+ eng = testing_engine()
+ eng.update_execution_options(autocommit=True)
+ conn = eng.connect()
+ eq_(conn._execution_options, {"autocommit": True})
+ conn.close()
+
+ def test_initialize_rollback(self, testing_engine):
+ """test a rollback happens during first connect"""
+ eng = testing_engine()
+ with patch.object(eng.dialect, "do_rollback") as do_rollback:
+ assert do_rollback.call_count == 0
+ connection = eng.connect()
+ assert do_rollback.call_count == 1
+ connection.close()
+
+ def test_dialect_init_uses_options(self, testing_engine):
+ eng = testing_engine()
+
+ def my_init(connection):
+ connection.execution_options(foo="bar").execute(select(1))
+
+ with patch.object(eng.dialect, "initialize", my_init):
+ conn = eng.connect()
+ eq_(conn._execution_options, {})
+ conn.close()
+
+ @testing.combinations(
+ ({}, {}, {}),
+ ({"a": "b"}, {}, {"a": "b"}),
+ ({"a": "b", "d": "e"}, {"a": "c"}, {"a": "c", "d": "e"}),
+ argnames="conn_opts, exec_opts, expected",
+ )
+ def test_execution_opts_per_invoke(
+ self, connection, conn_opts, exec_opts, expected
+ ):
+ opts = []
+
+ @event.listens_for(connection, "before_cursor_execute")
+ def before_cursor_execute(
+ conn, cursor, statement, parameters, context, executemany
+ ):
+ opts.append(context.execution_options)
+
+ if conn_opts:
+ connection = connection.execution_options(**conn_opts)
+
+ if exec_opts:
+ connection.execute(select(1), execution_options=exec_opts)
+ else:
+ connection.execute(select(1))
+
+ eq_(opts, [expected])
+
+ @testing.combinations(
+ ({}, {}, {}, {}),
+ ({}, {"a": "b"}, {}, {"a": "b"}),
+ ({}, {"a": "b", "d": "e"}, {"a": "c"}, {"a": "c", "d": "e"}),
+ (
+ {"q": "z", "p": "r"},
+ {"a": "b", "p": "x", "d": "e"},
+ {"a": "c"},
+ {"q": "z", "p": "x", "a": "c", "d": "e"},
+ ),
+ argnames="stmt_opts, conn_opts, exec_opts, expected",
+ )
+ def test_execution_opts_per_invoke_execute_events(
+ self, connection, stmt_opts, conn_opts, exec_opts, expected
+ ):
+ opts = []
+
+ @event.listens_for(connection, "before_execute")
+ def before_execute(
+ conn, clauseelement, multiparams, params, execution_options
+ ):
+ opts.append(("before", execution_options))
+
+ @event.listens_for(connection, "after_execute")
+ def after_execute(
+ conn,
+ clauseelement,
+ multiparams,
+ params,
+ execution_options,
+ result,
+ ):
+ opts.append(("after", execution_options))
+
+ stmt = select(1)
+
+ if stmt_opts:
+ stmt = stmt.execution_options(**stmt_opts)
+
+ if conn_opts:
+ connection = connection.execution_options(**conn_opts)
+
+ if exec_opts:
+ connection.execute(stmt, execution_options=exec_opts)
+ else:
+ connection.execute(stmt)
+
+ eq_(opts, [("before", expected), ("after", expected)])
+
def test_dialect_conn_options(self, testing_engine):
engine = testing_engine("sqlite://", options=dict(_initialize=False))
engine.dialect = Mock()
class EngineEventsTest(fixtures.TestBase):
- __requires__ = ("ad_hoc_engines",)
- __sparse_driver_backend__ = True
def teardown_test(self):
Engine.dispatch._clear()
eq_(canary.be2.call_count, 1)
eq_(canary.be3.call_count, 2)
- @testing.requires.ad_hoc_engines
def test_option_engine_registration_issue_one(self):
"""test #12289"""
{"foo": "bar", "isolation_level": "AUTOCOMMIT"},
)
- @testing.requires.ad_hoc_engines
def test_option_engine_registration_issue_two(self):
"""test #12289"""
# new feature as of #2978
canary = Mock()
- e1 = testing_engine(config.db_url, future=False)
+ e1 = testing_engine(config.db_url)
assert not e1._has_events
conn = e1.connect()
def test_force_conn_events_false(self, testing_engine):
canary = Mock()
- e1 = testing_engine(config.db_url, future=False)
+ e1 = testing_engine(config.db_url)
assert not e1._has_events
event.listen(e1, "before_execute", canary.be1)
eq_(c3._execution_options, {"foo": "bar", "bar": "bat"})
eq_(canary, ["execute", "cursor_execute"])
- @testing.requires.ad_hoc_engines
def test_generative_engine_event_dispatch(self):
canary = []
eq_(canary, ["l1", "l2", "l3", "l1", "l2"])
- @testing.requires.ad_hoc_engines
def test_clslevel_engine_event_options(self):
canary = []
conn.execute(select(1))
eq_(canary, ["l2"])
- @testing.requires.ad_hoc_engines
def test_cant_listen_to_option_engine(self):
from sqlalchemy.engine import base
evt,
)
- @testing.requires.ad_hoc_engines
def test_dispose_event(self, testing_engine):
canary = Mock()
eng = testing_engine(testing.db.url)
eq_(canary.mock_calls, [call(eng), call(eng)])
- @testing.requires.ad_hoc_engines
@testing.combinations(True, False, argnames="close")
def test_close_parameter(self, testing_engine, close):
eng = testing_engine(
class HandleErrorTest(fixtures.TestBase):
- __requires__ = ("ad_hoc_engines",)
__sparse_driver_backend__ = True
def teardown_test(self):