def execute(self, statement, parameters, connection = None, echo = None, typemap = None, **kwargs):
if parameters is None:
parameters = {}
-
if echo is True or self.echo:
self.log(statement)
self.log(repr(parameters))
else:
return uowcommit.uow.attributes.get_history(obj, self.key)
- def whose_dependent_on_who(self, obj1, obj2, uowcommit):
+ def whose_dependent_on_who(self, obj1, obj2):
if obj1 is obj2:
return None
elif self.thiscol.primary_key:
return (obj2, obj1)
def process_dependencies(self, task, deplist, uowcommit, delete = False):
- print self.mapper.table.name + " " + repr(len(deplist)) + " process_dep isdelete " + repr(delete)
+ #print self.mapper.table.name + " " + repr(len(deplist)) + " process_dep isdelete " + repr(delete)
# fucntion to set properties across a parent/child object plus an "association row",
# based on a join condition
uowcommit.register_deleted_list(childlist)
else:
for obj in deplist:
+ print "obj: " + repr(obj)
childlist = getlist(obj)
- if childlist is None: return
+ if childlist is None: continue
clearkeys = False
for child in childlist.added_items():
associationrow = {}
if self.direction == PropertyLoader.RIGHT:
uowcommit.register_object(obj)
childlist = getlist(obj)
- if childlist is None: return
+ if childlist is None: continue
uowcommit.register_saved_list(childlist)
clearkeys = False
for child in childlist.added_items():
self.mapper.save_obj(self.tosave_objects(), trans)
for dep in self.save_dependencies():
(processor, targettask, isdelete) = dep
- processor.process_dependencies(targettask, targettask.tosave_objects(), trans, delete = False)
+ processor.process_dependencies(targettask, [elem.obj for elem in targettask.tosave_elements()], trans, delete = False)
for element in self.tosave_elements():
if element.childtask is not None:
element.childtask.execute(trans)
for dep in self.delete_dependencies():
(processor, targettask, isdelete) = dep
- processor.process_dependencies(targettask, targettask.todelete_objects(), trans, delete = True)
+ processor.process_dependencies(targettask, [elem.obj for elem in targettask.todelete_elements()], trans, delete = True)
for child in self.childtasks:
child.execute(trans)
for element in self.todelete_elements():
dp[(processor, isdelete)] = l
return l
- # TODO: rework, its putting too many things in places they shouldnt be
for taskelement in self.objects.values():
# go through all of the dependencies on this task, and organize them
# into a hash where we isolate individual objects that depend on each
for o in childlist:
if not self.objects.has_key(o):
continue
- whosdep = processor.whose_dependent_on_who(obj, o, trans)
+ whosdep = processor.whose_dependent_on_who(obj, o)
if whosdep is not None:
tuples.append(whosdep)
if whosdep[0] is obj:
def make_task_tree(node, parenttask):
circ = objecttotask[node.item]
- #if len(circ.objects) == 0 and len(circ.dependencies) == 0:
- # circ = None
parenttask.append(node.item, self.objects[node.item].listonly, circ, isdelete=self.objects[node.item].isdelete)
if dependencies.has_key(node.item):
for tup, deptask in dependencies[node.item].iteritems():
return t
def dump(self, indent=""):
- # TODO: what a mess !
s = "\n" + indent + repr(self)
if self.circular is not None:
s += " Circular Representation:"
s = ""
for dt in dep:
s += "\n " + indent + "process " + repr(dt[0].key) + " on:"
-# s += "\n " + indent + repr(dt[0].key) + "/" + (dt[2] and 'items to be deleted' or 'saved items')
if dt[2]:
val = [t for t in dt[1].objects.values() if t.isdelete]
else:
def object_mapper(obj):
return sqlalchemy.mapper.object_mapper(obj)
-
-
uow = util.ScopedRegistry(lambda: UnitOfWork(), "thread")
k = Keyword()
k.name = 'yellow'
objects[5].keywords.append(k)
-
- objectstore.uow().commit()
+ db.set_assert_list(self, [
+ (
+ "INSERT INTO keywords (keyword_id, name) VALUES (:keyword_id, :name)",
+ {'keyword_id': None, 'name': 'yellow'}
+ ),
+ (
+ "UPDATE items SET order_id=:order_id, item_name=:item_name WHERE items.item_id = :items_item_id",
+ [{'item_name': 'item4updated', 'order_id': None, 'items_item_id': objects[4].item_id}]
+ ),
+ ("INSERT INTO itemkeywords (item_id, keyword_id) VALUES (:item_id, :keyword_id)",
+ [{'item_id': objects[5].item_id, 'keyword_id': 11}]
+ )
+ ])
+ objectstore.commit()
+ db.set_assert_list(None, None)
+
objects[2].keywords.append(k)
self.echo("added: " + repr(objects[2].keywords.added_items()))
objectstore.uow().commit()
elif DBTYPE == 'postgres':
pass
+db = testbase.EngineAssert(db)
+
users = Table('users', db,
Column('user_id', Integer, primary_key = True),
Column('user_name', String(20)),
import unittest
import StringIO
+import sqlalchemy.engine as engine
echo = True
self.assert_(getattr(rowobj, key) == value, "attribute %s value %s does not match %s" % (key, getattr(rowobj, key), value))
def assert_enginesql(self, db, callable_, result):
self.assert_(self.capture_exec(db, callable_) == result, result)
+
+class EngineAssert(object):
+ def __init__(self, engine):
+ self.__dict__['engine'] = engine
+ self.__dict__['realexec'] = engine.execute
+ self.set_assert_list(None, None)
+ def __getattr__(self, key):
+ return getattr(self.engine, key)
+ def __setattr__(self, key, value):
+ setattr(self.__dict__['engine'], key, value)
+ def set_assert_list(self, unittest, list):
+ self.__dict__['unittest'] = unittest
+ self.__dict__['assert_list'] = list
+ def execute(self, statement, parameters, **kwargs):
+ # TODO: get this to work
+ if self.assert_list is None:
+ return
+ item = self.assert_list.pop()
+ (query, params) = item
+ self.unittest.assert_(statement == query and params == parameters)
+ return self.realexec(statement, parameters, **kwargs)
def runTests(suite):
runner = unittest.TextTestRunner(verbosity = 2, descriptions =1)