update(..., mysql_limit=10, mariadb_limit=10)
+* DELETE with LIMIT::
+
+ delete(..., mysql_limit=10, mariadb_limit=10)
+
* optimizer hints, use :meth:`_expression.Select.prefix_with` and
:meth:`_query.Query.prefix_with`::
else:
return None
+ def delete_limit_clause(self, delete_stmt):
+ limit = delete_stmt.kwargs.get("%s_limit" % self.dialect.name, None)
+ if limit:
+ return "LIMIT %s" % limit
+ else:
+ return None
+
def update_tables_clause(self, update_stmt, from_table, extra_froms, **kw):
kw["asfrom"] = True
return ", ".join(
)
def delete(
- self, synchronize_session: SynchronizeSessionArgument = "auto"
+ self,
+ synchronize_session: SynchronizeSessionArgument = "auto",
+ delete_args: Optional[Dict[Any, Any]] = None,
) -> int:
r"""Perform a DELETE with an arbitrary WHERE clause.
:ref:`orm_expression_update_delete` for a discussion of these
strategies.
+ :param delete_args: Optional dictionary, if present will be passed
+ to the underlying :func:`_expression.delete`
+ construct as the ``**kw`` for
+ the object. May be used to pass dialect-specific arguments such
+ as ``mysql_limit``.
+
:return: the count of rows matched as returned by the database's
"row count" feature.
"""
- bulk_del = BulkDelete(self)
+ delete_args = delete_args or {}
+
+ bulk_del = BulkDelete(self, delete_args)
if self.dispatch.before_compile_delete:
for fn in self.dispatch.before_compile_delete:
new_query = fn(bulk_del.query, bulk_del)
self = bulk_del.query
delete_ = sql.delete(*self._raw_columns) # type: ignore
+
+ if delete_args:
+ delete_ = delete_.with_dialect_options(**delete_args)
+
delete_._where_criteria = self._where_criteria
result: CursorResult[Any] = self.session.execute(
delete_,
class BulkDelete(BulkUD):
"""BulkUD which handles DELETEs."""
+ def __init__(
+ self,
+ query: Query[Any],
+ delete_kwargs: Optional[Dict[Any, Any]],
+ ):
+ super().__init__(query)
+ self.delete_kwargs = delete_kwargs
+
class RowReturningQuery(Query[Row[Unpack[_Ts]]]):
if TYPE_CHECKING:
"""Provide a hook for MySQL to add LIMIT to the UPDATE"""
return None
+ def delete_limit_clause(self, delete_stmt):
+ """Provide a hook for MySQL to add LIMIT to the DELETE"""
+ return None
+
def update_tables_clause(self, update_stmt, from_table, extra_froms, **kw):
"""Provide a hook to override the initial table clause
in an UPDATE statement.
if t:
text += " WHERE " + t
+ limit_clause = self.delete_limit_clause(delete_stmt)
+ if limit_clause:
+ text += " " + limit_clause
+
if (
self.implicit_returning or delete_stmt._returning
) and not self.returning_precedes_values:
"UPDATE t SET col1=%s WHERE t.col2 = %s LIMIT 1",
)
+ def test_delete_limit(self):
+ t = sql.table("t", sql.column("col1"), sql.column("col2"))
+
+ self.assert_compile(t.delete(), "DELETE FROM t")
+ self.assert_compile(
+ t.delete().with_dialect_options(mysql_limit=5),
+ "DELETE FROM t SET col1=%s LIMIT 5",
+ )
+ self.assert_compile(
+ t.delete().with_dialect_options(mysql_limit=None),
+ "DELETE FROM t",
+ )
+ self.assert_compile(
+ t.delete()
+ .where(t.c.col2 == 456)
+ .with_dialect_options(mysql_limit=1),
+ "DELETE FRM t WHERE t.col2 = %s LIMIT 1",
+ )
+
def test_utc_timestamp(self):
self.assert_compile(func.utc_timestamp(), "utc_timestamp()")
],
)
+ def test_delete_args(self):
+ Data = self.classes.Data
+ session = fixture_session()
+ delete_args = {"mysql_limit": 1}
+
+ m1 = testing.mock.Mock()
+
+ @event.listens_for(session, "after_bulk_update")
+ def do_orm_execute(bulk_ud):
+ delete_stmt = (
+ bulk_ud.result.context.compiled.compile_state.statement
+ )
+ m1(delete_stmt)
+
+ q = session.query(Data)
+ q.delete(delete_args=delete_args)
+
+ delete_stmt = m1.mock_calls[0][1][0]
+
+ eq_(delete_stmt.dialect_kwargs, delete_args)
+
class ExpressionUpdateTest(fixtures.MappedTest):
@classmethod