--- /dev/null
+.. change::
+ :tags: bug, orm, unitofwork
+ :tickets: 5735
+
+ Improved the unit of work topological sorting system such that the
+ toplogical sort is now deterministic based on the sorting of the input set,
+ which itself is now sorted at the level of mappers, so that the same inputs
+ of affected mappers should produce the same output every time, among
+ mappers / tables that don't have any dependency on each other. This further
+ reduces the chance of deadlocks as can be observed in a flush that UPDATEs
+ among multiple, unrelated tables such that row locks are generated.
+
else:
self._passive_update_flag = attributes.PASSIVE_OFF
+ self.sort_key = "%s_%s" % (self.parent._sort_key, prop.key)
self.key = prop.key
if not self.prop.synchronize_pairs:
raise sa_exc.ArgumentError(
"""
self.class_ = util.assert_arg_type(class_, type, "class_")
+ self._sort_key = "%s.%s" % (
+ self.class_.__module__,
+ self.class_.__name__,
+ )
self.class_manager = None
def execute(self):
postsort_actions = self._generate_actions()
+ postsort_actions = sorted(
+ postsort_actions,
+ key=lambda item: item.sort_key,
+ )
# sort = topological.sort(self.dependencies, postsort_actions)
# print "--------------"
# print "\ndependencies:", self.dependencies
# execute
if self.cycles:
- for set_ in topological.sort_as_subsets(
+ for subset in topological.sort_as_subsets(
self.dependencies, postsort_actions
):
+ set_ = set(subset)
while set_:
n = set_.pop()
n.execute_aggregate(self, set_)
class ProcessAll(IterateMappersMixin, PostSortRec):
- __slots__ = "dependency_processor", "isdelete", "fromparent"
+ __slots__ = "dependency_processor", "isdelete", "fromparent", "sort_key"
def __init__(self, uow, dependency_processor, isdelete, fromparent):
self.dependency_processor = dependency_processor
+ self.sort_key = (
+ "ProcessAll",
+ self.dependency_processor.sort_key,
+ isdelete,
+ )
self.isdelete = isdelete
self.fromparent = fromparent
uow.deps[dependency_processor.parent.base_mapper].add(
class PostUpdateAll(PostSortRec):
- __slots__ = "mapper", "isdelete"
+ __slots__ = "mapper", "isdelete", "sort_key"
def __init__(self, uow, mapper, isdelete):
self.mapper = mapper
self.isdelete = isdelete
+ self.sort_key = ("PostUpdateAll", mapper._sort_key, isdelete)
@util.preload_module("sqlalchemy.orm.persistence")
def execute(self, uow):
class SaveUpdateAll(PostSortRec):
- __slots__ = ("mapper",)
+ __slots__ = ("mapper", "sort_key")
def __init__(self, uow, mapper):
self.mapper = mapper
+ self.sort_key = ("SaveUpdateAll", mapper._sort_key)
assert mapper is mapper.base_mapper
@util.preload_module("sqlalchemy.orm.persistence")
class DeleteAll(PostSortRec):
- __slots__ = ("mapper",)
+ __slots__ = ("mapper", "sort_key")
def __init__(self, uow, mapper):
self.mapper = mapper
+ self.sort_key = ("DeleteAll", mapper._sort_key)
assert mapper is mapper.base_mapper
@util.preload_module("sqlalchemy.orm.persistence")
class ProcessState(PostSortRec):
- __slots__ = "dependency_processor", "isdelete", "state"
+ __slots__ = "dependency_processor", "isdelete", "state", "sort_key"
def __init__(self, uow, dependency_processor, isdelete, state):
self.dependency_processor = dependency_processor
+ self.sort_key = ("ProcessState", dependency_processor.sort_key)
self.isdelete = isdelete
self.state = state
class SaveUpdateState(PostSortRec):
- __slots__ = "state", "mapper"
+ __slots__ = "state", "mapper", "sort_key"
def __init__(self, uow, state):
self.state = state
self.mapper = state.mapper.base_mapper
+ self.sort_key = ("ProcessState", self.mapper._sort_key)
@util.preload_module("sqlalchemy.orm.persistence")
def execute_aggregate(self, uow, recs):
class DeleteState(PostSortRec):
- __slots__ = "state", "mapper"
+ __slots__ = "state", "mapper", "sort_key"
def __init__(self, uow, state):
self.state = state
self.mapper = state.mapper.base_mapper
+ self.sort_key = ("DeleteState", self.mapper._sort_key)
@util.preload_module("sqlalchemy.orm.persistence")
def execute_aggregate(self, uow, recs):
topological.sort(
fixed_dependencies.union(mutable_dependencies),
tables,
- deterministic_order=True,
)
)
except exc.CircularDependencyError as err:
topological.sort(
fixed_dependencies.union(mutable_dependencies),
tables,
- deterministic_order=True,
)
)
from .. import util
from ..exc import CircularDependencyError
-
__all__ = ["sort", "sort_as_subsets", "find_cycles"]
-def sort_as_subsets(tuples, allitems, deterministic_order=False):
+def sort_as_subsets(tuples, allitems):
edges = util.defaultdict(set)
for parent, child in tuples:
edges[child].add(parent)
- Set = util.OrderedSet if deterministic_order else set
-
- todo = Set(allitems)
+ todo = list(allitems)
+ todo_set = set(allitems)
- while todo:
- output = Set()
+ while todo_set:
+ output = []
for node in todo:
- if todo.isdisjoint(edges[node]):
- output.add(node)
+ if todo_set.isdisjoint(edges[node]):
+ output.append(node)
if not output:
raise CircularDependencyError(
_gen_edges(edges),
)
- todo.difference_update(output)
+ todo_set.difference_update(output)
+ todo = [t for t in todo if t in todo_set]
yield output
-def sort(tuples, allitems, deterministic_order=False):
+def sort(tuples, allitems, deterministic_order=True):
"""sort the given list of items by dependency.
'tuples' is a list of tuples representing a partial ordering.
- 'deterministic_order' keeps items within a dependency tier in list order.
+
+ deterministic_order is no longer used, the order is now always
+ deterministic given the order of "allitems". the flag is there
+ for backwards compatibility with Alembic.
+
"""
- for set_ in sort_as_subsets(tuples, allitems, deterministic_order):
+ for set_ in sort_as_subsets(tuples, allitems):
for s in set_:
yield s
assert conforms_partial_ordering(tuples, result)
def assert_sort_deterministic(self, tuples, allitems, expected):
- result = list(
- topological.sort(tuples, allitems, deterministic_order=True)
- )
+ result = list(topological.sort(tuples, allitems))
assert conforms_partial_ordering(tuples, result)
assert result == expected
self.assert_sql_execution(
testing.db,
session.flush,
- AllOf(
- CompiledSQL(
- "UPDATE items SET description=:description "
- "WHERE items.id = :items_id",
- {"description": "item4updated", "items_id": objects[4].id},
- ),
- CompiledSQL(
- "INSERT INTO keywords (name) " "VALUES (:name)",
- {"name": "yellow"},
- ),
+ CompiledSQL(
+ "UPDATE items SET description=:description "
+ "WHERE items.id = :items_id",
+ {"description": "item4updated", "items_id": objects[4].id},
+ ),
+ CompiledSQL(
+ "INSERT INTO keywords (name) " "VALUES (:name)",
+ {"name": "yellow"},
),
CompiledSQL(
"INSERT INTO item_keywords (item_id, keyword_id) "
self.assert_sql_execution(
testing.db,
sess.flush,
- AllOf(
- CompiledSQL(
- "INSERT INTO keywords (name) VALUES (:name)",
- {"name": "k1"},
- ),
- CompiledSQL(
- "INSERT INTO items (description) VALUES (:description)",
- {"description": "i1"},
- ),
+ CompiledSQL(
+ "INSERT INTO items (description) VALUES (:description)",
+ {"description": "i1"},
+ ),
+ CompiledSQL(
+ "INSERT INTO keywords (name) VALUES (:name)",
+ {"name": "k1"},
),
CompiledSQL(
"INSERT INTO item_keywords (item_id, keyword_id) "
self.assert_sql_execution(
testing.db,
sess.flush,
- AllOf(
- CompiledSQL(
- "UPDATE nodes SET parent_id=:parent_id "
- "WHERE nodes.id = :nodes_id",
- lambda ctx: [
- {"nodes_id": n3.id, "parent_id": None},
- {"nodes_id": n2.id, "parent_id": None},
- ],
- )
+ CompiledSQL(
+ "UPDATE nodes SET parent_id=:parent_id "
+ "WHERE nodes.id = :nodes_id",
+ lambda ctx: [
+ {"nodes_id": n3.id, "parent_id": None},
+ {"nodes_id": n2.id, "parent_id": None},
+ ],
),
CompiledSQL(
"DELETE FROM nodes WHERE nodes.id = :id",