From: Jason Kirtland Date: Tue, 13 May 2008 16:39:47 +0000 (+0000) Subject: - Reworked test/orm/mapper X-Git-Tag: rel_0_5beta1~92 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=b97c8f2d60713ff26833e30871607ab29ff347cc;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - Reworked test/orm/mapper - Exposed some uncovered (and broken) functionality - Fixed [ticket:1038] --- diff --git a/test/orm/_base.py b/test/orm/_base.py index 444d4b88cc..b952952d76 100644 --- a/test/orm/_base.py +++ b/test/orm/_base.py @@ -234,6 +234,10 @@ class MappedTest(ORMTest): def sql_count_(self, count, fn): self.assert_sql_count(self.metadata.bind, fn, count) + def sql_eq_(self, callable_, statements, with_sequences=None): + self.assert_sql(self.metadata.bind, + callable_, statements, with_sequences) + @classmethod def _register_new_class_artifacts(cls, baseline): for class_ in subclasses(BasicEntity) - baseline: diff --git a/test/orm/_fixtures.py b/test/orm/_fixtures.py index 09fab5dbaa..5f133bf1cb 100644 --- a/test/orm/_fixtures.py +++ b/test/orm/_fixtures.py @@ -45,6 +45,17 @@ addresses = fixture_table( (4, 8, "ed@lala.com"), (5, 9, "fred@fred.com")) +email_bounces = fixture_table( + Table('email_bounces', fixture_metadata, + Column('id', Integer, ForeignKey('addresses.id')), + Column('bounces', Integer)), + ('id', 'bounces'), + (1, 1), + (2, 0), + (3, 5), + (4, 0), + (5, 0)) + orders = fixture_table( Table('orders', fixture_metadata, Column('id', Integer, primary_key=True), @@ -285,6 +296,14 @@ class FixtureTest(_base.MappedTest): class CannedResults(object): """Built on demand, instances use mappers in effect at time of call.""" + @property + def user_result(self): + return [ + User(id=7), + User(id=8), + User(id=9), + User(id=10)] + @property def user_address_result(self): return [ diff --git a/test/orm/deprecations.py b/test/orm/deprecations.py index 3bc38da2a8..71d8070598 100644 --- a/test/orm/deprecations.py +++ b/test/orm/deprecations.py @@ -148,12 +148,12 @@ class QueryAlternativesTest(_base.MappedTest): """ session = create_session() - avgs = list(session.query(Address).values(func.avg(Address.bounces))) + avgs = list(session.query(Address).values(func.sum(Address.bounces))) avg = avgs[0][0] - assert avg > 0 and avg < 10 + assert avg == 11 - avg = session.query(func.avg(Address.bounces)).one()[0] - assert avg > 0 and avg < 10 + avg = session.query(func.sum(Address.bounces)).one()[0] + assert avg == 11 @testing.resolve_artifact_names def test_count_by(self): diff --git a/test/orm/mapper.py b/test/orm/mapper.py index 34bfb451a7..b397a8b4c7 100644 --- a/test/orm/mapper.py +++ b/test/orm/mapper.py @@ -1,54 +1,48 @@ -"""tests general mapper operations with an emphasis on selecting/loading""" +"""General mapper operations with an emphasis on selecting/loading.""" import testenv; testenv.configure_for_tests() -from sqlalchemy import * -from sqlalchemy import exc as sa_exc, sql -from sqlalchemy.orm import * -from testlib import * -from testlib import fixtures -from testlib.tables import * -import testlib.tables as tables - - -class MapperSuperTest(TestBase, AssertsExecutionResults): - def setUpAll(self): - tables.create() - tables.data() - def tearDownAll(self): - tables.drop() - def tearDown(self): - clear_mappers() - def setUp(self): - pass - -class MapperTest(MapperSuperTest): - - def test_propconflict(self): - """test that a backref created against an existing mapper with a property name - conflict raises a decent error message""" +from testlib import sa, testing +from testlib.sa import MetaData, Table, Column, Integer, String, ForeignKey +from testlib.sa.orm import mapper, relation, backref, create_session +from testlib.sa.orm import defer, deferred, synonym +from testlib.testing import eq_ +from testlib.compat import set +import pickleable +from orm import _base, _fixtures + + +class MapperTest(_fixtures.FixtureTest): + + @testing.resolve_artifact_names + def test_prop_shadow(self): + """A backref name may not shadow an existing property name.""" + mapper(Address, addresses) mapper(User, users, properties={ 'addresses':relation(Address, backref='email_address') }) - self.assertRaises(sa_exc.ArgumentError, compile_mappers) + self.assertRaises(sa.exc.ArgumentError, sa.orm.compile_mappers) + @testing.resolve_artifact_names def test_prop_accessor(self): mapper(User, users) - self.assertRaises(NotImplementedError, getattr, class_mapper(User), 'properties') + self.assertRaises(NotImplementedError, + getattr, sa.orm.class_mapper(User), 'properties') @testing.uses_deprecated( 'Call to deprecated function _instance_key', 'Call to deprecated function _sa_session_id', 'Call to deprecated function _entity_name') - def test_legacy_accesors(self): - u1 = User() + @testing.resolve_artifact_names + def test_legacy_accessors(self): + u1 = User(name='u1') assert not hasattr(u1, '_instance_key') assert not hasattr(u1, '_sa_session_id') assert not hasattr(u1, '_entity_name') mapper(User, users) - u1 = User() + u1 = User(name='u2') assert not hasattr(u1, '_instance_key') assert not hasattr(u1, '_sa_session_id') assert u1._entity_name is None @@ -56,84 +50,104 @@ class MapperTest(MapperSuperTest): sess = create_session() sess.save(u1) assert not hasattr(u1, '_instance_key') - assert u1._sa_session_id == sess.hash_key + eq_(u1._sa_session_id, sess.hash_key) assert u1._entity_name is None sess.flush() - assert u1._instance_key == class_mapper(u1).identity_key_from_instance(u1) - assert u1._sa_session_id == sess.hash_key + eq_(u1._instance_key, + sa.orm.class_mapper(u1).identity_key_from_instance(u1)) + eq_(u1._sa_session_id, sess.hash_key) assert u1._entity_name is None sess.delete(u1) sess.flush() - def test_badcascade(self): + @testing.resolve_artifact_names + def test_bad_cascade(self): mapper(Address, addresses) - self.assertRaises(sa_exc.ArgumentError, relation, Address, cascade="fake, all, delete-orphan") + self.assertRaises(sa.exc.ArgumentError, + relation, Address, cascade="fake, all, delete-orphan") - def test_columnprefix(self): + @testing.resolve_artifact_names + def test_column_prefix(self): mapper(User, users, column_prefix='_', properties={ - 'user_name':synonym('_user_name') + 'user_name': synonym('_name') }) s = create_session() u = s.get(User, 7) - assert u._user_name=='jack' - assert u._user_id ==7 + eq_(u._name, 'jack') + eq_(u._id,7) u2 = s.query(User).filter_by(user_name='jack').one() assert u is u2 - def test_no_pks(self): - s = select([users.c.user_name]).alias('foo') - self.assertRaises(sa_exc.ArgumentError, mapper, User, s) - - def test_recompile_on_othermapper(self): - """test the global '_new_mappers' flag such that a compile - trigger on an already-compiled mapper still triggers a check against all mappers.""" - - from sqlalchemy.orm import mapperlib - + @testing.resolve_artifact_names + def test_no_pks_1(self): + s = sa.select([users.c.name]).alias('foo') + self.assertRaises(sa.exc.ArgumentError, mapper, User, s) + + @testing.emits_warning( + 'mapper Mapper|User|Select object creating an alias for ' + 'the given selectable - use Class attributes for queries') + @testing.resolve_artifact_names + def test_no_pks_2(self): + s = sa.select([users.c.name]) + self.assertRaises(sa.exc.ArgumentError, mapper, User, s) + + @testing.resolve_artifact_names + def test_recompile_on_other_mapper(self): + """A compile trigger on an already-compiled mapper still triggers a check against all mappers.""" mapper(User, users) - compile_mappers() - assert mapperlib._new_mappers is False + sa.orm.compile_mappers() + assert sa.orm.mapperlib._new_mappers is False m = mapper(Address, addresses, properties={ 'user': relation(User, backref="addresses")}) assert m.compiled is False - assert mapperlib._new_mappers is True + assert sa.orm.mapperlib._new_mappers is True u = User() assert User.addresses - assert mapperlib._new_mappers is False + assert sa.orm.mapperlib._new_mappers is False - def test_compileonsession(self): + @testing.resolve_artifact_names + def test_compile_on_session(self): m = mapper(User, users) session = create_session() session.connection(m) - def test_incompletecolumns(self): - """test loading from a select which does not contain all columns""" + @testing.resolve_artifact_names + def test_incomplete_columns(self): + """Loading from a select which does not contain all columns""" mapper(Address, addresses) s = create_session() - a = s.query(Address).from_statement(select([addresses.c.address_id, addresses.c.user_id])).first() - assert a.user_id == 7 - assert a.address_id == 1 + a = s.query(Address).from_statement( + sa.select([addresses.c.id, addresses.c.user_id])).first() + eq_(a.user_id, 7) + eq_(a.id, 1) # email address auto-defers assert 'email_addres' not in a.__dict__ - assert a.email_address == 'jack@bean.com' + eq_(a.email_address, 'jack@bean.com') - def test_badconstructor(self): - """test that if the construction of a mapped class fails, the instnace does not get placed in the session""" + @testing.resolve_artifact_names + def test_bad_constructor(self): + """If the construction of a mapped class fails, the instance does not get placed in the session""" class Foo(object): def __init__(self, one, two, _sa_session=None): pass - mapper(Foo, users) + + mapper(Foo, users, extension=sa.orm.scoped_session( + create_session).extension) + sess = create_session() self.assertRaises(TypeError, Foo, 'one', _sa_session=sess) - assert len(list(sess)) == 0 + eq_(len(list(sess)), 0) self.assertRaises(TypeError, Foo, 'one') + Foo('one', 'two', _sa_session=sess) + eq_(len(list(sess)), 1) - def test_constructorexc(self): - """test that exceptions raised in the mapped class are not masked by sa decorations""" + @testing.resolve_artifact_names + def test_constructor_exc_1(self): + """Exceptions raised in the mapped class are not masked by sa decorations""" ex = AssertionError('oops') sess = create_session() @@ -148,8 +162,9 @@ class MapperTest(MapperSuperTest): except Exception, e: assert e is ex - clear_mappers() - mapper(Foo, users, extension=scoped_session(create_session).extension) + sa.orm.clear_mappers() + mapper(Foo, users, extension=sa.orm.scoped_session( + create_session).extension) def bad_expunge(foo): raise Exception("this exception should be stated as a warning") @@ -158,12 +173,12 @@ class MapperTest(MapperSuperTest): Foo(_sa_session=sess) assert False except Exception, e: - assert isinstance(e, sa_exc.SAWarning), e + assert isinstance(e, sa.exc.SAWarning), e - clear_mappers() + @testing.resolve_artifact_names + def test_constructor_exc_2(self): + """TypeError is raised for illegal constructor args, whether or not explicit __init__ is present [ticket:908].""" - # test that TypeError is raised for illegal constructor args, - # whether or not explicit __init__ is present [ticket:908] class Foo(object): def __init__(self): pass @@ -184,126 +199,134 @@ class MapperTest(MapperSuperTest): except TypeError: assert True + @testing.resolve_artifact_names def test_props(self): m = mapper(User, users, properties = { 'addresses' : relation(mapper(Address, addresses)) }).compile() - self.assert_(User.addresses.property is m.get_property('addresses')) + assert User.addresses.property is m.get_property('addresses') - def test_compileonprop(self): + @testing.resolve_artifact_names + def test_compile_on_prop_1(self): mapper(User, users, properties = { 'addresses' : relation(mapper(Address, addresses)) }) User.addresses.any(Address.email_address=='foo@bar.com') - clear_mappers() + @testing.resolve_artifact_names + def test_compile_on_prop_2(self): mapper(User, users, properties = { 'addresses' : relation(mapper(Address, addresses)) }) - self.assertEquals((User.user_id==3).__str__(), (users.c.user_id==3).__str__()) - - clear_mappers() + eq_(str(User.id == 3), str(users.c.id==3)) + @testing.resolve_artifact_names + def test_compile_on_prop_3(self): class Foo(User):pass mapper(User, users) mapper(Foo, addresses, inherits=User) - assert getattr(Foo().__class__, 'user_name').impl is not None + assert getattr(Foo().__class__, 'name').impl is not None - def test_compileon_getprops(self): + @testing.resolve_artifact_names + def test_compile_on_get_props_1(self): m =mapper(User, users) - assert not m.compiled assert list(m.iterate_properties) assert m.compiled - clear_mappers() + @testing.resolve_artifact_names + def test_compile_on_get_props_2(self): m= mapper(User, users) assert not m.compiled - assert m.get_property('user_name') + assert m.get_property('name') assert m.compiled + @testing.resolve_artifact_names def test_add_property(self): assert_col = [] - class User(object): - def _get_user_name(self): - assert_col.append(('get', self._user_name)) - return self._user_name - def _set_user_name(self, name): + class User(_base.ComparableEntity): + def _get_name(self): + assert_col.append(('get', self._name)) + return self._name + def _set_name(self, name): assert_col.append(('set', name)) - self._user_name = name - user_name = property(_get_user_name, _set_user_name) + self._name = name + name = property(_get_name, _set_name) - def _uc_user_name(self): - if self._user_name is None: + def _uc_name(self): + if self._name is None: return None - return self._user_name.upper() - uc_user_name = property(_uc_user_name) - uc_user_name2 = property(_uc_user_name) + return self._name.upper() + uc_name = property(_uc_name) + uc_name2 = property(_uc_name) m = mapper(User, users) mapper(Address, addresses) - class UCComparator(PropComparator): + class UCComparator(sa.orm.PropComparator): def __eq__(self, other): cls = self.prop.parent.class_ - col = getattr(cls, 'user_name') + col = getattr(cls, 'name') if other is None: return col == None else: - return func.upper(col) == func.upper(other) + return sa.func.upper(col) == sa.func.upper(other) - m.add_property('_user_name', deferred(users.c.user_name)) - m.add_property('user_name', synonym('_user_name')) + m.add_property('_name', deferred(users.c.name)) + m.add_property('name', synonym('_name')) m.add_property('addresses', relation(Address)) - m.add_property('uc_user_name', comparable_property(UCComparator)) - m.add_property('uc_user_name2', comparable_property( - UCComparator, User.uc_user_name2)) + m.add_property('uc_name', sa.orm.comparable_property(UCComparator)) + m.add_property('uc_name2', sa.orm.comparable_property( + UCComparator, User.uc_name2)) sess = create_session(autocommit=False) assert sess.query(User).get(7) - u = sess.query(User).filter_by(user_name='jack').one() + u = sess.query(User).filter_by(name='jack').one() def go(): - self.assert_result([u], User, user_address_result[0]) - assert u.user_name == 'jack' - assert u.uc_user_name == 'JACK' - assert u.uc_user_name2 == 'JACK' - assert assert_col == [('get', 'jack')], str(assert_col) - self.assert_sql_count(testing.db, go, 2) + eq_(len(u.addresses), + len(self.static.user_address_result[0].addresses)) + eq_(u.name, 'jack') + eq_(u.uc_name, 'JACK') + eq_(u.uc_name2, 'JACK') + eq_(assert_col, [('get', 'jack')], str(assert_col)) + self.sql_count_(2, go) u.name = 'ed' u3 = User() - u3.user_name = 'some user' + u3.name = 'some user' sess.save(u3) sess.flush() sess.rollback() + @testing.resolve_artifact_names def test_replace_property(self): m = mapper(User, users) - m.add_property('_user_name',users.c.user_name) - m.add_property('user_name', synonym('_user_name', proxy=True)) + m.add_property('_name',users.c.name) + m.add_property('name', synonym('_name', proxy=True)) sess = create_session() - u = sess.query(User).filter_by(user_name='jack').one() - assert u._user_name == 'jack' - assert u.user_name == 'jack' - u.user_name = 'jacko' - assert m._columntoproperty[users.c.user_name] is m.get_property('_user_name') + u = sess.query(User).filter_by(name='jack').one() + eq_(u._name, 'jack') + eq_(u.name, 'jack') + u.name = 'jacko' + assert m._columntoproperty[users.c.name] is m.get_property('_name') - clear_mappers() + sa.orm.clear_mappers() m = mapper(User, users) - m.add_property('user_name', synonym('_user_name', map_column=True)) + m.add_property('name', synonym('_name', map_column=True)) sess.clear() - u = sess.query(User).filter_by(user_name='jack').one() - assert u._user_name == 'jack' - assert u.user_name == 'jack' - u.user_name = 'jacko' - assert m._columntoproperty[users.c.user_name] is m.get_property('_user_name') + u = sess.query(User).filter_by(name='jack').one() + eq_(u._name, 'jack') + eq_(u.name, 'jack') + u.name = 'jacko' + assert m._columntoproperty[users.c.name] is m.get_property('_name') + @testing.resolve_artifact_names def test_synonym_replaces_backref(self): assert_calls = [] class Address(object): @@ -319,7 +342,7 @@ class MapperTest(MapperSuperTest): mapper(Address, addresses, properties={ 'user':synonym('_user') }) - compile_mappers() + sa.orm.compile_mappers() # later, backref sets up the prop mapper(User, users, properties={ @@ -331,12 +354,13 @@ class MapperTest(MapperSuperTest): u2 = sess.query(User).get(8) # comparaison ops need to work a1 = sess.query(Address).filter(Address.user==u1).one() - assert a1.address_id == 1 + eq_(a1.id, 1) a1.user = u2 assert a1.user is u2 - self.assertEquals(assert_calls, ["set", "get"]) + eq_(assert_calls, ["set", "get"]) - def test_self_ref_syn(self): + @testing.resolve_artifact_names + def test_self_ref_synonym(self): t = Table('nodes', MetaData(), Column('id', Integer, primary_key=True), Column('parent_id', Integer, ForeignKey('nodes.id'))) @@ -355,8 +379,9 @@ class MapperTest(MapperSuperTest): n1.children.append(n2) assert n2.parent is n2._parent is n1 assert n1.children[0] is n1._children[0] is n2 - self.assertEquals(str(Node.parent == n2), ":param_1 = nodes.parent_id") + eq_(str(Node.parent == n2), ":param_1 = nodes.parent_id") + @testing.resolve_artifact_names def test_illegal_non_primary(self): mapper(User, users) mapper(Address, addresses) @@ -365,17 +390,19 @@ class MapperTest(MapperSuperTest): 'addresses':relation(Address) }).compile() assert False - except sa_exc.ArgumentError, e: + except sa.exc.ArgumentError, e: assert "Attempting to assign a new relation 'addresses' to a non-primary mapper on class 'User'" in str(e) + @testing.resolve_artifact_names def test_illegal_non_primary_2(self): try: mapper(User, users, non_primary=True) assert False - except sa_exc.InvalidRequestError, e: + except sa.exc.InvalidRequestError, e: assert "Configure a primary mapper first" in str(e) - def test_propfilters(self): + @testing.resolve_artifact_names + def test_prop_filters(self): t = Table('person', MetaData(), Column('id', Integer, primary_key=True), Column('type', String(128)), @@ -409,12 +436,12 @@ class MapperTest(MapperSuperTest): column_prefix="p_") p_m.compile() - #compile_mappers() + #sa.orm.compile_mappers() def assert_props(cls, want): have = set([n for n in dir(cls) if not n.startswith('_')]) want = set(want) - self.assert_(have == want, repr(have) + " " + repr(want)) + eq_(have, want) assert_props(Person, ['id', 'name', 'type']) assert_props(Employee, ['boss', 'boss_id', 'employee_number', @@ -425,142 +452,165 @@ class MapperTest(MapperSuperTest): assert_props(Hoho, ['id', 'name', 'type']) assert_props(Lala, ['p_employee_number', 'p_id', 'p_name', 'p_type']) - def test_mappingtojoin(self): - """test mapping to a join""" - usersaddresses = sql.join(users, addresses, users.c.user_id == addresses.c.user_id) - m = mapper(User, usersaddresses, primary_key=[users.c.user_id]) - q = create_session().query(m) - l = q.all() - self.assert_result(l, User, *user_result[0:2]) - - def test_mappingtojoinnopk(self): - metadata = MetaData() - account_ids_table = Table('account_ids', metadata, - Column('account_id', Integer, Sequence('seq_account_id'), primary_key=True), - Column('username', String(20))) - account_stuff_table = Table('account_stuff', metadata, - Column('account_id', Integer, ForeignKey('account_ids.account_id')), - Column('credit', Numeric)) - class A(object):pass - m = mapper(A, account_ids_table.join(account_stuff_table)) + @testing.resolve_artifact_names + def test_mapping_to_join(self): + """Mapping to a join""" + usersaddresses = sa.join(users, addresses, + users.c.id == addresses.c.user_id) + mapper(User, usersaddresses, primary_key=[users.c.id]) + l = create_session().query(User).order_by(users.c.id).all() + eq_(l, self.static.user_result[:3]) + + @testing.resolve_artifact_names + def test_mapping_to_join_no_pk(self): + m = mapper(Address, addresses.join(email_bounces)) m.compile() - assert account_ids_table in m._pks_by_table - assert account_stuff_table not in m._pks_by_table - metadata.create_all(testing.db) - try: - sess = create_session(bind=testing.db) - a = A() - sess.save(a) - sess.flush() - assert testing.db.execute(account_ids_table.count()).scalar() == 1 - assert testing.db.execute(account_stuff_table.count()).scalar() == 0 - finally: - metadata.drop_all(testing.db) - - def test_mappingtoouterjoin(self): - """test mapping to an outer join, with a composite primary key that allows nulls""" - result = [ - {'user_id' : 7, 'address_id' : 1}, - {'user_id' : 8, 'address_id' : 2}, - {'user_id' : 8, 'address_id' : 3}, - {'user_id' : 8, 'address_id' : 4}, - {'user_id' : 9, 'address_id':None} - ] - - j = join(users, addresses, isouter=True) - m = mapper(User, j, allow_null_pks=True, primary_key=[users.c.user_id, addresses.c.address_id]) - q = create_session().query(m) - l = q.all() - self.assert_result(l, User, *result) + assert addresses in m._pks_by_table + assert email_bounces not in m._pks_by_table + sess = create_session() + a = Address(email_address='e1') + sess.add(a) + sess.flush() - def test_customjoin(self): - """Tests that select_from totally replace the FROM parameters.""" + eq_(addresses.count().scalar(), 6) + eq_(email_bounces.count().scalar(), 5) - m = mapper(User, users, properties={ - 'orders':relation(mapper(Order, orders, properties={ - 'items':relation(mapper(Item, orderitems)) - })) - }) + @testing.resolve_artifact_names + def test_mapping_to_outerjoin(self): + """Mapping to an outer join with a nullable composite primary key.""" - q = create_session().query(m) - l = (q.select_from(users.join(orders).join(orderitems)). - filter(orderitems.c.item_name=='item 4')) - self.assert_result(l, User, user_result[0]) + mapper(User, users.outerjoin(addresses), + allow_null_pks=True, + primary_key=[users.c.id, addresses.c.id], + properties=dict( + address_id=addresses.c.id)) - def test_orderby(self): - """test ordering at the mapper and query level""" + session = create_session() + l = session.query(User).order_by(User.id, User.address_id).all() + + eq_(l, [ + User(id=7, address_id=1), + User(id=8, address_id=2), + User(id=8, address_id=3), + User(id=8, address_id=4), + User(id=9, address_id=5), + User(id=10, address_id=None)]) + + @testing.resolve_artifact_names + def test_custom_join(self): + """select_from totally replace the FROM parameters.""" + + mapper(Item, items) + + mapper(Order, orders, properties=dict( + items=relation(Item, order_items))) + + mapper(User, users, properties=dict( + orders=relation(Order))) + + session = create_session() + l = (session.query(User). + select_from(users.join(orders). + join(order_items). + join(items)). + filter(items.c.description == 'item 4')).all() + + eq_(l, [self.static.user_result[0]]) + + @testing.resolve_artifact_names + def test_order_by(self): + """Ordering at the mapper and query level""" # TODO: make a unit test out of these various combinations - #m = mapper(User, users, order_by=desc(users.c.user_name)) + #m = mapper(User, users, order_by=desc(users.c.name)) mapper(User, users, order_by=None) #mapper(User, users) - #l = create_session().query(User).select(order_by=[desc(users.c.user_name), asc(users.c.user_id)]) + #l = create_session().query(User).select(order_by=[desc(users.c.name), asc(users.c.user_id)]) l = create_session().query(User).all() #l = create_session().query(User).select(order_by=[]) #l = create_session().query(User).select(order_by=None) - @testing.unsupported('firebird', 'Raises a "expression evaluation not supported" error at prepare time') + # 'Raises a "expression evaluation not supported" error at prepare time + @testing.fails_on('firebird') + @testing.resolve_artifact_names def test_function(self): - """Test mapping to a SELECT statement that has functions in it.""" + """Mapping to a SELECT statement that has functions in it.""" - s = select([users, - (users.c.user_id * 2).label('concat'), - func.count(addresses.c.address_id).label('count')], - users.c.user_id == addresses.c.user_id, - group_by=[c for c in users.c]).alias('myselect') + s = sa.select([users, + (users.c.id * 2).label('concat'), + sa.func.count(addresses.c.id).label('count')], + users.c.id == addresses.c.user_id, + group_by=[c for c in users.c]).alias('myselect') mapper(User, s, order_by=s.default_order_by()) sess = create_session() l = sess.query(User).all() - for u in l: - print "User", u.user_id, u.user_name, u.concat, u.count - assert l[0].concat == l[0].user_id * 2 == 14 - assert l[1].concat == l[1].user_id * 2 == 16 + for idx, total in enumerate((14, 16)): + eq_(l[idx].concat, l[idx].id * 2) + eq_(l[idx].concat, total) + + @testing.resolve_artifact_names def test_count(self): - """Test the count function on Query.""" + """The count function on Query.""" mapper(User, users) - q = create_session().query(User) - self.assert_(q.count()==3) - self.assert_(q.filter(users.c.user_id.in_([8,9])).count()==2) - - def test_manytomany_count(self): - mapper(Item, orderitems, properties = dict( - keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy = True), - )) - q = create_session().query(Item) - assert q.join('keywords').distinct().filter(Keyword.name=="red").count() == 2 - - def test_override(self): - # assert that overriding a column raises an error - try: - m = mapper(User, users, properties = { - 'user_name' : relation(mapper(Address, addresses)), - }).compile() - self.assert_(False, "should have raised ArgumentError") - except sa_exc.ArgumentError, e: - self.assert_(True) - - clear_mappers() - # assert that allow_column_override cancels the error - m = mapper(User, users, properties = { - 'user_name' : relation(mapper(Address, addresses)) - }, allow_column_override=True) - clear_mappers() - # assert that the column being named else where also cancels the error - m = mapper(User, users, properties = { - 'user_name' : relation(mapper(Address, addresses)), - 'foo' : users.c.user_name, - }) + session = create_session() + q = session.query(User) + + eq_(q.count(), 4) + eq_(q.filter(User.id.in_([8,9])).count(), 2) + eq_(q.filter(users.c.id.in_([8,9])).count(), 2) + + eq_(session.query(User.id).count(), 4) + eq_(session.query(User.id).filter(User.id.in_((8, 9))).count(), 2) + + @testing.resolve_artifact_names + def test_many_to_many_count(self): + mapper(Keyword, keywords) + mapper(Item, items, properties=dict( + keywords = relation(Keyword, item_keywords, lazy=True))) + + session = create_session() + q = (session.query(Item). + join('keywords'). + distinct(). + filter(Keyword.name == "red")) + eq_(q.count(), 2) + + @testing.resolve_artifact_names + def test_override_1(self): + """Overriding a column raises an error.""" + def go(): + mapper(User, users, + properties=dict( + name=relation(mapper(Address, addresses)))) + self.assertRaises(sa.exc.ArgumentError, go) + + @testing.resolve_artifact_names + def test_override_2(self): + """allow_column_override cancels the error.""" + mapper(User, users, + allow_column_override=True, + properties=dict( + name=relation(mapper(Address, addresses)))) + + @testing.resolve_artifact_names + def test_override_3(self): + """The column being named elsewhere also cancels the error,""" + mapper(User, users, + properties=dict( + name=relation(mapper(Address, addresses)), + foo=users.c.name)) + + @testing.resolve_artifact_names def test_synonym(self): - sess = create_session() assert_col = [] class extendedproperty(property): @@ -569,31 +619,35 @@ class MapperTest(MapperSuperTest): return 'value' class User(object): - def _get_user_name(self): - assert_col.append(('get', self.user_name)) - return self.user_name - def _set_user_name(self, name): + def _get_name(self): + assert_col.append(('get', self.name)) + return self.name + def _set_name(self, name): assert_col.append(('set', name)) - self.user_name = name - uname = extendedproperty(_get_user_name, _set_user_name) + self.name = name + uname = extendedproperty(_get_name, _set_name) - mapper(User, users, properties = dict( + mapper(User, users, properties=dict( addresses = relation(mapper(Address, addresses), lazy=True), - uname = synonym('user_name'), + uname = synonym('name'), adlist = synonym('addresses', proxy=True), adname = synonym('addresses') )) assert hasattr(User, 'adlist') - assert hasattr(User, 'adname') # as of 0.4.2, synonyms always create a property + # as of 0.4.2, synonyms always create a property + assert hasattr(User, 'adname') # test compile assert not isinstance(User.uname == 'jack', bool) + sess = create_session() u = sess.query(User).filter(User.uname=='jack').one() - self.assert_result(u.adlist, Address, *(user_address_result[0]['addresses'][1])) - addr = sess.query(Address).filter_by(address_id=user_address_result[0]['addresses'][1][0]['address_id']).one() + fixture = self.static.user_address_result[0].addresses + eq_(u.adlist, fixture) + + addr = sess.query(Address).filter_by(id=fixture[0].id).one() u = sess.query(User).filter(User.adname.contains(addr)).one() u2 = sess.query(User).filter(User.adlist.contains(addr)).one() @@ -602,90 +656,91 @@ class MapperTest(MapperSuperTest): assert u not in sess.dirty u.uname = "some user name" assert len(assert_col) > 0 - assert assert_col == [('set', 'some user name')], str(assert_col) - assert u.uname == "some user name" - assert assert_col == [('set', 'some user name'), ('get', 'some user name')], str(assert_col) - assert u.user_name == "some user name" + eq_(assert_col, [('set', 'some user name')]) + eq_(u.uname, "some user name") + eq_(assert_col, [('set', 'some user name'), ('get', 'some user name')]) + eq_(u.name, "some user name") assert u in sess.dirty - assert User.uname.attribute == 123 - assert User.uname['key'] == 'value' + eq_(User.uname.attribute, 123) + eq_(User.uname['key'], 'value') + + @testing.resolve_artifact_names + def test_synonym_column_location(self): + def go(): + mapper(User, users, properties={ + 'not_name':synonym('_name', map_column=True)}) + + self.assertRaisesMessage( + sa.exc.ArgumentError, + ("Can't compile synonym '_name': no column on table " + "'users' named 'not_name'"), + go) + @testing.resolve_artifact_names def test_column_synonyms(self): - """test new-style synonyms which automatically instrument properties, set up aliased column, etc.""" + """Synonyms which automatically instrument properties, set up aliased column, etc.""" - sess = create_session() assert_col = [] class User(object): - def _get_user_name(self): - assert_col.append(('get', self._user_name)) - return self._user_name - def _set_user_name(self, name): + def _get_name(self): + assert_col.append(('get', self._name)) + return self._name + def _set_name(self, name): assert_col.append(('set', name)) - self._user_name = name - user_name = property(_get_user_name, _set_user_name) - - mapper(Address, addresses) - try: - mapper(User, users, properties = { - 'addresses':relation(Address, lazy=True), - 'not_user_name':synonym('_user_name', map_column=True) - }) - User.not_user_name - assert False - except sa_exc.ArgumentError, e: - assert str(e) == "Can't compile synonym '_user_name': no column on table 'users' named 'not_user_name'" - - clear_mappers() + self._name = name + name = property(_get_name, _set_name) mapper(Address, addresses) mapper(User, users, properties = { 'addresses':relation(Address, lazy=True), - 'user_name':synonym('_user_name', map_column=True) + 'name':synonym('_name', map_column=True) }) # test compile - assert not isinstance(User.user_name == 'jack', bool) + assert not isinstance(User.name == 'jack', bool) - assert hasattr(User, 'user_name') - assert hasattr(User, '_user_name') + assert hasattr(User, 'name') + assert hasattr(User, '_name') - u = sess.query(User).filter(User.user_name == 'jack').one() - assert u.user_name == 'jack' - u.user_name = 'foo' - assert u.user_name == 'foo' - assert assert_col == [('get', 'jack'), ('set', 'foo'), ('get', 'foo')] + sess = create_session() + u = sess.query(User).filter(User.name == 'jack').one() + eq_(u.name, 'jack') + u.name = 'foo' + eq_(u.name, 'foo') + eq_(assert_col, [('get', 'jack'), ('set', 'foo'), ('get', 'foo')]) + @testing.resolve_artifact_names def test_comparable(self): class extendedproperty(property): attribute = 123 def __getitem__(self, key): return 'value' - class UCComparator(PropComparator): + class UCComparator(sa.orm.PropComparator): def __eq__(self, other): cls = self.prop.parent.class_ - col = getattr(cls, 'user_name') + col = getattr(cls, 'name') if other is None: return col == None else: - return func.upper(col) == func.upper(other) + return sa.func.upper(col) == sa.func.upper(other) def map_(with_explicit_property): class User(object): @extendedproperty - def uc_user_name(self): - if self.user_name is None: + def uc_name(self): + if self.name is None: return None - return self.user_name.upper() + return self.name.upper() if with_explicit_property: - args = (UCComparator, User.uc_user_name) + args = (UCComparator, User.uc_name) else: args = (UCComparator,) mapper(User, users, properties=dict( - uc_user_name = comparable_property(*args))) + uc_name = sa.orm.comparable_property(*args))) return User for User in (map_(True), map_(False)): @@ -693,462 +748,569 @@ class MapperTest(MapperSuperTest): sess.begin() q = sess.query(User) - assert hasattr(User, 'user_name') - assert hasattr(User, 'uc_user_name') + assert hasattr(User, 'name') + assert hasattr(User, 'uc_name') # test compile - assert not isinstance(User.uc_user_name == 'jack', bool) - u = q.filter(User.uc_user_name=='JACK').one() + assert not isinstance(User.uc_name == 'jack', bool) + u = q.filter(User.uc_name=='JACK').one() - assert u.uc_user_name == "JACK" + assert u.uc_name == "JACK" assert u not in sess.dirty - u.user_name = "some user name" - assert u.user_name == "some user name" + u.name = "some user name" + eq_(u.name, "some user name") assert u in sess.dirty - assert u.uc_user_name == "SOME USER NAME" + eq_(u.uc_name, "SOME USER NAME") sess.flush() sess.clear() q = sess.query(User) - u2 = q.filter(User.user_name=='some user name').one() - u3 = q.filter(User.uc_user_name=='SOME USER NAME').one() + u2 = q.filter(User.name=='some user name').one() + u3 = q.filter(User.uc_name=='SOME USER NAME').one() assert u2 is u3 - assert User.uc_user_name.attribute == 123 - assert User.uc_user_name['key'] == 'value' + eq_(User.uc_name.attribute, 123) + eq_(User.uc_name['key'], 'value') sess.rollback() -class OptionsTest(MapperSuperTest): +class OptionsTest(_fixtures.FixtureTest): + @testing.fails_on('maxdb') - def test_synonymoptions(self): - sess = create_session() - mapper(User, users, properties = dict( - addresses = relation(mapper(Address, addresses), lazy = True, order_by=addresses.c.address_id), - adlist = synonym('addresses', proxy=True) - ), order_by=users.c.user_id) + @testing.resolve_artifact_names + def test_synonym_options(self): + mapper(User, users, properties=dict( + addresses = relation(mapper(Address, addresses), lazy=True, + order_by=addresses.c.id), + adlist = synonym('addresses', proxy=True))) + def go(): - u = sess.query(User).options(eagerload('adlist')).filter_by(user_name='jack').one() - self.assert_result(u.adlist, Address, *(user_address_result[0]['addresses'][1])) + sess = create_session() + u = (sess.query(User). + order_by(User.id). + options(sa.orm.eagerload('adlist')). + filter_by(name='jack')).one() + eq_(u.adlist, + [self.static.user_address_result[0].addresses[0]]) self.assert_sql_count(testing.db, go, 1) + @testing.resolve_artifact_names + def test_eager_options(self): + """A lazy relation can be upgraded to an eager relation.""" + mapper(User, users, properties=dict( + addresses = relation(mapper(Address, addresses), + order_by=addresses.c.id))) - def test_eageroptions(self): - """tests that a lazy relation can be upgraded to an eager relation via the options method""" sess = create_session() - mapper(User, users, properties = dict( - addresses = relation(mapper(Address, addresses)) - )) - l = sess.query(User).options(eagerload('addresses')).all() + l = (sess.query(User). + order_by(User.id). + options(sa.orm.eagerload('addresses'))).all() def go(): - self.assert_result(l, User, *user_address_result) - self.assert_sql_count(testing.db, go, 0) + eq_(l, self.static.user_address_result) + self.sql_count_(0, go) @testing.fails_on('maxdb') - def test_eageroptionswithlimit(self): + @testing.resolve_artifact_names + def test_eager_options_with_limit(self): + mapper(User, users, properties=dict( + addresses=relation(mapper(Address, addresses), lazy=True))) + sess = create_session() - mapper(User, users, properties = dict( - addresses = relation(mapper(Address, addresses), lazy = True) - )) - u = sess.query(User).options(eagerload('addresses')).filter_by(user_id=8).one() + u = (sess.query(User). + options(sa.orm.eagerload('addresses')). + filter_by(id=8)).one() def go(): - assert u.user_id == 8 - assert len(u.addresses) == 3 - self.assert_sql_count(testing.db, go, 0) + eq_(u.id, 8) + eq_(len(u.addresses), 3) + self.sql_count_(0, go) sess.clear() # test that eager loading doesnt modify parent mapper def go(): - u = sess.query(User).filter_by(user_id=8).one() - assert u.user_id == 8 - assert len(u.addresses) == 3 + u = sess.query(User).filter_by(id=8).one() + eq_(u.id, 8) + eq_(len(u.addresses), 3) assert "tbl_row_count" not in self.capture_sql(testing.db, go) @testing.fails_on('maxdb') - def test_lazyoptionswithlimit(self): + @testing.resolve_artifact_names + def test_lazy_options_with_limit(self): + mapper(User, users, properties=dict( + addresses = relation(mapper(Address, addresses), lazy=False))) + sess = create_session() - mapper(User, users, properties = dict( - addresses = relation(mapper(Address, addresses), lazy=False) - )) - u = sess.query(User).options(lazyload('addresses')).filter_by(user_id=8).one() + u = (sess.query(User). + options(sa.orm.lazyload('addresses')). + filter_by(id=8)).one() def go(): - assert u.user_id == 8 - assert len(u.addresses) == 3 - self.assert_sql_count(testing.db, go, 1) + eq_(u.id, 8) + eq_(len(u.addresses), 3) + self.sql_count_(1, go) - def test_eagerdegrade(self): - """tests that an eager relation automatically degrades to a lazy relation if eager columns are not available""" - sess = create_session() - usermapper = mapper(User, users, properties = dict( - addresses = relation(mapper(Address, addresses), lazy=False) - )) + @testing.resolve_artifact_names + def test_eager_degrade(self): + """An eager relation automatically degrades to a lazy relation if eager columns are not available""" + mapper(User, users, properties=dict( + addresses = relation(mapper(Address, addresses), lazy=False))) + sess = create_session() # first test straight eager load, 1 statement def go(): - l = sess.query(usermapper).all() - self.assert_result(l, User, *user_address_result) - self.assert_sql_count(testing.db, go, 1) + l = sess.query(User).order_by(User.id).all() + eq_(l, self.static.user_address_result) + self.sql_count_(1, go) sess.clear() # then select just from users. run it into instances. # then assert the data, which will launch 3 more lazy loads - # (previous users in session fell out of scope and were removed from session's identity map) + # (previous users in session fell out of scope and were removed from + # session's identity map) + r = users.select().order_by(users.c.id).execute() def go(): - r = users.select().execute() - l = sess.query(usermapper).instances(r) - self.assert_result(l, User, *user_address_result) - self.assert_sql_count(testing.db, go, 4) - - clear_mappers() + l = sess.query(User).instances(r) + eq_(l, self.static.user_address_result) + self.sql_count_(4, go) - sess.clear() + @testing.resolve_artifact_names + def test_eager_degrade_deep(self): # test with a deeper set of eager loads. when we first load the three - # users, they will have no addresses or orders. the number of lazy loads when - # traversing the whole thing will be three for the addresses and three for the - # orders. - # (previous users in session fell out of scope and were removed from session's identity map) - usermapper = mapper(User, users, - properties = { - 'addresses':relation(mapper(Address, addresses), lazy=False, order_by=addresses.default_order_by()), - 'orders': relation(mapper(Order, orders, properties = { - 'items' : relation(mapper(Item, orderitems, properties = { - 'keywords' : relation(mapper(Keyword, keywords), itemkeywords, lazy=False, order_by=itemkeywords.default_order_by()) - }), order_by=orderitems.default_order_by(), lazy=False) - }), order_by=orders.default_order_by(), lazy=False) - }, order_by=users.default_order_by()) + # users, they will have no addresses or orders. the number of lazy + # loads when traversing the whole thing will be three for the + # addresses and three for the orders. + mapper(Address, addresses) - sess.clear() + mapper(Keyword, keywords) + + mapper(Item, items, properties=dict( + keywords=relation(Keyword, secondary=item_keywords, + lazy=False, + order_by=item_keywords.c.keyword_id))) + + mapper(Order, orders, properties=dict( + items=relation(Item, secondary=order_items, lazy=False, + order_by=order_items.c.item_id))) + + mapper(User, users, properties=dict( + addresses=relation(Address, lazy=False, + order_by=addresses.default_order_by()), + orders=relation(Order, lazy=False, + order_by=orders.default_order_by()))) + + sess = create_session() # first test straight eager load, 1 statement def go(): - l = sess.query(usermapper).all() - self.assert_result(l, User, *user_all_result) + l = sess.query(User).order_by(User.id).all() + eq_(l, self.static.user_all_result) self.assert_sql_count(testing.db, go, 1) sess.clear() # then select just from users. run it into instances. # then assert the data, which will launch 6 more lazy loads + r = users.select().execute() def go(): - r = users.select().execute() - l = sess.query(usermapper).instances(r) - self.assert_result(l, User, *user_all_result) - self.assert_sql_count(testing.db, go, 7) - - - def test_lazyoptions(self): - """tests that an eager relation can be upgraded to a lazy relation via the options method""" - sess = create_session() - mapper(User, users, properties = dict( + l = sess.query(User).instances(r) + eq_(l, self.static.user_all_result) + self.assert_sql_count(testing.db, go, 6) + + @testing.resolve_artifact_names + def test_lazy_options(self): + """An eager relation can be upgraded to a lazy relation.""" + mapper(User, users, properties=dict( addresses = relation(mapper(Address, addresses), lazy=False) )) - l = sess.query(User).options(lazyload('addresses')).all() + + sess = create_session() + l = (sess.query(User). + order_by(User.id). + options(sa.orm.lazyload('addresses'))).all() + def go(): - self.assert_result(l, User, *user_address_result) - self.assert_sql_count(testing.db, go, 3) + eq_(l, self.static.user_address_result) + self.sql_count_(4, go) - def test_deepoptions(self): - mapper(User, users, order_by=users.default_order_by(), - properties = { - 'orders': relation(mapper(Order, orders, properties = { - 'items' : relation(mapper(Item, orderitems, properties = { - 'keywords' : relation(mapper(Keyword, keywords), itemkeywords, order_by=itemkeywords.default_order_by()) - }), order_by=orderitems.default_order_by()) - }), order_by=orders.default_order_by()) - }) +class DeepOptionsTest(_fixtures.FixtureTest): + @testing.resolve_artifact_names + def setup_mappers(self): + mapper(Keyword, keywords) + + mapper(Item, items, properties=dict( + keywords=relation(Keyword, item_keywords, + order_by=item_keywords.default_order_by()))) + + mapper(Order, orders, properties=dict( + items=relation(Item, order_items, + order_by=order_items.default_order_by()))) + + mapper(User, users, order_by=users.default_order_by(), properties=dict( + orders=relation(Order, order_by=orders.default_order_by()))) + + @testing.resolve_artifact_names + def test_deep_options_1(self): sess = create_session() # eagerload nothing. u = sess.query(User).all() def go(): - print u[0].orders[1].items[0].keywords[1] + x = u[0].orders[1].items[0].keywords[1] self.assert_sql_count(testing.db, go, 3) - sess.clear() - # eagerload orders.items.keywords; eagerload_all() implies eager load of orders, orders.items - q2 = sess.query(User).options(eagerload_all('orders.items.keywords')) - u = q2.all() + @testing.resolve_artifact_names + def test_deep_options_2(self): + sess = create_session() + + # eagerload orders.items.keywords; eagerload_all() implies eager load + # of orders, orders.items + l = (sess.query(User). + options(sa.orm.eagerload_all('orders.items.keywords'))).all() def go(): - print u[0].orders[1].items[0].keywords[1] - self.assert_sql_count(testing.db, go, 0) + x = l[0].orders[1].items[0].keywords[1] + self.sql_count_(0, go) - sess.clear() + + @testing.resolve_artifact_names + def test_deep_options_3(self): + sess = create_session() # same thing, with separate options calls - q2 = sess.query(User).options(eagerload('orders')).options(eagerload('orders.items')).options(eagerload('orders.items.keywords')) + q2 = (sess.query(User). + options(sa.orm.eagerload('orders')). + options(sa.orm.eagerload('orders.items')). + options(sa.orm.eagerload('orders.items.keywords'))) u = q2.all() def go(): - print u[0].orders[1].items[0].keywords[1] - self.assert_sql_count(testing.db, go, 0) + x = u[0].orders[1].items[0].keywords[1] + self.sql_count_(0, go) - sess.clear() + @testing.resolve_artifact_names + def test_deep_options_4(self): + sess = create_session() - self.assertRaisesMessage(sa_exc.ArgumentError, - r"Can't find entity Mapper\|Order\|orders in Query. Current list: \['Mapper\|User\|users'\]", - sess.query(User).options, eagerload(Order.items) - ) + self.assertRaisesMessage( + sa.exc.ArgumentError, + r"Can't find entity Mapper\|Order\|orders in Query. " + r"Current list: \['Mapper\|User\|users'\]", + sess.query(User).options, sa.orm.eagerload(Order.items)) - # eagerload "keywords" on items. it will lazy load "orders", then lazy load - # the "items" on the order, but on "items" it will eager load the "keywords" - q3 = sess.query(User).options(eagerload('orders.items.keywords')) + # eagerload "keywords" on items. it will lazy load "orders", then + # lazy load the "items" on the order, but on "items" it will eager + # load the "keywords" + q3 = sess.query(User).options(sa.orm.eagerload('orders.items.keywords')) u = q3.all() - self.assert_sql_count(testing.db, go, 2) + def go(): + x = u[0].orders[1].items[0].keywords[1] + self.sql_count_(2, go) -class DeferredTest(MapperSuperTest): +class DeferredTest(_fixtures.FixtureTest): + @testing.resolve_artifact_names def test_basic(self): - """tests a basic "deferred" load""" + """A basic deferred load.""" - m = mapper(Order, orders, properties={ - 'description':deferred(orders.c.description) - }, order_by=orders.c.order_id) + mapper(Order, orders, order_by=orders.c.id, properties={ + 'description': deferred(orders.c.description)}) o = Order() self.assert_(o.description is None) - q = create_session().query(m) + q = create_session().query(Order) def go(): l = q.all() o2 = l[2] - print o2.description + x = o2.description + + self.sql_eq_(go, [ + ("SELECT orders.id AS orders_id, " + "orders.user_id AS orders_user_id, " + "orders.address_id AS orders_address_id, " + "orders.isopen AS orders_isopen " + "FROM orders ORDER BY orders.id", {}), + ("SELECT orders.description AS orders_description " + "FROM orders WHERE orders.id = :param_1", + {'param_1':3})]) + + @testing.resolve_artifact_names + def test_unsaved(self): + """Deferred loading does not kick in when just PK cols are set.""" - self.assert_sql(testing.db, go, [ - ("SELECT orders.order_id AS orders_order_id, orders.user_id AS orders_user_id, orders.isopen AS orders_isopen FROM orders ORDER BY orders.order_id", {}), - ("SELECT orders.description AS orders_description FROM orders WHERE orders.order_id = :param_1", {'param_1':3}) - ]) + mapper(Order, orders, properties={ + 'description': deferred(orders.c.description)}) - def test_unsaved(self): - """test that deferred loading doesnt kick in when just PK cols are set""" - m = mapper(Order, orders, properties={ - 'description':deferred(orders.c.description) - }, order_by=orders.c.order_id) + sess = create_session() + o = Order() + sess.save(o) + o.id = 7 + def go(): + o.description = "some description" + self.sql_count_(0, go) + + @testing.resolve_artifact_names + def test_unsaved_2(self): + mapper(Order, orders, properties={ + 'description': deferred(orders.c.description)}) + + sess = create_session() + o = Order() + sess.add(o) + def go(): + o.description = "some description" + self.sql_count_(0, go) + + @testing.resolve_artifact_names + def test_unsaved_group(self): + """Deferred loading doesnt kick in when just PK cols are set""" + + mapper(Order, orders, order_by=orders.c.id, properties=dict( + description=deferred(orders.c.description, group='primary'), + opened=deferred(orders.c.isopen, group='primary'))) sess = create_session() o = Order() sess.save(o) - o.order_id = 7 + o.id = 7 def go(): o.description = "some description" - self.assert_sql_count(testing.db, go, 0) + self.sql_count_(0, go) - def test_unsavedgroup(self): - """test that deferred loading doesnt kick in when just PK cols are set""" - m = mapper(Order, orders, properties={ - 'description':deferred(orders.c.description, group='primary'), - 'opened':deferred(orders.c.isopen, group='primary') - }, order_by=orders.c.order_id) + @testing.resolve_artifact_names + def test_unsaved_group_2(self): + mapper(Order, orders, order_by=orders.c.id, properties=dict( + description=deferred(orders.c.description, group='primary'), + opened=deferred(orders.c.isopen, group='primary'))) sess = create_session() o = Order() sess.save(o) - o.order_id = 7 def go(): o.description = "some description" - self.assert_sql_count(testing.db, go, 0) + self.sql_count_(0, go) + @testing.resolve_artifact_names def test_save(self): m = mapper(Order, orders, properties={ - 'description':deferred(orders.c.description) - }, order_by=orders.c.order_id) + 'description': deferred(orders.c.description)}) sess = create_session() - q = sess.query(m) - l = q.all() - o2 = l[2] + o2 = sess.query(Order).get(2) o2.isopen = 1 sess.flush() + @testing.resolve_artifact_names def test_group(self): - """tests deferred load with a group""" - m = mapper(Order, orders, properties = { - 'userident':deferred(orders.c.user_id, group='primary'), - 'description':deferred(orders.c.description, group='primary'), - 'opened':deferred(orders.c.isopen, group='primary') - }, order_by=orders.c.order_id) + """Deferred load with a group""" + mapper(Order, orders, properties={ + 'userident': deferred(orders.c.user_id, group='primary'), + 'addrident': deferred(orders.c.address_id, group='primary'), + 'description': deferred(orders.c.description, group='primary'), + 'opened': deferred(orders.c.isopen, group='primary') + }) + sess = create_session() - q = sess.query(m) + q = sess.query(Order).order_by(Order.id) def go(): l = q.all() o2 = l[2] - print o2.opened, o2.description, o2.userident - assert o2.opened == 1 - assert o2.userident == 7 - assert o2.description == 'order 3' - - self.assert_sql(testing.db, go, [ - ("SELECT orders.order_id AS orders_order_id FROM orders ORDER BY orders.order_id", {}), - ("SELECT orders.user_id AS orders_user_id, orders.description AS orders_description, orders.isopen AS orders_isopen FROM orders WHERE orders.order_id = :param_1", {'param_1':3}) - ]) + eq_(o2.opened, 1) + eq_(o2.userident, 7) + eq_(o2.description, 'order 3') + + self.sql_eq_(go, [ + ("SELECT orders.id AS orders_id " + "FROM orders ORDER BY orders.id", {}), + ("SELECT orders.user_id AS orders_user_id, " + "orders.address_id AS orders_address_id, " + "orders.description AS orders_description, " + "orders.isopen AS orders_isopen " + "FROM orders WHERE orders.id = :param_1", + {'param_1':3})]) o2 = q.all()[2] - assert o2.description == 'order 3' + eq_(o2.description, 'order 3') assert o2 not in sess.dirty o2.description = 'order 3' def go(): sess.flush() - self.assert_sql_count(testing.db, go, 0) + self.sql_count_(0, go) + @testing.resolve_artifact_names def test_preserve_changes(self): - """test that the deferred load operation doesn't revert modifications on attributes""" - + """A deferred load operation doesn't revert modifications on attributes""" mapper(Order, orders, properties = { - 'userident':deferred(orders.c.user_id, group='primary'), - 'description':deferred(orders.c.description, group='primary'), - 'opened':deferred(orders.c.isopen, group='primary') - }, order_by=orders.c.order_id) + 'userident': deferred(orders.c.user_id, group='primary'), + 'description': deferred(orders.c.description, group='primary'), + 'opened': deferred(orders.c.isopen, group='primary') + }) sess = create_session() o = sess.query(Order).get(3) assert 'userident' not in o.__dict__ o.description = 'somenewdescription' - assert o.description == 'somenewdescription' + eq_(o.description, 'somenewdescription') def go(): - assert o.opened == 1 + eq_(o.opened, 1) self.assert_sql_count(testing.db, go, 1) - assert o.description == 'somenewdescription' + eq_(o.description, 'somenewdescription') assert o in sess.dirty + @testing.resolve_artifact_names + def test_commits_state(self): + """ + When deferred elements are loaded via a group, they get the proper + CommittedState and don't result in changes being committed - def test_commitsstate(self): - """test that when deferred elements are loaded via a group, they get the proper CommittedState - and dont result in changes being committed""" - - m = mapper(Order, orders, properties = { + """ + mapper(Order, orders, properties = { 'userident':deferred(orders.c.user_id, group='primary'), 'description':deferred(orders.c.description, group='primary'), - 'opened':deferred(orders.c.isopen, group='primary') - }) + 'opened':deferred(orders.c.isopen, group='primary')}) + sess = create_session() - q = sess.query(m) - o2 = q.all()[2] + o2 = sess.query(Order).get(3) + # this will load the group of attributes - assert o2.description == 'order 3' + eq_(o2.description, 'order 3') assert o2 not in sess.dirty # this will mark it as 'dirty', but nothing actually changed o2.description = 'order 3' - def go(): - # therefore the flush() shouldnt actually issue any SQL - sess.flush() - self.assert_sql_count(testing.db, go, 0) + # therefore the flush() shouldnt actually issue any SQL + self.assert_sql_count(testing.db, sess.flush, 0) + @testing.resolve_artifact_names def test_options(self): - """tests using options on a mapper to create deferred and undeferred columns""" - m = mapper(Order, orders, order_by=orders.c.order_id) - sess = create_session() - q = sess.query(m) - q2 = q.options(defer('user_id')) - def go(): - l = q2.all() - print l[2].user_id + """Options on a mapper to create deferred and undeferred columns""" - self.assert_sql(testing.db, go, [ - ("SELECT orders.order_id AS orders_order_id, orders.description AS orders_description, orders.isopen AS orders_isopen FROM orders ORDER BY orders.order_id", {}), - ("SELECT orders.user_id AS orders_user_id FROM orders WHERE orders.order_id = :param_1", {'param_1':3}) - ]) - sess.clear() - q3 = q2.options(undefer('user_id')) - def go(): - l = q3.all() - print l[3].user_id - self.assert_sql(testing.db, go, [ - ("SELECT orders.order_id AS orders_order_id, orders.user_id AS orders_user_id, orders.description AS orders_description, orders.isopen AS orders_isopen FROM orders ORDER BY orders.order_id", {}), - ]) + mapper(Order, orders) - def test_undefergroup(self): - """tests undefer_group()""" + sess = create_session() + q = sess.query(Order).order_by(Order.id).options(defer('user_id')) + + self.sql_eq_(q.all, [ + ("SELECT orders.id AS orders_id, " + "orders.address_id AS orders_address_id, " + "orders.description AS orders_description, " + "orders.isopen AS orders_isopen " + "FROM orders ORDER BY orders.id", {}), + ("SELECT orders.user_id AS orders_user_id " + "FROM orders WHERE orders.id = :param_1", + {'param_1':3})]) + sess.clear() - m = mapper(Order, orders, properties = { + q2 = q.options(sa.orm.undefer('user_id')) + self.sql_eq_(q2.all, [ + ("SELECT orders.id AS orders_id, " + "orders.user_id AS orders_user_id, " + "orders.address_id AS orders_address_id, " + "orders.description AS orders_description, " + "orders.isopen AS orders_isopen " + "FROM orders ORDER BY orders.id", + {})]) + + @testing.resolve_artifact_names + def test_undefer_group(self): + mapper(Order, orders, properties={ 'userident':deferred(orders.c.user_id, group='primary'), 'description':deferred(orders.c.description, group='primary'), - 'opened':deferred(orders.c.isopen, group='primary') - }, order_by=orders.c.order_id) + 'opened':deferred(orders.c.isopen, group='primary')}) + sess = create_session() - q = sess.query(m) + q = sess.query(Order).order_by(Order.id) def go(): - l = q.options(undefer_group('primary')).all() + l = q.options(sa.orm.undefer_group('primary')).all() o2 = l[2] - print o2.opened, o2.description, o2.userident - assert o2.opened == 1 - assert o2.userident == 7 - assert o2.description == 'order 3' - - self.assert_sql(testing.db, go, [ - ("SELECT orders.user_id AS orders_user_id, orders.description AS orders_description, orders.isopen AS orders_isopen, orders.order_id AS orders_order_id FROM orders ORDER BY orders.order_id", {}), - ]) - + eq_(o2.opened, 1) + eq_(o2.userident, 7) + eq_(o2.description, 'order 3') + + self.sql_eq_(go, [ + ("SELECT orders.user_id AS orders_user_id, " + "orders.description AS orders_description, " + "orders.isopen AS orders_isopen, " + "orders.id AS orders_id, " + "orders.address_id AS orders_address_id " + "FROM orders ORDER BY orders.id", + {})]) + + @testing.resolve_artifact_names def test_locates_col(self): - """test that manually adding a col to the result undefers the column""" + """Manually adding a column to the result undefers the column.""" mapper(Order, orders, properties={ - 'description':deferred(orders.c.description) - }, order_by=orders.c.order_id) + 'description':deferred(orders.c.description)}) sess = create_session() - o1 = sess.query(Order).first() + o1 = sess.query(Order).order_by(Order.id).first() def go(): - assert o1.description == 'order 1' - self.assert_sql_count(testing.db, go, 1) + eq_(o1.description, 'order 1') + self.sql_count_(1, go) sess = create_session() - o1 = sess.query(Order).add_column(orders.c.description).first()[0] + o1 = (sess.query(Order). + order_by(Order.id). + add_column(orders.c.description).first())[0] def go(): - assert o1.description == 'order 1' - self.assert_sql_count(testing.db, go, 0) - - def test_deepoptions(self): - m = mapper(User, users, properties={ - 'orders':relation(mapper(Order, orders, properties={ - 'items':relation(mapper(Item, orderitems, properties={ - 'item_name':deferred(orderitems.c.item_name) - })) - })) - }) + eq_(o1.description, 'order 1') + self.sql_count_(0, go) + + @testing.resolve_artifact_names + def test_deep_options(self): + mapper(Item, items, properties=dict( + description=deferred(items.c.description))) + mapper(Order, orders, properties=dict( + items=relation(Item, secondary=order_items))) + mapper(User, users, properties=dict( + orders=relation(Order, order_by=orders.c.id))) + sess = create_session() - q = sess.query(m) + q = sess.query(User).order_by(User.id) l = q.all() item = l[0].orders[1].items[1] def go(): - print item.item_name - self.assert_sql_count(testing.db, go, 1) - self.assert_(item.item_name == 'item 4') + eq_(item.description, 'item 4') + self.sql_count_(1, go) + eq_(item.description, 'item 4') + sess.clear() - q2 = q.options(undefer('orders.items.item_name')) - l = q2.all() + l = q.options(sa.orm.undefer('orders.items.description')).all() item = l[0].orders[1].items[1] def go(): - print item.item_name - self.assert_sql_count(testing.db, go, 0) - self.assert_(item.item_name == 'item 4') + eq_(item.description, 'item 4') + self.sql_count_(0, go) + eq_(item.description, 'item 4') + + +class CompositeTypesTest(_base.MappedTest): -class CompositeTypesTest(ORMTest): def define_tables(self, metadata): - global graphs, edges - graphs = Table('graphs', metadata, + Table('graphs', metadata, Column('id', Integer, primary_key=True), Column('version_id', Integer, primary_key=True), Column('name', String(30))) - edges = Table('edges', metadata, - Column('id', Integer, Sequence('seq_edges_id'), primary_key=True), + Table('edges', metadata, + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('graph_id', Integer, nullable=False), Column('graph_version_id', Integer, nullable=False), Column('x1', Integer), Column('y1', Integer), Column('x2', Integer), Column('y2', Integer), - ForeignKeyConstraint(['graph_id', 'graph_version_id'], ['graphs.id', 'graphs.version_id']) - ) + sa.ForeignKeyConstraint( + ['graph_id', 'graph_version_id'], + ['graphs.id', 'graphs.version_id'])) + @testing.resolve_artifact_names def test_basic(self): class Point(object): def __init__(self, x, y): @@ -1172,8 +1334,8 @@ class CompositeTypesTest(ORMTest): 'edges':relation(Edge) }) mapper(Edge, edges, properties={ - 'start':composite(Point, edges.c.x1, edges.c.y1), - 'end':composite(Point, edges.c.x2, edges.c.y2) + 'start':sa.orm.composite(Point, edges.c.x1, edges.c.y1), + 'end': sa.orm.composite(Point, edges.c.x2, edges.c.y2) }) sess = create_session() @@ -1188,29 +1350,30 @@ class CompositeTypesTest(ORMTest): sess.clear() g2 = sess.query(Graph).get([g.id, g.version_id]) for e1, e2 in zip(g.edges, g2.edges): - assert e1.start == e2.start - assert e1.end == e2.end + eq_(e1.start, e2.start) + eq_(e1.end, e2.end) g2.edges[1].end = Point(18, 4) sess.flush() sess.clear() e = sess.query(Edge).get(g2.edges[1].id) - assert e.end == Point(18, 4) + eq_(e.end, Point(18, 4)) e.end.x = 19 e.end.y = 5 sess.flush() sess.clear() - assert sess.query(Edge).get(g2.edges[1].id).end == Point(19, 5) + eq_(sess.query(Edge).get(g2.edges[1].id).end, Point(19, 5)) g.edges[1].end = Point(19, 5) sess.clear() def go(): - g2 = sess.query(Graph).options(eagerload('edges')).get([g.id, g.version_id]) + g2 = (sess.query(Graph). + options(sa.orm.eagerload('edges'))).get([g.id, g.version_id]) for e1, e2 in zip(g.edges, g2.edges): - assert e1.start == e2.start - assert e1.end == e2.end + eq_(e1.start, e2.start) + eq_(e1.end, e2.end) self.assert_sql_count(testing.db, go, 1) # test comparison of CompositeProperties to their object instances @@ -1219,11 +1382,11 @@ class CompositeTypesTest(ORMTest): assert sess.query(Edge).filter(Edge.start!=Point(3, 4)).first() is g.edges[1] - assert sess.query(Edge).filter(Edge.start==None).all() == [] - + eq_(sess.query(Edge).filter(Edge.start==None).all(), []) + @testing.resolve_artifact_names def test_pk(self): - """test using a composite type as a primary key""" + """Using a composite type as a primary key""" class Version(object): def __init__(self, id, version): @@ -1242,8 +1405,8 @@ class CompositeTypesTest(ORMTest): self.version = version mapper(Graph, graphs, properties={ - 'version':composite(Version, graphs.c.id, graphs.c.version_id) - }) + 'version':sa.orm.composite(Version, graphs.c.id, + graphs.c.version_id)}) sess = create_session() g = Graph(Version(1, 1)) @@ -1252,149 +1415,146 @@ class CompositeTypesTest(ORMTest): sess.clear() g2 = sess.query(Graph).get([1, 1]) - assert g.version == g2.version + eq_(g.version, g2.version) sess.clear() g2 = sess.query(Graph).get(Version(1, 1)) - assert g.version == g2.version + eq_(g.version, g2.version) +class NoLoadTest(_fixtures.FixtureTest): + run_inserts = 'once' + run_deletes = None -class NoLoadTest(MapperSuperTest): + @testing.resolve_artifact_names def test_basic(self): - """tests a basic one-to-many lazy load""" - m = mapper(User, users, properties = dict( + """A basic one-to-many lazy load""" + m = mapper(User, users, properties=dict( addresses = relation(mapper(Address, addresses), lazy=None) )) q = create_session().query(m) l = [None] def go(): - x = q.filter(users.c.user_id == 7).all() + x = q.filter(User.id == 7).all() x[0].addresses l[0] = x self.assert_sql_count(testing.db, go, 1) self.assert_result(l[0], User, - {'user_id' : 7, 'addresses' : (Address, [])}, + {'id' : 7, 'addresses' : (Address, [])}, ) + @testing.resolve_artifact_names def test_options(self): - m = mapper(User, users, properties = dict( + m = mapper(User, users, properties=dict( addresses = relation(mapper(Address, addresses), lazy=None) )) - q = create_session().query(m).options(lazyload('addresses')) + q = create_session().query(m).options(sa.orm.lazyload('addresses')) l = [None] def go(): - x = q.filter(users.c.user_id == 7).all() + x = q.filter(User.id == 7).all() x[0].addresses l[0] = x - self.assert_sql_count(testing.db, go, 2) + self.sql_count_(2, go) self.assert_result(l[0], User, - {'user_id' : 7, 'addresses' : (Address, [{'address_id' : 1}])}, + {'id' : 7, 'addresses' : (Address, [{'id' : 1}])}, ) -class MapperExtensionTest(TestBase): - def setUpAll(self): - tables.create() - - def tearDown(self): - clear_mappers() - tables.delete() - def tearDownAll(self): - tables.drop() +class MapperExtensionTest(_fixtures.FixtureTest): def extension(self): methods = [] - class Ext(MapperExtension): + class Ext(sa.orm.MapperExtension): def instrument_class(self, mapper, cls): methods.append('instrument_class') - return EXT_CONTINUE + return sa.orm.EXT_CONTINUE def init_instance(self, mapper, class_, oldinit, instance, args, kwargs): methods.append('init_instance') - return EXT_CONTINUE + return sa.orm.EXT_CONTINUE def init_failed(self, mapper, class_, oldinit, instance, args, kwargs): methods.append('init_failed') - return EXT_CONTINUE + return sa.orm.EXT_CONTINUE def load(self, query, *args, **kwargs): methods.append('load') - return EXT_CONTINUE + return sa.orm.EXT_CONTINUE def get(self, query, *args, **kwargs): methods.append('get') - return EXT_CONTINUE + return sa.orm.EXT_CONTINUE def translate_row(self, mapper, context, row): methods.append('translate_row') - return EXT_CONTINUE + return sa.orm.EXT_CONTINUE def create_instance(self, mapper, selectcontext, row, class_): methods.append('create_instance') - return EXT_CONTINUE + return sa.orm.EXT_CONTINUE def append_result(self, mapper, selectcontext, row, instance, result, **flags): methods.append('append_result') - return EXT_CONTINUE + return sa.orm.EXT_CONTINUE def populate_instance(self, mapper, selectcontext, row, instance, **flags): methods.append('populate_instance') - return EXT_CONTINUE + return sa.orm.EXT_CONTINUE def before_insert(self, mapper, connection, instance): methods.append('before_insert') - return EXT_CONTINUE + return sa.orm.EXT_CONTINUE def after_insert(self, mapper, connection, instance): methods.append('after_insert') - return EXT_CONTINUE + return sa.orm.EXT_CONTINUE def before_update(self, mapper, connection, instance): methods.append('before_update') - return EXT_CONTINUE + return sa.orm.EXT_CONTINUE def after_update(self, mapper, connection, instance): methods.append('after_update') - return EXT_CONTINUE + return sa.orm.EXT_CONTINUE def before_delete(self, mapper, connection, instance): methods.append('before_delete') - return EXT_CONTINUE + return sa.orm.EXT_CONTINUE def after_delete(self, mapper, connection, instance): methods.append('after_delete') - return EXT_CONTINUE + return sa.orm.EXT_CONTINUE return Ext, methods + @testing.resolve_artifact_names def test_basic(self): """test that common user-defined methods get called.""" Ext, methods = self.extension() mapper(User, users, extension=Ext()) sess = create_session() - u = User() + u = User(name='u1') sess.save(u) sess.flush() - u = sess.query(User).load(u.user_id) + u = sess.query(User).load(u.id) sess.clear() - u = sess.query(User).get(u.user_id) - u.user_name = 'foobar' + u = sess.query(User).get(u.id) + u.name = 'u1 changed' sess.flush() sess.delete(u) sess.flush() - self.assertEquals(methods, + eq_(methods, ['instrument_class', 'init_instance', 'before_insert', 'after_insert', 'load', 'translate_row', 'populate_instance', 'append_result', 'get', 'translate_row', 'create_instance', 'populate_instance', 'append_result', 'before_update', 'after_update', 'before_delete', 'after_delete']) - + @testing.resolve_artifact_names def test_inheritance(self): Ext, methods = self.extension() @@ -1405,40 +1565,40 @@ class MapperExtensionTest(TestBase): mapper(AdminUser, addresses, inherits=User) sess = create_session() - am = AdminUser() + am = AdminUser(name='au1', email_address='au1@e1') sess.save(am) sess.flush() - am = sess.query(AdminUser).load(am.user_id) + am = sess.query(AdminUser).load(am.id) sess.clear() - am = sess.query(AdminUser).get(am.user_id) - am.user_name = 'foobar' + am = sess.query(AdminUser).get(am.id) + am.name = 'au1 changed' sess.flush() sess.delete(am) sess.flush() - self.assertEquals(methods, + eq_(methods, ['instrument_class', 'instrument_class', 'init_instance', 'before_insert', 'after_insert', 'load', 'translate_row', 'populate_instance', 'append_result', 'get', 'translate_row', 'create_instance', 'populate_instance', 'append_result', 'before_update', 'after_update', 'before_delete', 'after_delete']) + @testing.resolve_artifact_names def test_after_with_no_changes(self): - # test that after_update is called even if no cols were updated + """after_update is called even if no columns were updated.""" Ext, methods = self.extension() - mapper(Item, orderitems, extension=Ext() , properties={ - 'keywords':relation(Keyword, secondary=itemkeywords) - }) - mapper(Keyword, keywords, extension=Ext() ) + mapper(Item, items, extension=Ext() , properties={ + 'keywords': relation(Keyword, secondary=item_keywords)}) + mapper(Keyword, keywords, extension=Ext()) sess = create_session() - i1 = Item() - k1 = Keyword() + i1 = Item(description="i1") + k1 = Keyword(name="k1") sess.save(i1) sess.save(k1) sess.flush() - self.assertEquals(methods, + eq_(methods, ['instrument_class', 'instrument_class', 'init_instance', 'init_instance', 'before_insert', 'after_insert', 'before_insert', 'after_insert']) @@ -1446,13 +1606,14 @@ class MapperExtensionTest(TestBase): del methods[:] i1.keywords.append(k1) sess.flush() - self.assertEquals(methods, ['before_update', 'after_update']) + eq_(methods, ['before_update', 'after_update']) + @testing.resolve_artifact_names def test_inheritance_with_dupes(self): + """Inheritance with the same extension instance on both mappers.""" Ext, methods = self.extension() - # test using inheritance, same extension on both mappers class AdminUser(User): pass @@ -1461,29 +1622,30 @@ class MapperExtensionTest(TestBase): mapper(AdminUser, addresses, inherits=User, extension=ext) sess = create_session() - am = AdminUser() + am = AdminUser(name="au1", email_address="au1@e1") sess.save(am) sess.flush() - am = sess.query(AdminUser).load(am.user_id) + am = sess.query(AdminUser).load(am.id) sess.clear() - am = sess.query(AdminUser).get(am.user_id) - am.user_name = 'foobar' + am = sess.query(AdminUser).get(am.id) + am.name = 'au1 changed' sess.flush() sess.delete(am) sess.flush() - self.assertEquals(methods, + eq_(methods, ['instrument_class', 'instrument_class', 'init_instance', 'before_insert', 'after_insert', 'load', 'translate_row', 'populate_instance', 'append_result', 'get', 'translate_row', 'create_instance', 'populate_instance', 'append_result', 'before_update', 'after_update', 'before_delete', 'after_delete']) + @testing.resolve_artifact_names def test_single_instrumentor(self): ext_None, methods_None = self.extension() ext_x, methods_x = self.extension() def reset(): - clear_mappers() + sa.orm.clear_mappers() del methods_None[:] del methods_x[:] @@ -1491,8 +1653,8 @@ class MapperExtensionTest(TestBase): mapper(User, users, extension=ext_x(), entity_name='x') User() - self.assertEquals(methods_None, ['instrument_class', 'init_instance']) - self.assertEquals(methods_x, []) + eq_(methods_None, ['instrument_class', 'init_instance']) + eq_(methods_x, []) reset() @@ -1500,8 +1662,8 @@ class MapperExtensionTest(TestBase): mapper(User, users, extension=ext_None()) User() - self.assertEquals(methods_x, ['instrument_class', 'init_instance']) - self.assertEquals(methods_None, []) + eq_(methods_x, ['instrument_class', 'init_instance']) + eq_(methods_None, []) reset() @@ -1511,54 +1673,56 @@ class MapperExtensionTest(TestBase): mapper(User, users, extension=ext_y(), entity_name='y') User() - self.assertEquals(methods_x, ['instrument_class', 'init_instance']) - self.assertEquals(methods_y, []) + eq_(methods_x, ['instrument_class', 'init_instance']) + eq_(methods_y, []) -class RequirementsTest(ORMTest): +class RequirementsTest(_base.MappedTest): """Tests the contract for user classes.""" def define_tables(self, metadata): - global t1, t2, t3, t4, t5, t6 - - t1 = Table('ht1', metadata, - Column('id', Integer, Sequence('seq_ht1_id'), primary_key=True), - Column('value', String(10))) - t2 = Table('ht2', metadata, - Column('id', Integer, Sequence('seq_ht2_id'), primary_key=True), - Column('ht1_id', Integer, ForeignKey('ht1.id')), - Column('value', String(10))) - t3 = Table('ht3', metadata, - Column('id', Integer, Sequence('seq_ht3_id'), primary_key=True), - Column('value', String(10))) - t4 = Table('ht4', metadata, - Column('ht1_id', Integer, ForeignKey('ht1.id'), - primary_key=True), - Column('ht3_id', Integer, ForeignKey('ht3.id'), - primary_key=True)) - t5 = Table('ht5', metadata, - Column('ht1_id', Integer, ForeignKey('ht1.id'), - primary_key=True), - ) - t6 = Table('ht6', metadata, - Column('ht1a_id', Integer, ForeignKey('ht1.id'), - primary_key=True), - Column('ht1b_id', Integer, ForeignKey('ht1.id'), - primary_key=True), - Column('value', String(10))) - + Table('ht1', metadata, + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column('value', String(10))) + Table('ht2', metadata, + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column('ht1_id', Integer, ForeignKey('ht1.id')), + Column('value', String(10))) + Table('ht3', metadata, + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column('value', String(10))) + Table('ht4', metadata, + Column('ht1_id', Integer, ForeignKey('ht1.id'), + primary_key=True), + Column('ht3_id', Integer, ForeignKey('ht3.id'), + primary_key=True)) + Table('ht5', metadata, + Column('ht1_id', Integer, ForeignKey('ht1.id'), + primary_key=True)) + Table('ht6', metadata, + Column('ht1a_id', Integer, ForeignKey('ht1.id'), + primary_key=True), + Column('ht1b_id', Integer, ForeignKey('ht1.id'), + primary_key=True), + Column('value', String(10))) + + @testing.resolve_artifact_names def test_baseclass(self): class OldStyle: pass - self.assertRaises(sa_exc.ArgumentError, mapper, OldStyle, t1) + self.assertRaises(sa.exc.ArgumentError, mapper, OldStyle, ht1) class NoWeakrefSupport(str): pass # TODO: is weakref support detectable without an instance? - #self.assertRaises(sa_exc.ArgumentError, mapper, NoWeakrefSupport, t2) + #self.assertRaises(sa.exc.ArgumentError, mapper, NoWeakrefSupport, t2) + @testing.resolve_artifact_names def test_comparison_overrides(self): """Simple tests to ensure users can supply comparison __methods__. @@ -1571,40 +1735,7 @@ class RequirementsTest(ORMTest): # adding these methods directly to each class to avoid decoration # by the testlib decorators. - class H1(object): - def __init__(self, value='abc'): - self.value = value - def __nonzero__(self): - return False - def __hash__(self): - return hash(self.value) - def __eq__(self, other): - if isinstance(other, type(self)): - return self.value == other.value - return False - class H2(object): - def __init__(self, value='abc'): - self.value = value - def __nonzero__(self): - return False - def __hash__(self): - return hash(self.value) - def __eq__(self, other): - if isinstance(other, type(self)): - return self.value == other.value - return False - class H3(object): - def __init__(self, value='abc'): - self.value = value - def __nonzero__(self): - return False - def __hash__(self): - return hash(self.value) - def __eq__(self, other): - if isinstance(other, type(self)): - return self.value == other.value - return False - class H6(object): + class _Base(object): def __init__(self, value='abc'): self.value = value def __nonzero__(self): @@ -1616,18 +1747,27 @@ class RequirementsTest(ORMTest): return self.value == other.value return False - mapper(H1, t1, properties={ + class H1(_Base): + pass + class H2(_Base): + pass + class H3(_Base): + pass + class H6(_Base): + pass + + mapper(H1, ht1, properties={ 'h2s': relation(H2, backref='h1'), - 'h3s': relation(H3, secondary=t4, backref='h1s'), - 'h1s': relation(H1, secondary=t5, backref='parent_h1'), + 'h3s': relation(H3, secondary=ht4, backref='h1s'), + 'h1s': relation(H1, secondary=ht5, backref='parent_h1'), 't6a': relation(H6, backref='h1a', - primaryjoin=t1.c.id==t6.c.ht1a_id), + primaryjoin=ht1.c.id==ht6.c.ht1a_id), 't6b': relation(H6, backref='h1b', - primaryjoin=t1.c.id==t6.c.ht1b_id), + primaryjoin=ht1.c.id==ht6.c.ht1b_id), }) - mapper(H2, t2) - mapper(H3, t3) - mapper(H6, t6) + mapper(H2, ht2) + mapper(H3, ht3) + mapper(H6, ht6) s = create_session() for i in range(3): @@ -1639,7 +1779,7 @@ class RequirementsTest(ORMTest): h1.h1s.append(H1()) s.flush() - self.assertEquals(t1.count().scalar(), 4) + eq_(ht1.count().scalar(), 4) h6 = H6() h6.h1a = h1 @@ -1657,8 +1797,8 @@ class RequirementsTest(ORMTest): h1.h2s.extend([H2(), H2()]) s.flush() - h1s = s.query(H1).options(eagerload('h2s')).all() - self.assertEqual(len(h1s), 5) + h1s = s.query(H1).options(sa.orm.eagerload('h2s')).all() + eq_(len(h1s), 5) self.assert_unordered_result(h1s, H1, {'h2s': []}, @@ -1669,23 +1809,16 @@ class RequirementsTest(ORMTest): {'h2s': []}, {'h2s': (H2, [{'value': 'abc'}])}) - h1s = s.query(H1).options(eagerload('h3s')).all() + h1s = s.query(H1).options(sa.orm.eagerload('h3s')).all() - self.assertEqual(len(h1s), 5) - h1s = s.query(H1).options(eagerload_all('t6a.h1b'), - eagerload('h2s'), - eagerload_all('h3s.h1s')).all() - self.assertEqual(len(h1s), 5) + eq_(len(h1s), 5) + h1s = s.query(H1).options(sa.orm.eagerload_all('t6a.h1b'), + sa.orm.eagerload('h2s'), + sa.orm.eagerload_all('h3s.h1s')).all() + eq_(len(h1s), 5) -class NoEqFoo(object): - def __init__(self, data): - self.data = data - def __eq__(self, other): - raise NotImplementedError() - def __ne__(self, other): - raise NotImplementedError() -class MagicNamesTest(ORMTest): +class MagicNamesTest(_base.MappedTest): def define_tables(self, metadata): Table('cartographers', metadata, @@ -1698,29 +1831,20 @@ class MagicNamesTest(ORMTest): Column('cart_id', Integer, ForeignKey('cartographers.id')), Column('state', String(2)), - Column('data', Text)) + Column('data', sa.Text)) - def tables(self): - cat = testing._otest_metadata.tables - return cat['cartographers'], cat['maps'] - - def classes(self): - class Base(object): - def __init__(self, **kw): - for key, value in kw.iteritems(): - setattr(self, key, value) - class Cartographer(Base): pass - class Map(Base): pass + def setup_classes(self): + class Cartographer(_base.BasicEntity): + pass - return Cartographer, Map + class Map(_base.BasicEntity): + pass - @testing.future + @testing.resolve_artifact_names def test_mappish(self): - t1, t2 = self.tables() - Cartographer, Map = self.classes() - mapper(Cartographer, t1, properties=dict( - query=t1.c.quip)) - mapper(Map, t2, properties=dict( + mapper(Cartographer, cartographers, properties=dict( + query=cartographers.c.quip)) + mapper(Map, maps, properties=dict( mapper=relation(Cartographer, backref='maps'))) c = Cartographer(name='Lenny', alias='The Dude', @@ -1732,68 +1856,81 @@ class MagicNamesTest(ORMTest): sess.flush() sess.clear() - for C, M in ((Cartographer, Map), (aliased(Cartographer), aliased(Map))): - print C, M + for C, M in ((Cartographer, Map), + (sa.orm.aliased(Cartographer), sa.orm.aliased(Map))): c1 = (sess.query(C). filter(C.alias=='The Dude'). filter(C.query=='Where be dragons?')).one() m1 = sess.query(M).filter(M.mapper==c1).one() - @testing.future - def test_stateish(self): - from sqlalchemy.orm import attributes - if hasattr(attributes, 'ClassManager'): - syn1 = attributes.ClassManager.STATE_ATTR - syn2 = attributes.ClassManager.MANAGER_ATTR - else: - syn1 = '_state' - syn2 = '_class_state' - + @testing.resolve_artifact_names + def test_direct_stateish(self): + for reserved in (sa.orm.attributes.ClassManager.STATE_ATTR, + sa.orm.attributes.ClassManager.MANAGER_ATTR): + t = Table('t', sa.MetaData(), + Column('id', Integer, primary_key=True), + Column(reserved, Integer)) + class T(object): + pass - t1, t2 = self.tables() - Cartographer, Map = self.classes() - mapper(Map, t2, properties=dict( - syn1=t2.c.state, - syn2=t2.c.data)) + self.assertRaisesMessage( + KeyError, + ('%r: requested attribute name conflicts with ' + 'instrumentation attribute of the same name.' % reserved), + mapper, T, t) + + @testing.resolve_artifact_names + def test_indirect_stateish(self): + for reserved in (sa.orm.attributes.ClassManager.STATE_ATTR, + sa.orm.attributes.ClassManager.MANAGER_ATTR): + class M(object): + pass - m = Map() - setattr(m, syn1, 'AK') - setattr(m, syn2, '10x10') + self.assertRaisesMessage( + KeyError, + ('requested attribute name conflicts with ' + 'instrumentation attribute of the same name'), + mapper, M, maps, properties={ + reserved: maps.c.state}) - sess = create_session() - sess.save(m) - sess.flush() - sess.clear() - for M in (Map, aliased(Map)): - print M - sess.query(M).filter(getattr(M, syn1) == 'AK').one() - sess.query(M).filter(getattr(M, syn2) == '10x10').one() +class ScalarRequirementsTest(_base.MappedTest): + # TODO: is this needed here? + # what does this suite excercise that unitofwork doesn't? -class ScalarRequirementsTest(ORMTest): def define_tables(self, metadata): - import pickle - global t1 - t1 = Table('t1', metadata, - Column('id', Integer, Sequence('seq_t1_id'), primary_key=True), - Column('data', PickleType(pickler=pickle)) # dont use cPickle due to import weirdness - ) + Table('t1', metadata, + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column('data', sa.PickleType())) + + def setup_classes(self): + class Foo(_base.ComparableEntity): + pass + @testing.resolve_artifact_names def test_correct_comparison(self): + mapper(Foo, t1) - class H1(fixtures.Base): - pass + f1 = Foo(data=pickleable.NotComparable('12345')) - mapper(H1, t1) + session = create_session() + session.add(f1) + session.flush() + session.clear() - h1 = H1(data=NoEqFoo('12345')) - s = create_session() - s.save(h1) - s.flush() - s.clear() - h1 = s.get(H1, h1.id) - assert h1.data.data == '12345' + f1 = session.get(Foo, f1.id) + eq_(f1.data.data, '12345') + + f2 = Foo(data=pickleable.BrokenComparable('abc')) + + session.add(f2) + session.flush() + session.clear() + + f2 = session.get(Foo, f2.id) + eq_(f2.data.data, 'abc') if __name__ == "__main__": diff --git a/test/pickleable.py b/test/pickleable.py index fc4f6910c3..f6331ca0be 100644 --- a/test/pickleable.py +++ b/test/pickleable.py @@ -20,10 +20,39 @@ class Bar(object): def __str__(self): return "Bar(%d, %d)" % (self.x, self.y) + class BarWithoutCompare(object): def __init__(self, x, y): self.x = x self.y = y def __str__(self): return "Bar(%d, %d)" % (self.x, self.y) - \ No newline at end of file + + +class NotComparable(object): + def __init__(self, data): + self.data = data + + def __hash__(self): + return id(self) + + def __eq__(self, other): + return NotImplemented + + def __ne__(self, other): + return NotImplemented + + +class BrokenComparable(object): + def __init__(self, data): + self.data = data + + def __hash__(self): + return id(self) + + def __eq__(self, other): + raise NotImplementedError + + def __ne__(self, other): + raise NotImplementedError + diff --git a/test/testlib/testing.py b/test/testlib/testing.py index e1f8e43502..da3e81d62b 100644 --- a/test/testlib/testing.py +++ b/test/testlib/testing.py @@ -629,9 +629,9 @@ class TestBase(unittest.TestCase): def assertRaisesMessage(self, except_cls, msg, callable_, *args, **kwargs): try: callable_(*args, **kwargs) - assert False, "Callable did not raise expected exception" + assert False, "Callable did not raise an exception" except except_cls, e: - assert re.search(msg, str(e)), "Exception message did not match: '%s'" % str(e) + assert re.search(msg, str(e)), "%r !~ %s" % (msg, e) if not hasattr(unittest.TestCase, 'assertTrue'): assertTrue = unittest.TestCase.failUnless