import weakref
import string
import StringIO
+from sets import *
__all__ = ['get_id_key', 'get_row_key', 'commit', 'update', 'clear', 'delete',
'begin', 'has_key', 'has_instance', 'UnitOfWork']
mapper = object_mapper(obj)
self.mappers.append(mapper)
task = self.get_task_by_mapper(mapper)
+
+ # for a cyclical task, things need to be sorted out already,
+ # so this object should have already been added to the appropriate sub-task
+ # can put an assertion here to make sure....
+ if task.circular:
+ return
+
+ if obj not in task.objects:
+ self.__modified = True
task.append(obj, listonly, isdelete=isdelete, **kwargs)
def unregister_object(self, obj):
mapper = object_mapper(obj)
task = self.get_task_by_mapper(mapper)
task.delete(obj)
+ self.__modified = True
def get_task_by_mapper(self, mapper):
"""every individual mapper involved in the transaction has a single
"""called by mapper.PropertyLoader to register the objects handled by
one mapper being dependent on the objects handled by another."""
self.dependencies[(mapper, dependency)] = True
+ self.__modified = True
def register_processor(self, mapper, processor, mapperfrom, isdeletefrom):
"""called by mapper.PropertyLoader to register itself as a "processor", which
# when the task from "mapper" executes, take the objects from the task corresponding
# to "mapperfrom"'s list of save/delete objects, and send them to "processor"
# for dependency processing
+ #print "registerprocessor", str(mapper), repr(processor.key), str(mapperfrom), repr(isdeletefrom)
task = self.get_task_by_mapper(mapper)
targettask = self.get_task_by_mapper(mapperfrom)
task.dependencies.append(UOWDependencyProcessor(processor, targettask, isdeletefrom))
+ self.__modified = True
def register_saved_history(self, listobj):
self.saved_histories.append(listobj)
task.mapper.register_dependencies(self)
head = self._sort_dependencies()
+ self.__modified = False
if LOG or echo:
if head is None:
print "Task dump: None"
if head is not None:
head.execute(self)
if LOG or echo:
- if head is not None:
+ if self.__modified and head is not None:
print "\nAfter Execute:\n" + head.dump()
+ else:
+ print "\nExecute complete (no post-exec changes)\n"
def post_exec(self):
"""after an execute/commit is completed, all of the objects and lists that have
def _sort_dependencies(self):
"""creates a hierarchical tree of dependent tasks. the root node is returned.
when the root node is executed, it also executes its child tasks recursively."""
- bymapper = {}
-
def sort_hier(node):
if node is None:
return None
- task = bymapper.get(node.item, None)
- if task is None:
- task = UOWTask(self, node.item)
- bymapper[node.item] = task
- if node.circular:
- task.circular = task._sort_circular_dependencies(self)
- task.iscircular = True
+ task = self.get_task_by_mapper(node.item)
+ if node.cycles is not None:
+ tasks = []
+ for n in node.cycles:
+ tasks.append(self.get_task_by_mapper(n.item))
+ task.circular = task._sort_circular_dependencies(self, tasks)
for child in node.children:
t = sort_hier(child)
if t is not None:
mappers = util.HashSet()
for task in self.tasks.values():
mappers.append(task.mapper)
- bymapper[task.mapper] = task
- head = DependencySorter(self.dependencies, mappers).sort()
+ head = DependencySorter(self.dependencies, mappers).sort(allow_all_cycles=True)
+ #print str(head)
task = sort_hier(head)
return task
def __init__(self, obj):
self.obj = obj
self.listonly = True
- self.childtask = None
+ self.childtasks = []
self.isdelete = False
+ self.mapper = None
def __repr__(self):
return "UOWTaskElement/%d: %s/%d %s" % (id(self), self.obj.__class__.__name__, id(self.obj), (self.listonly and 'listonly' or (self.isdelete and 'delete' or 'save')) )
def execute(self, trans, delete):
if not delete:
- self.processor.process_dependencies(self.targettask, [elem.obj for elem in self.targettask.tosave_elements()], trans, delete = delete)
+ self.processor.process_dependencies(self.targettask, [elem.obj for elem in self.targettask.tosave_elements() if elem.obj is not None], trans, delete = delete)
else:
- self.processor.process_dependencies(self.targettask, [elem.obj for elem in self.targettask.todelete_elements()], trans, delete = delete)
+ self.processor.process_dependencies(self.targettask, [elem.obj for elem in self.targettask.todelete_elements() if elem.obj is not None], trans, delete = delete)
def get_object_dependencies(self, obj, trans, passive):
return self.processor.get_object_dependencies(obj, trans, passive=passive)
def branch(self, task):
return UOWDependencyProcessor(self.processor, task, self.isdeletefrom)
-
+
class UOWTask(object):
def __init__(self, uowtransaction, mapper):
if uowtransaction is not None:
self.mapper = mapper
self.objects = util.OrderedDict()
self.dependencies = []
- self.iscircular = False
+ self.cyclical_dependencies = []
self.circular = None
+ self.postcircular = None
self.childtasks = []
- #print "NEW TASK", repr(self)
+# print "NEW TASK", repr(self)
def is_empty(self):
return len(self.objects) == 0 and len(self.dependencies) == 0 and len(self.childtasks) == 0
only be processed as a dependency and not actually saved/deleted. if the object
already exists with a 'listonly' flag of False, it is kept as is. 'childtask' is used
internally when creating a hierarchical list of self-referential tasks, to assign
- dependent operations at the per-object instead of per-task level."""
+ dependent operations at the per-object instead of per-task level. """
try:
rec = self.objects[obj]
except KeyError:
if not listonly:
rec.listonly = False
if childtask:
- rec.childtask = childtask
+ rec.childtasks.append(childtask)
if isdelete:
rec.isdelete = True
return
self.mapper.save_obj(self.tosave_objects(), trans)
- for dep in self.save_dependencies():
+ for dep in self.cyclical_save_dependencies():
dep.execute(trans, delete=False)
for element in self.tosave_elements():
- if element.childtask is not None:
- element.childtask.execute(trans)
+ for task in element.childtasks:
+ task.execute(trans)
+ for dep in self.save_dependencies():
+ dep.execute(trans, delete=False)
for dep in self.delete_dependencies():
dep.execute(trans, delete=True)
+ for dep in self.cyclical_delete_dependencies():
+ dep.execute(trans, delete=True)
for child in self.childtasks:
child.execute(trans)
for element in self.todelete_elements():
- if element.childtask is not None:
- element.childtask.execute(trans)
+ for task in element.childtasks:
+ task.execute(trans)
self.mapper.delete_obj(self.todelete_objects(), trans)
def tosave_elements(self):
def todelete_elements(self):
return [rec for rec in self.objects.values() if rec.isdelete]
def tosave_objects(self):
- return [o for o, rec in self.objects.iteritems() if not rec.listonly and not rec.isdelete]
+ return [rec.obj for rec in self.objects.values() if rec.obj is not None and not rec.listonly and rec.isdelete is False]
def todelete_objects(self):
- return [o for o, rec in self.objects.iteritems() if not rec.listonly and rec.isdelete]
+ return [rec.obj for rec in self.objects.values() if rec.obj is not None and not rec.listonly and rec.isdelete is True]
def save_dependencies(self):
return [dep for dep in self.dependencies if not dep.isdeletefrom]
+ def cyclical_save_dependencies(self):
+ return [dep for dep in self.cyclical_dependencies if not dep.isdeletefrom]
def delete_dependencies(self):
return [dep for dep in self.dependencies if dep.isdeletefrom]
+ def cyclical_delete_dependencies(self):
+ return [dep for dep in self.cyclical_dependencies if dep.isdeletefrom]
- def _sort_circular_dependencies(self, trans):
+ def _sort_circular_dependencies(self, trans, cycles):
"""for a single task, creates a hierarchical tree of "subtasks" which associate
specific dependency actions with individual objects. This is used for a
- "circular" task, or a task where elements
+ "cyclical" task, or a task where elements
of its object list contain dependencies on each other.
this is not the normal case; this logic only kicks in when something like
a hierarchical tree is being represented."""
-
- allobjects = self.objects.keys()
+
+ allobjects = []
+ for task in cycles:
+ allobjects += task.objects.keys()
tuples = []
objecttotask = {}
- # dependency processors that arent part of the "circular" thing
+ cycles = Set(cycles)
+
+ # dependency processors that arent part of the cyclical thing
# get put here
- extradep = util.HashSet()
+ extradeplist = []
- def get_task(obj):
+ def get_object_task(parent, obj):
try:
return objecttotask[obj]
except KeyError:
- t = UOWTask(None, self.mapper)
+ t = UOWTask(None, parent.mapper)
+ t.parent = parent
objecttotask[obj] = t
return t
try:
l = dp[depprocessor]
except KeyError:
- l = UOWTask(None, None)
+ l = UOWTask(None, depprocessor.targettask.mapper)
dp[depprocessor] = l
return l
- for taskelement in self.objects.values():
- # go through all of the dependencies on this task, which are
- # self-referring, and organize them
- # into a hash where we isolate individual objects that depend on each
- # other. then those individual object relationships will be grabbed
- # back into a hierarchical tree thing down below via make_task_tree.
- obj = taskelement.obj
- parenttask = get_task(obj)
- for dep in self.dependencies:
- (processor, targettask, isdelete) = (dep.processor, dep.targettask, dep.isdeletefrom)
- if dep.targettask is not self:
- extradep.append(dep)
- continue
- elif taskelement.isdelete is not dep.isdeletefrom:
- continue
- #print "GETING LIST OFF PROC", processor.key, "OBJ", repr(obj)
- childlist = dep.get_object_dependencies(obj, trans, passive = True)
- if isdelete:
- childlist = childlist.unchanged_items() + childlist.deleted_items()
- else:
- childlist = childlist.added_items()
- for o in childlist:
- if not self.objects.has_key(o):
+ # work out a list of all the "dependency processors" that
+ # represent objects that have to be dependency sorted at the
+ # per-object level. all other dependency processors go in
+ # "extradep."
+ deps_by_targettask = {}
+ for task in cycles:
+ for dep in task.dependencies:
+ if dep.targettask not in cycles or trans.get_task_by_mapper(dep.processor.mapper) not in cycles:
+ extradeplist.append(dep)
+ l = deps_by_targettask.setdefault(dep.targettask, [])
+ l.append(dep)
+
+ for task in cycles:
+ for taskelement in task.objects.values():
+ obj = taskelement.obj
+ #print "OBJ", repr(obj), "TASK", repr(task)
+
+ # create a placeholder UOWTask that may be built into the final
+ # task tree
+ get_object_task(task, obj)
+ for dep in deps_by_targettask[task]:
+ (processor, targettask, isdelete) = (dep.processor, dep.targettask, dep.isdeletefrom)
+ if taskelement.isdelete is not dep.isdeletefrom:
continue
- whosdep = dep.whose_dependent_on_who(obj, o)
- if whosdep is not None:
- tuples.append(whosdep)
- if whosdep[0] is obj:
- get_dependency_task(whosdep[0], dep).append(whosdep[0], isdelete=isdelete)
- else:
- get_dependency_task(whosdep[0], dep).append(whosdep[1], isdelete=isdelete)
+ #print "GETING LIST OFF PROC", processor.key, "OBJ", repr(obj)
+
+ # traverse through the modified child items of each object. normally this
+ # is done via PropertyLoader in properties.py, but we need all the info
+ # up front here to do the object-level topological sort.
+
+ # list of dependent objects from this object
+ childlist = dep.get_object_dependencies(obj, trans, passive = True)
+ # the task corresponding to the processor's objects
+ childtask = trans.get_task_by_mapper(processor.mapper)
+ # is this dependency involved in one of the cycles ?
+ cyclicaldep = dep.targettask in cycles and trans.get_task_by_mapper(dep.processor.mapper) in cycles
+ if isdelete:
+ childlist = childlist.unchanged_items() + childlist.deleted_items()
else:
- get_dependency_task(obj, dep).append(obj, isdelete=isdelete)
+ childlist = childlist.added_items()
+
+ for o in childlist:
+ if not o in childtask.objects:
+ # item needs to be saved since its added, or attached to a deleted object
+ childtask.append(o, isdelete=isdelete and dep.processor.private)
+ if cyclicaldep:
+ # cyclical, so create a placeholder UOWTask that may be built into the
+ # final task tree
+ t = get_object_task(childtask, o)
+ if not cyclicaldep:
+ # not cyclical, so we are done with this
+ continue
+ # cyclical, so create an ordered pair for the dependency sort
+ whosdep = dep.whose_dependent_on_who(obj, o)
+ if whosdep is not None:
+ tuples.append(whosdep)
+ # then locate a UOWDependencyProcessor to add the object onto, which
+ # will handle the modifications between saves/deletes
+ if whosdep[0] is obj:
+ get_dependency_task(whosdep[0], dep).append(whosdep[0], isdelete=isdelete)
+ else:
+ get_dependency_task(whosdep[0], dep).append(whosdep[1], isdelete=isdelete)
+ else:
+ get_dependency_task(obj, dep).append(obj, isdelete=isdelete)
head = DependencySorter(tuples, allobjects).sort()
if head is None:
return None
+ #print str(head)
+
def make_task_tree(node, parenttask):
+ """takes a dependency-sorted tree of objects and creates a tree of UOWTasks"""
t = objecttotask[node.item]
- parenttask.append(node.item, self.objects[node.item].listonly, t, isdelete=self.objects[node.item].isdelete)
+ can_add_to_parent = t.mapper is parenttask.mapper
+ if can_add_to_parent:
+ parenttask.append(node.item, t.parent.objects[node.item].listonly, isdelete=t.parent.objects[node.item].isdelete, childtask=t)
+ else:
+ t.append(node.item, t.parent.objects[node.item].listonly, isdelete=t.parent.objects[node.item].isdelete)
+ parenttask.append(None, listonly=False, isdelete=t.parent.objects[node.item].isdelete, childtask=t)
if dependencies.has_key(node.item):
for depprocessor, deptask in dependencies[node.item].iteritems():
- parenttask.dependencies.append(depprocessor.branch(deptask))
+ if can_add_to_parent:
+ parenttask.cyclical_dependencies.append(depprocessor.branch(deptask))
+ else:
+ t.cyclical_dependencies.append(depprocessor.branch(deptask))
for n in node.children:
t2 = make_task_tree(n, t)
return t
-
- # this is the new "circular" UOWTask which will execute in place of "self"
+
+ # this is the new "circular" UOWTask which will execute in place of "self"
t = UOWTask(None, self.mapper)
# stick the non-circular dependencies and child tasks onto the new
# circular UOWTask
- t.dependencies += [d for d in extradep]
+ t.dependencies += [d for d in extradeplist]
t.childtasks = self.childtasks
make_task_tree(head, t)
return t
def _indent():
return " | " * indent
+
+ headers = {}
+ def header(buf, text):
+ """writes a given header just once"""
+ try:
+ headers[text]
+ except KeyError:
+ buf.write(_indent() + " |\n")
+ buf.write(text)
+ headers[text] = True
def _dump_processor(proc):
if proc.isdeletefrom:
else:
val = [t for t in proc.targettask.objects.values() if not t.isdelete]
- buf.write(_indent() + " |\n")
- buf.write(_indent() + " |- UOWDependencyProcessor(%d) %s on %s %s\n" % (id(proc), repr(proc.processor.key), (proc.isdeletefrom and "to delete" or "saved"), _repr_task(proc.targettask)))
+ buf.write(_indent() + " |- UOWDependencyProcessor(%d) %s attribute on %s (%s)\n" % (
+ id(proc),
+ repr(proc.processor.key),
+ (proc.isdeletefrom and
+ "%s's to be deleted" % _repr_task_class(proc.targettask)
+ or "saved %s's" % _repr_task_class(proc.targettask)),
+ _repr_task(proc.targettask))
+ )
+
if len(val) == 0:
buf.write(_indent() + " | |-" + "(no objects)\n")
for v in val:
buf.write(_indent() + " | |-" + _repr_task_element(v) + "\n")
- buf.write(_indent() + " |\n")
def _repr_task_element(te):
- return "UOWTaskElement(%d): %s(%d) %s" % (id(te), te.obj.__class__.__name__, id(te.obj), (te.listonly and '(listonly)' or (te.isdelete and '(delete)' or '(save)')) )
+ if te.obj is None:
+ objid = "(placeholder)"
+ else:
+ objid = "%s(%d)" % (te.obj.__class__.__name__, id(te.obj))
+ return "UOWTaskElement(%d): %s %s%s" % (id(te), objid, (te.listonly and '(listonly)' or (te.isdelete and '(delete' or '(save')),
+ (te.mapper is not None and " w/ " + str(te.mapper) + ")" or ")")
+ )
def _repr_task(task):
if task.mapper is not None:
else:
name = '(none)'
return ("UOWTask(%d) '%s'" % (id(task), name))
-
+ def _repr_task_class(task):
+ if task.mapper is not None and task.mapper.__class__.__name__ == 'Mapper':
+ return task.mapper.class_.__name__
+ else:
+ return '(none)'
def _repr(obj):
return "%s(%d)" % (obj.__class__.__name__, id(obj))
for rec in self.tosave_elements():
if rec.listonly:
continue
+ header(buf, _indent() + " |- Save elements\n")
buf.write(_indent() + " |- Save: " + _repr_task_element(rec) + "\n")
-
- for dep in self.save_dependencies():
+ for dep in self.cyclical_save_dependencies():
+ header(buf, _indent() + " |- Cyclical Save dependencies\n")
_dump_processor(dep)
for element in self.tosave_elements():
- if element.childtask is not None:
- element.childtask._dump(buf, indent + 1)
+ for task in element.childtasks:
+ header(buf, _indent() + " |- Save subelements of UOWTaskElement(%s)\n" % id(element))
+ task._dump(buf, indent + 1)
+ for dep in self.save_dependencies():
+ header(buf, _indent() + " |- Save dependencies\n")
+ _dump_processor(dep)
for dep in self.delete_dependencies():
+ header(buf, _indent() + " |- Delete dependencies\n")
+ _dump_processor(dep)
+ for dep in self.cyclical_delete_dependencies():
+ header(buf, _indent() + " |- Cyclical Delete dependencies\n")
_dump_processor(dep)
for child in self.childtasks:
+ header(buf, _indent() + " |- Child tasks\n")
child._dump(buf, indent + 1)
for element in self.todelete_elements():
- if element.childtask is not None:
- element.childtask._dump(buf, indent + 1)
+ for task in element.childtasks:
+ header(buf, _indent() + " |- Delete subelements of UOWTaskElement(%s)\n" % id(element))
+ task._dump(buf, indent + 1)
for rec in self.todelete_elements():
if rec.listonly:
continue
+ header(buf, _indent() + " |- Delete elements\n")
buf.write(_indent() + " |- Delete: " + _repr_task_element(rec) + "\n")
buf.write(_indent() + " |----\n")
def object_mapper(obj):
return sqlalchemy.mapperlib.object_mapper(obj)
-
global_attributes = UOWAttributeManager()
uow = util.ScopedRegistry(lambda: UnitOfWork(), "thread")
uow.register_deleted(child)
class MapperStub(object):
- """poses as a Mapper representing the association table in a many-to-many
- join, when performing a commit().
-
- The Task objects in the objectstore module treat it just like
- any other Mapper, but in fact it only serves as a "dependency" placeholder
- for the many-to-many update task."""
- def save_obj(self, *args, **kwargs):
- pass
- def delete_obj(self, *args, **kwargs):
- pass
+ """poses as a Mapper representing the association table in a many-to-many
+ join, when performing a commit().
+
+ The Task objects in the objectstore module treat it just like
+ any other Mapper, but in fact it only serves as a "dependency" placeholder
+ for the many-to-many update task."""
+ def __init__(self, mapper):
+ self.mapper = mapper
+ def save_obj(self, *args, **kwargs):
+ pass
+ def delete_obj(self, *args, **kwargs):
+ pass
def register_dependencies(self, uowcommit):
"""tells a UOWTransaction what mappers are dependent on which, with regards
# association/parent->stub->self, then we process the child
# elments after the 'stub' save, which is before our own
# mapper's save.
- stub = PropertyLoader.MapperStub()
+ stub = PropertyLoader.MapperStub(self.association)
uowcommit.register_dependency(self.parent, stub)
uowcommit.register_dependency(self.association, stub)
uowcommit.register_dependency(stub, self.mapper)
# if we are the "backref" half of a two-way backref
# relationship, let the other mapper handle inserting the rows
return
- stub = PropertyLoader.MapperStub()
+ stub = PropertyLoader.MapperStub(self.mapper)
uowcommit.register_dependency(self.parent, stub)
uowcommit.register_dependency(self.mapper, stub)
uowcommit.register_processor(stub, self, self.parent, False)
return uowcommit.uow.attributes.get_history(obj, self.key, passive = passive)
def whose_dependent_on_who(self, obj1, obj2):
+ """given an object pair assuming obj2 is a child of obj1, returns a tuple
+ with the dependent object second, or None if they are equal.
+ used by objectstore's object-level topoligical sort."""
if obj1 is obj2:
return None
elif self.direction == PropertyLoader.ONETOMANY:
return (obj1, obj2)
else:
return (obj2, obj1)
-
+
def process_dependencies(self, task, deplist, uowcommit, delete = False):
#print self.mapper.table.name + " " + self.key + " " + repr(len(deplist)) + " process_dep isdelete " + repr(delete) + " direction " + repr(self.direction)
for child in childlist.added_items():
self._synchronize(obj, child, None, False)
if self.direction == PropertyLoader.ONETOMANY:
+ # for a cyclical task, this registration is handled by the objectstore
uowcommit.register_object(child)
if self.direction != PropertyLoader.MANYTOONE or len(childlist.added_items()) == 0:
for child in childlist.deleted_items():
if not self.private:
self._synchronize(obj, child, None, True)
if self.direction == PropertyLoader.ONETOMANY:
+ # for a cyclical task, this registration is handled by the objectstore
uowcommit.register_object(child, isdelete=self.private)
-
def _synchronize(self, obj, child, associationrow, clearkeys):
if self.direction == PropertyLoader.ONETOMANY:
source = obj
if self.secondaryjoin is not None:
self.eagersecondary = self.secondaryjoin.copy_container()
self.eagersecondary.accept_visitor(aliasizer)
- self.eagerpriamry = self.primaryjoin
+ self.eagerprimary = self.primaryjoin.copy_container()
+ self.eagerprimary.accept_visitor(aliasizer)
else:
self.eagerprimary = self.primaryjoin.copy_container()
self.eagerprimary.accept_visitor(aliasizer)
towrap = self.parent.table
if self.secondaryjoin is not None:
- statement._outerjoin = sql.outerjoin(towrap, self.secondary, self.primaryjoin).outerjoin(self.eagertarget, self.eagersecondary)
+ statement._outerjoin = sql.outerjoin(towrap, self.secondary, self.eagerprimary).outerjoin(self.eagertarget, self.eagersecondary)
if self.order_by is False and self.secondary.default_order_by() is not None:
statement.order_by(*self.secondary.default_order_by())
else: