-"""tests general mapper operations with an emphasis on selecting/loading"""
+"""General mapper operations with an emphasis on selecting/loading."""
import testenv; testenv.configure_for_tests()
-from sqlalchemy import *
-from sqlalchemy import exc as sa_exc, sql
-from sqlalchemy.orm import *
-from testlib import *
-from testlib import fixtures
-from testlib.tables import *
-import testlib.tables as tables
-
-
-class MapperSuperTest(TestBase, AssertsExecutionResults):
- def setUpAll(self):
- tables.create()
- tables.data()
- def tearDownAll(self):
- tables.drop()
- def tearDown(self):
- clear_mappers()
- def setUp(self):
- pass
-
-class MapperTest(MapperSuperTest):
-
- def test_propconflict(self):
- """test that a backref created against an existing mapper with a property name
- conflict raises a decent error message"""
+from testlib import sa, testing
+from testlib.sa import MetaData, Table, Column, Integer, String, ForeignKey
+from testlib.sa.orm import mapper, relation, backref, create_session
+from testlib.sa.orm import defer, deferred, synonym
+from testlib.testing import eq_
+from testlib.compat import set
+import pickleable
+from orm import _base, _fixtures
+
+
+class MapperTest(_fixtures.FixtureTest):
+
+ @testing.resolve_artifact_names
+ def test_prop_shadow(self):
+ """A backref name may not shadow an existing property name."""
+
mapper(Address, addresses)
mapper(User, users,
properties={
'addresses':relation(Address, backref='email_address')
})
- self.assertRaises(sa_exc.ArgumentError, compile_mappers)
+ self.assertRaises(sa.exc.ArgumentError, sa.orm.compile_mappers)
+ @testing.resolve_artifact_names
def test_prop_accessor(self):
mapper(User, users)
- self.assertRaises(NotImplementedError, getattr, class_mapper(User), 'properties')
+ self.assertRaises(NotImplementedError,
+ getattr, sa.orm.class_mapper(User), 'properties')
@testing.uses_deprecated(
'Call to deprecated function _instance_key',
'Call to deprecated function _sa_session_id',
'Call to deprecated function _entity_name')
- def test_legacy_accesors(self):
- u1 = User()
+ @testing.resolve_artifact_names
+ def test_legacy_accessors(self):
+ u1 = User(name='u1')
assert not hasattr(u1, '_instance_key')
assert not hasattr(u1, '_sa_session_id')
assert not hasattr(u1, '_entity_name')
mapper(User, users)
- u1 = User()
+ u1 = User(name='u2')
assert not hasattr(u1, '_instance_key')
assert not hasattr(u1, '_sa_session_id')
assert u1._entity_name is None
sess = create_session()
sess.save(u1)
assert not hasattr(u1, '_instance_key')
- assert u1._sa_session_id == sess.hash_key
+ eq_(u1._sa_session_id, sess.hash_key)
assert u1._entity_name is None
sess.flush()
- assert u1._instance_key == class_mapper(u1).identity_key_from_instance(u1)
- assert u1._sa_session_id == sess.hash_key
+ eq_(u1._instance_key,
+ sa.orm.class_mapper(u1).identity_key_from_instance(u1))
+ eq_(u1._sa_session_id, sess.hash_key)
assert u1._entity_name is None
sess.delete(u1)
sess.flush()
- def test_badcascade(self):
+ @testing.resolve_artifact_names
+ def test_bad_cascade(self):
mapper(Address, addresses)
- self.assertRaises(sa_exc.ArgumentError, relation, Address, cascade="fake, all, delete-orphan")
+ self.assertRaises(sa.exc.ArgumentError,
+ relation, Address, cascade="fake, all, delete-orphan")
- def test_columnprefix(self):
+ @testing.resolve_artifact_names
+ def test_column_prefix(self):
mapper(User, users, column_prefix='_', properties={
- 'user_name':synonym('_user_name')
+ 'user_name': synonym('_name')
})
s = create_session()
u = s.get(User, 7)
- assert u._user_name=='jack'
- assert u._user_id ==7
+ eq_(u._name, 'jack')
+ eq_(u._id,7)
u2 = s.query(User).filter_by(user_name='jack').one()
assert u is u2
- def test_no_pks(self):
- s = select([users.c.user_name]).alias('foo')
- self.assertRaises(sa_exc.ArgumentError, mapper, User, s)
-
- def test_recompile_on_othermapper(self):
- """test the global '_new_mappers' flag such that a compile
- trigger on an already-compiled mapper still triggers a check against all mappers."""
-
- from sqlalchemy.orm import mapperlib
-
+ @testing.resolve_artifact_names
+ def test_no_pks_1(self):
+ s = sa.select([users.c.name]).alias('foo')
+ self.assertRaises(sa.exc.ArgumentError, mapper, User, s)
+
+ @testing.emits_warning(
+ 'mapper Mapper|User|Select object creating an alias for '
+ 'the given selectable - use Class attributes for queries')
+ @testing.resolve_artifact_names
+ def test_no_pks_2(self):
+ s = sa.select([users.c.name])
+ self.assertRaises(sa.exc.ArgumentError, mapper, User, s)
+
+ @testing.resolve_artifact_names
+ def test_recompile_on_other_mapper(self):
+ """A compile trigger on an already-compiled mapper still triggers a check against all mappers."""
mapper(User, users)
- compile_mappers()
- assert mapperlib._new_mappers is False
+ sa.orm.compile_mappers()
+ assert sa.orm.mapperlib._new_mappers is False
m = mapper(Address, addresses, properties={
'user': relation(User, backref="addresses")})
assert m.compiled is False
- assert mapperlib._new_mappers is True
+ assert sa.orm.mapperlib._new_mappers is True
u = User()
assert User.addresses
- assert mapperlib._new_mappers is False
+ assert sa.orm.mapperlib._new_mappers is False
- def test_compileonsession(self):
+ @testing.resolve_artifact_names
+ def test_compile_on_session(self):
m = mapper(User, users)
session = create_session()
session.connection(m)
- def test_incompletecolumns(self):
- """test loading from a select which does not contain all columns"""
+ @testing.resolve_artifact_names
+ def test_incomplete_columns(self):
+ """Loading from a select which does not contain all columns"""
mapper(Address, addresses)
s = create_session()
- a = s.query(Address).from_statement(select([addresses.c.address_id, addresses.c.user_id])).first()
- assert a.user_id == 7
- assert a.address_id == 1
+ a = s.query(Address).from_statement(
+ sa.select([addresses.c.id, addresses.c.user_id])).first()
+ eq_(a.user_id, 7)
+ eq_(a.id, 1)
# email address auto-defers
assert 'email_addres' not in a.__dict__
- assert a.email_address == 'jack@bean.com'
+ eq_(a.email_address, 'jack@bean.com')
- def test_badconstructor(self):
- """test that if the construction of a mapped class fails, the instnace does not get placed in the session"""
+ @testing.resolve_artifact_names
+ def test_bad_constructor(self):
+ """If the construction of a mapped class fails, the instance does not get placed in the session"""
class Foo(object):
def __init__(self, one, two, _sa_session=None):
pass
- mapper(Foo, users)
+
+ mapper(Foo, users, extension=sa.orm.scoped_session(
+ create_session).extension)
+
sess = create_session()
self.assertRaises(TypeError, Foo, 'one', _sa_session=sess)
- assert len(list(sess)) == 0
+ eq_(len(list(sess)), 0)
self.assertRaises(TypeError, Foo, 'one')
+ Foo('one', 'two', _sa_session=sess)
+ eq_(len(list(sess)), 1)
- def test_constructorexc(self):
- """test that exceptions raised in the mapped class are not masked by sa decorations"""
+ @testing.resolve_artifact_names
+ def test_constructor_exc_1(self):
+ """Exceptions raised in the mapped class are not masked by sa decorations"""
ex = AssertionError('oops')
sess = create_session()
except Exception, e:
assert e is ex
- clear_mappers()
- mapper(Foo, users, extension=scoped_session(create_session).extension)
+ sa.orm.clear_mappers()
+ mapper(Foo, users, extension=sa.orm.scoped_session(
+ create_session).extension)
def bad_expunge(foo):
raise Exception("this exception should be stated as a warning")
Foo(_sa_session=sess)
assert False
except Exception, e:
- assert isinstance(e, sa_exc.SAWarning), e
+ assert isinstance(e, sa.exc.SAWarning), e
- clear_mappers()
+ @testing.resolve_artifact_names
+ def test_constructor_exc_2(self):
+ """TypeError is raised for illegal constructor args, whether or not explicit __init__ is present [ticket:908]."""
- # test that TypeError is raised for illegal constructor args,
- # whether or not explicit __init__ is present [ticket:908]
class Foo(object):
def __init__(self):
pass
except TypeError:
assert True
+ @testing.resolve_artifact_names
def test_props(self):
m = mapper(User, users, properties = {
'addresses' : relation(mapper(Address, addresses))
}).compile()
- self.assert_(User.addresses.property is m.get_property('addresses'))
+ assert User.addresses.property is m.get_property('addresses')
- def test_compileonprop(self):
+ @testing.resolve_artifact_names
+ def test_compile_on_prop_1(self):
mapper(User, users, properties = {
'addresses' : relation(mapper(Address, addresses))
})
User.addresses.any(Address.email_address=='foo@bar.com')
- clear_mappers()
+ @testing.resolve_artifact_names
+ def test_compile_on_prop_2(self):
mapper(User, users, properties = {
'addresses' : relation(mapper(Address, addresses))
})
- self.assertEquals((User.user_id==3).__str__(), (users.c.user_id==3).__str__())
-
- clear_mappers()
+ eq_(str(User.id == 3), str(users.c.id==3))
+ @testing.resolve_artifact_names
+ def test_compile_on_prop_3(self):
class Foo(User):pass
mapper(User, users)
mapper(Foo, addresses, inherits=User)
- assert getattr(Foo().__class__, 'user_name').impl is not None
+ assert getattr(Foo().__class__, 'name').impl is not None
- def test_compileon_getprops(self):
+ @testing.resolve_artifact_names
+ def test_compile_on_get_props_1(self):
m =mapper(User, users)
-
assert not m.compiled
assert list(m.iterate_properties)
assert m.compiled
- clear_mappers()
+ @testing.resolve_artifact_names
+ def test_compile_on_get_props_2(self):
m= mapper(User, users)
assert not m.compiled
- assert m.get_property('user_name')
+ assert m.get_property('name')
assert m.compiled
+ @testing.resolve_artifact_names
def test_add_property(self):
assert_col = []
- class User(object):
- def _get_user_name(self):
- assert_col.append(('get', self._user_name))
- return self._user_name
- def _set_user_name(self, name):
+ class User(_base.ComparableEntity):
+ def _get_name(self):
+ assert_col.append(('get', self._name))
+ return self._name
+ def _set_name(self, name):
assert_col.append(('set', name))
- self._user_name = name
- user_name = property(_get_user_name, _set_user_name)
+ self._name = name
+ name = property(_get_name, _set_name)
- def _uc_user_name(self):
- if self._user_name is None:
+ def _uc_name(self):
+ if self._name is None:
return None
- return self._user_name.upper()
- uc_user_name = property(_uc_user_name)
- uc_user_name2 = property(_uc_user_name)
+ return self._name.upper()
+ uc_name = property(_uc_name)
+ uc_name2 = property(_uc_name)
m = mapper(User, users)
mapper(Address, addresses)
- class UCComparator(PropComparator):
+ class UCComparator(sa.orm.PropComparator):
def __eq__(self, other):
cls = self.prop.parent.class_
- col = getattr(cls, 'user_name')
+ col = getattr(cls, 'name')
if other is None:
return col == None
else:
- return func.upper(col) == func.upper(other)
+ return sa.func.upper(col) == sa.func.upper(other)
- m.add_property('_user_name', deferred(users.c.user_name))
- m.add_property('user_name', synonym('_user_name'))
+ m.add_property('_name', deferred(users.c.name))
+ m.add_property('name', synonym('_name'))
m.add_property('addresses', relation(Address))
- m.add_property('uc_user_name', comparable_property(UCComparator))
- m.add_property('uc_user_name2', comparable_property(
- UCComparator, User.uc_user_name2))
+ m.add_property('uc_name', sa.orm.comparable_property(UCComparator))
+ m.add_property('uc_name2', sa.orm.comparable_property(
+ UCComparator, User.uc_name2))
sess = create_session(autocommit=False)
assert sess.query(User).get(7)
- u = sess.query(User).filter_by(user_name='jack').one()
+ u = sess.query(User).filter_by(name='jack').one()
def go():
- self.assert_result([u], User, user_address_result[0])
- assert u.user_name == 'jack'
- assert u.uc_user_name == 'JACK'
- assert u.uc_user_name2 == 'JACK'
- assert assert_col == [('get', 'jack')], str(assert_col)
- self.assert_sql_count(testing.db, go, 2)
+ eq_(len(u.addresses),
+ len(self.static.user_address_result[0].addresses))
+ eq_(u.name, 'jack')
+ eq_(u.uc_name, 'JACK')
+ eq_(u.uc_name2, 'JACK')
+ eq_(assert_col, [('get', 'jack')], str(assert_col))
+ self.sql_count_(2, go)
u.name = 'ed'
u3 = User()
- u3.user_name = 'some user'
+ u3.name = 'some user'
sess.save(u3)
sess.flush()
sess.rollback()
+ @testing.resolve_artifact_names
def test_replace_property(self):
m = mapper(User, users)
- m.add_property('_user_name',users.c.user_name)
- m.add_property('user_name', synonym('_user_name', proxy=True))
+ m.add_property('_name',users.c.name)
+ m.add_property('name', synonym('_name', proxy=True))
sess = create_session()
- u = sess.query(User).filter_by(user_name='jack').one()
- assert u._user_name == 'jack'
- assert u.user_name == 'jack'
- u.user_name = 'jacko'
- assert m._columntoproperty[users.c.user_name] is m.get_property('_user_name')
+ u = sess.query(User).filter_by(name='jack').one()
+ eq_(u._name, 'jack')
+ eq_(u.name, 'jack')
+ u.name = 'jacko'
+ assert m._columntoproperty[users.c.name] is m.get_property('_name')
- clear_mappers()
+ sa.orm.clear_mappers()
m = mapper(User, users)
- m.add_property('user_name', synonym('_user_name', map_column=True))
+ m.add_property('name', synonym('_name', map_column=True))
sess.clear()
- u = sess.query(User).filter_by(user_name='jack').one()
- assert u._user_name == 'jack'
- assert u.user_name == 'jack'
- u.user_name = 'jacko'
- assert m._columntoproperty[users.c.user_name] is m.get_property('_user_name')
+ u = sess.query(User).filter_by(name='jack').one()
+ eq_(u._name, 'jack')
+ eq_(u.name, 'jack')
+ u.name = 'jacko'
+ assert m._columntoproperty[users.c.name] is m.get_property('_name')
+ @testing.resolve_artifact_names
def test_synonym_replaces_backref(self):
assert_calls = []
class Address(object):
mapper(Address, addresses, properties={
'user':synonym('_user')
})
- compile_mappers()
+ sa.orm.compile_mappers()
# later, backref sets up the prop
mapper(User, users, properties={
u2 = sess.query(User).get(8)
# comparaison ops need to work
a1 = sess.query(Address).filter(Address.user==u1).one()
- assert a1.address_id == 1
+ eq_(a1.id, 1)
a1.user = u2
assert a1.user is u2
- self.assertEquals(assert_calls, ["set", "get"])
+ eq_(assert_calls, ["set", "get"])
- def test_self_ref_syn(self):
+ @testing.resolve_artifact_names
+ def test_self_ref_synonym(self):
t = Table('nodes', MetaData(),
Column('id', Integer, primary_key=True),
Column('parent_id', Integer, ForeignKey('nodes.id')))
n1.children.append(n2)
assert n2.parent is n2._parent is n1
assert n1.children[0] is n1._children[0] is n2
- self.assertEquals(str(Node.parent == n2), ":param_1 = nodes.parent_id")
+ eq_(str(Node.parent == n2), ":param_1 = nodes.parent_id")
+ @testing.resolve_artifact_names
def test_illegal_non_primary(self):
mapper(User, users)
mapper(Address, addresses)
'addresses':relation(Address)
}).compile()
assert False
- except sa_exc.ArgumentError, e:
+ except sa.exc.ArgumentError, e:
assert "Attempting to assign a new relation 'addresses' to a non-primary mapper on class 'User'" in str(e)
+ @testing.resolve_artifact_names
def test_illegal_non_primary_2(self):
try:
mapper(User, users, non_primary=True)
assert False
- except sa_exc.InvalidRequestError, e:
+ except sa.exc.InvalidRequestError, e:
assert "Configure a primary mapper first" in str(e)
- def test_propfilters(self):
+ @testing.resolve_artifact_names
+ def test_prop_filters(self):
t = Table('person', MetaData(),
Column('id', Integer, primary_key=True),
Column('type', String(128)),
column_prefix="p_")
p_m.compile()
- #compile_mappers()
+ #sa.orm.compile_mappers()
def assert_props(cls, want):
have = set([n for n in dir(cls) if not n.startswith('_')])
want = set(want)
- self.assert_(have == want, repr(have) + " " + repr(want))
+ eq_(have, want)
assert_props(Person, ['id', 'name', 'type'])
assert_props(Employee, ['boss', 'boss_id', 'employee_number',
assert_props(Hoho, ['id', 'name', 'type'])
assert_props(Lala, ['p_employee_number', 'p_id', 'p_name', 'p_type'])
- def test_mappingtojoin(self):
- """test mapping to a join"""
- usersaddresses = sql.join(users, addresses, users.c.user_id == addresses.c.user_id)
- m = mapper(User, usersaddresses, primary_key=[users.c.user_id])
- q = create_session().query(m)
- l = q.all()
- self.assert_result(l, User, *user_result[0:2])
-
- def test_mappingtojoinnopk(self):
- metadata = MetaData()
- account_ids_table = Table('account_ids', metadata,
- Column('account_id', Integer, Sequence('seq_account_id'), primary_key=True),
- Column('username', String(20)))
- account_stuff_table = Table('account_stuff', metadata,
- Column('account_id', Integer, ForeignKey('account_ids.account_id')),
- Column('credit', Numeric))
- class A(object):pass
- m = mapper(A, account_ids_table.join(account_stuff_table))
+ @testing.resolve_artifact_names
+ def test_mapping_to_join(self):
+ """Mapping to a join"""
+ usersaddresses = sa.join(users, addresses,
+ users.c.id == addresses.c.user_id)
+ mapper(User, usersaddresses, primary_key=[users.c.id])
+ l = create_session().query(User).order_by(users.c.id).all()
+ eq_(l, self.static.user_result[:3])
+
+ @testing.resolve_artifact_names
+ def test_mapping_to_join_no_pk(self):
+ m = mapper(Address, addresses.join(email_bounces))
m.compile()
- assert account_ids_table in m._pks_by_table
- assert account_stuff_table not in m._pks_by_table
- metadata.create_all(testing.db)
- try:
- sess = create_session(bind=testing.db)
- a = A()
- sess.save(a)
- sess.flush()
- assert testing.db.execute(account_ids_table.count()).scalar() == 1
- assert testing.db.execute(account_stuff_table.count()).scalar() == 0
- finally:
- metadata.drop_all(testing.db)
-
- def test_mappingtoouterjoin(self):
- """test mapping to an outer join, with a composite primary key that allows nulls"""
- result = [
- {'user_id' : 7, 'address_id' : 1},
- {'user_id' : 8, 'address_id' : 2},
- {'user_id' : 8, 'address_id' : 3},
- {'user_id' : 8, 'address_id' : 4},
- {'user_id' : 9, 'address_id':None}
- ]
-
- j = join(users, addresses, isouter=True)
- m = mapper(User, j, allow_null_pks=True, primary_key=[users.c.user_id, addresses.c.address_id])
- q = create_session().query(m)
- l = q.all()
- self.assert_result(l, User, *result)
+ assert addresses in m._pks_by_table
+ assert email_bounces not in m._pks_by_table
+ sess = create_session()
+ a = Address(email_address='e1')
+ sess.add(a)
+ sess.flush()
- def test_customjoin(self):
- """Tests that select_from totally replace the FROM parameters."""
+ eq_(addresses.count().scalar(), 6)
+ eq_(email_bounces.count().scalar(), 5)
- m = mapper(User, users, properties={
- 'orders':relation(mapper(Order, orders, properties={
- 'items':relation(mapper(Item, orderitems))
- }))
- })
+ @testing.resolve_artifact_names
+ def test_mapping_to_outerjoin(self):
+ """Mapping to an outer join with a nullable composite primary key."""
- q = create_session().query(m)
- l = (q.select_from(users.join(orders).join(orderitems)).
- filter(orderitems.c.item_name=='item 4'))
- self.assert_result(l, User, user_result[0])
+ mapper(User, users.outerjoin(addresses),
+ allow_null_pks=True,
+ primary_key=[users.c.id, addresses.c.id],
+ properties=dict(
+ address_id=addresses.c.id))
- def test_orderby(self):
- """test ordering at the mapper and query level"""
+ session = create_session()
+ l = session.query(User).order_by(User.id, User.address_id).all()
+
+ eq_(l, [
+ User(id=7, address_id=1),
+ User(id=8, address_id=2),
+ User(id=8, address_id=3),
+ User(id=8, address_id=4),
+ User(id=9, address_id=5),
+ User(id=10, address_id=None)])
+
+ @testing.resolve_artifact_names
+ def test_custom_join(self):
+ """select_from totally replace the FROM parameters."""
+
+ mapper(Item, items)
+
+ mapper(Order, orders, properties=dict(
+ items=relation(Item, order_items)))
+
+ mapper(User, users, properties=dict(
+ orders=relation(Order)))
+
+ session = create_session()
+ l = (session.query(User).
+ select_from(users.join(orders).
+ join(order_items).
+ join(items)).
+ filter(items.c.description == 'item 4')).all()
+
+ eq_(l, [self.static.user_result[0]])
+
+ @testing.resolve_artifact_names
+ def test_order_by(self):
+ """Ordering at the mapper and query level"""
# TODO: make a unit test out of these various combinations
- #m = mapper(User, users, order_by=desc(users.c.user_name))
+ #m = mapper(User, users, order_by=desc(users.c.name))
mapper(User, users, order_by=None)
#mapper(User, users)
- #l = create_session().query(User).select(order_by=[desc(users.c.user_name), asc(users.c.user_id)])
+ #l = create_session().query(User).select(order_by=[desc(users.c.name), asc(users.c.user_id)])
l = create_session().query(User).all()
#l = create_session().query(User).select(order_by=[])
#l = create_session().query(User).select(order_by=None)
- @testing.unsupported('firebird', 'Raises a "expression evaluation not supported" error at prepare time')
+ # 'Raises a "expression evaluation not supported" error at prepare time
+ @testing.fails_on('firebird')
+ @testing.resolve_artifact_names
def test_function(self):
- """Test mapping to a SELECT statement that has functions in it."""
+ """Mapping to a SELECT statement that has functions in it."""
- s = select([users,
- (users.c.user_id * 2).label('concat'),
- func.count(addresses.c.address_id).label('count')],
- users.c.user_id == addresses.c.user_id,
- group_by=[c for c in users.c]).alias('myselect')
+ s = sa.select([users,
+ (users.c.id * 2).label('concat'),
+ sa.func.count(addresses.c.id).label('count')],
+ users.c.id == addresses.c.user_id,
+ group_by=[c for c in users.c]).alias('myselect')
mapper(User, s, order_by=s.default_order_by())
sess = create_session()
l = sess.query(User).all()
- for u in l:
- print "User", u.user_id, u.user_name, u.concat, u.count
- assert l[0].concat == l[0].user_id * 2 == 14
- assert l[1].concat == l[1].user_id * 2 == 16
+ for idx, total in enumerate((14, 16)):
+ eq_(l[idx].concat, l[idx].id * 2)
+ eq_(l[idx].concat, total)
+
+ @testing.resolve_artifact_names
def test_count(self):
- """Test the count function on Query."""
+ """The count function on Query."""
mapper(User, users)
- q = create_session().query(User)
- self.assert_(q.count()==3)
- self.assert_(q.filter(users.c.user_id.in_([8,9])).count()==2)
-
- def test_manytomany_count(self):
- mapper(Item, orderitems, properties = dict(
- keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy = True),
- ))
- q = create_session().query(Item)
- assert q.join('keywords').distinct().filter(Keyword.name=="red").count() == 2
-
- def test_override(self):
- # assert that overriding a column raises an error
- try:
- m = mapper(User, users, properties = {
- 'user_name' : relation(mapper(Address, addresses)),
- }).compile()
- self.assert_(False, "should have raised ArgumentError")
- except sa_exc.ArgumentError, e:
- self.assert_(True)
-
- clear_mappers()
- # assert that allow_column_override cancels the error
- m = mapper(User, users, properties = {
- 'user_name' : relation(mapper(Address, addresses))
- }, allow_column_override=True)
- clear_mappers()
- # assert that the column being named else where also cancels the error
- m = mapper(User, users, properties = {
- 'user_name' : relation(mapper(Address, addresses)),
- 'foo' : users.c.user_name,
- })
+ session = create_session()
+ q = session.query(User)
+
+ eq_(q.count(), 4)
+ eq_(q.filter(User.id.in_([8,9])).count(), 2)
+ eq_(q.filter(users.c.id.in_([8,9])).count(), 2)
+
+ eq_(session.query(User.id).count(), 4)
+ eq_(session.query(User.id).filter(User.id.in_((8, 9))).count(), 2)
+
+ @testing.resolve_artifact_names
+ def test_many_to_many_count(self):
+ mapper(Keyword, keywords)
+ mapper(Item, items, properties=dict(
+ keywords = relation(Keyword, item_keywords, lazy=True)))
+
+ session = create_session()
+ q = (session.query(Item).
+ join('keywords').
+ distinct().
+ filter(Keyword.name == "red"))
+ eq_(q.count(), 2)
+
+ @testing.resolve_artifact_names
+ def test_override_1(self):
+ """Overriding a column raises an error."""
+ def go():
+ mapper(User, users,
+ properties=dict(
+ name=relation(mapper(Address, addresses))))
+ self.assertRaises(sa.exc.ArgumentError, go)
+
+ @testing.resolve_artifact_names
+ def test_override_2(self):
+ """allow_column_override cancels the error."""
+ mapper(User, users,
+ allow_column_override=True,
+ properties=dict(
+ name=relation(mapper(Address, addresses))))
+
+ @testing.resolve_artifact_names
+ def test_override_3(self):
+ """The column being named elsewhere also cancels the error,"""
+ mapper(User, users,
+ properties=dict(
+ name=relation(mapper(Address, addresses)),
+ foo=users.c.name))
+
+ @testing.resolve_artifact_names
def test_synonym(self):
- sess = create_session()
assert_col = []
class extendedproperty(property):
return 'value'
class User(object):
- def _get_user_name(self):
- assert_col.append(('get', self.user_name))
- return self.user_name
- def _set_user_name(self, name):
+ def _get_name(self):
+ assert_col.append(('get', self.name))
+ return self.name
+ def _set_name(self, name):
assert_col.append(('set', name))
- self.user_name = name
- uname = extendedproperty(_get_user_name, _set_user_name)
+ self.name = name
+ uname = extendedproperty(_get_name, _set_name)
- mapper(User, users, properties = dict(
+ mapper(User, users, properties=dict(
addresses = relation(mapper(Address, addresses), lazy=True),
- uname = synonym('user_name'),
+ uname = synonym('name'),
adlist = synonym('addresses', proxy=True),
adname = synonym('addresses')
))
assert hasattr(User, 'adlist')
- assert hasattr(User, 'adname') # as of 0.4.2, synonyms always create a property
+ # as of 0.4.2, synonyms always create a property
+ assert hasattr(User, 'adname')
# test compile
assert not isinstance(User.uname == 'jack', bool)
+ sess = create_session()
u = sess.query(User).filter(User.uname=='jack').one()
- self.assert_result(u.adlist, Address, *(user_address_result[0]['addresses'][1]))
- addr = sess.query(Address).filter_by(address_id=user_address_result[0]['addresses'][1][0]['address_id']).one()
+ fixture = self.static.user_address_result[0].addresses
+ eq_(u.adlist, fixture)
+
+ addr = sess.query(Address).filter_by(id=fixture[0].id).one()
u = sess.query(User).filter(User.adname.contains(addr)).one()
u2 = sess.query(User).filter(User.adlist.contains(addr)).one()
assert u not in sess.dirty
u.uname = "some user name"
assert len(assert_col) > 0
- assert assert_col == [('set', 'some user name')], str(assert_col)
- assert u.uname == "some user name"
- assert assert_col == [('set', 'some user name'), ('get', 'some user name')], str(assert_col)
- assert u.user_name == "some user name"
+ eq_(assert_col, [('set', 'some user name')])
+ eq_(u.uname, "some user name")
+ eq_(assert_col, [('set', 'some user name'), ('get', 'some user name')])
+ eq_(u.name, "some user name")
assert u in sess.dirty
- assert User.uname.attribute == 123
- assert User.uname['key'] == 'value'
+ eq_(User.uname.attribute, 123)
+ eq_(User.uname['key'], 'value')
+
+ @testing.resolve_artifact_names
+ def test_synonym_column_location(self):
+ def go():
+ mapper(User, users, properties={
+ 'not_name':synonym('_name', map_column=True)})
+
+ self.assertRaisesMessage(
+ sa.exc.ArgumentError,
+ ("Can't compile synonym '_name': no column on table "
+ "'users' named 'not_name'"),
+ go)
+ @testing.resolve_artifact_names
def test_column_synonyms(self):
- """test new-style synonyms which automatically instrument properties, set up aliased column, etc."""
+ """Synonyms which automatically instrument properties, set up aliased column, etc."""
- sess = create_session()
assert_col = []
class User(object):
- def _get_user_name(self):
- assert_col.append(('get', self._user_name))
- return self._user_name
- def _set_user_name(self, name):
+ def _get_name(self):
+ assert_col.append(('get', self._name))
+ return self._name
+ def _set_name(self, name):
assert_col.append(('set', name))
- self._user_name = name
- user_name = property(_get_user_name, _set_user_name)
-
- mapper(Address, addresses)
- try:
- mapper(User, users, properties = {
- 'addresses':relation(Address, lazy=True),
- 'not_user_name':synonym('_user_name', map_column=True)
- })
- User.not_user_name
- assert False
- except sa_exc.ArgumentError, e:
- assert str(e) == "Can't compile synonym '_user_name': no column on table 'users' named 'not_user_name'"
-
- clear_mappers()
+ self._name = name
+ name = property(_get_name, _set_name)
mapper(Address, addresses)
mapper(User, users, properties = {
'addresses':relation(Address, lazy=True),
- 'user_name':synonym('_user_name', map_column=True)
+ 'name':synonym('_name', map_column=True)
})
# test compile
- assert not isinstance(User.user_name == 'jack', bool)
+ assert not isinstance(User.name == 'jack', bool)
- assert hasattr(User, 'user_name')
- assert hasattr(User, '_user_name')
+ assert hasattr(User, 'name')
+ assert hasattr(User, '_name')
- u = sess.query(User).filter(User.user_name == 'jack').one()
- assert u.user_name == 'jack'
- u.user_name = 'foo'
- assert u.user_name == 'foo'
- assert assert_col == [('get', 'jack'), ('set', 'foo'), ('get', 'foo')]
+ sess = create_session()
+ u = sess.query(User).filter(User.name == 'jack').one()
+ eq_(u.name, 'jack')
+ u.name = 'foo'
+ eq_(u.name, 'foo')
+ eq_(assert_col, [('get', 'jack'), ('set', 'foo'), ('get', 'foo')])
+ @testing.resolve_artifact_names
def test_comparable(self):
class extendedproperty(property):
attribute = 123
def __getitem__(self, key):
return 'value'
- class UCComparator(PropComparator):
+ class UCComparator(sa.orm.PropComparator):
def __eq__(self, other):
cls = self.prop.parent.class_
- col = getattr(cls, 'user_name')
+ col = getattr(cls, 'name')
if other is None:
return col == None
else:
- return func.upper(col) == func.upper(other)
+ return sa.func.upper(col) == sa.func.upper(other)
def map_(with_explicit_property):
class User(object):
@extendedproperty
- def uc_user_name(self):
- if self.user_name is None:
+ def uc_name(self):
+ if self.name is None:
return None
- return self.user_name.upper()
+ return self.name.upper()
if with_explicit_property:
- args = (UCComparator, User.uc_user_name)
+ args = (UCComparator, User.uc_name)
else:
args = (UCComparator,)
mapper(User, users, properties=dict(
- uc_user_name = comparable_property(*args)))
+ uc_name = sa.orm.comparable_property(*args)))
return User
for User in (map_(True), map_(False)):
sess.begin()
q = sess.query(User)
- assert hasattr(User, 'user_name')
- assert hasattr(User, 'uc_user_name')
+ assert hasattr(User, 'name')
+ assert hasattr(User, 'uc_name')
# test compile
- assert not isinstance(User.uc_user_name == 'jack', bool)
- u = q.filter(User.uc_user_name=='JACK').one()
+ assert not isinstance(User.uc_name == 'jack', bool)
+ u = q.filter(User.uc_name=='JACK').one()
- assert u.uc_user_name == "JACK"
+ assert u.uc_name == "JACK"
assert u not in sess.dirty
- u.user_name = "some user name"
- assert u.user_name == "some user name"
+ u.name = "some user name"
+ eq_(u.name, "some user name")
assert u in sess.dirty
- assert u.uc_user_name == "SOME USER NAME"
+ eq_(u.uc_name, "SOME USER NAME")
sess.flush()
sess.clear()
q = sess.query(User)
- u2 = q.filter(User.user_name=='some user name').one()
- u3 = q.filter(User.uc_user_name=='SOME USER NAME').one()
+ u2 = q.filter(User.name=='some user name').one()
+ u3 = q.filter(User.uc_name=='SOME USER NAME').one()
assert u2 is u3
- assert User.uc_user_name.attribute == 123
- assert User.uc_user_name['key'] == 'value'
+ eq_(User.uc_name.attribute, 123)
+ eq_(User.uc_name['key'], 'value')
sess.rollback()
-class OptionsTest(MapperSuperTest):
+class OptionsTest(_fixtures.FixtureTest):
+
@testing.fails_on('maxdb')
- def test_synonymoptions(self):
- sess = create_session()
- mapper(User, users, properties = dict(
- addresses = relation(mapper(Address, addresses), lazy = True, order_by=addresses.c.address_id),
- adlist = synonym('addresses', proxy=True)
- ), order_by=users.c.user_id)
+ @testing.resolve_artifact_names
+ def test_synonym_options(self):
+ mapper(User, users, properties=dict(
+ addresses = relation(mapper(Address, addresses), lazy=True,
+ order_by=addresses.c.id),
+ adlist = synonym('addresses', proxy=True)))
+
def go():
- u = sess.query(User).options(eagerload('adlist')).filter_by(user_name='jack').one()
- self.assert_result(u.adlist, Address, *(user_address_result[0]['addresses'][1]))
+ sess = create_session()
+ u = (sess.query(User).
+ order_by(User.id).
+ options(sa.orm.eagerload('adlist')).
+ filter_by(name='jack')).one()
+ eq_(u.adlist,
+ [self.static.user_address_result[0].addresses[0]])
self.assert_sql_count(testing.db, go, 1)
+ @testing.resolve_artifact_names
+ def test_eager_options(self):
+ """A lazy relation can be upgraded to an eager relation."""
+ mapper(User, users, properties=dict(
+ addresses = relation(mapper(Address, addresses),
+ order_by=addresses.c.id)))
- def test_eageroptions(self):
- """tests that a lazy relation can be upgraded to an eager relation via the options method"""
sess = create_session()
- mapper(User, users, properties = dict(
- addresses = relation(mapper(Address, addresses))
- ))
- l = sess.query(User).options(eagerload('addresses')).all()
+ l = (sess.query(User).
+ order_by(User.id).
+ options(sa.orm.eagerload('addresses'))).all()
def go():
- self.assert_result(l, User, *user_address_result)
- self.assert_sql_count(testing.db, go, 0)
+ eq_(l, self.static.user_address_result)
+ self.sql_count_(0, go)
@testing.fails_on('maxdb')
- def test_eageroptionswithlimit(self):
+ @testing.resolve_artifact_names
+ def test_eager_options_with_limit(self):
+ mapper(User, users, properties=dict(
+ addresses=relation(mapper(Address, addresses), lazy=True)))
+
sess = create_session()
- mapper(User, users, properties = dict(
- addresses = relation(mapper(Address, addresses), lazy = True)
- ))
- u = sess.query(User).options(eagerload('addresses')).filter_by(user_id=8).one()
+ u = (sess.query(User).
+ options(sa.orm.eagerload('addresses')).
+ filter_by(id=8)).one()
def go():
- assert u.user_id == 8
- assert len(u.addresses) == 3
- self.assert_sql_count(testing.db, go, 0)
+ eq_(u.id, 8)
+ eq_(len(u.addresses), 3)
+ self.sql_count_(0, go)
sess.clear()
# test that eager loading doesnt modify parent mapper
def go():
- u = sess.query(User).filter_by(user_id=8).one()
- assert u.user_id == 8
- assert len(u.addresses) == 3
+ u = sess.query(User).filter_by(id=8).one()
+ eq_(u.id, 8)
+ eq_(len(u.addresses), 3)
assert "tbl_row_count" not in self.capture_sql(testing.db, go)
@testing.fails_on('maxdb')
- def test_lazyoptionswithlimit(self):
+ @testing.resolve_artifact_names
+ def test_lazy_options_with_limit(self):
+ mapper(User, users, properties=dict(
+ addresses = relation(mapper(Address, addresses), lazy=False)))
+
sess = create_session()
- mapper(User, users, properties = dict(
- addresses = relation(mapper(Address, addresses), lazy=False)
- ))
- u = sess.query(User).options(lazyload('addresses')).filter_by(user_id=8).one()
+ u = (sess.query(User).
+ options(sa.orm.lazyload('addresses')).
+ filter_by(id=8)).one()
def go():
- assert u.user_id == 8
- assert len(u.addresses) == 3
- self.assert_sql_count(testing.db, go, 1)
+ eq_(u.id, 8)
+ eq_(len(u.addresses), 3)
+ self.sql_count_(1, go)
- def test_eagerdegrade(self):
- """tests that an eager relation automatically degrades to a lazy relation if eager columns are not available"""
- sess = create_session()
- usermapper = mapper(User, users, properties = dict(
- addresses = relation(mapper(Address, addresses), lazy=False)
- ))
+ @testing.resolve_artifact_names
+ def test_eager_degrade(self):
+ """An eager relation automatically degrades to a lazy relation if eager columns are not available"""
+ mapper(User, users, properties=dict(
+ addresses = relation(mapper(Address, addresses), lazy=False)))
+ sess = create_session()
# first test straight eager load, 1 statement
def go():
- l = sess.query(usermapper).all()
- self.assert_result(l, User, *user_address_result)
- self.assert_sql_count(testing.db, go, 1)
+ l = sess.query(User).order_by(User.id).all()
+ eq_(l, self.static.user_address_result)
+ self.sql_count_(1, go)
sess.clear()
# then select just from users. run it into instances.
# then assert the data, which will launch 3 more lazy loads
- # (previous users in session fell out of scope and were removed from session's identity map)
+ # (previous users in session fell out of scope and were removed from
+ # session's identity map)
+ r = users.select().order_by(users.c.id).execute()
def go():
- r = users.select().execute()
- l = sess.query(usermapper).instances(r)
- self.assert_result(l, User, *user_address_result)
- self.assert_sql_count(testing.db, go, 4)
-
- clear_mappers()
+ l = sess.query(User).instances(r)
+ eq_(l, self.static.user_address_result)
+ self.sql_count_(4, go)
- sess.clear()
+ @testing.resolve_artifact_names
+ def test_eager_degrade_deep(self):
# test with a deeper set of eager loads. when we first load the three
- # users, they will have no addresses or orders. the number of lazy loads when
- # traversing the whole thing will be three for the addresses and three for the
- # orders.
- # (previous users in session fell out of scope and were removed from session's identity map)
- usermapper = mapper(User, users,
- properties = {
- 'addresses':relation(mapper(Address, addresses), lazy=False, order_by=addresses.default_order_by()),
- 'orders': relation(mapper(Order, orders, properties = {
- 'items' : relation(mapper(Item, orderitems, properties = {
- 'keywords' : relation(mapper(Keyword, keywords), itemkeywords, lazy=False, order_by=itemkeywords.default_order_by())
- }), order_by=orderitems.default_order_by(), lazy=False)
- }), order_by=orders.default_order_by(), lazy=False)
- }, order_by=users.default_order_by())
+ # users, they will have no addresses or orders. the number of lazy
+ # loads when traversing the whole thing will be three for the
+ # addresses and three for the orders.
+ mapper(Address, addresses)
- sess.clear()
+ mapper(Keyword, keywords)
+
+ mapper(Item, items, properties=dict(
+ keywords=relation(Keyword, secondary=item_keywords,
+ lazy=False,
+ order_by=item_keywords.c.keyword_id)))
+
+ mapper(Order, orders, properties=dict(
+ items=relation(Item, secondary=order_items, lazy=False,
+ order_by=order_items.c.item_id)))
+
+ mapper(User, users, properties=dict(
+ addresses=relation(Address, lazy=False,
+ order_by=addresses.default_order_by()),
+ orders=relation(Order, lazy=False,
+ order_by=orders.default_order_by())))
+
+ sess = create_session()
# first test straight eager load, 1 statement
def go():
- l = sess.query(usermapper).all()
- self.assert_result(l, User, *user_all_result)
+ l = sess.query(User).order_by(User.id).all()
+ eq_(l, self.static.user_all_result)
self.assert_sql_count(testing.db, go, 1)
sess.clear()
# then select just from users. run it into instances.
# then assert the data, which will launch 6 more lazy loads
+ r = users.select().execute()
def go():
- r = users.select().execute()
- l = sess.query(usermapper).instances(r)
- self.assert_result(l, User, *user_all_result)
- self.assert_sql_count(testing.db, go, 7)
-
-
- def test_lazyoptions(self):
- """tests that an eager relation can be upgraded to a lazy relation via the options method"""
- sess = create_session()
- mapper(User, users, properties = dict(
+ l = sess.query(User).instances(r)
+ eq_(l, self.static.user_all_result)
+ self.assert_sql_count(testing.db, go, 6)
+
+ @testing.resolve_artifact_names
+ def test_lazy_options(self):
+ """An eager relation can be upgraded to a lazy relation."""
+ mapper(User, users, properties=dict(
addresses = relation(mapper(Address, addresses), lazy=False)
))
- l = sess.query(User).options(lazyload('addresses')).all()
+
+ sess = create_session()
+ l = (sess.query(User).
+ order_by(User.id).
+ options(sa.orm.lazyload('addresses'))).all()
+
def go():
- self.assert_result(l, User, *user_address_result)
- self.assert_sql_count(testing.db, go, 3)
+ eq_(l, self.static.user_address_result)
+ self.sql_count_(4, go)
- def test_deepoptions(self):
- mapper(User, users, order_by=users.default_order_by(),
- properties = {
- 'orders': relation(mapper(Order, orders, properties = {
- 'items' : relation(mapper(Item, orderitems, properties = {
- 'keywords' : relation(mapper(Keyword, keywords), itemkeywords, order_by=itemkeywords.default_order_by())
- }), order_by=orderitems.default_order_by())
- }), order_by=orders.default_order_by())
- })
+class DeepOptionsTest(_fixtures.FixtureTest):
+ @testing.resolve_artifact_names
+ def setup_mappers(self):
+ mapper(Keyword, keywords)
+
+ mapper(Item, items, properties=dict(
+ keywords=relation(Keyword, item_keywords,
+ order_by=item_keywords.default_order_by())))
+
+ mapper(Order, orders, properties=dict(
+ items=relation(Item, order_items,
+ order_by=order_items.default_order_by())))
+
+ mapper(User, users, order_by=users.default_order_by(), properties=dict(
+ orders=relation(Order, order_by=orders.default_order_by())))
+
+ @testing.resolve_artifact_names
+ def test_deep_options_1(self):
sess = create_session()
# eagerload nothing.
u = sess.query(User).all()
def go():
- print u[0].orders[1].items[0].keywords[1]
+ x = u[0].orders[1].items[0].keywords[1]
self.assert_sql_count(testing.db, go, 3)
- sess.clear()
- # eagerload orders.items.keywords; eagerload_all() implies eager load of orders, orders.items
- q2 = sess.query(User).options(eagerload_all('orders.items.keywords'))
- u = q2.all()
+ @testing.resolve_artifact_names
+ def test_deep_options_2(self):
+ sess = create_session()
+
+ # eagerload orders.items.keywords; eagerload_all() implies eager load
+ # of orders, orders.items
+ l = (sess.query(User).
+ options(sa.orm.eagerload_all('orders.items.keywords'))).all()
def go():
- print u[0].orders[1].items[0].keywords[1]
- self.assert_sql_count(testing.db, go, 0)
+ x = l[0].orders[1].items[0].keywords[1]
+ self.sql_count_(0, go)
- sess.clear()
+
+ @testing.resolve_artifact_names
+ def test_deep_options_3(self):
+ sess = create_session()
# same thing, with separate options calls
- q2 = sess.query(User).options(eagerload('orders')).options(eagerload('orders.items')).options(eagerload('orders.items.keywords'))
+ q2 = (sess.query(User).
+ options(sa.orm.eagerload('orders')).
+ options(sa.orm.eagerload('orders.items')).
+ options(sa.orm.eagerload('orders.items.keywords')))
u = q2.all()
def go():
- print u[0].orders[1].items[0].keywords[1]
- self.assert_sql_count(testing.db, go, 0)
+ x = u[0].orders[1].items[0].keywords[1]
+ self.sql_count_(0, go)
- sess.clear()
+ @testing.resolve_artifact_names
+ def test_deep_options_4(self):
+ sess = create_session()
- self.assertRaisesMessage(sa_exc.ArgumentError,
- r"Can't find entity Mapper\|Order\|orders in Query. Current list: \['Mapper\|User\|users'\]",
- sess.query(User).options, eagerload(Order.items)
- )
+ self.assertRaisesMessage(
+ sa.exc.ArgumentError,
+ r"Can't find entity Mapper\|Order\|orders in Query. "
+ r"Current list: \['Mapper\|User\|users'\]",
+ sess.query(User).options, sa.orm.eagerload(Order.items))
- # eagerload "keywords" on items. it will lazy load "orders", then lazy load
- # the "items" on the order, but on "items" it will eager load the "keywords"
- q3 = sess.query(User).options(eagerload('orders.items.keywords'))
+ # eagerload "keywords" on items. it will lazy load "orders", then
+ # lazy load the "items" on the order, but on "items" it will eager
+ # load the "keywords"
+ q3 = sess.query(User).options(sa.orm.eagerload('orders.items.keywords'))
u = q3.all()
- self.assert_sql_count(testing.db, go, 2)
+ def go():
+ x = u[0].orders[1].items[0].keywords[1]
+ self.sql_count_(2, go)
-class DeferredTest(MapperSuperTest):
+class DeferredTest(_fixtures.FixtureTest):
+ @testing.resolve_artifact_names
def test_basic(self):
- """tests a basic "deferred" load"""
+ """A basic deferred load."""
- m = mapper(Order, orders, properties={
- 'description':deferred(orders.c.description)
- }, order_by=orders.c.order_id)
+ mapper(Order, orders, order_by=orders.c.id, properties={
+ 'description': deferred(orders.c.description)})
o = Order()
self.assert_(o.description is None)
- q = create_session().query(m)
+ q = create_session().query(Order)
def go():
l = q.all()
o2 = l[2]
- print o2.description
+ x = o2.description
+
+ self.sql_eq_(go, [
+ ("SELECT orders.id AS orders_id, "
+ "orders.user_id AS orders_user_id, "
+ "orders.address_id AS orders_address_id, "
+ "orders.isopen AS orders_isopen "
+ "FROM orders ORDER BY orders.id", {}),
+ ("SELECT orders.description AS orders_description "
+ "FROM orders WHERE orders.id = :param_1",
+ {'param_1':3})])
+
+ @testing.resolve_artifact_names
+ def test_unsaved(self):
+ """Deferred loading does not kick in when just PK cols are set."""
- self.assert_sql(testing.db, go, [
- ("SELECT orders.order_id AS orders_order_id, orders.user_id AS orders_user_id, orders.isopen AS orders_isopen FROM orders ORDER BY orders.order_id", {}),
- ("SELECT orders.description AS orders_description FROM orders WHERE orders.order_id = :param_1", {'param_1':3})
- ])
+ mapper(Order, orders, properties={
+ 'description': deferred(orders.c.description)})
- def test_unsaved(self):
- """test that deferred loading doesnt kick in when just PK cols are set"""
- m = mapper(Order, orders, properties={
- 'description':deferred(orders.c.description)
- }, order_by=orders.c.order_id)
+ sess = create_session()
+ o = Order()
+ sess.save(o)
+ o.id = 7
+ def go():
+ o.description = "some description"
+ self.sql_count_(0, go)
+
+ @testing.resolve_artifact_names
+ def test_unsaved_2(self):
+ mapper(Order, orders, properties={
+ 'description': deferred(orders.c.description)})
+
+ sess = create_session()
+ o = Order()
+ sess.add(o)
+ def go():
+ o.description = "some description"
+ self.sql_count_(0, go)
+
+ @testing.resolve_artifact_names
+ def test_unsaved_group(self):
+ """Deferred loading doesnt kick in when just PK cols are set"""
+
+ mapper(Order, orders, order_by=orders.c.id, properties=dict(
+ description=deferred(orders.c.description, group='primary'),
+ opened=deferred(orders.c.isopen, group='primary')))
sess = create_session()
o = Order()
sess.save(o)
- o.order_id = 7
+ o.id = 7
def go():
o.description = "some description"
- self.assert_sql_count(testing.db, go, 0)
+ self.sql_count_(0, go)
- def test_unsavedgroup(self):
- """test that deferred loading doesnt kick in when just PK cols are set"""
- m = mapper(Order, orders, properties={
- 'description':deferred(orders.c.description, group='primary'),
- 'opened':deferred(orders.c.isopen, group='primary')
- }, order_by=orders.c.order_id)
+ @testing.resolve_artifact_names
+ def test_unsaved_group_2(self):
+ mapper(Order, orders, order_by=orders.c.id, properties=dict(
+ description=deferred(orders.c.description, group='primary'),
+ opened=deferred(orders.c.isopen, group='primary')))
sess = create_session()
o = Order()
sess.save(o)
- o.order_id = 7
def go():
o.description = "some description"
- self.assert_sql_count(testing.db, go, 0)
+ self.sql_count_(0, go)
+ @testing.resolve_artifact_names
def test_save(self):
m = mapper(Order, orders, properties={
- 'description':deferred(orders.c.description)
- }, order_by=orders.c.order_id)
+ 'description': deferred(orders.c.description)})
sess = create_session()
- q = sess.query(m)
- l = q.all()
- o2 = l[2]
+ o2 = sess.query(Order).get(2)
o2.isopen = 1
sess.flush()
+ @testing.resolve_artifact_names
def test_group(self):
- """tests deferred load with a group"""
- m = mapper(Order, orders, properties = {
- 'userident':deferred(orders.c.user_id, group='primary'),
- 'description':deferred(orders.c.description, group='primary'),
- 'opened':deferred(orders.c.isopen, group='primary')
- }, order_by=orders.c.order_id)
+ """Deferred load with a group"""
+ mapper(Order, orders, properties={
+ 'userident': deferred(orders.c.user_id, group='primary'),
+ 'addrident': deferred(orders.c.address_id, group='primary'),
+ 'description': deferred(orders.c.description, group='primary'),
+ 'opened': deferred(orders.c.isopen, group='primary')
+ })
+
sess = create_session()
- q = sess.query(m)
+ q = sess.query(Order).order_by(Order.id)
def go():
l = q.all()
o2 = l[2]
- print o2.opened, o2.description, o2.userident
- assert o2.opened == 1
- assert o2.userident == 7
- assert o2.description == 'order 3'
-
- self.assert_sql(testing.db, go, [
- ("SELECT orders.order_id AS orders_order_id FROM orders ORDER BY orders.order_id", {}),
- ("SELECT orders.user_id AS orders_user_id, orders.description AS orders_description, orders.isopen AS orders_isopen FROM orders WHERE orders.order_id = :param_1", {'param_1':3})
- ])
+ eq_(o2.opened, 1)
+ eq_(o2.userident, 7)
+ eq_(o2.description, 'order 3')
+
+ self.sql_eq_(go, [
+ ("SELECT orders.id AS orders_id "
+ "FROM orders ORDER BY orders.id", {}),
+ ("SELECT orders.user_id AS orders_user_id, "
+ "orders.address_id AS orders_address_id, "
+ "orders.description AS orders_description, "
+ "orders.isopen AS orders_isopen "
+ "FROM orders WHERE orders.id = :param_1",
+ {'param_1':3})])
o2 = q.all()[2]
- assert o2.description == 'order 3'
+ eq_(o2.description, 'order 3')
assert o2 not in sess.dirty
o2.description = 'order 3'
def go():
sess.flush()
- self.assert_sql_count(testing.db, go, 0)
+ self.sql_count_(0, go)
+ @testing.resolve_artifact_names
def test_preserve_changes(self):
- """test that the deferred load operation doesn't revert modifications on attributes"""
-
+ """A deferred load operation doesn't revert modifications on attributes"""
mapper(Order, orders, properties = {
- 'userident':deferred(orders.c.user_id, group='primary'),
- 'description':deferred(orders.c.description, group='primary'),
- 'opened':deferred(orders.c.isopen, group='primary')
- }, order_by=orders.c.order_id)
+ 'userident': deferred(orders.c.user_id, group='primary'),
+ 'description': deferred(orders.c.description, group='primary'),
+ 'opened': deferred(orders.c.isopen, group='primary')
+ })
sess = create_session()
o = sess.query(Order).get(3)
assert 'userident' not in o.__dict__
o.description = 'somenewdescription'
- assert o.description == 'somenewdescription'
+ eq_(o.description, 'somenewdescription')
def go():
- assert o.opened == 1
+ eq_(o.opened, 1)
self.assert_sql_count(testing.db, go, 1)
- assert o.description == 'somenewdescription'
+ eq_(o.description, 'somenewdescription')
assert o in sess.dirty
+ @testing.resolve_artifact_names
+ def test_commits_state(self):
+ """
+ When deferred elements are loaded via a group, they get the proper
+ CommittedState and don't result in changes being committed
- def test_commitsstate(self):
- """test that when deferred elements are loaded via a group, they get the proper CommittedState
- and dont result in changes being committed"""
-
- m = mapper(Order, orders, properties = {
+ """
+ mapper(Order, orders, properties = {
'userident':deferred(orders.c.user_id, group='primary'),
'description':deferred(orders.c.description, group='primary'),
- 'opened':deferred(orders.c.isopen, group='primary')
- })
+ 'opened':deferred(orders.c.isopen, group='primary')})
+
sess = create_session()
- q = sess.query(m)
- o2 = q.all()[2]
+ o2 = sess.query(Order).get(3)
+
# this will load the group of attributes
- assert o2.description == 'order 3'
+ eq_(o2.description, 'order 3')
assert o2 not in sess.dirty
# this will mark it as 'dirty', but nothing actually changed
o2.description = 'order 3'
- def go():
- # therefore the flush() shouldnt actually issue any SQL
- sess.flush()
- self.assert_sql_count(testing.db, go, 0)
+ # therefore the flush() shouldnt actually issue any SQL
+ self.assert_sql_count(testing.db, sess.flush, 0)
+ @testing.resolve_artifact_names
def test_options(self):
- """tests using options on a mapper to create deferred and undeferred columns"""
- m = mapper(Order, orders, order_by=orders.c.order_id)
- sess = create_session()
- q = sess.query(m)
- q2 = q.options(defer('user_id'))
- def go():
- l = q2.all()
- print l[2].user_id
+ """Options on a mapper to create deferred and undeferred columns"""
- self.assert_sql(testing.db, go, [
- ("SELECT orders.order_id AS orders_order_id, orders.description AS orders_description, orders.isopen AS orders_isopen FROM orders ORDER BY orders.order_id", {}),
- ("SELECT orders.user_id AS orders_user_id FROM orders WHERE orders.order_id = :param_1", {'param_1':3})
- ])
- sess.clear()
- q3 = q2.options(undefer('user_id'))
- def go():
- l = q3.all()
- print l[3].user_id
- self.assert_sql(testing.db, go, [
- ("SELECT orders.order_id AS orders_order_id, orders.user_id AS orders_user_id, orders.description AS orders_description, orders.isopen AS orders_isopen FROM orders ORDER BY orders.order_id", {}),
- ])
+ mapper(Order, orders)
- def test_undefergroup(self):
- """tests undefer_group()"""
+ sess = create_session()
+ q = sess.query(Order).order_by(Order.id).options(defer('user_id'))
+
+ self.sql_eq_(q.all, [
+ ("SELECT orders.id AS orders_id, "
+ "orders.address_id AS orders_address_id, "
+ "orders.description AS orders_description, "
+ "orders.isopen AS orders_isopen "
+ "FROM orders ORDER BY orders.id", {}),
+ ("SELECT orders.user_id AS orders_user_id "
+ "FROM orders WHERE orders.id = :param_1",
+ {'param_1':3})])
+ sess.clear()
- m = mapper(Order, orders, properties = {
+ q2 = q.options(sa.orm.undefer('user_id'))
+ self.sql_eq_(q2.all, [
+ ("SELECT orders.id AS orders_id, "
+ "orders.user_id AS orders_user_id, "
+ "orders.address_id AS orders_address_id, "
+ "orders.description AS orders_description, "
+ "orders.isopen AS orders_isopen "
+ "FROM orders ORDER BY orders.id",
+ {})])
+
+ @testing.resolve_artifact_names
+ def test_undefer_group(self):
+ mapper(Order, orders, properties={
'userident':deferred(orders.c.user_id, group='primary'),
'description':deferred(orders.c.description, group='primary'),
- 'opened':deferred(orders.c.isopen, group='primary')
- }, order_by=orders.c.order_id)
+ 'opened':deferred(orders.c.isopen, group='primary')})
+
sess = create_session()
- q = sess.query(m)
+ q = sess.query(Order).order_by(Order.id)
def go():
- l = q.options(undefer_group('primary')).all()
+ l = q.options(sa.orm.undefer_group('primary')).all()
o2 = l[2]
- print o2.opened, o2.description, o2.userident
- assert o2.opened == 1
- assert o2.userident == 7
- assert o2.description == 'order 3'
-
- self.assert_sql(testing.db, go, [
- ("SELECT orders.user_id AS orders_user_id, orders.description AS orders_description, orders.isopen AS orders_isopen, orders.order_id AS orders_order_id FROM orders ORDER BY orders.order_id", {}),
- ])
-
+ eq_(o2.opened, 1)
+ eq_(o2.userident, 7)
+ eq_(o2.description, 'order 3')
+
+ self.sql_eq_(go, [
+ ("SELECT orders.user_id AS orders_user_id, "
+ "orders.description AS orders_description, "
+ "orders.isopen AS orders_isopen, "
+ "orders.id AS orders_id, "
+ "orders.address_id AS orders_address_id "
+ "FROM orders ORDER BY orders.id",
+ {})])
+
+ @testing.resolve_artifact_names
def test_locates_col(self):
- """test that manually adding a col to the result undefers the column"""
+ """Manually adding a column to the result undefers the column."""
mapper(Order, orders, properties={
- 'description':deferred(orders.c.description)
- }, order_by=orders.c.order_id)
+ 'description':deferred(orders.c.description)})
sess = create_session()
- o1 = sess.query(Order).first()
+ o1 = sess.query(Order).order_by(Order.id).first()
def go():
- assert o1.description == 'order 1'
- self.assert_sql_count(testing.db, go, 1)
+ eq_(o1.description, 'order 1')
+ self.sql_count_(1, go)
sess = create_session()
- o1 = sess.query(Order).add_column(orders.c.description).first()[0]
+ o1 = (sess.query(Order).
+ order_by(Order.id).
+ add_column(orders.c.description).first())[0]
def go():
- assert o1.description == 'order 1'
- self.assert_sql_count(testing.db, go, 0)
-
- def test_deepoptions(self):
- m = mapper(User, users, properties={
- 'orders':relation(mapper(Order, orders, properties={
- 'items':relation(mapper(Item, orderitems, properties={
- 'item_name':deferred(orderitems.c.item_name)
- }))
- }))
- })
+ eq_(o1.description, 'order 1')
+ self.sql_count_(0, go)
+
+ @testing.resolve_artifact_names
+ def test_deep_options(self):
+ mapper(Item, items, properties=dict(
+ description=deferred(items.c.description)))
+ mapper(Order, orders, properties=dict(
+ items=relation(Item, secondary=order_items)))
+ mapper(User, users, properties=dict(
+ orders=relation(Order, order_by=orders.c.id)))
+
sess = create_session()
- q = sess.query(m)
+ q = sess.query(User).order_by(User.id)
l = q.all()
item = l[0].orders[1].items[1]
def go():
- print item.item_name
- self.assert_sql_count(testing.db, go, 1)
- self.assert_(item.item_name == 'item 4')
+ eq_(item.description, 'item 4')
+ self.sql_count_(1, go)
+ eq_(item.description, 'item 4')
+
sess.clear()
- q2 = q.options(undefer('orders.items.item_name'))
- l = q2.all()
+ l = q.options(sa.orm.undefer('orders.items.description')).all()
item = l[0].orders[1].items[1]
def go():
- print item.item_name
- self.assert_sql_count(testing.db, go, 0)
- self.assert_(item.item_name == 'item 4')
+ eq_(item.description, 'item 4')
+ self.sql_count_(0, go)
+ eq_(item.description, 'item 4')
+
+
+class CompositeTypesTest(_base.MappedTest):
-class CompositeTypesTest(ORMTest):
def define_tables(self, metadata):
- global graphs, edges
- graphs = Table('graphs', metadata,
+ Table('graphs', metadata,
Column('id', Integer, primary_key=True),
Column('version_id', Integer, primary_key=True),
Column('name', String(30)))
- edges = Table('edges', metadata,
- Column('id', Integer, Sequence('seq_edges_id'), primary_key=True),
+ Table('edges', metadata,
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('graph_id', Integer, nullable=False),
Column('graph_version_id', Integer, nullable=False),
Column('x1', Integer),
Column('y1', Integer),
Column('x2', Integer),
Column('y2', Integer),
- ForeignKeyConstraint(['graph_id', 'graph_version_id'], ['graphs.id', 'graphs.version_id'])
- )
+ sa.ForeignKeyConstraint(
+ ['graph_id', 'graph_version_id'],
+ ['graphs.id', 'graphs.version_id']))
+ @testing.resolve_artifact_names
def test_basic(self):
class Point(object):
def __init__(self, x, y):
'edges':relation(Edge)
})
mapper(Edge, edges, properties={
- 'start':composite(Point, edges.c.x1, edges.c.y1),
- 'end':composite(Point, edges.c.x2, edges.c.y2)
+ 'start':sa.orm.composite(Point, edges.c.x1, edges.c.y1),
+ 'end': sa.orm.composite(Point, edges.c.x2, edges.c.y2)
})
sess = create_session()
sess.clear()
g2 = sess.query(Graph).get([g.id, g.version_id])
for e1, e2 in zip(g.edges, g2.edges):
- assert e1.start == e2.start
- assert e1.end == e2.end
+ eq_(e1.start, e2.start)
+ eq_(e1.end, e2.end)
g2.edges[1].end = Point(18, 4)
sess.flush()
sess.clear()
e = sess.query(Edge).get(g2.edges[1].id)
- assert e.end == Point(18, 4)
+ eq_(e.end, Point(18, 4))
e.end.x = 19
e.end.y = 5
sess.flush()
sess.clear()
- assert sess.query(Edge).get(g2.edges[1].id).end == Point(19, 5)
+ eq_(sess.query(Edge).get(g2.edges[1].id).end, Point(19, 5))
g.edges[1].end = Point(19, 5)
sess.clear()
def go():
- g2 = sess.query(Graph).options(eagerload('edges')).get([g.id, g.version_id])
+ g2 = (sess.query(Graph).
+ options(sa.orm.eagerload('edges'))).get([g.id, g.version_id])
for e1, e2 in zip(g.edges, g2.edges):
- assert e1.start == e2.start
- assert e1.end == e2.end
+ eq_(e1.start, e2.start)
+ eq_(e1.end, e2.end)
self.assert_sql_count(testing.db, go, 1)
# test comparison of CompositeProperties to their object instances
assert sess.query(Edge).filter(Edge.start!=Point(3, 4)).first() is g.edges[1]
- assert sess.query(Edge).filter(Edge.start==None).all() == []
-
+ eq_(sess.query(Edge).filter(Edge.start==None).all(), [])
+ @testing.resolve_artifact_names
def test_pk(self):
- """test using a composite type as a primary key"""
+ """Using a composite type as a primary key"""
class Version(object):
def __init__(self, id, version):
self.version = version
mapper(Graph, graphs, properties={
- 'version':composite(Version, graphs.c.id, graphs.c.version_id)
- })
+ 'version':sa.orm.composite(Version, graphs.c.id,
+ graphs.c.version_id)})
sess = create_session()
g = Graph(Version(1, 1))
sess.clear()
g2 = sess.query(Graph).get([1, 1])
- assert g.version == g2.version
+ eq_(g.version, g2.version)
sess.clear()
g2 = sess.query(Graph).get(Version(1, 1))
- assert g.version == g2.version
+ eq_(g.version, g2.version)
+class NoLoadTest(_fixtures.FixtureTest):
+ run_inserts = 'once'
+ run_deletes = None
-class NoLoadTest(MapperSuperTest):
+ @testing.resolve_artifact_names
def test_basic(self):
- """tests a basic one-to-many lazy load"""
- m = mapper(User, users, properties = dict(
+ """A basic one-to-many lazy load"""
+ m = mapper(User, users, properties=dict(
addresses = relation(mapper(Address, addresses), lazy=None)
))
q = create_session().query(m)
l = [None]
def go():
- x = q.filter(users.c.user_id == 7).all()
+ x = q.filter(User.id == 7).all()
x[0].addresses
l[0] = x
self.assert_sql_count(testing.db, go, 1)
self.assert_result(l[0], User,
- {'user_id' : 7, 'addresses' : (Address, [])},
+ {'id' : 7, 'addresses' : (Address, [])},
)
+ @testing.resolve_artifact_names
def test_options(self):
- m = mapper(User, users, properties = dict(
+ m = mapper(User, users, properties=dict(
addresses = relation(mapper(Address, addresses), lazy=None)
))
- q = create_session().query(m).options(lazyload('addresses'))
+ q = create_session().query(m).options(sa.orm.lazyload('addresses'))
l = [None]
def go():
- x = q.filter(users.c.user_id == 7).all()
+ x = q.filter(User.id == 7).all()
x[0].addresses
l[0] = x
- self.assert_sql_count(testing.db, go, 2)
+ self.sql_count_(2, go)
self.assert_result(l[0], User,
- {'user_id' : 7, 'addresses' : (Address, [{'address_id' : 1}])},
+ {'id' : 7, 'addresses' : (Address, [{'id' : 1}])},
)
-class MapperExtensionTest(TestBase):
- def setUpAll(self):
- tables.create()
-
- def tearDown(self):
- clear_mappers()
- tables.delete()
- def tearDownAll(self):
- tables.drop()
+class MapperExtensionTest(_fixtures.FixtureTest):
def extension(self):
methods = []
- class Ext(MapperExtension):
+ class Ext(sa.orm.MapperExtension):
def instrument_class(self, mapper, cls):
methods.append('instrument_class')
- return EXT_CONTINUE
+ return sa.orm.EXT_CONTINUE
def init_instance(self, mapper, class_, oldinit, instance, args, kwargs):
methods.append('init_instance')
- return EXT_CONTINUE
+ return sa.orm.EXT_CONTINUE
def init_failed(self, mapper, class_, oldinit, instance, args, kwargs):
methods.append('init_failed')
- return EXT_CONTINUE
+ return sa.orm.EXT_CONTINUE
def load(self, query, *args, **kwargs):
methods.append('load')
- return EXT_CONTINUE
+ return sa.orm.EXT_CONTINUE
def get(self, query, *args, **kwargs):
methods.append('get')
- return EXT_CONTINUE
+ return sa.orm.EXT_CONTINUE
def translate_row(self, mapper, context, row):
methods.append('translate_row')
- return EXT_CONTINUE
+ return sa.orm.EXT_CONTINUE
def create_instance(self, mapper, selectcontext, row, class_):
methods.append('create_instance')
- return EXT_CONTINUE
+ return sa.orm.EXT_CONTINUE
def append_result(self, mapper, selectcontext, row, instance, result, **flags):
methods.append('append_result')
- return EXT_CONTINUE
+ return sa.orm.EXT_CONTINUE
def populate_instance(self, mapper, selectcontext, row, instance, **flags):
methods.append('populate_instance')
- return EXT_CONTINUE
+ return sa.orm.EXT_CONTINUE
def before_insert(self, mapper, connection, instance):
methods.append('before_insert')
- return EXT_CONTINUE
+ return sa.orm.EXT_CONTINUE
def after_insert(self, mapper, connection, instance):
methods.append('after_insert')
- return EXT_CONTINUE
+ return sa.orm.EXT_CONTINUE
def before_update(self, mapper, connection, instance):
methods.append('before_update')
- return EXT_CONTINUE
+ return sa.orm.EXT_CONTINUE
def after_update(self, mapper, connection, instance):
methods.append('after_update')
- return EXT_CONTINUE
+ return sa.orm.EXT_CONTINUE
def before_delete(self, mapper, connection, instance):
methods.append('before_delete')
- return EXT_CONTINUE
+ return sa.orm.EXT_CONTINUE
def after_delete(self, mapper, connection, instance):
methods.append('after_delete')
- return EXT_CONTINUE
+ return sa.orm.EXT_CONTINUE
return Ext, methods
+ @testing.resolve_artifact_names
def test_basic(self):
"""test that common user-defined methods get called."""
Ext, methods = self.extension()
mapper(User, users, extension=Ext())
sess = create_session()
- u = User()
+ u = User(name='u1')
sess.save(u)
sess.flush()
- u = sess.query(User).load(u.user_id)
+ u = sess.query(User).load(u.id)
sess.clear()
- u = sess.query(User).get(u.user_id)
- u.user_name = 'foobar'
+ u = sess.query(User).get(u.id)
+ u.name = 'u1 changed'
sess.flush()
sess.delete(u)
sess.flush()
- self.assertEquals(methods,
+ eq_(methods,
['instrument_class', 'init_instance', 'before_insert',
'after_insert', 'load', 'translate_row', 'populate_instance',
'append_result', 'get', 'translate_row', 'create_instance',
'populate_instance', 'append_result', 'before_update',
'after_update', 'before_delete', 'after_delete'])
-
+ @testing.resolve_artifact_names
def test_inheritance(self):
Ext, methods = self.extension()
mapper(AdminUser, addresses, inherits=User)
sess = create_session()
- am = AdminUser()
+ am = AdminUser(name='au1', email_address='au1@e1')
sess.save(am)
sess.flush()
- am = sess.query(AdminUser).load(am.user_id)
+ am = sess.query(AdminUser).load(am.id)
sess.clear()
- am = sess.query(AdminUser).get(am.user_id)
- am.user_name = 'foobar'
+ am = sess.query(AdminUser).get(am.id)
+ am.name = 'au1 changed'
sess.flush()
sess.delete(am)
sess.flush()
- self.assertEquals(methods,
+ eq_(methods,
['instrument_class', 'instrument_class', 'init_instance',
'before_insert', 'after_insert', 'load', 'translate_row',
'populate_instance', 'append_result', 'get', 'translate_row',
'create_instance', 'populate_instance', 'append_result',
'before_update', 'after_update', 'before_delete', 'after_delete'])
+ @testing.resolve_artifact_names
def test_after_with_no_changes(self):
- # test that after_update is called even if no cols were updated
+ """after_update is called even if no columns were updated."""
Ext, methods = self.extension()
- mapper(Item, orderitems, extension=Ext() , properties={
- 'keywords':relation(Keyword, secondary=itemkeywords)
- })
- mapper(Keyword, keywords, extension=Ext() )
+ mapper(Item, items, extension=Ext() , properties={
+ 'keywords': relation(Keyword, secondary=item_keywords)})
+ mapper(Keyword, keywords, extension=Ext())
sess = create_session()
- i1 = Item()
- k1 = Keyword()
+ i1 = Item(description="i1")
+ k1 = Keyword(name="k1")
sess.save(i1)
sess.save(k1)
sess.flush()
- self.assertEquals(methods,
+ eq_(methods,
['instrument_class', 'instrument_class', 'init_instance',
'init_instance', 'before_insert', 'after_insert',
'before_insert', 'after_insert'])
del methods[:]
i1.keywords.append(k1)
sess.flush()
- self.assertEquals(methods, ['before_update', 'after_update'])
+ eq_(methods, ['before_update', 'after_update'])
+ @testing.resolve_artifact_names
def test_inheritance_with_dupes(self):
+ """Inheritance with the same extension instance on both mappers."""
Ext, methods = self.extension()
- # test using inheritance, same extension on both mappers
class AdminUser(User):
pass
mapper(AdminUser, addresses, inherits=User, extension=ext)
sess = create_session()
- am = AdminUser()
+ am = AdminUser(name="au1", email_address="au1@e1")
sess.save(am)
sess.flush()
- am = sess.query(AdminUser).load(am.user_id)
+ am = sess.query(AdminUser).load(am.id)
sess.clear()
- am = sess.query(AdminUser).get(am.user_id)
- am.user_name = 'foobar'
+ am = sess.query(AdminUser).get(am.id)
+ am.name = 'au1 changed'
sess.flush()
sess.delete(am)
sess.flush()
- self.assertEquals(methods,
+ eq_(methods,
['instrument_class', 'instrument_class', 'init_instance',
'before_insert', 'after_insert', 'load', 'translate_row',
'populate_instance', 'append_result', 'get', 'translate_row',
'create_instance', 'populate_instance', 'append_result',
'before_update', 'after_update', 'before_delete', 'after_delete'])
+ @testing.resolve_artifact_names
def test_single_instrumentor(self):
ext_None, methods_None = self.extension()
ext_x, methods_x = self.extension()
def reset():
- clear_mappers()
+ sa.orm.clear_mappers()
del methods_None[:]
del methods_x[:]
mapper(User, users, extension=ext_x(), entity_name='x')
User()
- self.assertEquals(methods_None, ['instrument_class', 'init_instance'])
- self.assertEquals(methods_x, [])
+ eq_(methods_None, ['instrument_class', 'init_instance'])
+ eq_(methods_x, [])
reset()
mapper(User, users, extension=ext_None())
User()
- self.assertEquals(methods_x, ['instrument_class', 'init_instance'])
- self.assertEquals(methods_None, [])
+ eq_(methods_x, ['instrument_class', 'init_instance'])
+ eq_(methods_None, [])
reset()
mapper(User, users, extension=ext_y(), entity_name='y')
User()
- self.assertEquals(methods_x, ['instrument_class', 'init_instance'])
- self.assertEquals(methods_y, [])
+ eq_(methods_x, ['instrument_class', 'init_instance'])
+ eq_(methods_y, [])
-class RequirementsTest(ORMTest):
+class RequirementsTest(_base.MappedTest):
"""Tests the contract for user classes."""
def define_tables(self, metadata):
- global t1, t2, t3, t4, t5, t6
-
- t1 = Table('ht1', metadata,
- Column('id', Integer, Sequence('seq_ht1_id'), primary_key=True),
- Column('value', String(10)))
- t2 = Table('ht2', metadata,
- Column('id', Integer, Sequence('seq_ht2_id'), primary_key=True),
- Column('ht1_id', Integer, ForeignKey('ht1.id')),
- Column('value', String(10)))
- t3 = Table('ht3', metadata,
- Column('id', Integer, Sequence('seq_ht3_id'), primary_key=True),
- Column('value', String(10)))
- t4 = Table('ht4', metadata,
- Column('ht1_id', Integer, ForeignKey('ht1.id'),
- primary_key=True),
- Column('ht3_id', Integer, ForeignKey('ht3.id'),
- primary_key=True))
- t5 = Table('ht5', metadata,
- Column('ht1_id', Integer, ForeignKey('ht1.id'),
- primary_key=True),
- )
- t6 = Table('ht6', metadata,
- Column('ht1a_id', Integer, ForeignKey('ht1.id'),
- primary_key=True),
- Column('ht1b_id', Integer, ForeignKey('ht1.id'),
- primary_key=True),
- Column('value', String(10)))
-
+ Table('ht1', metadata,
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('value', String(10)))
+ Table('ht2', metadata,
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('ht1_id', Integer, ForeignKey('ht1.id')),
+ Column('value', String(10)))
+ Table('ht3', metadata,
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('value', String(10)))
+ Table('ht4', metadata,
+ Column('ht1_id', Integer, ForeignKey('ht1.id'),
+ primary_key=True),
+ Column('ht3_id', Integer, ForeignKey('ht3.id'),
+ primary_key=True))
+ Table('ht5', metadata,
+ Column('ht1_id', Integer, ForeignKey('ht1.id'),
+ primary_key=True))
+ Table('ht6', metadata,
+ Column('ht1a_id', Integer, ForeignKey('ht1.id'),
+ primary_key=True),
+ Column('ht1b_id', Integer, ForeignKey('ht1.id'),
+ primary_key=True),
+ Column('value', String(10)))
+
+ @testing.resolve_artifact_names
def test_baseclass(self):
class OldStyle:
pass
- self.assertRaises(sa_exc.ArgumentError, mapper, OldStyle, t1)
+ self.assertRaises(sa.exc.ArgumentError, mapper, OldStyle, ht1)
class NoWeakrefSupport(str):
pass
# TODO: is weakref support detectable without an instance?
- #self.assertRaises(sa_exc.ArgumentError, mapper, NoWeakrefSupport, t2)
+ #self.assertRaises(sa.exc.ArgumentError, mapper, NoWeakrefSupport, t2)
+ @testing.resolve_artifact_names
def test_comparison_overrides(self):
"""Simple tests to ensure users can supply comparison __methods__.
# adding these methods directly to each class to avoid decoration
# by the testlib decorators.
- class H1(object):
- def __init__(self, value='abc'):
- self.value = value
- def __nonzero__(self):
- return False
- def __hash__(self):
- return hash(self.value)
- def __eq__(self, other):
- if isinstance(other, type(self)):
- return self.value == other.value
- return False
- class H2(object):
- def __init__(self, value='abc'):
- self.value = value
- def __nonzero__(self):
- return False
- def __hash__(self):
- return hash(self.value)
- def __eq__(self, other):
- if isinstance(other, type(self)):
- return self.value == other.value
- return False
- class H3(object):
- def __init__(self, value='abc'):
- self.value = value
- def __nonzero__(self):
- return False
- def __hash__(self):
- return hash(self.value)
- def __eq__(self, other):
- if isinstance(other, type(self)):
- return self.value == other.value
- return False
- class H6(object):
+ class _Base(object):
def __init__(self, value='abc'):
self.value = value
def __nonzero__(self):
return self.value == other.value
return False
- mapper(H1, t1, properties={
+ class H1(_Base):
+ pass
+ class H2(_Base):
+ pass
+ class H3(_Base):
+ pass
+ class H6(_Base):
+ pass
+
+ mapper(H1, ht1, properties={
'h2s': relation(H2, backref='h1'),
- 'h3s': relation(H3, secondary=t4, backref='h1s'),
- 'h1s': relation(H1, secondary=t5, backref='parent_h1'),
+ 'h3s': relation(H3, secondary=ht4, backref='h1s'),
+ 'h1s': relation(H1, secondary=ht5, backref='parent_h1'),
't6a': relation(H6, backref='h1a',
- primaryjoin=t1.c.id==t6.c.ht1a_id),
+ primaryjoin=ht1.c.id==ht6.c.ht1a_id),
't6b': relation(H6, backref='h1b',
- primaryjoin=t1.c.id==t6.c.ht1b_id),
+ primaryjoin=ht1.c.id==ht6.c.ht1b_id),
})
- mapper(H2, t2)
- mapper(H3, t3)
- mapper(H6, t6)
+ mapper(H2, ht2)
+ mapper(H3, ht3)
+ mapper(H6, ht6)
s = create_session()
for i in range(3):
h1.h1s.append(H1())
s.flush()
- self.assertEquals(t1.count().scalar(), 4)
+ eq_(ht1.count().scalar(), 4)
h6 = H6()
h6.h1a = h1
h1.h2s.extend([H2(), H2()])
s.flush()
- h1s = s.query(H1).options(eagerload('h2s')).all()
- self.assertEqual(len(h1s), 5)
+ h1s = s.query(H1).options(sa.orm.eagerload('h2s')).all()
+ eq_(len(h1s), 5)
self.assert_unordered_result(h1s, H1,
{'h2s': []},
{'h2s': []},
{'h2s': (H2, [{'value': 'abc'}])})
- h1s = s.query(H1).options(eagerload('h3s')).all()
+ h1s = s.query(H1).options(sa.orm.eagerload('h3s')).all()
- self.assertEqual(len(h1s), 5)
- h1s = s.query(H1).options(eagerload_all('t6a.h1b'),
- eagerload('h2s'),
- eagerload_all('h3s.h1s')).all()
- self.assertEqual(len(h1s), 5)
+ eq_(len(h1s), 5)
+ h1s = s.query(H1).options(sa.orm.eagerload_all('t6a.h1b'),
+ sa.orm.eagerload('h2s'),
+ sa.orm.eagerload_all('h3s.h1s')).all()
+ eq_(len(h1s), 5)
-class NoEqFoo(object):
- def __init__(self, data):
- self.data = data
- def __eq__(self, other):
- raise NotImplementedError()
- def __ne__(self, other):
- raise NotImplementedError()
-class MagicNamesTest(ORMTest):
+class MagicNamesTest(_base.MappedTest):
def define_tables(self, metadata):
Table('cartographers', metadata,
Column('cart_id', Integer,
ForeignKey('cartographers.id')),
Column('state', String(2)),
- Column('data', Text))
+ Column('data', sa.Text))
- def tables(self):
- cat = testing._otest_metadata.tables
- return cat['cartographers'], cat['maps']
-
- def classes(self):
- class Base(object):
- def __init__(self, **kw):
- for key, value in kw.iteritems():
- setattr(self, key, value)
- class Cartographer(Base): pass
- class Map(Base): pass
+ def setup_classes(self):
+ class Cartographer(_base.BasicEntity):
+ pass
- return Cartographer, Map
+ class Map(_base.BasicEntity):
+ pass
- @testing.future
+ @testing.resolve_artifact_names
def test_mappish(self):
- t1, t2 = self.tables()
- Cartographer, Map = self.classes()
- mapper(Cartographer, t1, properties=dict(
- query=t1.c.quip))
- mapper(Map, t2, properties=dict(
+ mapper(Cartographer, cartographers, properties=dict(
+ query=cartographers.c.quip))
+ mapper(Map, maps, properties=dict(
mapper=relation(Cartographer, backref='maps')))
c = Cartographer(name='Lenny', alias='The Dude',
sess.flush()
sess.clear()
- for C, M in ((Cartographer, Map), (aliased(Cartographer), aliased(Map))):
- print C, M
+ for C, M in ((Cartographer, Map),
+ (sa.orm.aliased(Cartographer), sa.orm.aliased(Map))):
c1 = (sess.query(C).
filter(C.alias=='The Dude').
filter(C.query=='Where be dragons?')).one()
m1 = sess.query(M).filter(M.mapper==c1).one()
- @testing.future
- def test_stateish(self):
- from sqlalchemy.orm import attributes
- if hasattr(attributes, 'ClassManager'):
- syn1 = attributes.ClassManager.STATE_ATTR
- syn2 = attributes.ClassManager.MANAGER_ATTR
- else:
- syn1 = '_state'
- syn2 = '_class_state'
-
+ @testing.resolve_artifact_names
+ def test_direct_stateish(self):
+ for reserved in (sa.orm.attributes.ClassManager.STATE_ATTR,
+ sa.orm.attributes.ClassManager.MANAGER_ATTR):
+ t = Table('t', sa.MetaData(),
+ Column('id', Integer, primary_key=True),
+ Column(reserved, Integer))
+ class T(object):
+ pass
- t1, t2 = self.tables()
- Cartographer, Map = self.classes()
- mapper(Map, t2, properties=dict(
- syn1=t2.c.state,
- syn2=t2.c.data))
+ self.assertRaisesMessage(
+ KeyError,
+ ('%r: requested attribute name conflicts with '
+ 'instrumentation attribute of the same name.' % reserved),
+ mapper, T, t)
+
+ @testing.resolve_artifact_names
+ def test_indirect_stateish(self):
+ for reserved in (sa.orm.attributes.ClassManager.STATE_ATTR,
+ sa.orm.attributes.ClassManager.MANAGER_ATTR):
+ class M(object):
+ pass
- m = Map()
- setattr(m, syn1, 'AK')
- setattr(m, syn2, '10x10')
+ self.assertRaisesMessage(
+ KeyError,
+ ('requested attribute name conflicts with '
+ 'instrumentation attribute of the same name'),
+ mapper, M, maps, properties={
+ reserved: maps.c.state})
- sess = create_session()
- sess.save(m)
- sess.flush()
- sess.clear()
- for M in (Map, aliased(Map)):
- print M
- sess.query(M).filter(getattr(M, syn1) == 'AK').one()
- sess.query(M).filter(getattr(M, syn2) == '10x10').one()
+class ScalarRequirementsTest(_base.MappedTest):
+ # TODO: is this needed here?
+ # what does this suite excercise that unitofwork doesn't?
-class ScalarRequirementsTest(ORMTest):
def define_tables(self, metadata):
- import pickle
- global t1
- t1 = Table('t1', metadata,
- Column('id', Integer, Sequence('seq_t1_id'), primary_key=True),
- Column('data', PickleType(pickler=pickle)) # dont use cPickle due to import weirdness
- )
+ Table('t1', metadata,
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('data', sa.PickleType()))
+
+ def setup_classes(self):
+ class Foo(_base.ComparableEntity):
+ pass
+ @testing.resolve_artifact_names
def test_correct_comparison(self):
+ mapper(Foo, t1)
- class H1(fixtures.Base):
- pass
+ f1 = Foo(data=pickleable.NotComparable('12345'))
- mapper(H1, t1)
+ session = create_session()
+ session.add(f1)
+ session.flush()
+ session.clear()
- h1 = H1(data=NoEqFoo('12345'))
- s = create_session()
- s.save(h1)
- s.flush()
- s.clear()
- h1 = s.get(H1, h1.id)
- assert h1.data.data == '12345'
+ f1 = session.get(Foo, f1.id)
+ eq_(f1.data.data, '12345')
+
+ f2 = Foo(data=pickleable.BrokenComparable('abc'))
+
+ session.add(f2)
+ session.flush()
+ session.clear()
+
+ f2 = session.get(Foo, f2.id)
+ eq_(f2.data.data, 'abc')
if __name__ == "__main__":