h1.counter = 19
ctx.current.flush()
self.assert_(h1.foober == 'im the update')
-
+
+class OneToManyTest(UnitOfWorkTest):
+ def setUpAll(self):
+ UnitOfWorkTest.setUpAll(self)
+ tables.create()
+ def tearDownAll(self):
+ tables.drop()
+ UnitOfWorkTest.tearDownAll(self)
+ def tearDown(self):
+ tables.delete()
+ UnitOfWorkTest.tearDown(self)
+
+ def testonetomany_1(self):
+ """test basic save of one to many."""
+ m = mapper(User, users, properties = dict(
+ addresses = relation(mapper(Address, addresses), lazy = True)
+ ))
+ u = User()
+ u.user_name = 'one2manytester'
+ u.addresses = []
+ a = Address()
+ a.email_address = 'one2many@test.org'
+ u.addresses.append(a)
+ a2 = Address()
+ a2.email_address = 'lala@test.org'
+ u.addresses.append(a2)
+ self.echo( repr(u.addresses))
+ self.echo( repr(u.addresses.added_items()))
+ ctx.current.flush()
+
+ usertable = users.select(users.c.user_id.in_(u.user_id)).execute().fetchall()
+ self.assertEqual(usertable[0].values(), [u.user_id, 'one2manytester'])
+ addresstable = addresses.select(addresses.c.address_id.in_(a.address_id, a2.address_id), order_by=[addresses.c.email_address]).execute().fetchall()
+ self.assertEqual(addresstable[0].values(), [a2.address_id, u.user_id, 'lala@test.org'])
+ self.assertEqual(addresstable[1].values(), [a.address_id, u.user_id, 'one2many@test.org'])
+
+ userid = u.user_id
+ addressid = a2.address_id
+
+ a2.email_address = 'somethingnew@foo.com'
+
+ ctx.current.flush()
+
+ addresstable = addresses.select(addresses.c.address_id == addressid).execute().fetchall()
+ self.assertEqual(addresstable[0].values(), [addressid, userid, 'somethingnew@foo.com'])
+ self.assert_(u.user_id == userid and a2.address_id == addressid)
+
+ def testonetomany_2(self):
+ """digs deeper into modifying the child items of an object to insure the correct
+ updates take place"""
+ m = mapper(User, users, properties = dict(
+ addresses = relation(mapper(Address, addresses), lazy = True)
+ ))
+ u1 = User()
+ u1.user_name = 'user1'
+ u1.addresses = []
+ a1 = Address()
+ a1.email_address = 'emailaddress1'
+ u1.addresses.append(a1)
+ u2 = User()
+ u2.user_name = 'user2'
+ u2.addresses = []
+ a2 = Address()
+ a2.email_address = 'emailaddress2'
+ u2.addresses.append(a2)
+
+ a3 = Address()
+ a3.email_address = 'emailaddress3'
+
+ ctx.current.flush()
+
+ # modify user2 directly, append an address to user1.
+ # upon commit, user2 should be updated, user1 should not
+ # both address1 and address3 should be updated
+ u2.user_name = 'user2modified'
+ u1.addresses.append(a3)
+ del u1.addresses[0]
+ self.assert_sql(db, lambda: ctx.current.flush(),
+ [
+ (
+ "UPDATE users SET user_name=:user_name WHERE users.user_id = :users_user_id",
+ {'users_user_id': u2.user_id, 'user_name': 'user2modified'}
+ ),
+ ("UPDATE email_addresses SET user_id=:user_id WHERE email_addresses.address_id = :email_addresses_address_id",
+ {'user_id': None, 'email_addresses_address_id': a1.address_id}
+ ),
+ (
+ "UPDATE email_addresses SET user_id=:user_id WHERE email_addresses.address_id = :email_addresses_address_id",
+ {'user_id': u1.user_id, 'email_addresses_address_id': a3.address_id}
+ ),
+ ])
+
+ def testchildmove(self):
+ """tests moving a child from one parent to the other, then deleting the first parent, properly
+ updates the child with the new parent. this tests the 'trackparent' option in the attributes module."""
+ m = mapper(User, users, properties = dict(
+ addresses = relation(mapper(Address, addresses), lazy = True)
+ ))
+ u1 = User()
+ u1.user_name = 'user1'
+ u2 = User()
+ u2.user_name = 'user2'
+ a = Address()
+ a.email_address = 'address1'
+ u1.addresses.append(a)
+ ctx.current.flush()
+ del u1.addresses[0]
+ u2.addresses.append(a)
+ ctx.current.delete(u1)
+ ctx.current.flush()
+ ctx.current.clear()
+ u2 = ctx.current.get(User, u2.user_id)
+ assert len(u2.addresses) == 1
+
+ def testchildmove_2(self):
+ m = mapper(User, users, properties = dict(
+ addresses = relation(mapper(Address, addresses), lazy = True)
+ ))
+ u1 = User()
+ u1.user_name = 'user1'
+ u2 = User()
+ u2.user_name = 'user2'
+ a = Address()
+ a.email_address = 'address1'
+ u1.addresses.append(a)
+ ctx.current.flush()
+ del u1.addresses[0]
+ u2.addresses.append(a)
+ ctx.current.flush()
+ ctx.current.clear()
+ u2 = ctx.current.get(User, u2.user_id)
+ assert len(u2.addresses) == 1
+
+ def testo2mdeleteparent(self):
+ m = mapper(User, users, properties = dict(
+ address = relation(mapper(Address, addresses), lazy = True, uselist = False, private = False)
+ ))
+ u = User()
+ a = Address()
+ u.user_name = 'one2onetester'
+ u.address = a
+ u.address.email_address = 'myonlyaddress@foo.com'
+ ctx.current.flush()
+ ctx.current.delete(u)
+ ctx.current.flush()
+ self.assert_(a.address_id is not None and a.user_id is None and not ctx.current.identity_map.has_key(u._instance_key) and ctx.current.identity_map.has_key(a._instance_key))
+
+ def testonetoone(self):
+ m = mapper(User, users, properties = dict(
+ address = relation(mapper(Address, addresses), lazy = True, uselist = False)
+ ))
+ u = User()
+ u.user_name = 'one2onetester'
+ u.address = Address()
+ u.address.email_address = 'myonlyaddress@foo.com'
+ ctx.current.flush()
+ u.user_name = 'imnew'
+ ctx.current.flush()
+ u.address.email_address = 'imnew@foo.com'
+ ctx.current.flush()
+
+ def testbidirectional(self):
+ m1 = mapper(User, users)
+
+ m2 = mapper(Address, addresses, properties = dict(
+ user = relation(m1, lazy = False, backref='addresses')
+ ))
+
+
+ u = User()
+ print repr(u.addresses)
+ u.user_name = 'test'
+ a = Address()
+ a.email_address = 'testaddress'
+ a.user = u
+ ctx.current.flush()
+ print repr(u.addresses)
+ x = False
+ try:
+ u.addresses.append('hi')
+ x = True
+ except:
+ pass
+
+ if x:
+ self.assert_(False, "User addresses element should be scalar based")
+
+ ctx.current.delete(u)
+ ctx.current.flush()
+
+ def testdoublerelation(self):
+ m2 = mapper(Address, addresses)
+ m = mapper(User, users, properties={
+ 'boston_addresses' : relation(m2, primaryjoin=
+ and_(users.c.user_id==Address.c.user_id,
+ Address.c.email_address.like('%boston%'))),
+ 'newyork_addresses' : relation(m2, primaryjoin=
+ and_(users.c.user_id==Address.c.user_id,
+ Address.c.email_address.like('%newyork%'))),
+ })
+ u = User()
+ a = Address()
+ a.email_address = 'foo@boston.com'
+ b = Address()
+ b.email_address = 'bar@newyork.com'
+
+ u.boston_addresses.append(a)
+ u.newyork_addresses.append(b)
+ ctx.current.flush()
+
class SaveTest(UnitOfWorkTest):
def setUpAll(self):
def tearDown(self):
tables.delete()
-
- #self.assert_(len(ctx.current.new) == 0)
- #self.assert_(len(ctx.current.dirty) == 0)
UnitOfWorkTest.tearDown(self)
def testbasic(self):
assert addresses.count().scalar() == 0
- def testm2mmultitable(self):
- # many-to-many join on an association table
- j = join(users, userkeywords,
- users.c.user_id==userkeywords.c.user_id).join(keywords,
- userkeywords.c.keyword_id==keywords.c.keyword_id)
-
- # a class
- class KeywordUser(object):
- pass
-
- # map to it - the identity of a KeywordUser object will be
- # (user_id, keyword_id) since those are the primary keys involved
- m = mapper(KeywordUser, j, properties={
- 'user_id':[users.c.user_id, userkeywords.c.user_id],
- 'keyword_id':[userkeywords.c.keyword_id, keywords.c.keyword_id],
- 'keyword_name':keywords.c.name
-
- })
-
- k = KeywordUser()
- k.user_name = 'keyworduser'
- k.keyword_name = 'a keyword'
- ctx.current.flush()
- print m.instance_key(k)
- id = (k.user_id, k.keyword_id)
- ctx.current.clear()
- k = ctx.current.query(KeywordUser).get(id)
- assert k.user_name == 'keyworduser'
- assert k.keyword_name == 'a keyword'
def testbatchmode(self):
class TestExtension(MapperExtension):
except AssertionError:
assert True
- def testonetoone(self):
- m = mapper(User, users, properties = dict(
- address = relation(mapper(Address, addresses), lazy = True, uselist = False)
- ))
- u = User()
- u.user_name = 'one2onetester'
- u.address = Address()
- u.address.email_address = 'myonlyaddress@foo.com'
- ctx.current.flush()
- u.user_name = 'imnew'
- ctx.current.flush()
- u.address.email_address = 'imnew@foo.com'
- ctx.current.flush()
-
- def testchildmove(self):
- """tests moving a child from one parent to the other, then deleting the first parent, properly
- updates the child with the new parent. this tests the 'trackparent' option in the attributes module."""
- m = mapper(User, users, properties = dict(
- addresses = relation(mapper(Address, addresses), lazy = True)
- ))
- u1 = User()
- u1.user_name = 'user1'
- u2 = User()
- u2.user_name = 'user2'
- a = Address()
- a.email_address = 'address1'
- u1.addresses.append(a)
- ctx.current.flush()
- del u1.addresses[0]
- u2.addresses.append(a)
- ctx.current.delete(u1)
- ctx.current.flush()
- ctx.current.clear()
- u2 = ctx.current.get(User, u2.user_id)
- assert len(u2.addresses) == 1
-
- def testdelete(self):
- m = mapper(User, users, properties = dict(
- address = relation(mapper(Address, addresses), lazy = True, uselist = False, private = False)
- ))
- u = User()
- a = Address()
- u.user_name = 'one2onetester'
- u.address = a
- u.address.email_address = 'myonlyaddress@foo.com'
- ctx.current.flush()
- self.echo("\n\n\n")
- ctx.current.delete(u)
- ctx.current.flush()
- self.assert_(a.address_id is not None and a.user_id is None and not ctx.current.identity_map.has_key(u._instance_key) and ctx.current.identity_map.has_key(a._instance_key))
+class ManyToOneTest(UnitOfWorkTest):
+ def setUpAll(self):
+ UnitOfWorkTest.setUpAll(self)
+ tables.create()
+ def tearDownAll(self):
+ tables.drop()
+ UnitOfWorkTest.tearDownAll(self)
+ def tearDown(self):
+ tables.delete()
+ UnitOfWorkTest.tearDown(self)
- def testbackwardsonetoone(self):
- # test 'backwards'
-# m = mapper(Address, addresses, properties = dict(
-# user = relation(User, users, foreignkey = addresses.c.user_id, primaryjoin = users.c.user_id == addresses.c.user_id, lazy = True, uselist = False)
-# ))
+ def testm2oonetoone(self):
# TODO: put assertion in here !!!
m = mapper(Address, addresses, properties = dict(
user = relation(mapper(User, users), lazy = True, uselist = False)
},
])
- l = sql.select([users, addresses], sql.and_(users.c.user_id==addresses.c.address_id, addresses.c.address_id==a.address_id)).execute()
- self.echo( repr(l.fetchone().values()))
+ l = sql.select([users, addresses], sql.and_(users.c.user_id==addresses.c.user_id, addresses.c.address_id==a.address_id)).execute()
+ assert l.fetchone().values() == [a.user.user_id, 'asdf8d', a.address_id, a.user_id, 'theater@foo.com']
-
- def testonetomany(self):
- """test basic save of one to many."""
- m = mapper(User, users, properties = dict(
- addresses = relation(mapper(Address, addresses), lazy = True)
+ def testmanytoone_1(self):
+ m = mapper(Address, addresses, properties = dict(
+ user = relation(mapper(User, users), lazy = True)
))
- u = User()
- u.user_name = 'one2manytester'
- u.addresses = []
- a = Address()
- a.email_address = 'one2many@test.org'
- u.addresses.append(a)
- a2 = Address()
- a2.email_address = 'lala@test.org'
- u.addresses.append(a2)
- self.echo( repr(u.addresses))
- self.echo( repr(u.addresses.added_items()))
- ctx.current.flush()
-
- usertable = users.select(users.c.user_id.in_(u.user_id)).execute().fetchall()
- self.assertEqual(usertable[0].values(), [u.user_id, 'one2manytester'])
- addresstable = addresses.select(addresses.c.address_id.in_(a.address_id, a2.address_id), order_by=[addresses.c.email_address]).execute().fetchall()
- self.assertEqual(addresstable[0].values(), [a2.address_id, u.user_id, 'lala@test.org'])
- self.assertEqual(addresstable[1].values(), [a.address_id, u.user_id, 'one2many@test.org'])
-
- userid = u.user_id
- addressid = a2.address_id
+ a1 = Address()
+ a1.email_address = 'emailaddress1'
+ u1 = User()
+ u1.user_name='user1'
- a2.email_address = 'somethingnew@foo.com'
-
+ a1.user = u1
ctx.current.flush()
+ ctx.current.clear()
+ a1 = ctx.current.query(Address).get(a1.address_id)
+ u1 = ctx.current.query(User).get(u1.user_id)
+ assert a1.user is u1
-
- addresstable = addresses.select(addresses.c.address_id == addressid).execute().fetchall()
- self.assertEqual(addresstable[0].values(), [addressid, userid, 'somethingnew@foo.com'])
- self.assert_(u.user_id == userid and a2.address_id == addressid)
-
- def testmapperswitch(self):
- """test that, if we change mappers, the new one gets used fully. """
- users.insert().execute(
- dict(user_id = 7, user_name = 'jack'),
- dict(user_id = 8, user_name = 'ed'),
- dict(user_id = 9, user_name = 'fred')
- )
-
- # mapper with just users table
- assign_mapper(User, users)
- ctx.current.query(User).select()
- oldmapper = User.mapper
- # now a mapper with the users table plus a relation to the addresses
- clear_mapper(User.mapper)
- assign_mapper(User, users, properties = dict(
- addresses = relation(mapper(Address, addresses), lazy = False)
- ))
- self.assert_(oldmapper is not User.mapper)
- u = ctx.current.query(User).select()
- u[0].addresses.append(Address())
- u[0].addresses[0].email_address='hi'
-
- # insure that upon commit, the new mapper with the address relation is used
- self.assert_sql(db, lambda: ctx.current.flush(),
- [
- (
- "INSERT INTO email_addresses (user_id, email_address) VALUES (:user_id, :email_address)",
- {'email_address': 'hi', 'user_id': 7}
- ),
- ],
- with_sequences=[
- (
- "INSERT INTO email_addresses (address_id, user_id, email_address) VALUES (:address_id, :user_id, :email_address)",
- lambda ctx:{'email_address': 'hi', 'user_id': 7, 'address_id':ctx.last_inserted_ids()[0]}
- ),
- ]
- )
+ a1.user = None
+ ctx.current.flush()
+ ctx.current.clear()
+ a1 = ctx.current.query(Address).get(a1.address_id)
+ u1 = ctx.current.query(User).get(u1.user_id)
+ assert a1.user is None
- def testchildmanipulations(self):
- """digs deeper into modifying the child items of an object to insure the correct
- updates take place"""
- m = mapper(User, users, properties = dict(
- addresses = relation(mapper(Address, addresses), lazy = True)
+ def testmanytoone_2(self):
+ m = mapper(Address, addresses, properties = dict(
+ user = relation(mapper(User, users), lazy = True)
))
- u1 = User()
- u1.user_name = 'user1'
- u1.addresses = []
a1 = Address()
a1.email_address = 'emailaddress1'
- u1.addresses.append(a1)
- u2 = User()
- u2.user_name = 'user2'
- u2.addresses = []
a2 = Address()
a2.email_address = 'emailaddress2'
- u2.addresses.append(a2)
-
- a3 = Address()
- a3.email_address = 'emailaddress3'
+ u1 = User()
+ u1.user_name='user1'
+ a1.user = u1
ctx.current.flush()
-
- self.echo("\n\n\n")
- # modify user2 directly, append an address to user1.
- # upon commit, user2 should be updated, user1 should not
- # both address1 and address3 should be updated
- u2.user_name = 'user2modified'
- u1.addresses.append(a3)
- del u1.addresses[0]
- self.assert_sql(db, lambda: ctx.current.flush(),
- [
- (
- "UPDATE users SET user_name=:user_name WHERE users.user_id = :users_user_id",
- {'users_user_id': u2.user_id, 'user_name': 'user2modified'}
- ),
- ("UPDATE email_addresses SET user_id=:user_id WHERE email_addresses.address_id = :email_addresses_address_id",
- {'user_id': None, 'email_addresses_address_id': a1.address_id}
- ),
- (
- "UPDATE email_addresses SET user_id=:user_id WHERE email_addresses.address_id = :email_addresses_address_id",
- {'user_id': u1.user_id, 'email_addresses_address_id': a3.address_id}
- ),
- ])
+ ctx.current.clear()
+ a1 = ctx.current.query(Address).get(a1.address_id)
+ a2 = ctx.current.query(Address).get(a2.address_id)
+ u1 = ctx.current.query(User).get(u1.user_id)
+ assert a1.user is u1
+ a1.user = None
+ a2.user = u1
+ ctx.current.flush()
+ ctx.current.clear()
+ a1 = ctx.current.query(Address).get(a1.address_id)
+ a2 = ctx.current.query(Address).get(a2.address_id)
+ u1 = ctx.current.query(User).get(u1.user_id)
+ assert a1.user is None
+ assert a2.user is u1
- def testbackwardsmanipulations(self):
+ def testmanytoone_3(self):
m = mapper(Address, addresses, properties = dict(
- user = relation(mapper(User, users), lazy = True, uselist = False)
+ user = relation(mapper(User, users), lazy = True)
))
a1 = Address()
a1.email_address = 'emailaddress1'
u1 = User()
u1.user_name='user1'
-
+ u2 = User()
+ u2.user_name='user2'
+
a1.user = u1
ctx.current.flush()
-
- self.echo("\n\n\n")
- ctx.current.delete(u1)
- a1.user = None
+ ctx.current.clear()
+ a1 = ctx.current.query(Address).get(a1.address_id)
+ u1 = ctx.current.query(User).get(u1.user_id)
+ u2 = ctx.current.query(User).get(u2.user_id)
+ assert a1.user is u1
+
+ a1.user = u2
ctx.current.flush()
-
+ ctx.current.clear()
+ a1 = ctx.current.query(Address).get(a1.address_id)
+ u1 = ctx.current.query(User).get(u1.user_id)
+ u2 = ctx.current.query(User).get(u2.user_id)
+ assert a1.user is u2
+
+class ManyToManyTest(UnitOfWorkTest):
+ def setUpAll(self):
+ UnitOfWorkTest.setUpAll(self)
+ tables.create()
+ def tearDownAll(self):
+ tables.drop()
+ UnitOfWorkTest.tearDownAll(self)
+ def tearDown(self):
+ tables.delete()
+ UnitOfWorkTest.tearDown(self)
+
def testmanytomany(self):
items = orderitems
l = Query(m).select(items.c.item_name.in_(*[e['item_name'] for e in data[1:]]), order_by=[items.c.item_name, keywords.c.name])
self.assert_result(l, *data)
- def testbidirectional(self):
- m1 = mapper(User, users)
-
- m2 = mapper(Address, addresses, properties = dict(
- user = relation(m1, lazy = False, backref='addresses')
- ))
-
-
- u = User()
- print repr(u.addresses)
- u.user_name = 'test'
- a = Address()
- a.email_address = 'testaddress'
- a.user = u
- ctx.current.flush()
- print repr(u.addresses)
- x = False
- try:
- u.addresses.append('hi')
- x = True
- except:
+ def testm2mmultitable(self):
+ # many-to-many join on an association table
+ j = join(users, userkeywords,
+ users.c.user_id==userkeywords.c.user_id).join(keywords,
+ userkeywords.c.keyword_id==keywords.c.keyword_id)
+
+ # a class
+ class KeywordUser(object):
pass
-
- if x:
- self.assert_(False, "User addresses element should be scalar based")
-
- ctx.current.delete(u)
- ctx.current.flush()
- def testdoublerelation(self):
- m2 = mapper(Address, addresses)
- m = mapper(User, users, properties={
- 'boston_addresses' : relation(m2, primaryjoin=
- and_(users.c.user_id==Address.c.user_id,
- Address.c.email_address.like('%boston%'))),
- 'newyork_addresses' : relation(m2, primaryjoin=
- and_(users.c.user_id==Address.c.user_id,
- Address.c.email_address.like('%newyork%'))),
+ # map to it - the identity of a KeywordUser object will be
+ # (user_id, keyword_id) since those are the primary keys involved
+ m = mapper(KeywordUser, j, properties={
+ 'user_id':[users.c.user_id, userkeywords.c.user_id],
+ 'keyword_id':[userkeywords.c.keyword_id, keywords.c.keyword_id],
+ 'keyword_name':keywords.c.name
+
})
- u = User()
- a = Address()
- a.email_address = 'foo@boston.com'
- b = Address()
- b.email_address = 'bar@newyork.com'
-
- u.boston_addresses.append(a)
- u.newyork_addresses.append(b)
+
+ k = KeywordUser()
+ k.user_name = 'keyworduser'
+ k.keyword_name = 'a keyword'
ctx.current.flush()
+ print m.instance_key(k)
+ id = (k.user_id, k.keyword_id)
+ ctx.current.clear()
+ k = ctx.current.query(KeywordUser).get(id)
+ assert k.user_name == 'keyworduser'
+ assert k.keyword_name == 'a keyword'
+
class SaveTest2(UnitOfWorkTest):