def _setattrbycolumn(self, obj, column, value):
self.columntoproperty[column][0].setattr(obj, value)
- def save(self, obj, traverse = True, refetch = False):
+ def save(self, obj, traverse = True):
"""saves the object across all its primary tables.
based on the existence of the primary key for each table, either inserts or updates.
primary key is determined by the underlying database engine's sequence methodology.
if getattr(obj, 'dirty', True):
def foo():
+ insert_statement = None
+ update_statement = None
for table in self.tables:
params = {}
# TODO: prepare the insert() and update() - (1) within the code or
obj.dirty = False
for prop in self.props.values():
if not isinstance(prop, ColumnProperty):
- prop.save(obj, traverse, refetch)
+ prop.save(obj, traverse)
self.transaction(foo)
else:
for prop in self.props.values():
- prop.save(obj, traverse, refetch)
+ prop.save(obj, traverse)
def transaction(self, f):
return self.table.engine.multi_transaction(self.tables, f)
"""removes the object. traverse indicates attached objects should be removed as well."""
pass
- def delete(self, obj):
- """deletes the object's row from its table unconditionally. this is a lower-level
- operation than remove."""
- # delete dependencies ?
- # delete row
- # remove primary keys
- # unset dirty flag
- pass
-
def _compile(self, whereclause = None, **options):
statement = sql.select([self.selectable], whereclause)
for key, value in self.props.iteritems():
"""called when the MapperProperty is first attached to a new parent Mapper."""
pass
- def save(self, object, traverse, refetch):
+ def save(self, object, traverse):
"""called when the instance is being saved"""
pass
if self.primaryjoin is None:
self.primaryjoin = match_primaries(parent.selectable, self.target)
- def save(self, obj, traverse, refetch):
+ def save(self, obj, traverse):
# saves child objects
- # TODO: put association table inserts/deletes into one batch
- #if self.secondary is not None:
- # secondary_delete = self.secondary.delete(sql.and_([c == bindparam(c.key) for c in setter.secondary.c]))
-
+ if self.secondary is not None:
+ secondary_delete = []
+ secondary_insert = []
+
setter = ForeignKeySetter(self.parent, self.mapper, self.parent.table, self.target, self.secondary, obj)
childlist = getattr(obj, self.key)
if not isinstance(childlist, util.HistoryArraySet):
childlist = util.HistoryArraySet(childlist)
clean_setattr(obj, self.key, childlist)
+
for child in childlist.deleted_items():
setter.child = child
+ setter.associationrow = {}
setter.clearkeys = True
self.primaryjoin.accept_visitor(setter)
child.dirty = True
- self.mapper.save(child)
+ self.mapper.save(child, traverse)
if self.secondary is not None:
self.secondaryjoin.accept_visitor(setter)
- # TODO: prepare this above
- statement = self.secondary.delete(sql.and_(*[c == setter.associationrow[c.key] for c in self.secondary.c]))
- statement.echo = self.mapper.echo
- statement.execute()
+ secondary_delete.append(setter.associationrow)
+
for child in childlist.added_items():
setter.child = child
+ setter.associationrow = {}
self.primaryjoin.accept_visitor(setter)
child.dirty = True
- self.mapper.save(child)
+ self.mapper.save(child, traverse)
if self.secondary is not None:
self.secondaryjoin.accept_visitor(setter)
- # TODO: prepare this above
+ secondary_insert.append(setter.associationrow)
+
+ if self.secondary is not None:
+ if len(secondary_delete):
+ statement = self.secondary.delete(sql.and_(*[c == sql.bindparam(c.key) for c in self.secondary.c]))
+ statement.echo = self.mapper.echo
+ statement.execute(*secondary_delete)
+ if len(secondary_insert):
statement = self.secondary.insert()
statement.echo = self.mapper.echo
- statement.execute(**setter.associationrow)
+ statement.execute(*secondary_insert)
+
for child in childlist.unchanged_items():
- self.mapper.save(child)
+ self.mapper.save(child, traverse)
# TODO: if transaction fails state is invalid
# use unit of work ?
childlist.clear_history()
-
def delete(self):
self.mapper.delete()
import unittest, sys, os
from sqlalchemy.mapper import *
+ECHO = False
execfile("test/tables.py")
class User(object):
#globalidentity().clear()
def testget(self):
- m = mapper(User, users, scope = "thread", echo = True)
+ m = mapper(User, users, scope = "thread")
self.assert_(m.get(19) is None)
u = m.get(7)
u2 = m.get(7)
"""tests that a lazy relation can be upgraded to an eager relation via the options method"""
m = mapper(User, users, properties = dict(
addresses = relation(Address, addresses, lazy = True)
- ), echo = True)
+ ))
l = m.options(eagerload('addresses')).select()
self.assert_result(l, User,
{'user_id' : 7, 'addresses' : (Address, [{'address_id' : 1}])},
"""tests that an eager relation can be upgraded to a lazy relation via the options method"""
m = mapper(User, users, properties = dict(
addresses = relation(Address, addresses, lazy = False)
- ), echo = True)
+ ))
l = m.options(lazyload('addresses')).select()
self.assert_result(l, User,
{'user_id' : 7, 'addresses' : (Address, [{'address_id' : 1}])},
"""tests a basic one-to-many lazy load"""
m = mapper(User, users, properties = dict(
addresses = relation(Address, addresses, lazy = True)
- ), echo = True)
+ ))
l = m.select(users.c.user_id == 7)
self.assert_result(l, User,
{'user_id' : 7, 'addresses' : (Address, [{'address_id' : 1}])},
m = mapper(Item, items, properties = dict(
keywords = relation(Keyword, keywords, itemkeywords, lazy = True),
- ), echo = True)
+ ))
l = m.select()
self.assert_result(l, Item,
{'item_id' : 1, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 4}, {'keyword_id' : 6}])},
m = mapper(User, users, properties = dict(
#addresses = relation(Address, addresses, lazy = False),
addresses = relation(m, lazy = False),
- ), echo = True)
+ ))
l = m.select()
print repr(l)
criterion doesnt interfere with the eager load criterion."""
m = mapper(User, users, properties = dict(
addresses = relation(Address, addresses, primaryjoin = users.c.user_id==addresses.c.user_id, lazy = False)
- ), echo = True)
+ ))
l = m.select(and_(addresses.c.email_address == 'ed@lala.com', addresses.c.user_id==users.c.user_id))
print repr(l)
m = mapper(User, users, properties = dict(
orders_open = relation(Order, openorders, primaryjoin = and_(openorders.c.isopen == 1, users.c.user_id==openorders.c.user_id), lazy = False),
orders_closed = relation(Order, closedorders, primaryjoin = and_(closedorders.c.isopen == 0, users.c.user_id==closedorders.c.user_id), lazy = False)
- ), echo = True)
+ ))
l = m.select()
print repr(l)
m = mapper(User, users, properties = dict(
addresses = relation(Address, addresses, lazy = False),
orders = relation(ordermapper, primaryjoin = users.c.user_id==orders.c.user_id, lazy = False),
- ), echo = True)
+ ))
l = m.select()
print repr(l)
m = mapper(Item, items, properties = dict(
keywords = relation(Keyword, keywords, itemkeywords, lazy = False),
- ), echo = True)
+ ))
l = m.select()
print repr(l)
m = mapper(Item, items,
properties = dict(
keywords = relation(Keyword, keywords, itemkeywords, lazy = False),
- ),
- echo = True)
+ ))
m = mapper(Order, orders, properties = dict(
items = relation(m, lazy = False)
- ), echo = True)
+ ))
l = m.select("orders.order_id in (1,2,3)")
#l = m.select()
print repr(l)
-class SaveTest(PersistTest):
+class SaveTest(AssertMixin):
def testbasic(self):
# save two users
u.user_name = 'savetester'
u2 = User()
u2.user_name = 'savetester2'
- m = mapper(User, users, echo=True)
+ m = mapper(User, users)
m.save(u)
m.save(u2)
"""tests a save of an object where each instance spans two tables. also tests
redefinition of the keynames for the column properties."""
usersaddresses = sql.join(users, addresses, users.c.user_id == addresses.c.user_id)
- m = mapper(User, usersaddresses, table = users, echo = True,
+ m = mapper(User, usersaddresses, table = users,
properties = dict(
email = ColumnProperty(addresses.c.email_address),
foo_id = ColumnProperty(users.c.user_id, addresses.c.user_id)
"""test basic save of one to many."""
m = mapper(User, users, properties = dict(
addresses = relation(Address, addresses, lazy = True)
- ), echo = True)
+ ))
u = User()
u.user_name = 'one2manytester'
u.addresses = []
"""tests that an alias of a table can be used in a mapper.
the mapper has to locate the original table and columns to keep it all straight."""
ualias = Alias(users, 'ualias')
- m = mapper(User, ualias, echo = True)
+ m = mapper(User, ualias)
u = User()
u.user_name = 'testalias'
m.save(u)
def testremove(self):
m = mapper(User, users, properties = dict(
addresses = relation(Address, addresses, lazy = True)
- ), echo = True)
+ ))
u = User()
u.user_name = 'one2manytester'
u.addresses = []
m = mapper(Item, items, properties = dict(
keywords = relation(Keyword, keywords, itemkeywords, lazy = False),
- ), echo = True)
+ ))
keywordmapper = mapper(Keyword, keywords)
for k in klist:
item.keywords.append(k)
m.save(item)
- print repr(m.select(items.c.item_id == item.item_id))
+ l = m.select(items.c.item_id == item.item_id)
+
+ self.assert_result(l, Item,
+ {'item_id' : item.item_id, 'keywords' : (Keyword, [
+ {'name' : 'purple'},
+ {'name' : 'blue'},
+ {'name' : 'big'},
+ {'name' : 'round'}
+ ])})
del item.keywords[2]
del item.keywords[2]
m.save(item)
- print repr(m.select(items.c.item_id == item.item_id))
+ l = m.select(items.c.item_id == item.item_id)
+ self.assert_result(l, Item,
+ {'item_id' : item.item_id, 'keywords' : (Keyword, [
+ {'name' : 'purple'},
+ {'name' : 'blue'},
+ ])})
if __name__ == "__main__":
unittest.main()