]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Send deterministic ordering into unit of work topological
authorMike Bayer <mike_mp@zzzcomputing.com>
Mon, 30 Nov 2020 17:50:51 +0000 (12:50 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sat, 12 Dec 2020 00:19:17 +0000 (19:19 -0500)
Improved the unit of work topological sorting system such that the
toplogical sort is now deterministic based on the sorting of the input set,
which itself is now sorted at the level of mappers, so that the same inputs
of affected mappers should produce the same output every time, among
mappers / tables that don't have any dependency on each other. This further
reduces the chance of deadlocks as can be observed in a flush that UPDATEs
among multiple, unrelated tables such that row locks are generated.

topological.sort() has been made "deterministic" in all cases by
using a separate list + set.

Fixes: #5735
Change-Id: I073103df414dba549e46605b394f8ccae6e80d0e

doc/build/changelog/unreleased_14/5735.rst [new file with mode: 0644]
lib/sqlalchemy/orm/dependency.py
lib/sqlalchemy/orm/mapper.py
lib/sqlalchemy/orm/unitofwork.py
lib/sqlalchemy/sql/ddl.py
lib/sqlalchemy/util/topological.py
test/base/test_dependency.py
test/orm/test_unitofwork.py
test/orm/test_unitofworkv2.py

diff --git a/doc/build/changelog/unreleased_14/5735.rst b/doc/build/changelog/unreleased_14/5735.rst
new file mode 100644 (file)
index 0000000..8c9bc78
--- /dev/null
@@ -0,0 +1,12 @@
+.. change::
+    :tags: bug, orm, unitofwork
+    :tickets: 5735
+
+    Improved the unit of work topological sorting system such that the
+    toplogical sort is now deterministic based on the sorting of the input set,
+    which itself is now sorted at the level of mappers, so that the same inputs
+    of affected mappers should produce the same output every time, among
+    mappers / tables that don't have any dependency on each other. This further
+    reduces the chance of deadlocks as can be observed in a flush that UPDATEs
+    among multiple, unrelated tables such that row locks are generated.
+
index d4680e394db4d28d25bb608eefb1b5aba47840fb..9c2c5ade3f4c16cfb224de0abc1cec9a3a606b58 100644 (file)
@@ -43,6 +43,7 @@ class DependencyProcessor(object):
         else:
             self._passive_update_flag = attributes.PASSIVE_OFF
 
+        self.sort_key = "%s_%s" % (self.parent._sort_key, prop.key)
         self.key = prop.key
         if not self.prop.synchronize_pairs:
             raise sa_exc.ArgumentError(
index e73993e5061aa44c03fffe68ac0820d5b86015bd..e501838940bcfedcd6146db2d2fcc8e3be4985a0 100644 (file)
@@ -574,6 +574,10 @@ class Mapper(
         """
 
         self.class_ = util.assert_arg_type(class_, type, "class_")
+        self._sort_key = "%s.%s" % (
+            self.class_.__module__,
+            self.class_.__name__,
+        )
 
         self.class_manager = None
 
index 9c67130ced4d192e9e4d5d0d6437cb49939a64a5..868f8e087fa10f15c957d7c75d1151f7a130ed9b 100644 (file)
@@ -422,6 +422,10 @@ class UOWTransaction(object):
     def execute(self):
         postsort_actions = self._generate_actions()
 
+        postsort_actions = sorted(
+            postsort_actions,
+            key=lambda item: item.sort_key,
+        )
         # sort = topological.sort(self.dependencies, postsort_actions)
         # print "--------------"
         # print "\ndependencies:", self.dependencies
@@ -431,9 +435,10 @@ class UOWTransaction(object):
 
         # execute
         if self.cycles:
-            for set_ in topological.sort_as_subsets(
+            for subset in topological.sort_as_subsets(
                 self.dependencies, postsort_actions
             ):
+                set_ = set(subset)
                 while set_:
                     n = set_.pop()
                     n.execute_aggregate(self, set_)
@@ -542,10 +547,15 @@ class PostSortRec(object):
 
 
 class ProcessAll(IterateMappersMixin, PostSortRec):
-    __slots__ = "dependency_processor", "isdelete", "fromparent"
+    __slots__ = "dependency_processor", "isdelete", "fromparent", "sort_key"
 
     def __init__(self, uow, dependency_processor, isdelete, fromparent):
         self.dependency_processor = dependency_processor
+        self.sort_key = (
+            "ProcessAll",
+            self.dependency_processor.sort_key,
+            isdelete,
+        )
         self.isdelete = isdelete
         self.fromparent = fromparent
         uow.deps[dependency_processor.parent.base_mapper].add(
@@ -582,11 +592,12 @@ class ProcessAll(IterateMappersMixin, PostSortRec):
 
 
 class PostUpdateAll(PostSortRec):
-    __slots__ = "mapper", "isdelete"
+    __slots__ = "mapper", "isdelete", "sort_key"
 
     def __init__(self, uow, mapper, isdelete):
         self.mapper = mapper
         self.isdelete = isdelete
+        self.sort_key = ("PostUpdateAll", mapper._sort_key, isdelete)
 
     @util.preload_module("sqlalchemy.orm.persistence")
     def execute(self, uow):
@@ -598,10 +609,11 @@ class PostUpdateAll(PostSortRec):
 
 
 class SaveUpdateAll(PostSortRec):
-    __slots__ = ("mapper",)
+    __slots__ = ("mapper", "sort_key")
 
     def __init__(self, uow, mapper):
         self.mapper = mapper
+        self.sort_key = ("SaveUpdateAll", mapper._sort_key)
         assert mapper is mapper.base_mapper
 
     @util.preload_module("sqlalchemy.orm.persistence")
@@ -634,10 +646,11 @@ class SaveUpdateAll(PostSortRec):
 
 
 class DeleteAll(PostSortRec):
-    __slots__ = ("mapper",)
+    __slots__ = ("mapper", "sort_key")
 
     def __init__(self, uow, mapper):
         self.mapper = mapper
+        self.sort_key = ("DeleteAll", mapper._sort_key)
         assert mapper is mapper.base_mapper
 
     @util.preload_module("sqlalchemy.orm.persistence")
@@ -670,10 +683,11 @@ class DeleteAll(PostSortRec):
 
 
 class ProcessState(PostSortRec):
-    __slots__ = "dependency_processor", "isdelete", "state"
+    __slots__ = "dependency_processor", "isdelete", "state", "sort_key"
 
     def __init__(self, uow, dependency_processor, isdelete, state):
         self.dependency_processor = dependency_processor
+        self.sort_key = ("ProcessState", dependency_processor.sort_key)
         self.isdelete = isdelete
         self.state = state
 
@@ -705,11 +719,12 @@ class ProcessState(PostSortRec):
 
 
 class SaveUpdateState(PostSortRec):
-    __slots__ = "state", "mapper"
+    __slots__ = "state", "mapper", "sort_key"
 
     def __init__(self, uow, state):
         self.state = state
         self.mapper = state.mapper.base_mapper
+        self.sort_key = ("ProcessState", self.mapper._sort_key)
 
     @util.preload_module("sqlalchemy.orm.persistence")
     def execute_aggregate(self, uow, recs):
@@ -732,11 +747,12 @@ class SaveUpdateState(PostSortRec):
 
 
 class DeleteState(PostSortRec):
-    __slots__ = "state", "mapper"
+    __slots__ = "state", "mapper", "sort_key"
 
     def __init__(self, uow, state):
         self.state = state
         self.mapper = state.mapper.base_mapper
+        self.sort_key = ("DeleteState", self.mapper._sort_key)
 
     @util.preload_module("sqlalchemy.orm.persistence")
     def execute_aggregate(self, uow, recs):
index f1012292b89f0768c2e70aee2c5d17d71ca3068e..e0dd6faf7ed5d7e9e11f9afbc8bcd3bea2e4d48f 100644 (file)
@@ -1171,7 +1171,6 @@ def sort_tables_and_constraints(
             topological.sort(
                 fixed_dependencies.union(mutable_dependencies),
                 tables,
-                deterministic_order=True,
             )
         )
     except exc.CircularDependencyError as err:
@@ -1203,7 +1202,6 @@ def sort_tables_and_constraints(
             topological.sort(
                 fixed_dependencies.union(mutable_dependencies),
                 tables,
-                deterministic_order=True,
             )
         )
 
index 4d6ef22ecf53489f0129da8ab9217dcf4fded3f0..b009a8ce2f4832b9f0370c144cae59f51218156b 100644 (file)
 from .. import util
 from ..exc import CircularDependencyError
 
-
 __all__ = ["sort", "sort_as_subsets", "find_cycles"]
 
 
-def sort_as_subsets(tuples, allitems, deterministic_order=False):
+def sort_as_subsets(tuples, allitems):
 
     edges = util.defaultdict(set)
     for parent, child in tuples:
         edges[child].add(parent)
 
-    Set = util.OrderedSet if deterministic_order else set
-
-    todo = Set(allitems)
+    todo = list(allitems)
+    todo_set = set(allitems)
 
-    while todo:
-        output = Set()
+    while todo_set:
+        output = []
         for node in todo:
-            if todo.isdisjoint(edges[node]):
-                output.add(node)
+            if todo_set.isdisjoint(edges[node]):
+                output.append(node)
 
         if not output:
             raise CircularDependencyError(
@@ -37,18 +35,23 @@ def sort_as_subsets(tuples, allitems, deterministic_order=False):
                 _gen_edges(edges),
             )
 
-        todo.difference_update(output)
+        todo_set.difference_update(output)
+        todo = [t for t in todo if t in todo_set]
         yield output
 
 
-def sort(tuples, allitems, deterministic_order=False):
+def sort(tuples, allitems, deterministic_order=True):
     """sort the given list of items by dependency.
 
     'tuples' is a list of tuples representing a partial ordering.
-    'deterministic_order' keeps items within a dependency tier in list order.
+
+    deterministic_order is no longer used, the order is now always
+    deterministic given the order of "allitems".    the flag is there
+    for backwards compatibility with Alembic.
+
     """
 
-    for set_ in sort_as_subsets(tuples, allitems, deterministic_order):
+    for set_ in sort_as_subsets(tuples, allitems):
         for s in set_:
             yield s
 
index 7af7fdc7111cd641c05901dd373f5a60326d2cd2..9250c3334136398427565a04cb54efe6227b47e7 100644 (file)
@@ -16,9 +16,7 @@ class DependencySortTest(fixtures.TestBase):
         assert conforms_partial_ordering(tuples, result)
 
     def assert_sort_deterministic(self, tuples, allitems, expected):
-        result = list(
-            topological.sort(tuples, allitems, deterministic_order=True)
-        )
+        result = list(topological.sort(tuples, allitems))
         assert conforms_partial_ordering(tuples, result)
         assert result == expected
 
index eb5530ef4229f45d192e5162f465b885492d041d..7583b9d22c73edb7ad4d9ab5aee76401c6ebaa29 100644 (file)
@@ -2531,16 +2531,14 @@ class ManyToManyTest(_fixtures.FixtureTest):
         self.assert_sql_execution(
             testing.db,
             session.flush,
-            AllOf(
-                CompiledSQL(
-                    "UPDATE items SET description=:description "
-                    "WHERE items.id = :items_id",
-                    {"description": "item4updated", "items_id": objects[4].id},
-                ),
-                CompiledSQL(
-                    "INSERT INTO keywords (name) " "VALUES (:name)",
-                    {"name": "yellow"},
-                ),
+            CompiledSQL(
+                "UPDATE items SET description=:description "
+                "WHERE items.id = :items_id",
+                {"description": "item4updated", "items_id": objects[4].id},
+            ),
+            CompiledSQL(
+                "INSERT INTO keywords (name) " "VALUES (:name)",
+                {"name": "yellow"},
             ),
             CompiledSQL(
                 "INSERT INTO item_keywords (item_id, keyword_id) "
index ed320db10426b89baed301b11a575981616633c6..b8701cb6a8e0f72d1654a0990fcf9ec39317dc17 100644 (file)
@@ -681,15 +681,13 @@ class RudimentaryFlushTest(UOWTest):
         self.assert_sql_execution(
             testing.db,
             sess.flush,
-            AllOf(
-                CompiledSQL(
-                    "INSERT INTO keywords (name) VALUES (:name)",
-                    {"name": "k1"},
-                ),
-                CompiledSQL(
-                    "INSERT INTO items (description) VALUES (:description)",
-                    {"description": "i1"},
-                ),
+            CompiledSQL(
+                "INSERT INTO items (description) VALUES (:description)",
+                {"description": "i1"},
+            ),
+            CompiledSQL(
+                "INSERT INTO keywords (name) VALUES (:name)",
+                {"name": "k1"},
             ),
             CompiledSQL(
                 "INSERT INTO item_keywords (item_id, keyword_id) "
@@ -873,15 +871,13 @@ class SingleCycleTest(UOWTest):
         self.assert_sql_execution(
             testing.db,
             sess.flush,
-            AllOf(
-                CompiledSQL(
-                    "UPDATE nodes SET parent_id=:parent_id "
-                    "WHERE nodes.id = :nodes_id",
-                    lambda ctx: [
-                        {"nodes_id": n3.id, "parent_id": None},
-                        {"nodes_id": n2.id, "parent_id": None},
-                    ],
-                )
+            CompiledSQL(
+                "UPDATE nodes SET parent_id=:parent_id "
+                "WHERE nodes.id = :nodes_id",
+                lambda ctx: [
+                    {"nodes_id": n3.id, "parent_id": None},
+                    {"nodes_id": n2.id, "parent_id": None},
+                ],
             ),
             CompiledSQL(
                 "DELETE FROM nodes WHERE nodes.id = :id",