]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Propose using RETURNING for bulk updates, deletes
authorMike Bayer <mike_mp@zzzcomputing.com>
Sun, 21 Jun 2020 16:21:21 +0000 (12:21 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Tue, 23 Jun 2020 14:41:39 +0000 (10:41 -0400)
This patch makes several improvements in the area of
bulk updates and deletes as well as the new session mechanics.

RETURNING is now used for an UPDATE or DELETE statement
emitted for a diaelct that supports "full returning"
in order to satisfy the "fetch" strategy; this currently
includes PostgreSQL and SQL Server.  The Oracle dialect
does not support RETURNING for more than one row,
so a new dialect capability "full_returning" is added
in addition to the existing "implicit_returning", indicating
this dialect supports RETURNING for zero or more rows,
not just a single identity row.

The "fetch" strategy will gracefully degrade to
the previous SELECT mechanics for dialects that do not
support RETURNING.

Additionally, the "fetch" strategy will attempt to use
evaluation for the VALUES that were UPDATEd, rather
than just expiring the updated attributes.   Values should
be evalutable in all cases where the value is not
a SQL expression.

The new approach also incurs some changes in the
session.execute mechanics, where do_orm_execute() event
handlers can now be chained to each return results;
this is in turn used by the handler to detect on a
per-bind basis if the fetch strategy needs to
do a SELECT or if it can do RETURNING.  A test suite is
added to test_horizontal_shard that breaks up a single
UPDATE or DELETE operation among multiple backends
where some are SQLite and don't support RETURNING and
others are PostgreSQL and do.

The session event mechanics are corrected
in terms of the "orm pre execute" hook, which now
receives a flag "is_reentrant" so that the two
ORM implementations for this can skip on their work
if they are being called inside of ORMExecuteState.invoke(),
where previously bulk update/delete were calling its
SELECT a second time.

In order for "fetch" to get the correct identity when
called as pre-execute, it also requests the identity_token
for each mapped instance which is now added as an optional
capability of a SELECT for ORM columns.   the identity_token
that's placed by horizontal_sharding is now made available
within each result row, so that even when fetching a
merged result of plain rows we can tell which row belongs
to which identity token.

The evaluator that takes place within the ORM bulk update and delete for
synchronize_session="evaluate" now supports the IN and NOT IN operators.
Tuple IN is also supported.

Fixes: #1653
Change-Id: I2292b56ae004b997cef0ba4d3fc350ae1dd5efc1

21 files changed:
doc/build/changelog/migration_14.rst
doc/build/changelog/unreleased_14/1653.rst [new file with mode: 0644]
doc/build/changelog/unreleased_14/orm_update_delete.rst [new file with mode: 0644]
examples/dogpile_caching/caching_query.py
lib/sqlalchemy/dialects/mssql/base.py
lib/sqlalchemy/dialects/postgresql/base.py
lib/sqlalchemy/engine/default.py
lib/sqlalchemy/engine/result.py
lib/sqlalchemy/ext/horizontal_shard.py
lib/sqlalchemy/orm/context.py
lib/sqlalchemy/orm/evaluator.py
lib/sqlalchemy/orm/mapper.py
lib/sqlalchemy/orm/persistence.py
lib/sqlalchemy/orm/session.py
lib/sqlalchemy/testing/requirements.py
test/aaa_profiling/test_orm.py
test/engine/test_reflection.py
test/ext/test_horizontal_shard.py
test/orm/test_evaluator.py
test/orm/test_update_delete.py
test/sql/test_returning.py

index 6702f86f0cd7a8771eaed993dfc57e0363067d58..97b94087ffa478ec9dcdf3c6d1d820078f57a66e 100644 (file)
@@ -349,7 +349,7 @@ New Result object
 -----------------
 
 The ``ResultProxy`` object has been replaced with the 2.0 -style
-:class:`.Result` object discussed at :ref:`change_result_20_core`.  This result object
+:class:`_result.Result` object discussed at :ref:`change_result_20_core`.  This result object
 is fully compatible with ``ResultProxy`` and includes many new features,
 that are now applied to both Core and ORM results equally, including methods
 such as:
@@ -366,7 +366,7 @@ such as:
 
 When using Core, the object returned is an instance of :class:`.CursorResult`,
 which continues to feature the same API features as ``ResultProxy`` regarding
-inserted primary keys, defaults, rowcounts, etc.   For ORM, a :class:`.Result`
+inserted primary keys, defaults, rowcounts, etc.   For ORM, a :class:`_result.Result`
 subclass will be returned that performs translation of Core rows into
 ORM rows, and then allows all the same operations to take place.
 
@@ -594,6 +594,55 @@ as was present previously.
 
 :ticket:`4826`
 
+.. _change_orm_update_returning_14:
+
+ORM Bulk Update and Delete use RETURNING for "fetch" strategy when available
+----------------------------------------------------------------------------
+
+An ORM bulk update or delete that uses the "fetch" strategy::
+
+    sess.query(User).filter(User.age > 29).update(
+        {"age": User.age - 10}, synchronize_session="fetch"
+    )
+
+Will now use RETURNING if the backend database supports it; this currently
+includes PostgreSQL and SQL Server (the Oracle dialect does not support RETURNING
+of multiple rows)::
+
+    UPDATE users SET age_int=(users.age_int - %(age_int_1)s) WHERE users.age_int > %(age_int_2)s RETURNING users.id
+    [generated in 0.00060s] {'age_int_1': 10, 'age_int_2': 29}
+    Col ('id',)
+    Row (2,)
+    Row (4,)
+
+For backends that do not support RETURNING of multiple rows, the previous approach
+of emitting SELECT for the primary keys beforehand is still used::
+
+    SELECT users.id FROM users WHERE users.age_int > %(age_int_1)s
+    [generated in 0.00043s] {'age_int_1': 29}
+    Col ('id',)
+    Row (2,)
+    Row (4,)
+    UPDATE users SET age_int=(users.age_int - %(age_int_1)s) WHERE users.age_int > %(age_int_2)s
+    [generated in 0.00102s] {'age_int_1': 10, 'age_int_2': 29}
+
+One of the intricate challenges of this change is to support cases such as the
+horizontal sharding extension, where a single bulk update or delete may be
+multiplexed among backends some of which support RETURNING and some don't.   The
+new 1.4 execution archiecture supports this case so that the "fetch" strategy
+can be left intact with a graceful degrade to using a SELECT, rather than having
+to add a new "returning" strategy that would not be backend-agnostic.
+
+As part of this change, the "fetch" strategy is also made much more efficient
+in that it will no longer expire the objects located which match the rows,
+for Python expressions used in the SET clause which can be evaluated in
+Python; these are instead assigned
+directly onto the object in the same way as the "evaluate" strategy.  Only
+for SQL expressions that can't be evaluated does it fall back to expiring
+the attributes.   The "evaluate" strategy has also been enhanced to fall back
+to "expire" for a value that cannot be evaluated.
+
+
 Behavioral Changes - ORM
 ========================
 
diff --git a/doc/build/changelog/unreleased_14/1653.rst b/doc/build/changelog/unreleased_14/1653.rst
new file mode 100644 (file)
index 0000000..e351212
--- /dev/null
@@ -0,0 +1,8 @@
+.. change::
+    :tags: usecase, orm
+    :tickets: 1653
+
+    The evaluator that takes place within the ORM bulk update and delete for
+    synchronize_session="evaluate" now supports the IN and NOT IN operators.
+    Tuple IN is also supported.
+
diff --git a/doc/build/changelog/unreleased_14/orm_update_delete.rst b/doc/build/changelog/unreleased_14/orm_update_delete.rst
new file mode 100644 (file)
index 0000000..e16ab62
--- /dev/null
@@ -0,0 +1,18 @@
+.. change::
+    :tags: orm, performance
+
+    The bulk update and delete methods :meth:`.Query.update` and
+    :meth:`.Query.delete`, as well as their 2.0-style counterparts, now make
+    use of RETURNING when the "fetch" strategy is used in order to fetch the
+    list of affected primary key identites, rather than emitting a separate
+    SELECT, when the backend in use supports RETURNING.  Additionally, the
+    "fetch" strategy will in ordinary cases not expire the attributes that have
+    been updated, and will instead apply the updated values directly in the
+    same way that the "evaluate" strategy does, to avoid having to refresh the
+    object.   The "evaluate" strategy will also fall back to expiring
+    attributes that were updated to a SQL expression that was unevaluable in
+    Python.
+
+    .. seealso::
+
+        :ref:`change_orm_update_returning_14`
\ No newline at end of file
index 54f712a11bd4ae87f4335acaf45e2a71282bd326..f994473612b8bed39938e5f974a558b7c33d3be6 100644 (file)
@@ -130,11 +130,14 @@ class FromCache(UserDefinedOption):
         self.expiration_time = expiration_time
         self.ignore_expiration = ignore_expiration
 
+    def _gen_cache_key(self, anon_map, bindparams):
+        return None
+
     def _generate_cache_key(self, statement, parameters, orm_cache):
         statement_cache_key = statement._generate_cache_key()
 
         key = statement_cache_key.to_offline_string(
-            orm_cache._statement_cache, parameters
+            orm_cache._statement_cache, statement, parameters
         ) + repr(self.cache_key)
 
         # print("here's our key...%s" % key)
index 5aaecf23a82aaa4fbaf1745cff9536b793354360..4b211bde7ad394e01ac69e3e09ee2df8cb1b173c 100644 (file)
@@ -2402,6 +2402,9 @@ class MSDialect(default.DefaultDialect):
     max_identifier_length = 128
     schema_name = "dbo"
 
+    implicit_returning = True
+    full_returning = True
+
     colspecs = {
         sqltypes.DateTime: _MSDateTime,
         sqltypes.Date: _MSDate,
@@ -2567,11 +2570,10 @@ class MSDialect(default.DefaultDialect):
                 "features may not function properly."
                 % ".".join(str(x) for x in self.server_version_info)
             )
-        if (
-            self.server_version_info >= MS_2005_VERSION
-            and "implicit_returning" not in self.__dict__
-        ):
-            self.implicit_returning = True
+
+        if self.server_version_info < MS_2005_VERSION:
+            self.implicit_returning = self.full_returning = False
+
         if self.server_version_info >= MS_2008_VERSION:
             self.supports_multivalues_insert = True
         if self.deprecate_large_types is None:
index f3e775354a87a24165932d7a47ad4127def33125..c2d9af4d21a1a1e9cfdff84c2ca1a775fc228698 100644 (file)
@@ -2510,6 +2510,9 @@ class PGDialect(default.DefaultDialect):
     inspector = PGInspector
     isolation_level = None
 
+    implicit_returning = True
+    full_returning = True
+
     construct_arguments = [
         (
             schema.Index,
@@ -2555,10 +2558,10 @@ class PGDialect(default.DefaultDialect):
 
     def initialize(self, connection):
         super(PGDialect, self).initialize(connection)
-        self.implicit_returning = self.server_version_info > (
-            8,
-            2,
-        ) and self.__dict__.get("implicit_returning", True)
+
+        if self.server_version_info <= (8, 2):
+            self.full_returning = self.implicit_returning = False
+
         self.supports_native_enum = self.server_version_info >= (8, 3)
         if not self.supports_native_enum:
             self.colspecs = self.colspecs.copy()
index 4d516e97c573d86dafa36fa2584d9903b5e60950..1a8dbb4cda76d7882920b733d4de8b89f9b0186a 100644 (file)
@@ -67,6 +67,7 @@ class DefaultDialect(interfaces.Dialect):
     preexecute_autoincrement_sequences = False
     postfetch_lastrowid = True
     implicit_returning = False
+    full_returning = False
 
     cte_follows_insert = False
 
index b29bc22d44fe6a1c264d4884669acfccbbb491de..ead52a3f844876fab829f3dde3f2724b9c10c91e 100644 (file)
@@ -1259,6 +1259,10 @@ class IteratorResult(Result):
         return list(itertools.islice(self.iterator, 0, size))
 
 
+def null_result():
+    return IteratorResult(SimpleResultMetaData([]), iter([]))
+
+
 class ChunkedIteratorResult(IteratorResult):
     """An :class:`.IteratorResult` that works from an iterator-producing callable.
 
index 0983807cb9664172fdc89e09bb81fd487c05f19a..9d7266d1aebd6b4e9b703722c406458a9a101c4f 100644 (file)
@@ -220,7 +220,6 @@ def execute_and_instances(orm_context):
         update_options = active_options = orm_context.update_delete_options
 
     session = orm_context.session
-    # orm_query = orm_context.orm_query
 
     def iter_for_shard(shard_id, load_options, update_options):
         execution_options = dict(orm_context.local_execution_options)
index f380229e1d0f8606c345e414f6aaf7090542089d..77237f089d3b26e45c4dc660cdd480789f602b85 100644 (file)
@@ -193,8 +193,17 @@ class ORMCompileState(CompileState):
 
     @classmethod
     def orm_pre_session_exec(
-        cls, session, statement, params, execution_options, bind_arguments
+        cls,
+        session,
+        statement,
+        params,
+        execution_options,
+        bind_arguments,
+        is_reentrant_invoke,
     ):
+        if is_reentrant_invoke:
+            return statement, execution_options
+
         load_options = execution_options.get(
             "_sa_orm_load_options", QueryContext.default_load_options
         )
@@ -220,7 +229,7 @@ class ORMCompileState(CompileState):
         if load_options._autoflush:
             session._autoflush()
 
-        return execution_options
+        return statement, execution_options
 
     @classmethod
     def orm_setup_cursor_result(
@@ -2259,9 +2268,20 @@ class _ColumnEntity(_QueryEntity):
                 )
 
             if _entity:
-                _ORMColumnEntity(
-                    compile_state, column, _entity, parent_bundle=parent_bundle
-                )
+                if "identity_token" in column._annotations:
+                    _IdentityTokenEntity(
+                        compile_state,
+                        column,
+                        _entity,
+                        parent_bundle=parent_bundle,
+                    )
+                else:
+                    _ORMColumnEntity(
+                        compile_state,
+                        column,
+                        _entity,
+                        parent_bundle=parent_bundle,
+                    )
             else:
                 _RawColumnEntity(
                     compile_state, column, parent_bundle=parent_bundle
@@ -2462,3 +2482,14 @@ class _ORMColumnEntity(_ColumnEntity):
 
         compile_state.primary_columns.append(column)
         self._fetch_column = column
+
+
+class _IdentityTokenEntity(_ORMColumnEntity):
+    def setup_compile_state(self, compile_state):
+        pass
+
+    def row_processor(self, context, result):
+        def getter(row):
+            return context.load_options._refresh_identity_token
+
+        return getter, self._label_name, self._extra_entities
index 51bc8e42601d52a3bc08e4daffc16e84de985427..caa9ffe108a554f0ca794608f989798faee4224f 100644 (file)
@@ -35,6 +35,10 @@ _straight_ops = set(
     )
 )
 
+_extended_ops = {
+    operators.in_op: (lambda a, b: a in b),
+    operators.notin_op: (lambda a, b: a not in b),
+}
 
 _notimplemented_ops = set(
     getattr(operators, op)
@@ -43,9 +47,8 @@ _notimplemented_ops = set(
         "notlike_op",
         "ilike_op",
         "notilike_op",
+        "startswith_op",
         "between_op",
-        "in_op",
-        "notin_op",
         "endswith_op",
         "concat_op",
     )
@@ -136,6 +139,17 @@ class EvaluatorCompiler(object):
                         return False
                 return True
 
+        elif clause.operator is operators.comma_op:
+
+            def evaluate(obj):
+                values = []
+                for sub_evaluate in evaluators:
+                    value = sub_evaluate(obj)
+                    if value is None:
+                        return None
+                    values.append(value)
+                return tuple(values)
+
         else:
             raise UnevaluatableError(
                 "Cannot evaluate clauselist with operator %s" % clause.operator
@@ -158,6 +172,16 @@ class EvaluatorCompiler(object):
             def evaluate(obj):
                 return eval_left(obj) != eval_right(obj)
 
+        elif operator in _extended_ops:
+
+            def evaluate(obj):
+                left_val = eval_left(obj)
+                right_val = eval_right(obj)
+                if left_val is None or right_val is None:
+                    return None
+
+                return _extended_ops[operator](left_val, right_val)
+
         elif operator in _straight_ops:
 
             def evaluate(obj):
index bec6da74d464d230e85604cfb43b0988b169594d..ef0e9a49b602f8cf36d8f4fce97cb1f4cc38bcd7 100644 (file)
@@ -2240,7 +2240,6 @@ class Mapper(
             "entity_namespace": self,
             "parententity": self,
             "parentmapper": self,
-            "compile_state_plugin": "orm",
         }
         if self.persist_selectable is not self.local_table:
             # joined table inheritance, with polymorphic selectable,
@@ -2250,7 +2249,6 @@ class Mapper(
                     "entity_namespace": self,
                     "parententity": self,
                     "parentmapper": self,
-                    "compile_state_plugin": "orm",
                 }
             )._set_propagate_attrs(
                 {"compile_state_plugin": "orm", "plugin_subject": self}
@@ -2260,6 +2258,23 @@ class Mapper(
             {"compile_state_plugin": "orm", "plugin_subject": self}
         )
 
+    @util.memoized_property
+    def select_identity_token(self):
+        return (
+            expression.null()
+            ._annotate(
+                {
+                    "entity_namespace": self,
+                    "parententity": self,
+                    "parentmapper": self,
+                    "identity_token": True,
+                }
+            )
+            ._set_propagate_attrs(
+                {"compile_state_plugin": "orm", "plugin_subject": self}
+            )
+        )
+
     @property
     def selectable(self):
         """The :func:`_expression.select` construct this
index 8393eaf74d0ea2672dddede0980ac8380bf18525..bd8efe77f04e203bc438fadee194fd85c0be27d1 100644 (file)
@@ -28,6 +28,7 @@ from .. import exc as sa_exc
 from .. import future
 from .. import sql
 from .. import util
+from ..engine import result as _result
 from ..future import select as future_select
 from ..sql import coercions
 from ..sql import expression
@@ -1672,8 +1673,17 @@ class BulkUDCompileState(CompileState):
 
     @classmethod
     def orm_pre_session_exec(
-        cls, session, statement, params, execution_options, bind_arguments
+        cls,
+        session,
+        statement,
+        params,
+        execution_options,
+        bind_arguments,
+        is_reentrant_invoke,
     ):
+        if is_reentrant_invoke:
+            return statement, execution_options
+
         sync = execution_options.get("synchronize_session", None)
         if sync is None:
             sync = statement._execution_options.get(
@@ -1706,6 +1716,17 @@ class BulkUDCompileState(CompileState):
         if update_options._autoflush:
             session._autoflush()
 
+        statement = statement._annotate(
+            {"synchronize_session": update_options._synchronize_session}
+        )
+
+        # this stage of the execution is called before the do_orm_execute event
+        # hook.  meaning for an extension like horizontal sharding, this step
+        # happens before the extension splits out into multiple backends and
+        # runs only once.  if we do pre_sync_fetch, we execute a SELECT
+        # statement, which the horizontal sharding extension splits amongst the
+        # shards and combines the results together.
+
         if update_options._synchronize_session == "evaluate":
             update_options = cls._do_pre_synchronize_evaluate(
                 session,
@@ -1725,19 +1746,31 @@ class BulkUDCompileState(CompileState):
                 update_options,
             )
 
-        return util.immutabledict(execution_options).union(
-            dict(_sa_orm_update_options=update_options)
+        return (
+            statement,
+            util.immutabledict(execution_options).union(
+                dict(_sa_orm_update_options=update_options)
+            ),
         )
 
     @classmethod
     def orm_setup_cursor_result(
         cls, session, statement, execution_options, bind_arguments, result
     ):
+
+        # this stage of the execution is called after the
+        # do_orm_execute event hook.  meaning for an extension like
+        # horizontal sharding, this step happens *within* the horizontal
+        # sharding event handler which calls session.execute() re-entrantly
+        # and will occur for each backend individually.
+        # the sharding extension then returns its own merged result from the
+        # individual ones we return here.
+
         update_options = execution_options["_sa_orm_update_options"]
         if update_options._synchronize_session == "evaluate":
-            cls._do_post_synchronize_evaluate(session, update_options)
+            cls._do_post_synchronize_evaluate(session, result, update_options)
         elif update_options._synchronize_session == "fetch":
-            cls._do_post_synchronize_fetch(session, update_options)
+            cls._do_post_synchronize_fetch(session, result, update_options)
 
         return result
 
@@ -1767,18 +1800,6 @@ class BulkUDCompileState(CompileState):
                 def eval_condition(obj):
                     return True
 
-            # TODO: something more robust for this conditional
-            if statement.__visit_name__ == "update":
-                resolved_values = cls._get_resolved_values(mapper, statement)
-                value_evaluators = {}
-                resolved_keys_as_propnames = cls._resolved_keys_as_propnames(
-                    mapper, resolved_values
-                )
-                for key, value in resolved_keys_as_propnames:
-                    value_evaluators[key] = evaluator_compiler.process(
-                        coercions.expect(roles.ExpressionElementRole, value)
-                    )
-
         except evaluator.UnevaluatableError as err:
             util.raise_(
                 sa_exc.InvalidRequestError(
@@ -1789,13 +1810,35 @@ class BulkUDCompileState(CompileState):
                 from_=err,
             )
 
-        # TODO: detect when the where clause is a trivial primary key match
+        if statement.__visit_name__ == "update":
+            resolved_values = cls._get_resolved_values(mapper, statement)
+            value_evaluators = {}
+            resolved_keys_as_propnames = cls._resolved_keys_as_propnames(
+                mapper, resolved_values
+            )
+            for key, value in resolved_keys_as_propnames:
+                try:
+                    _evaluator = evaluator_compiler.process(
+                        coercions.expect(roles.ExpressionElementRole, value)
+                    )
+                except evaluator.UnevaluatableError:
+                    pass
+                else:
+                    value_evaluators[key] = _evaluator
+
+        # TODO: detect when the where clause is a trivial primary key match.
         matched_objects = [
             obj
             for (cls, pk, identity_token,), obj in session.identity_map.items()
             if issubclass(cls, target_cls)
             and eval_condition(obj)
-            and identity_token == update_options._refresh_identity_token
+            and (
+                update_options._refresh_identity_token is None
+                # TODO: coverage for the case where horiziontal sharding
+                # invokes an update() or delete() given an explicit identity
+                # token up front
+                or identity_token == update_options._refresh_identity_token
+            )
         ]
         return update_options + {
             "_matched_objects": matched_objects,
@@ -1868,29 +1911,56 @@ class BulkUDCompileState(CompileState):
     ):
         mapper = update_options._subject_mapper
 
-        if mapper:
-            primary_table = mapper.local_table
-        else:
-            primary_table = statement._raw_columns[0]
-
-        # note this creates a Select() *without* the ORM plugin.
-        # we don't want that here.
-        select_stmt = future_select(*primary_table.primary_key)
+        select_stmt = future_select(
+            *(mapper.primary_key + (mapper.select_identity_token,))
+        )
         select_stmt._where_criteria = statement._where_criteria
 
-        matched_rows = session.execute(
-            select_stmt, params, execution_options, bind_arguments
-        ).fetchall()
+        def skip_for_full_returning(orm_context):
+            bind = orm_context.session.get_bind(**orm_context.bind_arguments)
+            if bind.dialect.full_returning:
+                return _result.null_result()
+            else:
+                return None
+
+        result = session.execute(
+            select_stmt,
+            params,
+            execution_options,
+            bind_arguments,
+            _add_event=skip_for_full_returning,
+        )
+        matched_rows = result.fetchall()
+
+        value_evaluators = _EMPTY_DICT
 
         if statement.__visit_name__ == "update":
+            target_cls = mapper.class_
+            evaluator_compiler = evaluator.EvaluatorCompiler(target_cls)
             resolved_values = cls._get_resolved_values(mapper, statement)
             resolved_keys_as_propnames = cls._resolved_keys_as_propnames(
                 mapper, resolved_values
             )
+
+            resolved_keys_as_propnames = cls._resolved_keys_as_propnames(
+                mapper, resolved_values
+            )
+            value_evaluators = {}
+            for key, value in resolved_keys_as_propnames:
+                try:
+                    _evaluator = evaluator_compiler.process(
+                        coercions.expect(roles.ExpressionElementRole, value)
+                    )
+                except evaluator.UnevaluatableError:
+                    pass
+                else:
+                    value_evaluators[key] = _evaluator
+
         else:
             resolved_keys_as_propnames = _EMPTY_DICT
 
         return update_options + {
+            "_value_evaluators": value_evaluators,
             "_matched_rows": matched_rows,
             "_resolved_keys_as_propnames": resolved_keys_as_propnames,
         }
@@ -1925,15 +1995,23 @@ class BulkORMUpdate(UpdateDMLState, BulkUDCompileState):
         elif statement._values:
             new_stmt._values = self._resolved_values
 
+        if (
+            statement._annotations.get("synchronize_session", None) == "fetch"
+            and compiler.dialect.full_returning
+        ):
+            new_stmt = new_stmt.returning(*mapper.primary_key)
+
         UpdateDMLState.__init__(self, new_stmt, compiler, **kw)
 
         return self
 
     @classmethod
-    def _do_post_synchronize_evaluate(cls, session, update_options):
+    def _do_post_synchronize_evaluate(cls, session, result, update_options):
 
         states = set()
         evaluated_keys = list(update_options._value_evaluators.keys())
+        values = update_options._resolved_keys_as_propnames
+        attrib = set(k for k, v in values)
         for obj in update_options._matched_objects:
 
             state, dict_ = (
@@ -1941,9 +2019,15 @@ class BulkORMUpdate(UpdateDMLState, BulkUDCompileState):
                 attributes.instance_dict(obj),
             )
 
-            assert (
-                state.identity_token == update_options._refresh_identity_token
-            )
+            # the evaluated states were gathered across all identity tokens.
+            # however the post_sync events are called per identity token,
+            # so filter.
+            if (
+                update_options._refresh_identity_token is not None
+                and state.identity_token
+                != update_options._refresh_identity_token
+            ):
+                continue
 
             # only evaluate unmodified attributes
             to_evaluate = state.unmodified.intersection(evaluated_keys)
@@ -1954,38 +2038,64 @@ class BulkORMUpdate(UpdateDMLState, BulkUDCompileState):
 
             state._commit(dict_, list(to_evaluate))
 
-            # expire attributes with pending changes
-            # (there was no autoflush, so they are overwritten)
-            state._expire_attributes(
-                dict_, set(evaluated_keys).difference(to_evaluate)
-            )
+            to_expire = attrib.intersection(dict_).difference(to_evaluate)
+            if to_expire:
+                state._expire_attributes(dict_, to_expire)
+
             states.add(state)
         session._register_altered(states)
 
     @classmethod
-    def _do_post_synchronize_fetch(cls, session, update_options):
+    def _do_post_synchronize_fetch(cls, session, result, update_options):
         target_mapper = update_options._subject_mapper
 
-        states = set(
-            [
-                attributes.instance_state(session.identity_map[identity_key])
-                for identity_key in [
-                    target_mapper.identity_key_from_primary_key(
-                        list(primary_key),
-                        identity_token=update_options._refresh_identity_token,
-                    )
-                    for primary_key in update_options._matched_rows
+        states = set()
+        evaluated_keys = list(update_options._value_evaluators.keys())
+
+        if result.returns_rows:
+            matched_rows = [
+                tuple(row) + (update_options._refresh_identity_token,)
+                for row in result.all()
+            ]
+        else:
+            matched_rows = update_options._matched_rows
+
+        objs = [
+            session.identity_map[identity_key]
+            for identity_key in [
+                target_mapper.identity_key_from_primary_key(
+                    list(primary_key), identity_token=identity_token,
+                )
+                for primary_key, identity_token in [
+                    (row[0:-1], row[-1]) for row in matched_rows
                 ]
-                if identity_key in session.identity_map
+                if update_options._refresh_identity_token is None
+                or identity_token == update_options._refresh_identity_token
             ]
-        )
+            if identity_key in session.identity_map
+        ]
 
         values = update_options._resolved_keys_as_propnames
         attrib = set(k for k, v in values)
-        for state in states:
-            to_expire = attrib.intersection(state.dict)
+
+        for obj in objs:
+            state, dict_ = (
+                attributes.instance_state(obj),
+                attributes.instance_dict(obj),
+            )
+
+            to_evaluate = state.unmodified.intersection(evaluated_keys)
+            for key in to_evaluate:
+                dict_[key] = update_options._value_evaluators[key](obj)
+            state.manager.dispatch.refresh(state, None, to_evaluate)
+
+            state._commit(dict_, list(to_evaluate))
+
+            to_expire = attrib.intersection(dict_).difference(to_evaluate)
             if to_expire:
-                session._expire_state(state, to_expire)
+                state._expire_attributes(dict_, to_expire)
+
+            states.add(state)
         session._register_altered(states)
 
 
@@ -1995,14 +2105,24 @@ class BulkORMDelete(DeleteDMLState, BulkUDCompileState):
     def create_for_statement(cls, statement, compiler, **kw):
         self = cls.__new__(cls)
 
-        self.mapper = statement.table._annotations.get("parentmapper", None)
+        self.mapper = mapper = statement.table._annotations.get(
+            "parentmapper", None
+        )
+
+        if (
+            mapper
+            and statement._annotations.get("synchronize_session", None)
+            == "fetch"
+            and compiler.dialect.full_returning
+        ):
+            statement = statement.returning(*mapper.primary_key)
 
         DeleteDMLState.__init__(self, statement, compiler, **kw)
 
         return self
 
     @classmethod
-    def _do_post_synchronize_evaluate(cls, session, update_options):
+    def _do_post_synchronize_evaluate(cls, session, result, update_options):
 
         session._remove_newly_deleted(
             [
@@ -2012,15 +2132,25 @@ class BulkORMDelete(DeleteDMLState, BulkUDCompileState):
         )
 
     @classmethod
-    def _do_post_synchronize_fetch(cls, session, update_options):
+    def _do_post_synchronize_fetch(cls, session, result, update_options):
         target_mapper = update_options._subject_mapper
 
-        for primary_key in update_options._matched_rows:
+        if result.returns_rows:
+            matched_rows = [
+                tuple(row) + (update_options._refresh_identity_token,)
+                for row in result.all()
+            ]
+        else:
+            matched_rows = update_options._matched_rows
+
+        for row in matched_rows:
+            primary_key = row[0:-1]
+            identity_token = row[-1]
+
             # TODO: inline this and call remove_newly_deleted
             # once
             identity_key = target_mapper.identity_key_from_primary_key(
-                list(primary_key),
-                identity_token=update_options._refresh_identity_token,
+                list(primary_key), identity_token=identity_token,
             )
             if identity_key in session.identity_map:
                 session._remove_newly_deleted(
index 5ad8bcf2f2ffa78f20828ecdf39c31dcf86f0398..a398da7937dcd4ea597bc3f9e0b9e927100acc03 100644 (file)
@@ -116,6 +116,8 @@ class ORMExecuteState(util.MemoizedSlots):
         "_merged_execution_options",
         "bind_arguments",
         "_compile_state_cls",
+        "_starting_event_idx",
+        "_events_todo",
     )
 
     def __init__(
@@ -126,6 +128,7 @@ class ORMExecuteState(util.MemoizedSlots):
         execution_options,
         bind_arguments,
         compile_state_cls,
+        events_todo,
     ):
         self.session = session
         self.statement = statement
@@ -133,6 +136,10 @@ class ORMExecuteState(util.MemoizedSlots):
         self._execution_options = execution_options
         self.bind_arguments = bind_arguments
         self._compile_state_cls = compile_state_cls
+        self._events_todo = list(events_todo)
+
+    def _remaining_events(self):
+        return self._events_todo[self._starting_event_idx + 1 :]
 
     def invoke_statement(
         self,
@@ -200,7 +207,11 @@ class ORMExecuteState(util.MemoizedSlots):
             _execution_options = self._execution_options
 
         return self.session.execute(
-            statement, _params, _execution_options, _bind_arguments
+            statement,
+            _params,
+            _execution_options,
+            _bind_arguments,
+            _parent_execute_state=self,
         )
 
     @property
@@ -1376,6 +1387,8 @@ class Session(_SessionClassMethods):
         params=None,
         execution_options=util.immutabledict(),
         bind_arguments=None,
+        _parent_execute_state=None,
+        _add_event=None,
         **kw
     ):
         r"""Execute a SQL expression construct or string statement within
@@ -1521,8 +1534,16 @@ class Session(_SessionClassMethods):
             compile_state_cls = None
 
         if compile_state_cls is not None:
-            execution_options = compile_state_cls.orm_pre_session_exec(
-                self, statement, params, execution_options, bind_arguments
+            (
+                statement,
+                execution_options,
+            ) = compile_state_cls.orm_pre_session_exec(
+                self,
+                statement,
+                params,
+                execution_options,
+                bind_arguments,
+                _parent_execute_state is not None,
             )
         else:
             bind_arguments.setdefault("clause", statement)
@@ -1531,22 +1552,28 @@ class Session(_SessionClassMethods):
                     execution_options, {"future_result": True}
                 )
 
-        if self.dispatch.do_orm_execute:
-            # run this event whether or not we are in ORM mode
-            skip_events = bind_arguments.get("_sa_skip_events", False)
-            if not skip_events:
-                orm_exec_state = ORMExecuteState(
-                    self,
-                    statement,
-                    params,
-                    execution_options,
-                    bind_arguments,
-                    compile_state_cls,
-                )
-                for fn in self.dispatch.do_orm_execute:
-                    result = fn(orm_exec_state)
-                    if result:
-                        return result
+        if _parent_execute_state:
+            events_todo = _parent_execute_state._remaining_events()
+        else:
+            events_todo = self.dispatch.do_orm_execute
+            if _add_event:
+                events_todo = list(events_todo) + [_add_event]
+
+        if events_todo:
+            orm_exec_state = ORMExecuteState(
+                self,
+                statement,
+                params,
+                execution_options,
+                bind_arguments,
+                compile_state_cls,
+                events_todo,
+            )
+            for idx, fn in enumerate(events_todo):
+                orm_exec_state._starting_event_idx = idx
+                result = fn(orm_exec_state)
+                if result:
+                    return result
 
         bind = self.get_bind(**bind_arguments)
 
@@ -1729,7 +1756,12 @@ class Session(_SessionClassMethods):
         self._add_bind(table, bind)
 
     def get_bind(
-        self, mapper=None, clause=None, bind=None, _sa_skip_events=None
+        self,
+        mapper=None,
+        clause=None,
+        bind=None,
+        _sa_skip_events=None,
+        _sa_skip_for_implicit_returning=False,
     ):
         """Return a "bind" to which this :class:`.Session` is bound.
 
index 2d51e7c9b3555a5c9836fe4c9c3de0a17717cb57..163276ca9847a0b9845f15ec23f8c7b8527df6d8 100644 (file)
@@ -311,13 +311,31 @@ class SuiteRequirements(Requirements):
 
         return exclusions.open()
 
+    @property
+    def full_returning(self):
+        """target platform supports RETURNING completely, including
+        multiple rows returned.
+
+        """
+
+        return exclusions.only_if(
+            lambda config: config.db.dialect.full_returning,
+            "%(database)s %(does_support)s 'RETURNING of multiple rows'",
+        )
+
     @property
     def returning(self):
-        """target platform supports RETURNING."""
+        """target platform supports RETURNING for at least one row.
+
+        .. seealso::
+
+            :attr:`.Requirements.full_returning`
+
+        """
 
         return exclusions.only_if(
             lambda config: config.db.dialect.implicit_returning,
-            "%(database)s %(does_support)s 'returning'",
+            "%(database)s %(does_support)s 'RETURNING of a single row'",
         )
 
     @property
index 8f06220e2876ee07b1fe907775d53da43603090c..13e92f5c4dc65778ec1ff88d752ec5cbe019357a 100644 (file)
@@ -878,6 +878,7 @@ class JoinedEagerLoadTest(NoCache, fixtures.MappedTest):
                     {},
                     exec_opts,
                     bind_arguments,
+                    is_reentrant_invoke=False,
                 )
 
                 r = sess.connection().execute(
@@ -888,7 +889,7 @@ class JoinedEagerLoadTest(NoCache, fixtures.MappedTest):
 
                 r.context.compiled.compile_state = compile_state
                 obj = ORMCompileState.orm_setup_cursor_result(
-                    sess, compile_state.statement, exec_opts, {}, r
+                    sess, compile_state.statement, exec_opts, {}, r,
                 )
                 list(obj)
                 sess.close()
index 0fea029feae5faa82f1467c8eaf36f35340cf3c7..f1b54cb8fee868ddcd407d78c46ae21304953543 100644 (file)
@@ -1197,7 +1197,6 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
     @testing.provide_metadata
     def test_reflect_all(self):
         existing = inspect(testing.db).get_table_names()
-
         names = ["rt_%s" % name for name in ("a", "b", "c", "d", "e")]
         nameset = set(names)
         for name in names:
index c0029fbb6389b0b693fdef5c678ca5e97208c9fa..9855cd5ab1bfb68824c53cdcc96912dfb8274432 100644 (file)
@@ -35,8 +35,6 @@ from sqlalchemy.testing import provision
 from sqlalchemy.testing.engines import testing_engine
 from sqlalchemy.testing.engines import testing_reaper
 
-# TODO: ShardTest can be turned into a base for further subclasses
-
 
 class ShardTest(object):
     __skip_if__ = (lambda: util.win32,)
@@ -47,9 +45,9 @@ class ShardTest(object):
     def setUp(self):
         global db1, db2, db3, db4, weather_locations, weather_reports
 
-        db1, db2, db3, db4 = self._init_dbs()
+        db1, db2, db3, db4 = self._dbs = self._init_dbs()
 
-        meta = MetaData()
+        meta = self.metadata = MetaData()
         ids = Table("ids", meta, Column("nextid", Integer, nullable=False))
 
         def id_generator(ctx):
@@ -578,9 +576,11 @@ class ShardTest(object):
         temps = sess.execute(future_select(Report)).scalars().all()
         eq_(set(t.temperature for t in temps), {80.0, 75.0, 85.0})
 
+        # MARKMARK
+        # omitting the criteria so that the UPDATE affects three out of
+        # four shards
         sess.execute(
             update(Report)
-            .filter(Report.temperature >= 80)
             .values({"temperature": Report.temperature + 6},)
             .execution_options(synchronize_session="fetch")
         )
@@ -590,11 +590,11 @@ class ShardTest(object):
                 row.temperature
                 for row in sess.execute(future_select(Report.temperature))
             ),
-            {86.0, 75.0, 91.0},
+            {86.0, 81.0, 91.0},
         )
 
         # test synchronize session as well
-        eq_(set(t.temperature for t in temps), {86.0, 75.0, 91.0})
+        eq_(set(t.temperature for t in temps), {86.0, 81.0, 91.0})
 
     def test_bulk_delete_future_synchronize_evaluate(self):
         sess = self._fixture_data()
@@ -711,9 +711,8 @@ class TableNameConventionShardTest(ShardTest, fixtures.TestBase):
     This used to be called "AttachedFileShardTest" but I didn't see any
     ATTACH going on.
 
-    The approach taken by this test is awkward and I wouldn't recommend  using
-    this pattern in a real situation.  I'm not sure of the history of this test
-    but it likely predates when we knew how to use real ATTACH in SQLite.
+    A more modern approach here would be to use the schema_translate_map
+    option.
 
     """
 
@@ -742,6 +741,49 @@ class TableNameConventionShardTest(ShardTest, fixtures.TestBase):
         return db1, db2, db3, db4
 
 
+class MultipleDialectShardTest(ShardTest, fixtures.TestBase):
+    __only_on__ = "postgresql"
+
+    schema = "changeme"
+
+    def _init_dbs(self):
+        e1 = testing_engine("sqlite://")
+        with e1.connect() as conn:
+            for i in [1, 3]:
+                conn.exec_driver_sql(
+                    'ATTACH DATABASE "shard%s_%s.db" AS shard%s'
+                    % (i, provision.FOLLOWER_IDENT, i)
+                )
+
+        e2 = testing_engine()
+        with e2.connect() as conn:
+            for i in [2, 4]:
+                conn.exec_driver_sql(
+                    "CREATE SCHEMA IF NOT EXISTS shard%s" % (i,)
+                )
+
+        db1 = e1.execution_options(schema_translate_map={"changeme": "shard1"})
+        db2 = e2.execution_options(schema_translate_map={"changeme": "shard2"})
+        db3 = e1.execution_options(schema_translate_map={"changeme": "shard3"})
+        db4 = e2.execution_options(schema_translate_map={"changeme": "shard4"})
+
+        self.sqlite_engine = e1
+        self.postgresql_engine = e2
+        return db1, db2, db3, db4
+
+    def teardown(self):
+        clear_mappers()
+
+        self.sqlite_engine.connect().invalidate()
+        for i in [1, 3]:
+            os.remove("shard%d_%s.db" % (i, provision.FOLLOWER_IDENT))
+
+        with self.postgresql_engine.connect() as conn:
+            self.metadata.drop_all(conn)
+            for i in [2, 4]:
+                conn.exec_driver_sql("DROP SCHEMA shard%s CASCADE" % (i,))
+
+
 class SelectinloadRegressionTest(fixtures.DeclarativeMappedTest):
     """test #4175
     """
index 5bc05448653bff60a37a1284f638308b4094eb2c..20577d8e62f353f009b232d77817fefbf6591779 100644 (file)
@@ -8,6 +8,7 @@ from sqlalchemy import Integer
 from sqlalchemy import not_
 from sqlalchemy import or_
 from sqlalchemy import String
+from sqlalchemy import tuple_
 from sqlalchemy.orm import evaluator
 from sqlalchemy.orm import mapper
 from sqlalchemy.orm import relationship
@@ -19,7 +20,6 @@ from sqlalchemy.testing import is_
 from sqlalchemy.testing.schema import Column
 from sqlalchemy.testing.schema import Table
 
-
 compiler = evaluator.EvaluatorCompiler()
 
 
@@ -191,6 +191,54 @@ class EvaluateTest(fixtures.MappedTest):
             ],
         )
 
+    def test_in(self):
+        User = self.classes.User
+
+        eval_eq(
+            User.name.in_(["foo", "bar"]),
+            testcases=[
+                (User(id=1, name="foo"), True),
+                (User(id=2, name="bat"), False),
+                (User(id=1, name="bar"), True),
+                (User(id=1, name=None), None),
+            ],
+        )
+
+        eval_eq(
+            User.name.notin_(["foo", "bar"]),
+            testcases=[
+                (User(id=1, name="foo"), False),
+                (User(id=2, name="bat"), True),
+                (User(id=1, name="bar"), False),
+                (User(id=1, name=None), None),
+            ],
+        )
+
+    def test_in_tuples(self):
+        User = self.classes.User
+
+        eval_eq(
+            tuple_(User.id, User.name).in_([(1, "foo"), (2, "bar")]),
+            testcases=[
+                (User(id=1, name="foo"), True),
+                (User(id=2, name="bat"), False),
+                (User(id=1, name="bar"), False),
+                (User(id=2, name="bar"), True),
+                (User(id=1, name=None), None),
+            ],
+        )
+
+        eval_eq(
+            tuple_(User.id, User.name).notin_([(1, "foo"), (2, "bar")]),
+            testcases=[
+                (User(id=1, name="foo"), False),
+                (User(id=2, name="bat"), True),
+                (User(id=1, name="bar"), True),
+                (User(id=2, name="bar"), False),
+                (User(id=1, name=None), None),
+            ],
+        )
+
     def test_null_propagation(self):
         User = self.classes.User
 
index 12a8417ba4b29da4078e39b5aee38eb2261b0956..310b170479483a23a1a8c923eec1a878ec1b9029 100644 (file)
@@ -23,6 +23,9 @@ from sqlalchemy.testing import assert_raises
 from sqlalchemy.testing import assert_raises_message
 from sqlalchemy.testing import eq_
 from sqlalchemy.testing import fixtures
+from sqlalchemy.testing import in_
+from sqlalchemy.testing import not_in_
+from sqlalchemy.testing.assertsql import CompiledSQL
 from sqlalchemy.testing.schema import Column
 from sqlalchemy.testing.schema import Table
 
@@ -144,6 +147,50 @@ class UpdateDeleteTest(fixtures.MappedTest):
                 q.delete,
             )
 
+    def test_update_w_unevaluatable_value_evaluate(self):
+        """test that the "evaluate" strategy falls back to 'expire' for an
+        update SET that is not evaluable in Python."""
+
+        User = self.classes.User
+
+        s = Session()
+
+        jill = s.query(User).filter(User.name == "jill").one()
+
+        s.execute(
+            update(User)
+            .filter(User.name == "jill")
+            .values({"name": User.name + User.name}),
+            execution_options={"synchronize_session": "evaluate"},
+        )
+
+        eq_(jill.name, "jilljill")
+
+    def test_update_w_unevaluatable_value_fetch(self):
+        """test that the "fetch" strategy falls back to 'expire' for an
+        update SET that is not evaluable in Python.
+
+        Prior to 1.4 the "fetch" strategy used expire for everything
+        but now it tries to evaluate a SET clause to avoid a round
+        trip.
+
+        """
+
+        User = self.classes.User
+
+        s = Session()
+
+        jill = s.query(User).filter(User.name == "jill").one()
+
+        s.execute(
+            update(User)
+            .filter(User.name == "jill")
+            .values({"name": User.name + User.name}),
+            execution_options={"synchronize_session": "fetch"},
+        )
+
+        eq_(jill.name, "jilljill")
+
     def test_evaluate_clauseelement(self):
         User = self.classes.User
 
@@ -479,6 +526,87 @@ class UpdateDeleteTest(fixtures.MappedTest):
             list(zip([25, 37, 29, 27])),
         )
 
+    def test_update_fetch_returning(self):
+        User = self.classes.User
+
+        sess = Session()
+
+        john, jack, jill, jane = sess.query(User).order_by(User.id).all()
+
+        with self.sql_execution_asserter() as asserter:
+            sess.query(User).filter(User.age > 29).update(
+                {"age": User.age - 10}, synchronize_session="fetch"
+            )
+
+            # these are simple values, these are now evaluated even with
+            # the "fetch" strategy, new in 1.4, so there is no expiry
+            eq_([john.age, jack.age, jill.age, jane.age], [25, 37, 29, 27])
+
+        if testing.db.dialect.full_returning:
+            asserter.assert_(
+                CompiledSQL(
+                    "UPDATE users SET age_int=(users.age_int - %(age_int_1)s) "
+                    "WHERE users.age_int > %(age_int_2)s RETURNING users.id",
+                    [{"age_int_1": 10, "age_int_2": 29}],
+                    dialect="postgresql",
+                ),
+            )
+        else:
+            asserter.assert_(
+                CompiledSQL(
+                    "SELECT users.id FROM users "
+                    "WHERE users.age_int > :age_int_1",
+                    [{"age_int_1": 29}],
+                ),
+                CompiledSQL(
+                    "UPDATE users SET age_int=(users.age_int - :age_int_1) "
+                    "WHERE users.age_int > :age_int_2",
+                    [{"age_int_1": 10, "age_int_2": 29}],
+                ),
+            )
+
+    def test_delete_fetch_returning(self):
+        User = self.classes.User
+
+        sess = Session()
+
+        john, jack, jill, jane = sess.query(User).order_by(User.id).all()
+
+        in_(john, sess)
+        in_(jack, sess)
+
+        with self.sql_execution_asserter() as asserter:
+            sess.query(User).filter(User.age > 29).delete(
+                synchronize_session="fetch"
+            )
+
+        if testing.db.dialect.full_returning:
+            asserter.assert_(
+                CompiledSQL(
+                    "DELETE FROM users WHERE users.age_int > %(age_int_1)s "
+                    "RETURNING users.id",
+                    [{"age_int_1": 29}],
+                    dialect="postgresql",
+                ),
+            )
+        else:
+            asserter.assert_(
+                CompiledSQL(
+                    "SELECT users.id FROM users "
+                    "WHERE users.age_int > :age_int_1",
+                    [{"age_int_1": 29}],
+                ),
+                CompiledSQL(
+                    "DELETE FROM users WHERE users.age_int > :age_int_1",
+                    [{"age_int_1": 29}],
+                ),
+            )
+
+        in_(john, sess)
+        not_in_(jack, sess)
+        in_(jill, sess)
+        not_in_(jane, sess)
+
     def test_update_without_load(self):
         User = self.classes.User
 
index f856c15a4c978e9163e0b64239e76d30a8f37560..90c21ed4548650344c53c52bc702e2b2965ab04f 100644 (file)
@@ -129,6 +129,32 @@ class ReturningTest(fixtures.TestBase, AssertsExecutionResults):
         )
         eq_(result2.fetchall(), [(1, True), (2, False)])
 
+    @testing.requires.full_returning
+    def test_update_full_returning(self, connection):
+        connection.execute(
+            table.insert(),
+            [{"persons": 5, "full": False}, {"persons": 3, "full": False}],
+        )
+
+        result = connection.execute(
+            table.update(table.c.persons > 2)
+            .values(full=True)
+            .returning(table.c.id, table.c.full)
+        )
+        eq_(result.fetchall(), [(1, True), (2, True)])
+
+    @testing.requires.full_returning
+    def test_delete_full_returning(self, connection):
+        connection.execute(
+            table.insert(),
+            [{"persons": 5, "full": False}, {"persons": 3, "full": False}],
+        )
+
+        result = connection.execute(
+            table.delete().returning(table.c.id, table.c.full)
+        )
+        eq_(result.fetchall(), [(1, False), (2, False)])
+
     def test_insert_returning(self, connection):
         result = connection.execute(
             table.insert().returning(table.c.id), {"persons": 1, "full": False}
@@ -474,13 +500,6 @@ class ImplicitReturningFlag(fixtures.TestBase):
         testing.requires.returning(go)()
         e = engines.testing_engine()
 
-        # starts as False.  This is because all of Firebird,
-        # PostgreSQL, Oracle, SQL Server started supporting RETURNING
-        # as of a certain version, and the flag is not set until
-        # version detection occurs.  If some DB comes along that has
-        # RETURNING in all cases, this test can be adjusted.
-        assert e.dialect.implicit_returning is False
-
         # version detection on connect sets it
         c = e.connect()
         c.close()