def _exec(
self,
construct: Union[Executable, str],
- execution_options: Optional[dict[str, Any]] = None,
- multiparams: Sequence[dict] = (),
- params: Dict[str, Any] = util.immutabledict(),
+ execution_options: Optional[Mapping[str, Any]] = None,
+ multiparams: Optional[Sequence[Mapping[str, Any]]] = None,
+ params: Mapping[str, Any] = util.immutabledict(),
) -> Optional[CursorResult]:
if isinstance(construct, str):
construct = text(construct)
if self.as_sql:
- if multiparams or params:
- # TODO: coverage
- raise Exception("Execution arguments not allowed with as_sql")
+ if multiparams is not None or params:
+ raise TypeError("SQL parameters not allowed with as_sql")
compile_kw: dict[str, Any]
if self.literal_binds and not isinstance(
assert conn is not None
if execution_options:
conn = conn.execution_options(**execution_options)
- if params:
- assert isinstance(multiparams, tuple)
- multiparams += (params,)
- return conn.execute(construct, multiparams)
+ if params and multiparams is not None:
+ raise TypeError(
+ "Can't send params and multiparams at the same time"
+ )
+
+ if multiparams:
+ return conn.execute(construct, multiparams)
+ else:
+ return conn.execute(construct, params)
def execute(
self,
connection, opts=dict(transaction_per_migration=True)
)
+ @testing.fixture
+ def as_sql_migration_context(self, connection):
+ return MigrationContext.configure(
+ connection, opts=dict(transaction_per_migration=True, as_sql=True)
+ )
+
@testing.fixture
def connection(self):
with config.db.connect() as conn:
--- /dev/null
+.. change::
+ :tags: bug, execution
+ :tickets: 1394
+
+ Fixed internal issue where Alembic would call ``connection.execute()``
+ sending an empty tuple to indicate "no params". In SQLAlchemy 2.1 this
+ case will be deprecated as "empty sequence" is ambiguous as to its intent.
+
with migration_context.begin_transaction(_per_migration=True):
yield migration_context.impl
+ @testing.fixture
+ def as_sql_impl(self, as_sql_migration_context):
+ with as_sql_migration_context.begin_transaction(_per_migration=True):
+ yield as_sql_migration_context.impl
+
def test_execute_params(self, impl):
result = impl._exec(text("select :my_param"), params={"my_param": 5})
eq_(result.scalar(), 5)
[(1, 2), (2, 3), (5, 7)],
)
+ def test_dont_send_both(self, impl):
+ with testing.expect_raises_message(
+ TypeError, "Can't send params and multiparams at the same time"
+ ):
+ impl._exec(
+ text("select :my_param"),
+ params={"my_param": 5},
+ multiparams=[],
+ )
+
+ def test_no_params_w_as_sql(self, as_sql_impl):
+ with testing.expect_raises_message(
+ TypeError, "SQL parameters not allowed with as_sql"
+ ):
+ as_sql_impl._exec(text("select :my_param"), params={"my_param": 5})
+
+ def test_no_multiparams_w_as_sql(self, as_sql_impl):
+ with testing.expect_raises_message(
+ TypeError, "SQL parameters not allowed with as_sql"
+ ):
+ as_sql_impl._exec(text("select :my_param"), multiparams=[])
+
class FutureImplTest(FutureEngineMixin, ImplTest):
pass