from sqlalchemy import util, log, topological
from sqlalchemy.orm import attributes, interfaces
from sqlalchemy.orm import util as mapperutil
-from sqlalchemy.orm.mapper import _state_mapper
+from sqlalchemy.orm.util import _state_mapper
# Load lazily
object_session = None
self.key = key
def append(self, state, item, initiator):
- # process "save_update" cascade rules for when an instance is appended to the list of another instance
+ # process "save_update" cascade rules for when
+ # an instance is appended to the list of another instance
sess = _state_session(state)
if sess:
prop = _state_mapper(state).get_property(self.key)
"""Handles the details of organizing and executing transaction
tasks during a UnitOfWork object's flush() operation.
- The central operation is to form a graph of nodes represented by the
- ``UOWTask`` class, which is then traversed by a ``UOWExecutor`` object
- that issues SQL and instance-synchronizing operations via the related
- packages.
"""
def __init__(self, session):
self.session = session
self.mapper_flush_opts = session._mapper_flush_opts
- # stores tuples of mapper/dependent mapper pairs,
- # representing a partial ordering fed into topological sort
- self.dependencies = set()
-
- # dictionary of mappers to UOWTasks
- self.tasks = {}
-
# dictionary used by external actors to store arbitrary state
# information.
self.attributes = {}
- self.processors = set()
-
+ self.recs = []
+ self.states = set()
+ self.dependencies = []
+
+ def _dependency(self, rec1, rec2):
+ self.dependencies.append((rec1, rec2))
+
def get_attribute_history(self, state, key, passive=True):
hashkey = ("history", state, key)
return history.as_state()
def register_object(self, state, isdelete=False,
- listonly=False, postupdate=False, post_update_cols=None):
+ listonly=False, postupdate=False,
+ post_update_cols=None):
# if object is not in the overall session, do nothing
if not self.session._contains_state(state):
return
-
- mapper = _state_mapper(state)
-
- task = self.get_task_by_mapper(mapper)
- if postupdate:
- task.append_postupdate(state, post_update_cols)
- else:
- task.append(state, listonly=listonly, isdelete=isdelete)
-
- # ensure the mapper for this object has had its
- # DependencyProcessors added.
- if mapper not in self.processors:
- mapper._register_processors(self)
- self.processors.add(mapper)
-
- if mapper.base_mapper not in self.processors:
- mapper.base_mapper._register_processors(self)
- self.processors.add(mapper.base_mapper)
-
- def set_row_switch(self, state):
- """mark a deleted object as a 'row switch'.
-
- this indicates that an INSERT statement elsewhere corresponds to this DELETE;
- the INSERT is converted to an UPDATE and the DELETE does not occur.
- """
- mapper = _state_mapper(state)
- task = self.get_task_by_mapper(mapper)
- taskelement = task._objects[state]
- taskelement.isdelete = "rowswitch"
-
- def is_deleted(self, state):
- """return true if the given state is marked as deleted within this UOWTransaction."""
-
+ if state in self.states:
+ return
+
mapper = _state_mapper(state)
- task = self.get_task_by_mapper(mapper)
- return task.is_deleted(state)
-
- def get_task_by_mapper(self, mapper, dontcreate=False):
- """return UOWTask element corresponding to the given mapper.
-
- Will create a new UOWTask, including a UOWTask corresponding to the
- "base" inherited mapper, if needed, unless the dontcreate flag is True.
- """
- try:
- return self.tasks[mapper]
- except KeyError:
- if dontcreate:
- return None
-
- base_mapper = mapper.base_mapper
- if base_mapper in self.tasks:
- base_task = self.tasks[base_mapper]
- else:
- self.tasks[base_mapper] = base_task = UOWTask(self, base_mapper)
- base_mapper._register_dependencies(self)
-
- if mapper not in self.tasks:
- self.tasks[mapper] = task = UOWTask(self, mapper, base_task=base_task)
- mapper._register_dependencies(self)
- else:
- task = self.tasks[mapper]
-
- return task
-
- def register_dependency(self, mapper, dependency):
- """register a dependency between two mappers.
-
- Called by ``mapper.PropertyLoader`` to register the objects
- handled by one mapper being dependent on the objects handled
- by another.
-
- """
- # correct for primary mapper
- # also convert to the "base mapper", the parentmost task at the top of an inheritance chain
- # dependency sorting is done via non-inheriting mappers only, dependencies between mappers
- # in the same inheritance chain is done at the per-object level
- mapper = mapper.primary_mapper().base_mapper
- dependency = dependency.primary_mapper().base_mapper
-
- self.dependencies.add((mapper, dependency))
-
- def register_processor(self, mapper, processor, mapperfrom):
- """register a dependency processor, corresponding to
- operations which occur between two mappers.
+ self.states.add(state)
- """
- # correct for primary mapper
- mapper = mapper.primary_mapper()
- mapperfrom = mapperfrom.primary_mapper()
-
- task = self.get_task_by_mapper(mapper)
- targettask = self.get_task_by_mapper(mapperfrom)
- up = UOWDependencyProcessor(processor, targettask)
- task.dependencies.add(up)
-
- def execute(self):
- """Execute this UOWTransaction.
-
- This will organize all collected UOWTasks into a dependency-sorted
- list which is then traversed using the traversal scheme
- encoded in the UOWExecutor class. Operations to mappers and dependency
- processors are fired off in order to issue SQL to the database and
- synchronize instance attributes with database values and related
- foreign key values."""
-
- # pre-execute dependency processors. this process may
- # result in new tasks, objects and/or dependency processors being added,
- # particularly with 'delete-orphan' cascade rules.
- # keep running through the full list of tasks until all
- # objects have been processed.
- while True:
- ret = False
- for task in self.tasks.values():
- for up in list(task.dependencies):
- if up.preexecute(self):
- ret = True
- if not ret:
- break
-
- tasks = self._sort_dependencies()
- if self._should_log_info():
- self.logger.info("Task dump:\n%s", self._dump(tasks))
- UOWExecutor().execute(self, tasks)
- self.logger.info("Execute Complete")
-
- def _dump(self, tasks):
- from uowdumper import UOWDumper
- return UOWDumper.dump(tasks)
-
- @property
- def elements(self):
- """Iterate UOWTaskElements."""
+ self.recs.extend(
+ mapper.get_flush_actions(self, state)
+ )
- for task in self.tasks.itervalues():
- for elem in task.elements:
- yield elem
+
+ def execute(self):
+ # so here, thinking we could figure out a way to get
+ # consecutive, "compatible" records to collapse together,
+ # i.e. a bunch of updates become an executemany(), etc.
+ # even though we usually need individual executes.
+ for rec in topological.sort(self.dependencies, self.recs):
+ rec.execute()
def finalize_flush_changes(self):
"""mark processed objects as clean / deleted after a successful flush().
elif not elem.listonly:
self.session._register_newly_persistent(elem.state)
- def _sort_dependencies(self):
- nodes = topological.sort_with_cycles(self.dependencies,
- [t.mapper for t in self.tasks.itervalues() if t.base_task is t]
- )
-
- ret = []
- for item, cycles in nodes:
- task = self.get_task_by_mapper(item)
- if cycles:
- for t in task._sort_circular_dependencies(
- self,
- [self.get_task_by_mapper(i) for i in cycles]
- ):
- ret.append(t)
- else:
- ret.append(task)
-
- return ret
-
log.class_logger(UOWTransaction)
-class UOWTask(object):
- """A collection of mapped states corresponding to a particular mapper."""
-
- def __init__(self, uowtransaction, mapper, base_task=None):
- self.uowtransaction = uowtransaction
-
- # base_task is the UOWTask which represents the "base mapper"
- # in our mapper's inheritance chain. if the mapper does not
- # inherit from any other mapper, the base_task is self.
- # the _inheriting_tasks dictionary is a dictionary present only
- # on the "base_task"-holding UOWTask, which maps all mappers within
- # an inheritance hierarchy to their corresponding UOWTask instances.
- if base_task is None:
- self.base_task = self
- self._inheriting_tasks = {mapper:self}
- else:
- self.base_task = base_task
- base_task._inheriting_tasks[mapper] = self
-
- # the Mapper which this UOWTask corresponds to
- self.mapper = mapper
-
- # mapping of InstanceState -> UOWTaskElement
- self._objects = {}
-
- self.dependent_tasks = []
- self.dependencies = set()
- self.cyclical_dependencies = set()
-
- @util.memoized_property
- def inheriting_mappers(self):
- return list(self.mapper.polymorphic_iterator())
-
- @property
- def polymorphic_tasks(self):
- """Return an iterator of UOWTask objects corresponding to the
- inheritance sequence of this UOWTask's mapper.
-
- e.g. if mapper B and mapper C inherit from mapper A, and
- mapper D inherits from B:
-
- mapperA -> mapperB -> mapperD
- -> mapperC
-
- the inheritance sequence starting at mapper A is a depth-first
- traversal:
-
- [mapperA, mapperB, mapperD, mapperC]
-
- this method will therefore return
-
- [UOWTask(mapperA), UOWTask(mapperB), UOWTask(mapperD),
- UOWTask(mapperC)]
-
- The concept of "polymporphic iteration" is adapted into
- several property-based iterators which return object
- instances, UOWTaskElements and UOWDependencyProcessors in an
- order corresponding to this sequence of parent UOWTasks. This
- is used to issue operations related to inheritance-chains of
- mappers in the proper order based on dependencies between
- those mappers.
-
- """
- for mapper in self.inheriting_mappers:
- t = self.base_task._inheriting_tasks.get(mapper, None)
- if t is not None:
- yield t
-
- def is_empty(self):
- """return True if this UOWTask is 'empty', meaning it has no child items.
-
- used only for debugging output.
- """
-
- return not self._objects and not self.dependencies
-
- def append(self, state, listonly=False, isdelete=False):
- if state not in self._objects:
- self._objects[state] = rec = UOWTaskElement(state)
- else:
- rec = self._objects[state]
-
- rec.update(listonly, isdelete)
-
- def append_postupdate(self, state, post_update_cols):
- """issue a 'post update' UPDATE statement via this object's mapper immediately.
-
- this operation is used only with relationships that specify the `post_update=True`
- flag.
- """
-
- # postupdates are UPDATED immeditely (for now)
- # convert post_update_cols list to a Set so that __hash__() is used to compare columns
- # instead of __eq__()
- self.mapper._save_obj([state], self.uowtransaction, postupdate=True, post_update_cols=set(post_update_cols))
-
- def __contains__(self, state):
- """return True if the given object is contained within this UOWTask or inheriting tasks."""
-
- for task in self.polymorphic_tasks:
- if state in task._objects:
- return True
- else:
- return False
-
- def is_deleted(self, state):
- """return True if the given object is marked as to be deleted within this UOWTask."""
-
- try:
- return self._objects[state].isdelete
- except KeyError:
- return False
-
- def _polymorphic_collection(fn):
- """return a property that will adapt the collection returned by the
- given callable into a polymorphic traversal."""
-
- @property
- def collection(self):
- for task in self.polymorphic_tasks:
- for rec in fn(task):
- yield rec
- return collection
-
- def _polymorphic_collection_filtered(fn):
-
- def collection(self, mappers):
- for task in self.polymorphic_tasks:
- if task.mapper in mappers:
- for rec in fn(task):
- yield rec
- return collection
-
- @property
- def elements(self):
- return self._objects.values()
-
- @_polymorphic_collection
- def polymorphic_elements(self):
- return self.elements
-
- @_polymorphic_collection_filtered
- def filter_polymorphic_elements(self):
- return self.elements
-
- @property
- def polymorphic_tosave_elements(self):
- return [rec for rec in self.polymorphic_elements if not rec.isdelete]
-
- @property
- def polymorphic_todelete_elements(self):
- return [rec for rec in self.polymorphic_elements if rec.isdelete]
-
- @property
- def polymorphic_tosave_objects(self):
- return [
- rec.state for rec in self.polymorphic_elements
- if rec.state is not None and not rec.listonly and rec.isdelete is False
- ]
-
- @property
- def polymorphic_todelete_objects(self):
- return [
- rec.state for rec in self.polymorphic_elements
- if rec.state is not None and not rec.listonly and rec.isdelete is True
- ]
-
- @_polymorphic_collection
- def polymorphic_dependencies(self):
- return self.dependencies
-
- @_polymorphic_collection
- def polymorphic_cyclical_dependencies(self):
- return self.cyclical_dependencies
-
- def _sort_circular_dependencies(self, trans, cycles):
- """Topologically sort individual entities with row-level dependencies.
-
- Builds a modified UOWTask structure, and is invoked when the
- per-mapper topological structure is found to have cycles.
-
- """
- dependencies = {}
- def set_processor_for_state(state, depprocessor, target_state, isdelete):
- if state not in dependencies:
- dependencies[state] = {}
- tasks = dependencies[state]
- if depprocessor not in tasks:
- tasks[depprocessor] = UOWDependencyProcessor(
- depprocessor.processor,
- UOWTask(self.uowtransaction, depprocessor.targettask.mapper)
- )
- tasks[depprocessor].targettask.append(target_state, isdelete=isdelete)
-
- cycles = set(cycles)
- def dependency_in_cycles(dep):
- proctask = trans.get_task_by_mapper(dep.processor.mapper.base_mapper, True)
- targettask = trans.get_task_by_mapper(dep.targettask.mapper.base_mapper, True)
- return targettask in cycles and (proctask is not None and proctask in cycles)
-
- deps_by_targettask = {}
- extradeplist = []
- for task in cycles:
- for dep in task.polymorphic_dependencies:
- if not dependency_in_cycles(dep):
- extradeplist.append(dep)
- for t in dep.targettask.polymorphic_tasks:
- l = deps_by_targettask.setdefault(t, [])
- l.append(dep)
-
- object_to_original_task = {}
- tuples = []
-
- for task in cycles:
- for subtask in task.polymorphic_tasks:
- for taskelement in subtask.elements:
- state = taskelement.state
- object_to_original_task[state] = subtask
- if subtask not in deps_by_targettask:
- continue
- for dep in deps_by_targettask[subtask]:
- if not dep.processor.has_dependencies or not dependency_in_cycles(dep):
- continue
- (processor, targettask) = (dep.processor, dep.targettask)
- isdelete = taskelement.isdelete
-
- # list of dependent objects from this object
- (added, unchanged, deleted) = dep.get_object_dependencies(state, trans, passive=True)
- if not added and not unchanged and not deleted:
- continue
-
- # the task corresponding to saving/deleting of those dependent objects
- childtask = trans.get_task_by_mapper(processor.mapper)
-
- childlist = added + unchanged + deleted
-
- for o in childlist:
- if o is None:
- continue
-
- if o not in childtask:
- childtask.append(o, listonly=True)
- object_to_original_task[o] = childtask
-
- whosdep = dep.whose_dependent_on_who(state, o)
- if whosdep is not None:
- tuples.append(whosdep)
-
- if whosdep[0] is state:
- set_processor_for_state(whosdep[0], dep, whosdep[0], isdelete=isdelete)
- else:
- set_processor_for_state(whosdep[0], dep, whosdep[1], isdelete=isdelete)
- else:
- # TODO: no test coverage here
- set_processor_for_state(state, dep, state, isdelete=isdelete)
-
- t = UOWTask(self.uowtransaction, self.mapper)
- t.dependencies.update(extradeplist)
-
- used_tasks = set()
-
- # rationale for "tree" sort as opposed to a straight
- # dependency - keep non-dependent objects
- # grouped together, so that insert ordering as determined
- # by session.add() is maintained.
- # An alternative might be to represent the "insert order"
- # as part of the topological sort itself, which would
- # eliminate the need for this step (but may make the original
- # topological sort more expensive)
- head = topological.sort_as_tree(tuples, object_to_original_task.iterkeys())
- if head is not None:
- original_to_tasks = {}
- stack = [(head, t)]
- while stack:
- ((state, cycles, children), parenttask) = stack.pop()
-
- originating_task = object_to_original_task[state]
- used_tasks.add(originating_task)
-
- if (parenttask, originating_task) not in original_to_tasks:
- task = UOWTask(self.uowtransaction, originating_task.mapper)
- original_to_tasks[(parenttask, originating_task)] = task
- parenttask.dependent_tasks.append(task)
- else:
- task = original_to_tasks[(parenttask, originating_task)]
-
- task.append(state, originating_task._objects[state].listonly, isdelete=originating_task._objects[state].isdelete)
-
- if state in dependencies:
- task.cyclical_dependencies.update(dependencies[state].itervalues())
-
- stack += [(n, task) for n in children]
-
- ret = [t]
-
- # add tasks that were in the cycle, but didnt get assembled
- # into the cyclical tree, to the start of the list
- for t2 in cycles:
- if t2 not in used_tasks and t2 is not self:
- localtask = UOWTask(self.uowtransaction, t2.mapper)
- for state in t2.elements:
- localtask.append(state, t2.listonly, isdelete=t2._objects[state].isdelete)
- for dep in t2.dependencies:
- localtask.dependencies.add(dep)
- ret.insert(0, localtask)
-
- return ret
-
- def __repr__(self):
- return ("UOWTask(%s) Mapper: '%r'" % (hex(id(self)), self.mapper))
-
-class UOWTaskElement(object):
- """Corresponds to a single InstanceState to be saved, deleted,
- or otherwise marked as having dependencies. A collection of
- UOWTaskElements are held by a UOWTask.
-
- """
- def __init__(self, state):
- self.state = state
- self.listonly = True
- self.isdelete = False
- self.preprocessed = set()
-
- def update(self, listonly, isdelete):
- if not listonly and self.listonly:
- self.listonly = False
- self.preprocessed.clear()
- if isdelete and not self.isdelete:
- self.isdelete = True
- self.preprocessed.clear()
-
- def __repr__(self):
- return "UOWTaskElement/%d: %s/%d %s" % (
- id(self),
- self.state.class_.__name__,
- id(self.state.obj()),
- (self.listonly and 'listonly' or (self.isdelete and 'delete' or 'save'))
- )
-
-class UOWDependencyProcessor(object):
- """In between the saving and deleting of objects, process
- dependent data, such as filling in a foreign key on a child item
- from a new primary key, or deleting association rows before a
- delete. This object acts as a proxy to a DependencyProcessor.
-
- """
- def __init__(self, processor, targettask):
- self.processor = processor
- self.targettask = targettask
- prop = processor.prop
-
- # define a set of mappers which
- # will filter the lists of entities
- # this UOWDP processes. this allows
- # MapperProperties to be overridden
- # at least for concrete mappers.
- self._mappers = set([
- m
- for m in self.processor.parent.polymorphic_iterator()
- if m._props[prop.key] is prop
- ]).union(self.processor.mapper.polymorphic_iterator())
-
- def __repr__(self):
- return "UOWDependencyProcessor(%s, %s)" % (str(self.processor), str(self.targettask))
-
- def __eq__(self, other):
- return other.processor is self.processor and other.targettask is self.targettask
-
- def __hash__(self):
- return hash((self.processor, self.targettask))
-
- def preexecute(self, trans):
- """preprocess all objects contained within this ``UOWDependencyProcessor``s target task.
-
- This may locate additional objects which should be part of the
- transaction, such as those affected deletes, orphans to be
- deleted, etc.
-
- Once an object is preprocessed, its ``UOWTaskElement`` is marked as processed. If subsequent
- changes occur to the ``UOWTaskElement``, its processed flag is reset, and will require processing
- again.
-
- Return True if any objects were preprocessed, or False if no
- objects were preprocessed. If True is returned, the parent ``UOWTransaction`` will
- ultimately call ``preexecute()`` again on all processors until no new objects are processed.
- """
-
- def getobj(elem):
- elem.preprocessed.add(self)
- return elem.state
-
- ret = False
- elements = [getobj(elem) for elem in
- self.targettask.filter_polymorphic_elements(self._mappers)
- if self not in elem.preprocessed and not elem.isdelete]
- if elements:
- ret = True
- self.processor.preprocess_dependencies(self.targettask, elements, trans, delete=False)
-
- elements = [getobj(elem) for elem in
- self.targettask.filter_polymorphic_elements(self._mappers)
- if self not in elem.preprocessed and elem.isdelete]
- if elements:
- ret = True
- self.processor.preprocess_dependencies(self.targettask, elements, trans, delete=True)
- return ret
-
- def execute(self, trans, delete):
- """process all objects contained within this ``UOWDependencyProcessor``s target task."""
-
-
- elements = [e for e in
- self.targettask.filter_polymorphic_elements(self._mappers)
- if bool(e.isdelete)==delete]
-
- self.processor.process_dependencies(
- self.targettask,
- [elem.state for elem in elements],
- trans,
- delete=delete)
-
- def get_object_dependencies(self, state, trans, passive):
- return trans.get_attribute_history(state, self.processor.key, passive=passive)
-
- def whose_dependent_on_who(self, state1, state2):
- """establish which object is operationally dependent amongst a parent/child
- using the semantics stated by the dependency processor.
-
- This method is used to establish a partial ordering (set of dependency tuples)
- when toplogically sorting on a per-instance basis.
-
- """
- return self.processor.whose_dependent_on_who(state1, state2)
-
-class UOWExecutor(object):
- """Encapsulates the execution traversal of a UOWTransaction structure."""
-
- def execute(self, trans, tasks, isdelete=None):
- if isdelete is not True:
- for task in tasks:
- self.execute_save_steps(trans, task)
- if isdelete is not False:
- for task in reversed(tasks):
- self.execute_delete_steps(trans, task)
-
- def save_objects(self, trans, task):
- task.mapper._save_obj(task.polymorphic_tosave_objects, trans)
-
- def delete_objects(self, trans, task):
- task.mapper._delete_obj(task.polymorphic_todelete_objects, trans)
-
- def execute_dependency(self, trans, dep, isdelete):
- dep.execute(trans, isdelete)
-
- def execute_save_steps(self, trans, task):
- self.save_objects(trans, task)
- for dep in task.polymorphic_cyclical_dependencies:
- self.execute_dependency(trans, dep, False)
- for dep in task.polymorphic_cyclical_dependencies:
- self.execute_dependency(trans, dep, True)
- self.execute_cyclical_dependencies(trans, task, False)
- self.execute_dependencies(trans, task)
-
- def execute_delete_steps(self, trans, task):
- self.execute_cyclical_dependencies(trans, task, True)
- self.delete_objects(trans, task)
-
- def execute_dependencies(self, trans, task):
- polymorphic_dependencies = list(task.polymorphic_dependencies)
- for dep in polymorphic_dependencies:
- self.execute_dependency(trans, dep, False)
- for dep in reversed(polymorphic_dependencies):
- self.execute_dependency(trans, dep, True)
+# TODO: don't know what these should be.
+# its very hard not to use subclasses to define behavior here.
+class Rec(object):
+ pass
- def execute_cyclical_dependencies(self, trans, task, isdelete):
- for t in task.dependent_tasks:
- self.execute(trans, [t], isdelete)
__all__ = ['sort', 'sort_with_cycles', 'sort_as_tree']
-def sort(tuples, allitems):
- """sort the given list of items by dependency.
-
- 'tuples' is a list of tuples representing a partial ordering.
- """
-
- return [n.item for n in _sort(tuples, allitems, allow_cycles=False, ignore_self_cycles=True)]
-
-def sort_with_cycles(tuples, allitems):
- """sort the given list of items by dependency, cutting out cycles.
-
- returns results as an iterable of 2-tuples, containing the item,
- and a list containing items involved in a cycle with this item, if any.
-
- 'tuples' is a list of tuples representing a partial ordering.
- """
-
- return [(n.item, [n.item for n in n.cycles or []]) for n in _sort(tuples, allitems, allow_cycles=True)]
-
-def sort_as_tree(tuples, allitems, with_cycles=False):
- """sort the given list of items by dependency, and return results
- as a hierarchical tree structure.
-
- returns results as an iterable of 3-tuples, containing the item,
- a list containing items involved in a cycle with this item, if any,
- and a list of child tuples.
+# TODO: obviate the need for a _Node class.
+# a straight tuple should be used.
+class _Node(tuple):
+ """Represent each item in the sort."""
- if with_cycles is False, the returned structure is of the same form
- but the second element of each tuple, i.e. the 'cycles', is an empty list.
+ def __new__(cls, item):
+ children = []
+ t = tuple.__new__(cls, [item, children])
+ t.item = item
+ t.children = children
+ return t
- 'tuples' is a list of tuples representing a partial ordering.
- """
-
- return _organize_as_tree(_sort(tuples, allitems, allow_cycles=with_cycles))
-
-
-class _Node(object):
- """Represent each item in the sort."""
-
- def __init__(self, item):
- self.item = item
- self.dependencies = set()
- self.children = []
- self.cycles = None
-
- def __str__(self):
- return self.safestr()
+ def __hash__(self):
+ return id(self)
- def safestr(self, indent=0):
- return (' ' * indent * 2) + \
- str(self.item) + \
- (self.cycles is not None and (" (cycles: " + repr([x for x in self.cycles]) + ")") or "") + \
- "\n" + \
- ''.join(str(n) for n in self.children)
-
- def __repr__(self):
- return str(self.item)
-
- def all_deps(self):
- """Return a set of dependencies for this node and all its cycles."""
-
- deps = set(self.dependencies)
- if self.cycles is not None:
- for c in self.cycles:
- deps.update(c.dependencies)
- return deps
-
class _EdgeCollection(object):
"""A collection of directed edges."""
parentnode, childnode = edge
self.parent_to_children[parentnode].add(childnode)
self.child_to_parents[childnode].add(parentnode)
- parentnode.dependencies.add(childnode)
def remove(self, edge):
"""Remove an edge from this collection.
def __repr__(self):
return repr(list(self))
-def _sort(tuples, allitems, allow_cycles=False, ignore_self_cycles=False):
+def sort(tuples, allitems):
+ """sort the given list of items by dependency.
+
+ 'tuples' is a list of tuples representing a partial ordering.
+ """
nodes = {}
edges = _EdgeCollection()
for t in tuples:
id0, id1 = id(t[0]), id(t[1])
if t[0] is t[1]:
- if allow_cycles:
- n = nodes[id0]
- n.cycles = set([n])
- elif not ignore_self_cycles:
- raise CircularDependencyError("Self-referential dependency detected " + repr(t))
continue
childnode = nodes[id1]
parentnode = nodes[id0]
output = []
while nodes:
if not queue:
- # edges remain but no edgeless nodes to remove; this indicates
- # a cycle
- if allow_cycles:
- for cycle in _find_cycles(edges):
- lead = cycle[0][0]
- lead.cycles = set()
- for edge in cycle:
- n = edges.remove(edge)
- lead.cycles.add(edge[0])
- lead.cycles.add(edge[1])
- if n is not None:
- queue.append(n)
- for n in lead.cycles:
- if n is not lead:
- n._cyclical = True
- for (n, k) in list(edges.edges_by_parent(n)):
- edges.add((lead, k))
- edges.remove((n, k))
- continue
- else:
- # long cycles not allowed
- raise CircularDependencyError("Circular dependency detected " + repr(edges) + repr(queue))
+ raise CircularDependencyError("Circular dependency detected " +
+ repr(edges) + repr(queue))
node = queue.pop()
- if not hasattr(node, '_cyclical'):
- output.append(node)
+ output.append(node.item)
del nodes[id(node.item)]
for childnode in edges.pop_node(node):
queue.append(childnode)
return output
-def _organize_as_tree(nodes):
- """Given a list of nodes from a topological sort, organize the
- nodes into a tree structure, with as many non-dependent nodes
- set as siblings to each other as possible.
-
- returns nodes as 3-tuples (item, cycles, children).
- """
-
- if not nodes:
- return None
- # a list of all currently independent subtrees as a tuple of
- # (root_node, set_of_all_tree_nodes, set_of_all_cycle_nodes_in_tree)
- # order of the list has no semantics for the algorithmic
- independents = []
- # in reverse topological order
- for node in reversed(nodes):
- # nodes subtree and cycles contain the node itself
- subtree = set([node])
- if node.cycles is not None:
- cycles = set(node.cycles)
- else:
- cycles = set()
- # get a set of dependent nodes of node and its cycles
- nodealldeps = node.all_deps()
- if nodealldeps:
- # iterate over independent node indexes in reverse order so we can efficiently remove them
- for index in xrange(len(independents) - 1, -1, -1):
- child, childsubtree, childcycles = independents[index]
- # if there is a dependency between this node and an independent node
- if (childsubtree.intersection(nodealldeps) or childcycles.intersection(node.dependencies)):
- # prepend child to nodes children
- # (append should be fine, but previous implemetation used prepend)
- node.children[0:0] = [(child.item, [n.item for n in child.cycles or []], child.children)]
- # merge childs subtree and cycles
- subtree.update(childsubtree)
- cycles.update(childcycles)
- # remove the child from list of independent subtrees
- independents[index:index+1] = []
- # add node as a new independent subtree
- independents.append((node, subtree, cycles))
- # choose an arbitrary node from list of all independent subtrees
- head = independents.pop()[0]
- # add all other independent subtrees as a child of the chosen root
- # used prepend [0:0] instead of extend to maintain exact behaviour of previous implementation
- head.children[0:0] = [(i[0].item, [n.item for n in i[0].cycles or []], i[0].children) for i in independents]
- return (head.item, [n.item for n in head.cycles or []], head.children)
-
-def _find_cycles(edges):
- involved_in_cycles = set()
- cycles = {}
- def traverse(node, goal=None, cycle=None):
- if goal is None:
- goal = node
- cycle = []
- elif node is goal:
- return True
-
- for (n, key) in edges.edges_by_parent(node):
- if key in cycle:
- continue
- cycle.append(key)
- if traverse(key, goal, cycle):
- cycset = set(cycle)
- for x in cycle:
- involved_in_cycles.add(x)
- if x in cycles:
- existing_set = cycles[x]
- [existing_set.add(y) for y in cycset]
- for y in existing_set:
- cycles[y] = existing_set
- cycset = existing_set
- else:
- cycles[x] = cycset
- cycle.pop()
-
- for parent in edges.get_parents():
- traverse(parent)
-
- unique_cycles = set(tuple(s) for s in cycles.values())
-
- for cycle in unique_cycles:
- edgecollection = [edge for edge in edges
- if edge[0] in cycle and edge[1] in cycle]
- yield edgecollection
import sqlalchemy.topological as topological
from sqlalchemy.test import TestBase
-
+from sqlalchemy.test.testing import assert_raises
+from sqlalchemy import exc
+import collections
class DependencySortTest(TestBase):
- def assert_sort(self, tuples, node, collection=None):
- print str(node)
- def assert_tuple(tuple, node):
- if node[1]:
- cycles = node[1]
- else:
- cycles = []
- if tuple[0] is node[0] or tuple[0] in cycles:
- tuple.pop()
- if tuple[0] is node[0] or tuple[0] in cycles:
- return
- elif len(tuple) > 1 and tuple[1] is node[0]:
- assert False, "Tuple not in dependency tree: " + str(tuple) + " " + str(node)
- for c in node[2]:
- assert_tuple(tuple, c)
-
- for tuple in tuples:
- assert_tuple(list(tuple), node)
-
- if collection is None:
- collection = set()
- items = set()
- def assert_unique(n):
- for item in [i for i in n[1] or [n[0]]]:
- assert item not in items, node
- items.add(item)
- if item in collection:
- collection.remove(item)
- for item in n[2]:
- assert_unique(item)
- assert_unique(node)
- assert len(collection) == 0
+ def assert_sort(self, tuples, result):
+
+ deps = collections.defaultdict(set)
+ for parent, child in tuples:
+ deps[parent].add(child)
+
+ for i, node in enumerate(result):
+ for n in result[i:]:
+ assert node not in deps[n]
def testsort(self):
rootnode = 'root'
(node4, subnode3),
(node4, subnode4)
]
- head = topological.sort_as_tree(tuples, [])
+ head = topological.sort(tuples, [])
self.assert_sort(tuples, head)
def testsort2(self):
(node5, node6),
(node6, node2)
]
- head = topological.sort_as_tree(tuples, [node7])
- self.assert_sort(tuples, head, [node7])
+ head = topological.sort(tuples, [node7])
+ self.assert_sort(tuples, head)
def testsort3(self):
['Mapper|Keyword|keywords,Mapper|IKAssociation|itemkeywords', 'Mapper|Item|items,Mapper|IKAssociation|itemkeywords']
(node3, node2),
(node1,node3)
]
- head1 = topological.sort_as_tree(tuples, [node1, node2, node3])
- head2 = topological.sort_as_tree(tuples, [node3, node1, node2])
- head3 = topological.sort_as_tree(tuples, [node3, node2, node1])
+ head1 = topological.sort(tuples, [node1, node2, node3])
+ head2 = topological.sort(tuples, [node3, node1, node2])
+ head3 = topological.sort(tuples, [node3, node2, node1])
# TODO: figure out a "node == node2" function
#self.assert_(str(head1) == str(head2) == str(head3))
(node1, node3),
(node3, node2)
]
- head = topological.sort_as_tree(tuples, [])
- self.assert_sort(tuples, head)
-
- def testsort5(self):
- # this one, depenending on the weather,
- node1 = 'node1' #'00B94190'
- node2 = 'node2' #'00B94990'
- node3 = 'node3' #'00B9A9B0'
- node4 = 'node4' #'00B4F210'
- tuples = [
- (node4, node1),
- (node1, node2),
- (node4, node3),
- (node2, node3),
- (node4, node2),
- (node3, node3)
- ]
- allitems = [
- node1,
- node2,
- node3,
- node4
- ]
- head = topological.sort_as_tree(tuples, allitems, with_cycles=True)
+ head = topological.sort(tuples, [])
self.assert_sort(tuples, head)
def testcircular(self):
(node4, node1)
]
allitems = [node1, node2, node3, node4]
- head = topological.sort_as_tree(tuples, allitems, with_cycles=True)
- self.assert_sort(tuples, head)
+ assert_raises(exc.CircularDependencyError, topological.sort, tuples, allitems)
def testcircular2(self):
# this condition was arising from ticket:362
(node3, node2),
(node2, node3)
]
- head = topological.sort_as_tree(tuples, [], with_cycles=True)
- self.assert_sort(tuples, head)
+ assert_raises(exc.CircularDependencyError, topological.sort, tuples, [])
def testcircular3(self):
question, issue, providerservice, answer, provider = "Question", "Issue", "ProviderService", "Answer", "Provider"
tuples = [(question, issue), (providerservice, issue), (provider, question), (question, provider), (providerservice, question), (provider, providerservice), (question, answer), (issue, question)]
- head = topological.sort_as_tree(tuples, [], with_cycles=True)
- self.assert_sort(tuples, head)
+ assert_raises(exc.CircularDependencyError, topological.sort, tuples, [])
def testbigsort(self):
tuples = [(i, i + 1) for i in range(0, 1500, 2)]
- head = topological.sort_as_tree(tuples, [])
+ head = topological.sort(tuples, [])
def testids(self):