--- /dev/null
+.. change::
+ :tags: engine, change
+
+ The private method ``Connection._execute_compiled`` is removed. This method may
+ have been used for some special purposes however the :class:`.SQLCompiler`
+ object has lots of special state that should be set up for an execute call,
+ which we don't support.
)
return ret
- def _execute_compiled(
- self,
- compiled: Compiled,
- distilled_parameters: _CoreMultiExecuteParams,
- execution_options: CoreExecuteOptionsParameter = _EMPTY_EXECUTION_OPTS,
- ) -> CursorResult[Unpack[TupleAny]]:
- """Execute a sql.Compiled object.
-
- TODO: why do we have this? likely deprecate or remove
-
- """
-
- exec_opts = compiled.execution_options.merge_with(
- self._execution_options, execution_options
- )
-
- if self._has_events or self.engine._has_events:
- (
- compiled,
- distilled_parameters,
- event_multiparams,
- event_params,
- ) = self._invoke_before_exec_event(
- compiled, distilled_parameters, exec_opts
- )
-
- dialect = self.dialect
-
- ret = self._execute_context(
- dialect,
- dialect.execution_ctx_cls._init_compiled,
- compiled,
- distilled_parameters,
- exec_opts,
- compiled,
- distilled_parameters,
- None,
- None,
- )
- if self._has_events or self.engine._has_events:
- self.dispatch.after_execute(
- self,
- compiled,
- event_multiparams,
- event_params,
- exec_opts,
- ret,
- )
- return ret
-
def exec_driver_sql(
self,
statement: str,
def _init_compiler_cls(cls):
pass
- def _execute_on_connection(
- self, connection, distilled_params, execution_options
- ):
- if self.can_execute:
- return connection._execute_compiled(
- self, distilled_params, execution_options
- )
- else:
- raise exc.ObjectNotExecutableError(self.statement)
-
def visit_unsupported_compilation(self, element, err, **kw):
raise exc.UnsupportedCompilationError(self, type(element)) from err
from sqlalchemy import VARCHAR
from sqlalchemy.connectors.asyncio import AsyncAdapt_dbapi_module
from sqlalchemy.engine import BindTyping
-from sqlalchemy.engine import default
from sqlalchemy.engine.base import Connection
from sqlalchemy.engine.base import Engine
from sqlalchemy.pool import AsyncAdaptedQueuePool
"insert into users (user_id, user_name) values (?, ?)", []
)
- @testing.only_on("sqlite")
- def test_execute_compiled_favors_compiled_paramstyle(self):
- users = self.tables.users
-
- with patch.object(testing.db.dialect, "do_execute") as do_exec:
- stmt = users.update().values(user_id=1, user_name="foo")
-
- d1 = default.DefaultDialect(paramstyle="format")
- d2 = default.DefaultDialect(paramstyle="pyformat")
-
- with testing.db.begin() as conn:
- conn.execute(stmt.compile(dialect=d1))
- conn.execute(stmt.compile(dialect=d2))
-
- eq_(
- do_exec.mock_calls,
- [
- call(
- mock.ANY,
- "UPDATE users SET user_id=%s, user_name=%s",
- (1, "foo"),
- mock.ANY,
- ),
- call(
- mock.ANY,
- "UPDATE users SET user_id=%(user_id)s, "
- "user_name=%(user_name)s",
- {"user_name": "foo", "user_id": 1},
- mock.ANY,
- ),
- ],
- )
-
@testing.requires.ad_hoc_engines
def test_engine_level_options(self):
eng = engines.testing_engine(
with e1.connect() as conn:
conn.execute(select(1))
conn.execute(select(1).compile(dialect=e1.dialect).statement)
- conn.execute(select(1).compile(dialect=e1.dialect))
-
- conn._execute_compiled(
- select(1).compile(dialect=e1.dialect), (), {}
- )
@testing.emits_warning("The garbage collector is trying to clean up")
def test_execute_events(self):
from sqlalchemy import intersect
from sqlalchemy import literal
from sqlalchemy import literal_column
-from sqlalchemy import MetaData
from sqlalchemy import not_
from sqlalchemy import or_
from sqlalchemy import select
[],
)
- def test_compiled_execute(self, connection):
- users = self.tables.users
- connection.execute(users.insert(), dict(user_id=7, user_name="jack"))
- s = (
- select(users)
- .where(users.c.user_id == bindparam("id"))
- .compile(connection)
- )
- eq_(connection.execute(s, dict(id=7)).first()._mapping["user_id"], 7)
-
- def test_compiled_insert_execute(self, connection):
- users = self.tables.users
- connection.execute(
- users.insert().compile(connection),
- dict(user_id=7, user_name="jack"),
- )
- s = (
- select(users)
- .where(users.c.user_id == bindparam("id"))
- .compile(connection)
- )
- eq_(connection.execute(s, dict(id=7)).first()._mapping["user_id"], 7)
-
def test_repeated_bindparams(self, connection):
"""Tests that a BindParam can be used more than once.
[(7, "jack"), (8, "fred")],
)
- def test_expanding_in_dont_alter_compiled(self, connection):
- """test for issue #5048"""
-
- class NameWithProcess(TypeDecorator):
- impl = String
- cache_ok = True
-
- def process_bind_param(self, value, dialect):
- return value[3:]
-
- users = Table(
- "users",
- MetaData(),
- Column("user_id", Integer, primary_key=True),
- Column("user_name", NameWithProcess()),
- )
-
- connection.execute(
- users.insert(),
- [
- dict(user_id=7, user_name="AB jack"),
- dict(user_id=8, user_name="BE fred"),
- dict(user_id=9, user_name="GP ed"),
- ],
- )
-
- stmt = (
- select(users)
- .where(users.c.user_name.in_(bindparam("uname", expanding=True)))
- .order_by(users.c.user_id)
- )
-
- compiled = stmt.compile(testing.db)
- eq_(len(compiled._bind_processors), 1)
-
- eq_(
- connection.execute(
- compiled, {"uname": ["HJ jack", "RR fred"]}
- ).fetchall(),
- [(7, "jack"), (8, "fred")],
- )
-
- eq_(len(compiled._bind_processors), 1)
-
@testing.skip_if(["mssql"])
def test_bind_in(self, connection):
"""test calling IN against a bind parameter.
compiled = select(cast(literal(util.b("foo")), LargeBinary)).compile(
dialect=testing.db.dialect, compile_kwargs={"literal_binds": True}
)
- result = connection.execute(compiled)
+ result = connection.execute(compiled.statement)
eq_(result.scalar(), util.b("foo"))
def test_bind_processor_no_dbapi(self):