warning = "Call to deprecated function %s" % func.__name__
def func_with_warning(*args, **kwargs):
- if self.warn:
- warnings.warn(logging.SADeprecationWarning(warning),
- stacklevel=2)
+ warnings.warn(logging.SADeprecationWarning(warning),
+ stacklevel=2)
return func(*args, **kwargs)
- func_with_warning.warn = True
- self = func_with_warning
doc = func.__doc__ is not None and func.__doc__ or ''
including the deprecated versions of these arguments"""
import testbase
-import warnings
from sqlalchemy import *
from sqlalchemy import engine, exceptions
from testlib import *
except exceptions.InvalidRequestError, e:
assert str(e) == "This SchemaItem is not connected to any Engine or Connection."
+ @testing.uses_deprecated('//connect')
def test_create_drop_bound(self):
for meta in (MetaData,ThreadLocalMetaData):
table = Table('test_table', metadata,
Column('foo', Integer))
- testing.squelch_deprecation(metadata.connect)
metadata.connect(bind)
assert metadata.bind is table.bind is bind
import testbase
-import warnings
from sqlalchemy import *
from sqlalchemy import exceptions
from sqlalchemy.orm import create_session, clear_mappers, relation, class_mapper
from testlib import *
-testing.squelch_deprecation(assign_mapper)
-
class AssignMapperTest(PersistTest):
def setUpAll(self):
global metadata, table, table2
)
metadata.create_all()
+ @testing.uses_deprecated('SessionContext', 'assign_mapper')
def setUp(self):
global SomeObject, SomeOtherObject, ctx
class SomeObject(object):pass
class SomeOtherObject(object):pass
- deps = ('SessionContext is deprecated',
- 'SessionContextExt is deprecated')
- try:
- for dep in deps:
- warnings.filterwarnings('ignore', dep)
-
- ctx = SessionContext(create_session)
- assign_mapper(ctx, SomeObject, table, properties={
- 'options':relation(SomeOtherObject)
- })
- assign_mapper(ctx, SomeOtherObject, table2)
-
- s = SomeObject()
- s.id = 1
- s.data = 'hello'
- sso = SomeOtherObject()
- s.options.append(sso)
- ctx.current.flush()
- ctx.current.clear()
- finally:
- for dep in deps:
- warnings.filterwarnings('always', dep)
+ ctx = SessionContext(create_session)
+ assign_mapper(ctx, SomeObject, table, properties={
+ 'options':relation(SomeOtherObject)
+ })
+ assign_mapper(ctx, SomeOtherObject, table2)
+
+ s = SomeObject()
+ s.id = 1
+ s.data = 'hello'
+ sso = SomeOtherObject()
+ s.options.append(sso)
+ ctx.current.flush()
+ ctx.current.clear()
def tearDownAll(self):
metadata.drop_all()
+
def tearDown(self):
for table in metadata.table_iterator(reverse=True):
table.delete().execute()
clear_mappers()
+ @testing.uses_deprecated('assign_mapper')
def test_override_attributes(self):
sso = SomeOtherObject.query().first()
except exceptions.ArgumentError:
pass
+ @testing.uses_deprecated('assign_mapper')
def test_dont_clobber_methods(self):
class MyClass(object):
def expunge(self):
from testlib import *
class AssociationTest(PersistTest):
+ @testing.uses_deprecated('association option')
def setUpAll(self):
global items, item_keywords, keywords, metadata, Item, Keyword, KeywordAssociation
metadata = MetaData(testbase.db)
print loaded
self.assert_(saved == loaded)
+ @testing.uses_deprecated('association option')
def testdelete(self):
sess = create_session()
item1 = Item('item1')
'owner':relation(Owner,backref='tests'),
'category':relation(Category),
'owner_option': relation(Option,primaryjoin=and_(tests.c.id==options.c.test_id,tests.c.owner_id==options.c.owner_id),
- foreignkey=[options.c.test_id, options.c.owner_id],
+ foreign_keys=[options.c.test_id, options.c.owner_id],
uselist=False )
})
assert result == [(1, u'Some Category'), (3, u'Some Category')]
def test_withouteagerload(self):
+ s = create_session()
+ l = (s.query(Test).
+ select_from(tests.outerjoin(options,
+ and_(tests.c.id == options.c.test_id,
+ tests.c.owner_id ==
+ options.c.owner_id))).
+ filter(and_(tests.c.owner_id==1,
+ or_(options.c.someoption==None,
+ options.c.someoption==False))))
+
+ result = ["%d %s" % ( t.id,t.category.name ) for t in l]
+ print result
+ assert result == [u'1 Some Category', u'3 Some Category']
+
+ @testing.uses_deprecated('//select')
+ def test_withouteagerload_deprecated(self):
s = create_session()
l=s.query(Test).select ( and_(tests.c.owner_id==1,or_(options.c.someoption==None,options.c.someoption==False)),
from_obj=[tests.outerjoin(options,and_(tests.c.id==options.c.test_id,tests.c.owner_id==options.c.owner_id))])
which to attach to, when presented with a query that already has a complicated from clause."""
s = create_session()
q=s.query(Test).options(eagerload('category'))
+
+ l=(q.select_from(tests.outerjoin(options,
+ and_(tests.c.id ==
+ options.c.test_id,
+ tests.c.owner_id ==
+ options.c.owner_id))).
+ filter(and_(tests.c.owner_id==1,or_(options.c.someoption==None,
+ options.c.someoption==False))))
+
+ result = ["%d %s" % ( t.id,t.category.name ) for t in l]
+ print result
+ assert result == [u'1 Some Category', u'3 Some Category']
+
+ @testing.uses_deprecated('//select')
+ def test_witheagerload_deprecated(self):
+ """As test_witheagerload, but via select()."""
+ s = create_session()
+ q=s.query(Test).options(eagerload('category'))
l=q.select ( and_(tests.c.owner_id==1,or_(options.c.someoption==None,options.c.someoption==False)),
from_obj=[tests.outerjoin(options,and_(tests.c.id==options.c.test_id,tests.c.owner_id==options.c.owner_id))])
result = ["%d %s" % ( t.id,t.category.name ) for t in l]
@testing.unsupported('sybase')
def test_withoutouterjoin_literal(self):
+ s = create_session()
+ q = s.query(Test).options(eagerload('category'))
+ l = (q.filter(
+ (tests.c.owner_id==1) &
+ ('options.someoption is null or options.someoption=%s' % false)).
+ join('owner_option'))
+
+ result = ["%d %s" % ( t.id,t.category.name ) for t in l]
+ print result
+ assert result == [u'3 Some Category']
+
+ @testing.unsupported('sybase')
+ @testing.uses_deprecated('//select', '//join_to')
+ def test_withoutouterjoin_literal_deprecated(self):
s = create_session()
q=s.query(Test).options(eagerload('category'))
l=q.select( (tests.c.owner_id==1) & ('options.someoption is null or options.someoption=%s' % false) & q.join_to('owner_option') )
assert result == [u'3 Some Category']
def test_withoutouterjoin(self):
+ s = create_session()
+ q=s.query(Test).options(eagerload('category'))
+ l = q.filter( (tests.c.owner_id==1) & ((options.c.someoption==None) | (options.c.someoption==False)) ).join('owner_option')
+ result = ["%d %s" % ( t.id,t.category.name ) for t in l]
+ print result
+ assert result == [u'3 Some Category']
+
+ @testing.uses_deprecated('//select', '//join_to', '//join_via')
+ def test_withoutouterjoin_deprecated(self):
s = create_session()
q=s.query(Test).options(eagerload('category'))
l=q.select( (tests.c.owner_id==1) & ((options.c.someoption==None) | (options.c.someoption==False)) & q.join_to('owner_option') )
session.save(p)
session.flush()
session.clear()
- obj = session.query(Left).get_by(tag='tag1')
+ obj = session.query(Left).filter_by(tag='tag1').one()
print obj.middle.right[0]
class EagerTest3(ORMTest):
# now query for Data objects using that above select, adding the
# "order by max desc" separately
- q=s.query(Data).options(eagerload('foo')).select(
- from_obj=[datas.join(arb_data,arb_data.c.data_id==datas.c.id)],
- order_by=[desc(arb_data.c.max)],limit=10)
+ q=(s.query(Data).options(eagerload('foo')).
+ select_from(datas.join(arb_data,arb_data.c.data_id==datas.c.id)).
+ order_by(desc(arb_data.c.max)).
+ limit(10))
# extract "data_id" from the list of result objects
verify_result = [d.id for d in q]
))
mapper(Design, design, properties=dict(
- inheritedParts=relation(InheritedPart, private=True, backref="design"),
+ inheritedParts=relation(InheritedPart,
+ cascade="all, delete-orphan",
+ backref="design"),
))
mapper(DesignType, designType, properties=dict(
x.inheritedParts
class EagerTest7(ORMTest):
+ @testing.uses_deprecated('SessionContext')
def define_tables(self, metadata):
global companies_table, addresses_table, invoice_table, phones_table, items_table, ctx
global Company, Address, Phone, Item,Invoice
def __repr__(self):
return "Item: " + repr(getattr(self, 'item_id', None)) + " " + repr(getattr(self, 'invoice_id', None)) + " " + repr(self.code) + " " + repr(self.qty)
+ @testing.uses_deprecated('SessionContext')
def testone(self):
"""tests eager load of a many-to-one attached to a one-to-many. this testcase illustrated
the bug, which is that when the single Company is loaded, no further processing of the rows
session = create_session()
- for t in session.query(cls.mapper).limit(10).offset(0).list():
+ for t in session.query(cls.mapper).limit(10).offset(0).all():
print t.id, t.title, t.props_cnt
class EagerTest9(ORMTest):
"""test that unloaded collections are still included in a delete-cascade by default."""
sess = create_session()
- u = sess.query(tables.User).get_by(user_name='ed')
+ u = sess.query(tables.User).filter_by(user_name='ed').one()
# assert 'addresses' collection not loaded
assert 'addresses' not in u.__dict__
sess.delete(u)
clear_mappers()
metadata.drop_all()
+ @testing.uses_deprecated('SessionContext')
def setUpAll(self):
global ctx, data, metadata, User, Pref, Extra
ctx = SessionContext(create_session)
@testing.fails_on('maxdb')
def testorphan(self):
- jack = ctx.current.query(User).get_by(user_name='jack')
+ jack = ctx.current.query(User).filter_by(user_name='jack').one()
p = jack.pref
e = jack.pref.extra[0]
jack.pref = None
@testing.fails_on('maxdb')
def testorphan2(self):
- jack = ctx.current.query(User).get_by(user_name='jack')
+ jack = ctx.current.query(User).filter_by(user_name='jack').one()
p = jack.pref
e = jack.pref.extra[0]
ctx.current.clear()
def testorphan3(self):
"""test that double assignment doesn't accidentally reset the 'parent' flag."""
-
- jack = ctx.current.query(User).get_by(user_name='jack')
+
+ jack = ctx.current.query(User).filter_by(user_name='jack').one()
newpref = Pref("newpref")
jack.pref = newpref
jack.pref = newpref
ctx.current.flush()
-
+
class M2MCascadeTest(AssertMixin):
from testlib import *
from testlib.tables import *
-"""test cyclical mapper relationships. Many of the assertions are provided
-via running with postgres, which is strict about foreign keys.
+"""
+Tests cyclical mapper relationships.
-we might want to try an automated generate of much of this, all combos of T1<->T2, with
-o2m or m2o between them, and a third T3 with o2m/m2o to one/both T1/T2.
+We might want to try an automated generate of much of this, all combos of
+T1<->T2, with o2m or m2o between them, and a third T3 with o2m/m2o to one/both
+T1/T2.
"""
print repr(self) + " (%d)" % (id(self))
def __repr__(self):
return "%s(%s)" % (self.__class__.__name__, repr(self.data))
-
+
class SelfReferentialTest(AssertMixin):
"""tests a self-referential mapper, with an additional list of child objects."""
def setUpAll(self):
global t1, t2, metadata
metadata = MetaData(testbase.db)
- t1 = Table('t1', metadata,
+ t1 = Table('t1', metadata,
Column('c1', Integer, Sequence('t1c1_id_seq', optional=True), primary_key=True),
Column('parent_c1', Integer, ForeignKey('t1.c1')),
Column('data', String(20))
metadata.drop_all()
def setUp(self):
clear_mappers()
-
+
def testsingle(self):
class C1(Tester):
pass
sess.flush()
sess.delete(a)
sess.flush()
-
+
def testmanytooneonly(self):
"""test that the circular dependency sort can assemble a many-to-one dependency processor
when only the object on the "many" side is actually in the list of modified objects.
this requires that the circular sort add the other side of the relation into the UOWTransaction
so that the dependency operation can be tacked onto it.
-
+
This also affects inheritance relationships since they rely upon circular sort as well.
"""
class C1(Tester):
sess.save(c2)
sess.flush()
assert c2.parent_c1==c1.c1
-
+
def testcycle(self):
class C1(Tester):
pass
class C2(Tester):
pass
-
+
m1 = mapper(C1, t1, properties = {
'c1s' : relation(C1, cascade="all"),
- 'c2s' : relation(mapper(C2, t2), private=True)
+ 'c2s' : relation(mapper(C2, t2), cascade="all, delete-orphan")
})
a = C1('head c1')
sess = create_session( )
sess.save(a)
sess.flush()
-
+
sess.delete(a)
sess.flush()
s.save(t1)
s.flush()
s.clear()
- t = s.query(TT).get_by(id=t1.id)
+ t = s.query(TT).filter_by(id=t1.id).one()
assert t.children[0].parent_uuid == t1.uuid
def testlazyclause(self):
class TT(object):
s.flush()
s.clear()
- t = s.query(TT).get_by(id=t2.id)
+ t = s.query(TT).filter_by(id=t2.id).one()
assert t.uuid == t2.uuid
assert t.parent.uuid == t1.uuid
-
+
class InheritTestOne(AssertMixin):
def setUpAll(self):
global parent, child1, child2, meta
session.flush()
session.clear()
- c1 = session.query(Child1).get_by(child1_data="qwerty")
+ c1 = session.query(Child1).filter_by(child1_data="qwerty").one()
c2 = Child2()
c2.child1 = c1
c2.child2_data = "asdfgh"
session.save(c2)
- # the flush will fail if the UOW does not set up a many-to-one DP
+ # the flush will fail if the UOW does not set up a many-to-one DP
# attached to a task corresponding to c1, since "child1_id" is not nullable
session.flush()
create duplicate entries in the final sort"""
def define_tables(self, metadata):
global a, b, c
- a = Table('a', metadata,
+ a = Table('a', metadata,
Column('id', Integer, primary_key=True),
Column('data', String(30)),
Column('cid', Integer, ForeignKey('c.id')),
)
- b = Table('b', metadata,
+ b = Table('b', metadata,
Column('id', Integer, ForeignKey("a.id"), primary_key=True),
Column('data', String(30)),
)
- c = Table('c', metadata,
+ c = Table('c', metadata,
Column('id', Integer, primary_key=True),
Column('data', String(30)),
Column('aid', Integer, ForeignKey('a.id', use_alter=True, name="foo")),
cobj = C()
sess.save(cobj)
sess.flush()
-
+
class BiDirectionalManyToOneTest(ORMTest):
def define_tables(self, metadata):
Column('t1id', Integer, ForeignKey('t1.id'), nullable=False),
Column('t2id', Integer, ForeignKey('t2.id'), nullable=False),
)
-
+
def test_reflush(self):
class T1(object):pass
class T2(object):pass
class T3(object):pass
-
+
mapper(T1, t1, properties={
't2':relation(T2, primaryjoin=t1.c.t2id==t2.c.id)
})
't1':relation(T1),
't2':relation(T2)
})
-
+
o1 = T1()
o1.t2 = T2()
sess = create_session()
sess.save(o1)
sess.flush()
-
+
# the bug here is that the dependency sort comes up with T1/T2 in a cycle, but there
# are no T1/T2 objects to be saved. therefore no "cyclical subtree" gets generated,
# and one or the other of T1/T2 gets lost, and processors on T3 dont fire off.
o3.t2 = o1.t2
sess.save(o3)
sess.flush()
-
+
def test_reflush_2(self):
"""a variant on test_reflush()"""
o3b.t1 = o1a
o3b.t2 = o2a
sess.save(o3b)
-
+
o3 = T3()
o3.t1 = o1
o3.t2 = o1.t2
sess.save(o3)
sess.flush()
-
+
class BiDirectionalOneToManyTest(AssertMixin):
"""tests two mappers with a one-to-many relation to each other."""
def setUpAll(self):
global t1, t2, metadata
metadata = MetaData(testbase.db)
- t1 = Table('t1', metadata,
+ t1 = Table('t1', metadata,
Column('c1', Integer, Sequence('t1c1_id_seq', optional=True), primary_key=True),
Column('c2', Integer, ForeignKey('t2.c1'))
)
def testcycle(self):
class C1(object):pass
class C2(object):pass
-
+
m2 = mapper(C2, t2, properties={
'c1s': relation(C1, primaryjoin=t2.c.c1==t1.c.c2, uselist=True)
})
def setUpAll(self):
global t1, t2, t3, metadata
metadata = MetaData(testbase.db)
- t1 = Table('t1', metadata,
+ t1 = Table('t1', metadata,
Column('c1', Integer, Sequence('t1c1_id_seq', optional=True), primary_key=True),
Column('c2', Integer, ForeignKey('t2.c1')),
)
Column('c1', Integer, Sequence('t2c1_id_seq', optional=True), primary_key=True),
Column('c2', Integer, ForeignKey('t1.c1', use_alter=True, name='t1c1_fq')),
)
- t3 = Table('t1_data', metadata,
+ t3 = Table('t1_data', metadata,
Column('c1', Integer, Sequence('t1dc1_id_seq', optional=True), primary_key=True),
Column('t1id', Integer, ForeignKey('t1.c1')),
Column('data', String(20)))
metadata.create_all()
-
+
def tearDown(self):
clear_mappers()
def tearDownAll(self):
metadata.drop_all()
-
+
def testcycle(self):
class C1(object):pass
class C2(object):pass
class C1Data(object):
def __init__(self, data=None):
self.data = data
-
+
m2 = mapper(C2, t2, properties={
'c1s': relation(C1, primaryjoin=t2.c.c1==t1.c.c2, uselist=True)
})
'c2s' : relation(C2, primaryjoin=t1.c.c1==t2.c.c2, uselist=True),
'data' : relation(mapper(C1Data, t3))
})
-
+
a = C1()
b = C2()
c = C1()
def setUpAll(self):
global metadata
metadata = MetaData(testbase.db)
- global person
+ global person
global ball
ball = Table('ball', metadata,
Column('id', Integer, Sequence('ball_id_seq', optional=True), primary_key=True),
)
metadata.create_all()
-
+
def tearDownAll(self):
metadata.drop_all()
-
+
def tearDown(self):
clear_mappers()
def testcycle(self):
- """this test has a peculiar aspect in that it doesnt create as many dependent
+ """this test has a peculiar aspect in that it doesnt create as many dependent
relationships as the other tests, and revealed a small glitch in the circular dependency sorting."""
class Person(object):
pass
Ball.mapper = mapper(Ball, ball)
Person.mapper = mapper(Person, person, properties= dict(
- balls = relation(Ball.mapper, primaryjoin=ball.c.person_id==person.c.id, remote_side=ball.c.person_id, post_update=False, private=True),
+ balls = relation(Ball.mapper, primaryjoin=ball.c.person_id==person.c.id, remote_side=ball.c.person_id, post_update=False, cascade="all, delete-orphan"),
favorateBall = relation(Ball.mapper, primaryjoin=person.c.favorite_ball_id==ball.c.id, remote_side=person.c.favorite_ball_id, post_update=True),
)
)
sess = create_session()
sess.save(b)
sess.save(p)
-
+
self.assert_sql(testbase.db, lambda: sess.flush(), [
(
"INSERT INTO person (favorite_ball_id, data) VALUES (:favorite_ball_id, :data)",
"UPDATE person SET favorite_ball_id=:favorite_ball_id WHERE person.id = :person_id",
lambda ctx:{'favorite_ball_id':p.favorateBall.id,'person_id':p.id}
)
- ],
+ ],
with_sequences= [
(
"INSERT INTO person (id, favorite_ball_id, data) VALUES (:id, :favorite_ball_id, :data)",
"INSERT INTO ball (id, person_id, data) VALUES (:id, :person_id, :data)",
lambda ctx:{'id':ctx.last_inserted_ids()[0],'person_id':p.id, 'data':'some data'}
),
- # heres the post update
+ # heres the post update
(
"UPDATE person SET favorite_ball_id=:favorite_ball_id WHERE person.id = :person_id",
lambda ctx:{'favorite_ball_id':p.favorateBall.id,'person_id':p.id}
])
-
+
def testpostupdate_o2m(self):
"""tests a cycle between two rows, with a post_update on the one-to-many"""
class Person(object):
Ball.mapper = mapper(Ball, ball)
Person.mapper = mapper(Person, person, properties= dict(
- balls = relation(Ball.mapper, primaryjoin=ball.c.person_id==person.c.id, remote_side=ball.c.person_id, private=True, post_update=True, backref='person'),
+ balls = relation(Ball.mapper, primaryjoin=ball.c.person_id==person.c.id, remote_side=ball.c.person_id, cascade="all, delete-orphan", post_update=True, backref='person'),
favorateBall = relation(Ball.mapper, primaryjoin=person.c.favorite_ball_id==ball.c.id, remote_side=person.c.favorite_ball_id),
)
)
node_table.create()
def tearDownAll(self):
node_table.drop()
-
+
def testbasic(self):
"""test that post_update only fires off when needed.
-
+
this test case used to produce many superfluous update statements, particularly upon delete"""
class Node(object):
def __init__(self, path=''):
parent.children[-1].next_sibling = child
child.prev_sibling = parent.children[-1]
parent.children.append(child)
-
+
def remove_child(parent, child):
child.parent = None
node = child.next_sibling
def tearDownAll(self):
a_table.drop()
def testbasic(self):
- """test that post_update remembers to be involved in update operations as well,
+ """test that post_update remembers to be involved in update operations as well,
since it replaces the normal dependency processing completely [ticket:413]"""
- class a(object):
+ class a(object):
def __init__(self, fui):
self.fui = fui
# to fire off anyway
session.save(f2)
session.flush()
-
+
session.clear()
f1 = session.query(a).get(f1.id)
f2 = session.query(a).get(f2.id)
assert f2.foo is f1
-
-if __name__ == "__main__":
- testbase.main()
+if __name__ == "__main__":
+ testbase.main()
class EagerTest(FixtureTest):
keep_mappers = False
keep_data = True
-
+
def test_basic(self):
mapper(User, users, properties={
'addresses':relation(mapper(Address, addresses), lazy=False)
assert [
User(id=7, addresses=[
Address(id=1)
- ]),
+ ]),
User(id=8, addresses=[
Address(id=3, email_address='ed@bettyboop.com'),
Address(id=4, email_address='ed@lala.com'),
Address(id=2, email_address='ed@wood.com')
- ]),
+ ]),
User(id=9, addresses=[
Address(id=5)
- ]),
+ ]),
User(id=10, addresses=[])
] == q.all()
# assert that the eager loader added 'user_id' to the row
# and deferred loading of that col was disabled
self.assert_sql_count(testbase.db, go, 0)
-
+
# do the mapping in reverse
# (we would have just used an "addresses" backref but the test fixtures then require the whole
# backref to be set up, lazy loaders trigger, etc.)
# assert that the eager loader didn't have to affect 'user_id' here
# and that its still deferred
self.assert_sql_count(testbase.db, go, 1)
-
+
clear_mappers()
-
+
mapper(User, users, properties={'addresses':relation(Address, lazy=False)})
mapper(Address, addresses, properties={
'user_id':deferred(addresses.c.user_id),
u = sess.query(User).limit(1).get(8)
assert User(id=8, addresses=[Address(id=2, dingalings=[Dingaling(id=1)]), Address(id=3), Address(id=4)]) == u
self.assert_sql_count(testbase.db, go, 1)
-
+
def test_many_to_many(self):
mapper(Keyword, keywords)
'orders':relation(Order, primaryjoin=sel.c.id==orders.c.user_id, lazy=False)
})
mapper(Order, orders)
-
+
sess = create_session()
- self.assertEquals(sess.query(User).first(),
+ self.assertEquals(sess.query(User).first(),
User(name=u'jack',orders=[
- Order(address_id=1,description=u'order 1',isopen=0,user_id=7,id=1),
- Order(address_id=1,description=u'order 3',isopen=1,user_id=7,id=3),
+ Order(address_id=1,description=u'order 1',isopen=0,user_id=7,id=1),
+ Order(address_id=1,description=u'order 3',isopen=1,user_id=7,id=3),
Order(address_id=None,description=u'order 5',isopen=0,user_id=7,id=5)],
email_address=u'jack@bean.com',id=7)
)
-
+
def test_one_to_many_scalar(self):
mapper(User, users, properties = dict(
address = relation(mapper(Address, addresses), lazy=False, uselist=False)
orders = relation(Order, lazy = False),
))
q = create_session().query(User)
- l = q.select()
+ l = q.all()
assert fixtures.user_all_result == q.all()
def test_against_select(self):
def _assert_result(self):
return [
(
- User(id=7,
+ User(id=7,
addresses=[Address(id=1)]
),
- Order(id=1,
+ Order(id=1,
items=[Item(id=1), Item(id=2), Item(id=3)]
),
),
(
- User(id=7,
+ User(id=7,
addresses=[Address(id=1)]
),
- Order(id=3,
+ Order(id=3,
items=[Item(id=3), Item(id=4), Item(id=5)]
),
),
(
- User(id=7,
+ User(id=7,
addresses=[Address(id=1)]
),
- Order(id=5,
+ Order(id=5,
items=[Item(id=5)]
),
),
(
- User(id=9,
+ User(id=9,
addresses=[Address(id=5)]
),
- Order(id=2,
+ Order(id=2,
items=[Item(id=1), Item(id=2), Item(id=3)]
),
),
(
- User(id=9,
+ User(id=9,
addresses=[Address(id=5)]
),
- Order(id=4,
+ Order(id=4,
items=[Item(id=1), Item(id=5)]
),
)
]
-
+
def test_basic(self):
mapper(User, users, properties={
'addresses':relation(Address, lazy=False),
]) == d
self.assert_sql_count(testbase.db, go, 1)
-
+
def test_lazy_fallback_doesnt_affect_eager(self):
class Node(Base):
def append(self, node):
Node(data='n123')
] == list(n12.children)
self.assert_sql_count(testbase.db, go, 1)
-
+
def test_with_deferred(self):
class Node(Base):
def append(self, node):
def go():
assert Node(data='n1', children=[Node(data='n11'), Node(data='n12')]) == sess.query(Node).first()
self.assert_sql_count(testbase.db, go, 4)
-
+
sess.clear()
def go():
self.assert_sql_count(testbase.db, go, 3)
sess.clear()
-
+
def go():
assert Node(data='n1', children=[Node(data='n11'), Node(data='n12')]) == sess.query(Node).options(undefer('data'), undefer('children.data')).first()
self.assert_sql_count(testbase.db, go, 1)
-
-
-
+
+
+
def test_options(self):
class Node(Base):
def append(self, node):
{'nodes_data_1': 'n1'}
),
])
-
+
@testing.fails_on('maxdb')
def test_no_depth(self):
class Node(Base):
class EntityTest(AssertMixin):
"""tests mappers that are constructed based on "entity names", which allows the same class
to have multiple primary mappers """
+
+ @testing.uses_deprecated('SessionContext')
def setUpAll(self):
global user1, user2, address1, address2, metadata, ctx
metadata = MetaData(testbase.db)
ctx = SessionContext(create_session)
-
- user1 = Table('user1', metadata,
+
+ user1 = Table('user1', metadata,
Column('user_id', Integer, Sequence('user1_id_seq', optional=True),
primary_key=True),
Column('name', String(60), nullable=False)
)
- user2 = Table('user2', metadata,
+ user2 = Table('user2', metadata,
Column('user_id', Integer, Sequence('user2_id_seq', optional=True),
primary_key=True),
Column('name', String(60), nullable=False)
for t in metadata.table_iterator(reverse=True):
t.delete().execute()
+ @testing.uses_deprecated('SessionContextExt')
def testbasic(self):
"""tests a pair of one-to-many mapper structures, establishing that both
parent and child objects honor the "entity_name" attribute attached to the object
instances."""
class User(object):pass
class Address(object):pass
-
+
a1mapper = mapper(Address, address1, entity_name='address1', extension=ctx.mapper_extension)
- a2mapper = mapper(Address, address2, entity_name='address2', extension=ctx.mapper_extension)
+ a2mapper = mapper(Address, address2, entity_name='address2', extension=ctx.mapper_extension)
u1mapper = mapper(User, user1, entity_name='user1', properties ={
'addresses':relation(a1mapper)
}, extension=ctx.mapper_extension)
u2mapper =mapper(User, user2, entity_name='user2', properties={
'addresses':relation(a2mapper)
}, extension=ctx.mapper_extension)
-
+
u1 = User(_sa_entity_name='user1')
u1.name = 'this is user 1'
a1 = Address(_sa_entity_name='address1')
a1.email='a1@foo.com'
u1.addresses.append(a1)
-
+
u2 = User(_sa_entity_name='user2')
u2.name='this is user 2'
a2 = Address(_sa_entity_name='address2')
a2.email='a2@foo.com'
u2.addresses.append(a2)
-
+
ctx.current.flush()
assert user1.select().execute().fetchall() == [(u1.user_id, u1.name)]
assert user2.select().execute().fetchall() == [(u2.user_id, u2.name)]
class Address(object):pass
a1mapper = mapper(Address, address1, entity_name='address1')
- a2mapper = mapper(Address, address2, entity_name='address2')
+ a2mapper = mapper(Address, address2, entity_name='address2')
u1mapper = mapper(User, user1, entity_name='user1', properties ={
'addresses':relation(a1mapper)
})
sess.save(u2, entity_name='user2')
print u2.__dict__
- sess.flush()
+ sess.flush()
assert user1.select().execute().fetchall() == [(u1.user_id, u1.name)]
assert user2.select().execute().fetchall() == [(u2.user_id, u2.name)]
assert address1.select().execute().fetchall() == [(a1.address_id, u1.user_id, 'a1@foo.com')]
assert address2.select().execute().fetchall() == [(a1.address_id, u2.user_id, 'a2@foo.com')]
sess.clear()
- u1list = sess.query(User, entity_name='user1').select()
- u2list = sess.query(User, entity_name='user2').select()
+ u1list = sess.query(User, entity_name='user1').all()
+ u2list = sess.query(User, entity_name='user2').all()
assert len(u1list) == len(u2list) == 1
assert u1list[0] is not u2list[0]
assert len(u1list[0].addresses) == len(u2list[0].addresses) == 1
class User(object):pass
class Address1(object):pass
class Address2(object):pass
-
+
a1mapper = mapper(Address1, address1, extension=ctx.mapper_extension)
- a2mapper = mapper(Address2, address2, extension=ctx.mapper_extension)
+ a2mapper = mapper(Address2, address2, extension=ctx.mapper_extension)
u1mapper = mapper(User, user1, entity_name='user1', properties ={
'addresses':relation(a1mapper)
}, extension=ctx.mapper_extension)
assert address2.select().execute().fetchall() == [(a1.address_id, u2.user_id, 'a2@foo.com')]
ctx.current.clear()
- u1list = ctx.current.query(User, entity_name='user1').select()
- u2list = ctx.current.query(User, entity_name='user2').select()
+ u1list = ctx.current.query(User, entity_name='user1').all()
+ u2list = ctx.current.query(User, entity_name='user2').all()
assert len(u1list) == len(u2list) == 1
assert u1list[0] is not u2list[0]
assert len(u1list[0].addresses) == len(u2list[0].addresses) == 1
# is setting up for each load
assert isinstance(u1list[0].addresses[0], Address1)
assert isinstance(u2list[0].addresses[0], Address2)
-
+
def testpolymorphic_deferred(self):
"""test that deferred columns load properly using entity names"""
class User(object):pass
assert user2.select().execute().fetchall() == [(u2.user_id, u2.name)]
ctx.current.clear()
- u1list = ctx.current.query(User, entity_name='user1').select()
- u2list = ctx.current.query(User, entity_name='user2').select()
+ u1list = ctx.current.query(User, entity_name='user1').all()
+ u2list = ctx.current.query(User, entity_name='user2').all()
assert len(u1list) == len(u2list) == 1
assert u1list[0] is not u2list[0]
# the deferred column load requires that setup_loader() check that the correct DeferredColumnLoader
assert u1list[0].name == 'this is user 1'
assert u2list[0].name == 'this is user 2'
-
-if __name__ == "__main__":
+
+if __name__ == "__main__":
testbase.main()
import testbase
+import warnings
from sqlalchemy import *
from sqlalchemy import exceptions, util
from sqlalchemy.orm import *
"""that cascades on polymorphic relations continue
cascading along the path of the instance's mapper, not
the base mapper."""
-
+
def define_tables(self, metadata):
global t1, t2, t3, t4
t1= Table('t1', metadata,
Column('id', Integer, primary_key=True),
Column('data', String(30))
)
-
+
t2 = Table('t2', metadata,
Column('id', Integer, primary_key=True),
Column('t1id', Integer, ForeignKey('t1.id')),
Column('type', String(30)),
Column('data', String(30))
)
- t3 = Table('t3', metadata,
+ t3 = Table('t3', metadata,
Column('id', Integer, ForeignKey('t2.id'), primary_key=True),
Column('moredata', String(30)))
-
+
t4 = Table('t4', metadata,
Column('id', Integer, primary_key=True),
Column('t3id', Integer, ForeignKey('t3.id')),
Column('data', String(30)))
-
+
def test_cascade(self):
class T1(fixtures.Base):
pass
pass
class T4(fixtures.Base):
pass
-
+
mapper(T1, t1, properties={
't2s':relation(T2, cascade="all")
})
- mapper(T2, t2, polymorphic_on=t2.c.type, polymorphic_identity='t2')
+ mapper(T2, t2, polymorphic_on=t2.c.type, polymorphic_identity='t2')
mapper(T3, t3, inherits=T2, polymorphic_identity='t3', properties={
't4s':relation(T4, cascade="all")
})
mapper(T4, t4)
-
+
sess = create_session()
t1_1 = T1(data='t1')
t1_1.t2s.append(t2_1)
t1_1.t2s.append(t3_1)
-
+
t4_1 = T4(data='t4')
t3_1.t4s.append(t4_1)
sess.save(t1_1)
-
+
assert t4_1 in sess.new
sess.flush()
-
+
sess.delete(t1_1)
assert t4_1 in sess.deleted
sess.flush()
-
-
-
+
+
+
class GetTest(ORMTest):
def define_tables(self, metadata):
global foo, bar, blub
assert sess.query(Bar).get(b.id) == b
assert sess.query(Bar).get(bl.id) == bl
assert sess.query(Blub).get(bl.id) == bl
-
+
self.assert_sql_count(testbase.db, go, 0)
else:
# this is testing the 'wrong' behavior of using get()
assert sess.query(Blub).get(bl.id) == bl
self.assert_sql_count(testbase.db, go, 3)
-
+
test_get.__name__ = name
return test_get
class VersioningTest(ORMTest):
def define_tables(self, metadata):
global base, subtable, stuff
- base = Table('base', metadata,
+ base = Table('base', metadata,
Column('id', Integer, Sequence('version_test_seq', optional=True), primary_key=True ),
Column('version_id', Integer, nullable=False),
Column('value', String(40)),
Column('discriminator', Integer, nullable=False)
)
- subtable = Table('subtable', metadata,
+ subtable = Table('subtable', metadata,
Column('id', None, ForeignKey('base.id'), primary_key=True),
Column('subdata', String(50))
)
- stuff = Table('stuff', metadata,
+ stuff = Table('stuff', metadata,
Column('id', Integer, primary_key=True),
Column('parent', Integer, ForeignKey('base.id'))
)
-
+
@engines.close_open_connections
def test_save_update(self):
class Base(fixtures.Base):
'stuff':relation(Stuff)
})
mapper(Sub, subtable, inherits=Base, polymorphic_identity=2)
-
+
sess = create_session()
-
+
b1 = Base(value='b1')
s1 = Sub(value='sub1', subdata='some subdata')
sess.save(b1)
sess.save(s1)
-
+
sess.flush()
-
+
sess2 = create_session()
s2 = sess2.query(Base).get(s1.id)
s2.subdata = 'sess2 subdata'
-
+
s1.subdata = 'sess1 subdata'
-
+
sess.flush()
-
+
try:
sess2.query(Base).with_lockmode('read').get(s1.id)
assert False
assert False
except exceptions.ConcurrentModificationError, e:
assert True
-
+
sess2.refresh(s2)
assert s2.subdata == 'sess1 subdata'
s2.subdata = 'sess2 subdata'
sess2.flush()
-
+
def test_delete(self):
class Base(fixtures.Base):
pass
class Sub(Base):
pass
-
+
mapper(Base, base, polymorphic_on=base.c.discriminator, version_id_col=base.c.version_id, polymorphic_identity=1)
mapper(Sub, subtable, inherits=Base, polymorphic_identity=2)
-
+
sess = create_session()
-
+
b1 = Base(value='b1')
s1 = Sub(value='sub1', subdata='some subdata')
s2 = Sub(value='sub2', subdata='some other subdata')
sess.save(b1)
sess.save(s1)
sess.save(s2)
-
+
sess.flush()
sess2 = create_session()
s3 = sess2.query(Base).get(s1.id)
sess2.delete(s3)
sess2.flush()
-
+
s2.subdata = 'some new subdata'
sess.flush()
-
+
try:
s1.subdata = 'some new subdata'
sess.flush()
assert False
except exceptions.ConcurrentModificationError, e:
assert True
-
-
+
+
class DistinctPKTest(ORMTest):
"""test the construction of mapper.primary_key when an inheriting relationship
joins on a column other than primary key column."""
class Employee(Person): pass
- import warnings
- warnings.filterwarnings("error", r".*On mapper.*distinct primary key")
-
def insert_data(self):
person_insert = person_table.insert()
person_insert.execute(id=1, name='alice')
self._do_test(True)
def test_explicit_composite_pk(self):
+ warnings.filterwarnings("error", r".*On mapper.*distinct primary key")
+
person_mapper = mapper(Person, person_table)
try:
mapper(Employee, employee_table, inherits=person_mapper, primary_key=[person_table.c.id, employee_table.c.id])
class LazyTest(FixtureTest):
keep_mappers = False
keep_data = True
-
+
def test_basic(self):
mapper(User, users, properties={
'addresses':relation(mapper(Address, addresses), lazy=True)
q = sess.query(User)
assert [User(id=7, addresses=[Address(id=1, email_address='jack@bean.com')])] == q.filter(users.c.id == 7).all()
+ @testing.uses_deprecated('SessionContext')
def test_bindstosession(self):
"""test that lazy loaders use the mapper's contextual session if the parent instance
is not in a session, and that an error is raised if no contextual session"""
-
+
from sqlalchemy.ext.sessioncontext import SessionContext
ctx = SessionContext(create_session)
m = mapper(User, users, properties = dict(
assert [
User(id=7, addresses=[
Address(id=1)
- ]),
+ ]),
User(id=8, addresses=[
Address(id=3, email_address='ed@bettyboop.com'),
Address(id=4, email_address='ed@lala.com'),
Address(id=2, email_address='ed@wood.com')
- ]),
+ ]),
User(id=9, addresses=[
Address(id=5)
- ]),
+ ]),
User(id=10, addresses=[])
] == q.all()
-
+
def test_orderby_secondary(self):
"""tests that a regular mapper select on a single table can order by a relation to a second table"""
-
+
mapper(Address, addresses)
mapper(User, users, properties = dict(
Address(id=2, email_address='ed@wood.com'),
Address(id=3, email_address='ed@bettyboop.com'),
Address(id=4, email_address='ed@lala.com'),
- ]),
+ ]),
User(id=9, addresses=[
Address(id=5)
- ]),
+ ]),
User(id=7, addresses=[
Address(id=1)
- ]),
+ ]),
] == l
def test_orderby_desc(self):
assert [
User(id=7, addresses=[
Address(id=1)
- ]),
+ ]),
User(id=8, addresses=[
Address(id=2, email_address='ed@wood.com'),
Address(id=4, email_address='ed@lala.com'),
Address(id=3, email_address='ed@bettyboop.com'),
- ]),
+ ]),
User(id=9, addresses=[
Address(id=5)
- ]),
+ ]),
User(id=10, addresses=[])
] == sess.query(User).all()
assert getattr(User, 'addresses').hasparent(user.addresses[0], optimistic=True)
assert not class_mapper(Address)._is_orphan(user.addresses[0])
-
+
def test_limit(self):
"""test limit operations combined with lazy-load relationships."""
-
+
mapper(Item, items)
mapper(Order, orders, properties={
'items':relation(Item, secondary=order_items, lazy=True)
if testing.against('maxdb', 'mssql'):
l = q.limit(2).all()
assert fixtures.user_all_result[:2] == l
- else:
+ else:
l = q.limit(2).offset(1).all()
assert fixtures.user_all_result[1:3] == l
closedorders = alias(orders, 'closedorders')
mapper(Address, addresses)
-
+
mapper(User, users, properties = dict(
addresses = relation(Address, lazy = True),
open_orders = relation(mapper(Order, openorders, entity_name='open'), primaryjoin = and_(openorders.c.isopen == 1, users.c.id==openorders.c.user_id), lazy=True),
closed_orders = [Order(id=2)]
),
User(id=10)
-
+
] == q.all()
-
+
sess = create_session()
user = sess.query(User).get(7)
assert [Order(id=1), Order(id=5)] == create_session().query(Order, entity_name='closed').with_parent(user, property='closed_orders').all()
mapper(Item, items, properties = dict(
keywords = relation(Keyword, secondary=item_keywords, lazy=True),
))
-
+
q = create_session().query(Item)
assert fixtures.item_keyword_result == q.all()
mapper(Address, addresses, properties = dict(
user = relation(mapper(User, users), lazy=True, primaryjoin=pj)
))
-
+
sess = create_session()
-
+
# load address
a1 = sess.query(Address).filter_by(email_address="ed@wood.com").one()
-
+
# load user that is attached to the address
u1 = sess.query(User).get(8)
-
+
def go():
# lazy load of a1.user should get it from the session
assert a1.user is u1
self.assert_sql_count(testbase.db, go, 0)
clear_mappers()
-
+
def test_many_to_one(self):
mapper(Address, addresses, properties = dict(
user = relation(mapper(User, users), lazy=True)
a = q.filter(addresses.c.id==1).one()
assert a.user is not None
-
+
u1 = sess.query(User).get(7)
-
+
assert a.user is u1
-
+
def test_backrefs_dont_lazyload(self):
mapper(User, users, properties={
'addresses':relation(Address, backref='user')
ad2.user = u1
assert ad2.user is u1
self.assert_sql_count(testbase.db, go, 0)
-
+
def go():
assert ad2 in u1.addresses
self.assert_sql_count(testbase.db, go, 1)
-
+
class M2OGetTest(FixtureTest):
keep_mappers = False
keep_data = True
global info_table, data_table, rel_table, metadata
metadata = MetaData(testbase.db)
info_table = Table('infos', metadata,
- Column('pk', Integer, primary_key=True),
- Column('info', String(128)))
+ Column('pk', Integer, primary_key=True),
+ Column('info', String(128)))
data_table = Table('data', metadata,
- Column('data_pk', Integer, primary_key=True),
- Column('info_pk', Integer, ForeignKey(info_table.c.pk)),
- Column('timeval', Integer),
- Column('data_val', String(128)))
+ Column('data_pk', Integer, primary_key=True),
+ Column('info_pk', Integer,
+ ForeignKey(info_table.c.pk)),
+ Column('timeval', Integer),
+ Column('data_val', String(128)))
rel_table = Table('rels', metadata,
- Column('rel_pk', Integer, primary_key=True),
- Column('info_pk', Integer, ForeignKey(info_table.c.pk)),
- Column('start', Integer),
- Column('finish', Integer))
+ Column('rel_pk', Integer, primary_key=True),
+ Column('info_pk', Integer,
+ ForeignKey(info_table.c.pk)),
+ Column('start', Integer),
+ Column('finish', Integer))
metadata.create_all()
info_table.insert().execute(
- {'pk':1, 'info':'pk_1_info'},
- {'pk':2, 'info':'pk_2_info'},
- {'pk':3, 'info':'pk_3_info'},
- {'pk':4, 'info':'pk_4_info'},
- {'pk':5, 'info':'pk_5_info'})
+ {'pk':1, 'info':'pk_1_info'},
+ {'pk':2, 'info':'pk_2_info'},
+ {'pk':3, 'info':'pk_3_info'},
+ {'pk':4, 'info':'pk_4_info'},
+ {'pk':5, 'info':'pk_5_info'})
rel_table.insert().execute(
- {'rel_pk':1, 'info_pk':1, 'start':10, 'finish':19},
- {'rel_pk':2, 'info_pk':1, 'start':100, 'finish':199},
- {'rel_pk':3, 'info_pk':2, 'start':20, 'finish':29},
- {'rel_pk':4, 'info_pk':3, 'start':13, 'finish':23},
- {'rel_pk':5, 'info_pk':5, 'start':15, 'finish':25})
+ {'rel_pk':1, 'info_pk':1, 'start':10, 'finish':19},
+ {'rel_pk':2, 'info_pk':1, 'start':100, 'finish':199},
+ {'rel_pk':3, 'info_pk':2, 'start':20, 'finish':29},
+ {'rel_pk':4, 'info_pk':3, 'start':13, 'finish':23},
+ {'rel_pk':5, 'info_pk':5, 'start':15, 'finish':25})
data_table.insert().execute(
- {'data_pk':1, 'info_pk':1, 'timeval':11, 'data_val':'11_data'},
- {'data_pk':2, 'info_pk':1, 'timeval':9, 'data_val':'9_data'},
- {'data_pk':3, 'info_pk':1, 'timeval':13, 'data_val':'13_data'},
- {'data_pk':4, 'info_pk':2, 'timeval':23, 'data_val':'23_data'},
- {'data_pk':5, 'info_pk':2, 'timeval':13, 'data_val':'13_data'},
- {'data_pk':6, 'info_pk':1, 'timeval':15, 'data_val':'15_data'})
+ {'data_pk':1, 'info_pk':1, 'timeval':11, 'data_val':'11_data'},
+ {'data_pk':2, 'info_pk':1, 'timeval':9, 'data_val':'9_data'},
+ {'data_pk':3, 'info_pk':1, 'timeval':13, 'data_val':'13_data'},
+ {'data_pk':4, 'info_pk':2, 'timeval':23, 'data_val':'23_data'},
+ {'data_pk':5, 'info_pk':2, 'timeval':13, 'data_val':'13_data'},
+ {'data_pk':6, 'info_pk':1, 'timeval':15, 'data_val':'15_data'})
def tearDownAll(self):
metadata.drop_all()
-
+
def testone(self):
- """tests a lazy load which has multiple join conditions, including two that are against
- the same column in the child table"""
+ """Tests a lazy load which has multiple join conditions.
+
+ ...including two that are against the same column in the child table.
+ """
+
class Information(object):
- pass
+ pass
class Relation(object):
- pass
+ pass
class Data(object):
- pass
+ pass
session = create_session()
-
+
mapper(Data, data_table)
mapper(Relation, rel_table, properties={
-
'datas': relation(Data,
- primaryjoin=and_(rel_table.c.info_pk==data_table.c.info_pk,
- data_table.c.timeval >= rel_table.c.start,
- data_table.c.timeval <= rel_table.c.finish),
- foreignkey=data_table.c.info_pk)
- }
-
- )
+ primaryjoin=and_(
+ rel_table.c.info_pk ==
+ data_table.c.info_pk,
+ data_table.c.timeval >= rel_table.c.start,
+ data_table.c.timeval <= rel_table.c.finish),
+ foreign_keys=[data_table.c.info_pk])})
mapper(Information, info_table, properties={
'rels': relation(Relation)
})
assert len(info.rels) == 2
assert len(info.rels[0].datas) == 3
-if __name__ == "__main__":
+if __name__ == "__main__":
testbase.main()
-
-
'''represents a thingy attached to a Place'''
def __init__(self, name=None):
self.name = name
-
+
class Transition(object):
'''represents a transition'''
def __init__(self, name=None):
self.outputs = []
def __repr__(self):
return object.__repr__(self)+ " " + repr(self.inputs) + " " + repr(self.outputs)
-
+
class M2MTest(ORMTest):
def define_tables(self, metadata):
global place
Column('place_id', Integer, ForeignKey('place.place_id'), nullable=False),
Column('name', String(30), nullable=False)
)
-
+
# association table #1
global place_input
place_input = Table('place_input', metadata,
"Error creating backref 'transitions' on relation 'Transition.places (Place)': property of that name exists on mapper 'Mapper|Place|place'",
"Error creating backref 'places' on relation 'Place.transitions (Transition)': property of that name exists on mapper 'Mapper|Transition|transition'"
]
-
-
+
+
def testcircular(self):
"""tests a many-to-many relationship from a table to itself."""
sess.flush()
sess.clear()
- l = sess.query(Place).select(order_by=place.c.place_id)
+ l = sess.query(Place).order_by(place.c.place_id).all()
(p1, p2, p3, p4, p5, p6, p7) = l
assert p1.places == [p2,p3,p5]
assert p5.places == [p6]
Place.mapper = mapper(Place, place, properties = {
'thingies':relation(mapper(PlaceThingy, place_thingy), lazy=False)
})
-
+
Transition.mapper = mapper(Transition, transition, properties = dict(
inputs = relation(Place.mapper, place_output, lazy=False),
outputs = relation(Place.mapper, place_input, lazy=False),
sess.flush()
sess.clear()
- r = sess.query(Transition).select()
+ r = sess.query(Transition).all()
self.assert_unordered_result(r, Transition,
{'name': 'transition1',
'inputs': (Place, [{'name':'place1'}]),
sess = create_session()
[sess.save(x) for x in [t1,t2,t3,p1,p2,p3]]
sess.flush()
-
+
self.assert_result([t1], Transition, {'outputs': (Place, [{'name':'place3'}, {'name':'place1'}])})
self.assert_result([p2], Place, {'inputs': (Transition, [{'name':'transition1'},{'name':'transition2'}])})
Column('student_id', String(20), ForeignKey('student.name'),primary_key=True),
Column('course_id', String(20), ForeignKey('course.name'), primary_key=True))
- def testcircular(self):
+ def testcircular(self):
class Student(object):
def __init__(self, name=''):
self.name = name
sess.save(s1)
sess.flush()
sess.clear()
- s = sess.query(Student).get_by(name='Student1')
- c = sess.query(Course).get_by(name='Course3')
+ s = sess.query(Student).filter_by(name='Student1').one()
+ c = sess.query(Course).filter_by(name='Course3').one()
self.assert_(len(s.courses) == 3)
del s.courses[1]
self.assert_(len(s.courses) == 2)
-
+
def test_delete(self):
"""test that many-to-many table gets cleared out with deletion from the backref side"""
class Student(object):
sess.delete(s1)
sess.flush()
assert enrolTbl.count().scalar() == 0
-
+
class M2MTest3(ORMTest):
def define_tables(self, metadata):
global c, c2a1, c2a2, b, a
- c = Table('c', metadata,
+ c = Table('c', metadata,
Column('c1', Integer, primary_key = True),
Column('c2', String(20)),
)
- a = Table('a', metadata,
+ a = Table('a', metadata,
Column('a1', Integer, primary_key=True),
Column('a2', String(20)),
Column('c1', Integer, ForeignKey('c.c1'))
)
- c2a1 = Table('ctoaone', metadata,
+ c2a1 = Table('ctoaone', metadata,
Column('c1', Integer, ForeignKey('c.c1')),
Column('a1', Integer, ForeignKey('a.a1'))
)
- c2a2 = Table('ctoatwo', metadata,
+ c2a2 = Table('ctoatwo', metadata,
Column('c1', Integer, ForeignKey('c.c1')),
Column('a1', Integer, ForeignKey('a.a1'))
)
- b = Table('b', metadata,
+ b = Table('b', metadata,
Column('b1', Integer, primary_key=True),
Column('a1', Integer, ForeignKey('a.a1')),
Column('b2', Boolean)
)
-
+
def testbasic(self):
class C(object):pass
class A(object):pass
mapper(B, b)
- mapper(A, a,
+ mapper(A, a,
properties = {
'tbs' : relation(B, primaryjoin=and_(b.c.a1==a.c.a1, b.c.b2 == True), lazy=False),
}
)
- mapper(C, c,
+ mapper(C, c,
properties = {
'a1s' : relation(A, secondary=c2a1, lazy=False),
'a2s' : relation(A, secondary=c2a2, lazy=False)
o1 = create_session().query(C).get(1)
-if __name__ == "__main__":
+if __name__ == "__main__":
testbase.main()
-
s = create_session()
u = s.get(User, 7)
assert u._user_name=='jack'
- assert u._user_id ==7
+ assert u._user_id ==7
u2 = s.query(User).filter_by(user_name='jack').one()
assert u is u2
-
def test_no_pks(self):
s = select([users.c.user_name]).alias('foo')
try:
assert False
except exceptions.ArgumentError, e:
assert str(e) == "Could not assemble any primary key columns for mapped table 'foo'"
-
+
def test_compileonsession(self):
m = mapper(User, users)
session = create_session()
except TypeError, e:
pass
+ @testing.uses_deprecated('SessionContext', 'SessionContextExt')
def test_constructorexceptions(self):
"""test that exceptions raised in the mapped class are not masked by sa decorations"""
ex = AssertionError('oops')
assert e is ex
clear_mappers()
-
+
# test that TypeError is raised for illegal constructor args,
# whether or not explicit __init__ is present [ticket:908]
class Foo(object):
pass
class Bar(object):
pass
-
+
mapper(Foo, users)
mapper(Bar, addresses)
try:
def test_compileon_getprops(self):
m =mapper(User, users)
-
+
assert not m.compiled
assert list(m.iterate_properties)
assert m.compiled
clear_mappers()
-
+
m= mapper(User, users)
assert not m.compiled
assert m.get_property('user_name')
assert m.compiled
-
+
def test_add_property(self):
assert_col = []
class User(object):
assert_col.append(('set', name))
self._user_name = name
user_name = property(_get_user_name, _set_user_name)
-
+
m = mapper(User, users)
mapper(Address, addresses)
m.add_property('_user_name', deferred(users.c.user_name))
sess = create_session(transactional=True)
assert sess.query(User).get(7)
-
+
u = sess.query(User).filter_by(user_name='jack').one()
def go():
sess.save(u3)
sess.flush()
sess.rollback()
-
+
def test_replace_property(self):
m = mapper(User, users)
m.add_property('_user_name',users.c.user_name)
m.add_property('user_name', synonym('_user_name', proxy=True))
-
+
sess = create_session()
u = sess.query(User).filter_by(user_name='jack').one()
assert u._user_name == 'jack'
assert u.user_name == 'jack'
u.user_name = 'jacko'
assert m._columntoproperty[users.c.user_name] is m.get_property('_user_name')
-
+
clear_mappers()
m = mapper(User, users)
m.add_property('user_name', synonym('_user_name', map_column=True))
-
+
sess.clear()
u = sess.query(User).filter_by(user_name='jack').one()
assert u._user_name == 'jack'
assert u.user_name == 'jack'
u.user_name = 'jacko'
assert m._columntoproperty[users.c.user_name] is m.get_property('_user_name')
-
+
def test_synonym_replaces_backref(self):
assert_calls = []
class Address(object):
assert_calls.append("set")
self._user = user
user = property(_get_user, _set_user)
-
+
# synonym is created against nonexistent prop
mapper(Address, addresses, properties={
'user':synonym('_user')
})
compile_mappers()
-
+
# later, backref sets up the prop
mapper(User, users, properties={
'addresses':relation(Address, backref='_user')
a1.user = u2
assert a1.user is u2
self.assertEquals(assert_calls, ["set", "get"])
-
+
def test_self_ref_syn(self):
t = Table('nodes', MetaData(),
Column('id', Integer, primary_key=True),
Column('parent_id', Integer, ForeignKey('nodes.id')))
-
+
class Node(object):
pass
-
+
mapper(Node, t, properties={
'_children':relation(Node, backref=backref('_parent', remote_side=t.c.id)),
'children':synonym('_children'),
'parent':synonym('_parent')
})
-
+
n1 = Node()
n2 = Node()
n1.children.append(n2)
assert n2.parent is n2._parent is n1
assert n1.children[0] is n1._children[0] is n2
self.assertEquals(str(Node.parent == n2), ":param_1 = nodes.parent_id")
-
+
def test_illegal_non_primary(self):
mapper(User, users)
mapper(Address, addresses)
assert False
except exceptions.ArgumentError, e:
assert "Attempting to assign a new relation 'addresses' to a non-primary mapper on class 'User'" in str(e)
-
+
def test_propfilters(self):
t = Table('person', MetaData(),
Column('id', Integer, primary_key=True),
assert_props(Hoho, ['id', 'name', 'type'])
assert_props(Lala, ['p_employee_number', 'p_id', 'p_name', 'p_type'])
- def test_recursiveselectby(self):
+ @testing.uses_deprecated('//select_by', '//join_via', '//list')
+ def test_recursive_select_by_deprecated(self):
"""test that no endless loop occurs when traversing for select_by"""
m = mapper(User, users, properties={
'orders':relation(mapper(Order, orders), backref='user'),
usersaddresses = sql.join(users, addresses, users.c.user_id == addresses.c.user_id)
m = mapper(User, usersaddresses, primary_key=[users.c.user_id])
q = create_session().query(m)
- l = q.select()
+ l = q.all()
self.assert_result(l, User, *user_result[0:2])
def test_mappingtojoinnopk(self):
j = join(users, addresses, isouter=True)
m = mapper(User, j, allow_null_pks=True, primary_key=[users.c.user_id, addresses.c.address_id])
q = create_session().query(m)
- l = q.select()
+ l = q.all()
self.assert_result(l, User, *result)
def test_customjoin(self):
+ """Tests that select_from totally replace the FROM parameters."""
+
+ m = mapper(User, users, properties={
+ 'orders':relation(mapper(Order, orders, properties={
+ 'items':relation(mapper(Item, orderitems))
+ }))
+ })
+
+ q = create_session().query(m)
+ l = (q.select_from(users.join(orders).join(orderitems)).
+ filter(orderitems.c.item_name=='item 4'))
+
+ self.assert_result(l, User, user_result[0])
+
+ @testing.uses_deprecated('//select')
+ def test_customjoin_deprecated(self):
"""test that the from_obj parameter to query.select() can be used
to totally replace the FROM parameters of the generated query."""
def test_orderby(self):
"""test ordering at the mapper and query level"""
+
# TODO: make a unit test out of these various combinations
-# m = mapper(User, users, order_by=desc(users.c.user_name))
+ #m = mapper(User, users, order_by=desc(users.c.user_name))
mapper(User, users, order_by=None)
-# mapper(User, users)
+ #mapper(User, users)
-# l = create_session().query(User).select(order_by=[desc(users.c.user_name), asc(users.c.user_id)])
- l = create_session().query(User).select()
-# l = create_session().query(User).select(order_by=[])
-# l = create_session().query(User).select(order_by=None)
+ #l = create_session().query(User).select(order_by=[desc(users.c.user_name), asc(users.c.user_id)])
+ l = create_session().query(User).all()
+ #l = create_session().query(User).select(order_by=[])
+ #l = create_session().query(User).select(order_by=None)
@testing.unsupported('firebird')
def test_function(self):
- """test mapping to a SELECT statement that has functions in it."""
- s = select([users, (users.c.user_id * 2).label('concat'), func.count(addresses.c.address_id).label('count')],
- users.c.user_id==addresses.c.user_id, group_by=[c for c in users.c]).alias('myselect')
+ """Test mapping to a SELECT statement that has functions in it."""
+
+ s = select([users,
+ (users.c.user_id * 2).label('concat'),
+ func.count(addresses.c.address_id).label('count')],
+ users.c.user_id == addresses.c.user_id,
+ group_by=[c for c in users.c]).alias('myselect')
+
mapper(User, s)
sess = create_session()
- l = sess.query(User).select()
+ l = sess.query(User).all()
for u in l:
print "User", u.user_id, u.user_name, u.concat, u.count
assert l[0].concat == l[0].user_id * 2 == 14
q = create_session().query(User)
self.assert_(q.count()==3)
self.assert_(q.count(users.c.user_id.in_([8,9]))==2)
+
+ @testing.unsupported('firebird')
+ @testing.uses_deprecated('//count_by', '//join_by', '//join_via')
+ def test_count_by_deprecated(self):
+ mapper(User, users)
+ q = create_session().query(User)
self.assert_(q.count_by(user_name='fred')==1)
def test_manytomany_count(self):
u = sess.query(User).filter(User.uname=='jack').one()
self.assert_result(u.adlist, Address, *(user_address_result[0]['addresses'][1]))
-
- addr = sess.query(Address).get_by(address_id=user_address_result[0]['addresses'][1][0]['address_id'])
+
+ addr = sess.query(Address).filter_by(address_id=user_address_result[0]['addresses'][1][0]['address_id']).one()
u = sess.query(User).filter_by(adname=addr).one()
u2 = sess.query(User).filter_by(adlist=addr).one()
-
+
assert u is u2
assert u not in sess.dirty
assert u.user_name == "some user name"
assert u in sess.dirty
-
+
def test_column_synonyms(self):
"""test new-style synonyms which automatically instrument properties, set up aliased column, etc."""
sess = create_session()
-
+
assert_col = []
class User(object):
def _get_user_name(self):
assert False
except exceptions.ArgumentError, e:
assert str(e) == "Can't compile synonym '_user_name': no column on table 'users' named 'not_user_name'"
-
+
clear_mappers()
-
+
mapper(Address, addresses)
mapper(User, users, properties = {
'addresses':relation(Address, lazy=True),
'user_name':synonym('_user_name', map_column=True)
})
-
+
# test compile
assert not isinstance(User.user_name == 'jack', bool)
-
+
assert hasattr(User, 'user_name')
assert hasattr(User, '_user_name')
-
+
u = sess.query(User).filter(User.user_name == 'jack').one()
assert u.user_name == 'jack'
u.user_name = 'foo'
assert u.user_name == 'foo'
assert assert_col == [('get', 'jack'), ('set', 'foo'), ('get', 'foo')]
-
+
class OptionsTest(MapperSuperTest):
@testing.fails_on('maxdb')
def test_synonymoptions(self):
))
def go():
- u = sess.query(User).options(eagerload('adlist')).get_by(user_name='jack')
+ u = sess.query(User).options(eagerload('adlist')).filter_by(user_name='jack').one()
self.assert_result(u.adlist, Address, *(user_address_result[0]['addresses'][1]))
self.assert_sql_count(testbase.db, go, 1)
- def test_extensionoptions(self):
+ @testing.uses_deprecated('//select_by')
+ def test_extension_options(self):
sess = create_session()
class ext1(MapperExtension):
def populate_instance(self, mapper, selectcontext, row, instance, **flags):
# test that eager loading doesnt modify parent mapper
def go():
- u = sess.query(User).get_by(user_id=8)
+ u = sess.query(User).filter_by(user_id=8).one()
assert u.user_id == 8
assert len(u.addresses) == 3
assert "tbl_row_count" not in self.capture_sql(testbase.db, go)
# first test straight eager load, 1 statement
def go():
- l = sess.query(usermapper).select()
+ l = sess.query(usermapper).all()
self.assert_result(l, User, *user_address_result)
self.assert_sql_count(testbase.db, go, 1)
mapper(User, users, properties = dict(
addresses = relation(mapper(Address, addresses), lazy=False)
))
- l = sess.query(User).options(lazyload('addresses')).select()
+ l = sess.query(User).options(lazyload('addresses')).all()
def go():
self.assert_result(l, User, *user_address_result)
self.assert_sql_count(testbase.db, go, 3)
sess = create_session()
# eagerload nothing.
- u = sess.query(User).select()
+ u = sess.query(User).all()
def go():
print u[0].orders[1].items[0].keywords[1]
self.assert_sql_count(testbase.db, go, 3)
print "-------MARK----------"
# eagerload orders.items.keywords; eagerload_all() implies eager load of orders, orders.items
q2 = sess.query(User).options(eagerload_all('orders.items.keywords'))
- u = q2.select()
+ u = q2.all()
def go():
print u[0].orders[1].items[0].keywords[1]
print "-------MARK2----------"
# same thing, with separate options calls
q2 = sess.query(User).options(eagerload('orders')).options(eagerload('orders.items')).options(eagerload('orders.items.keywords'))
- u = q2.select()
+ u = q2.all()
def go():
print u[0].orders[1].items[0].keywords[1]
self.assert_sql_count(testbase.db, go, 0)
sess.clear()
-
+
try:
sess.query(User).options(eagerload('items', Order))
assert False
# the "items" on the order, but on "items" it will eager load the "keywords"
print "-------MARK5----------"
q3 = sess.query(User).options(eagerload('orders.items.keywords'))
- u = q3.select()
+ u = q3.all()
self.assert_sql_count(testbase.db, go, 2)
q = create_session().query(m)
def go():
- l = q.select()
+ l = q.all()
o2 = l[2]
print o2.description
sess = create_session()
q = sess.query(m)
- l = q.select()
+ l = q.all()
o2 = l[2]
o2.isopen = 1
sess.flush()
sess = create_session()
q = sess.query(m)
def go():
- l = q.select()
+ l = q.all()
o2 = l[2]
print o2.opened, o2.description, o2.userident
assert o2.opened == 1
("SELECT orders.user_id AS orders_user_id, orders.description AS orders_description, orders.isopen AS orders_isopen FROM orders WHERE orders.order_id = :param_1", {'param_1':3})
])
- o2 = q.select()[2]
+ o2 = q.all()[2]
# assert o2.opened == 1
assert o2.description == 'order 3'
assert o2 not in sess.dirty
def go():
sess.flush()
self.assert_sql_count(testbase.db, go, 0)
-
+
def test_preserve_changes(self):
"""test that the deferred load operation doesn't revert modifications on attributes"""
-
+
mapper(Order, orders, properties = {
'userident':deferred(orders.c.user_id, group='primary'),
'description':deferred(orders.c.description, group='primary'),
self.assert_sql_count(testbase.db, go, 1)
assert o.description == 'somenewdescription'
assert o in sess.dirty
-
-
+
+
def test_commitsstate(self):
"""test that when deferred elements are loaded via a group, they get the proper CommittedState
and dont result in changes being committed"""
})
sess = create_session()
q = sess.query(m)
- o2 = q.select()[2]
+ o2 = q.all()[2]
# this will load the group of attributes
assert o2.description == 'order 3'
assert o2 not in sess.dirty
q = sess.query(m)
q2 = q.options(defer('user_id'))
def go():
- l = q2.select()
+ l = q2.all()
print l[2].user_id
orderby = str(orders.default_order_by()[0].compile(testbase.db))
sess.clear()
q3 = q2.options(undefer('user_id'))
def go():
- l = q3.select()
+ l = q3.all()
print l[3].user_id
self.assert_sql(testbase.db, go, [
("SELECT orders.order_id AS orders_order_id, orders.user_id AS orders_user_id, orders.description AS orders_description, orders.isopen AS orders_isopen FROM orders ORDER BY %s" % orderby, {}),
sess = create_session()
q = sess.query(m)
def go():
- l = q.options(undefer_group('primary')).select()
+ l = q.options(undefer_group('primary')).all()
o2 = l[2]
print o2.opened, o2.description, o2.userident
assert o2.opened == 1
self.assert_sql(testbase.db, go, [
("SELECT orders.user_id AS orders_user_id, orders.description AS orders_description, orders.isopen AS orders_isopen, orders.order_id AS orders_order_id FROM orders ORDER BY %s" % orderby, {}),
])
-
+
def test_locates_col(self):
"""test that manually adding a col to the result undefers the column"""
mapper(Order, orders, properties={
def go():
assert o1.description == 'order 1'
self.assert_sql_count(testbase.db, go, 1)
-
+
sess = create_session()
o1 = sess.query(Order).add_column(orders.c.description).first()[0]
def go():
assert o1.description == 'order 1'
self.assert_sql_count(testbase.db, go, 0)
-
+
def test_deepoptions(self):
m = mapper(User, users, properties={
'orders':relation(mapper(Order, orders, properties={
})
sess = create_session()
q = sess.query(m)
- l = q.select()
+ l = q.all()
item = l[0].orders[1].items[1]
def go():
print item.item_name
self.assert_(item.item_name == 'item 4')
sess.clear()
q2 = q.options(undefer('orders.items.item_name'))
- l = q2.select()
+ l = q2.all()
item = l[0].orders[1].items[1]
def go():
print item.item_name
q = create_session().query(m)
l = [None]
def go():
- x = q.select(users.c.user_id == 7)
+ x = q.filter(users.c.user_id == 7).all()
x[0].addresses
l[0] = x
self.assert_sql_count(testbase.db, go, 1)
self.assert_result(l[0], User,
{'user_id' : 7, 'addresses' : (Address, [])},
)
+
def test_options(self):
m = mapper(User, users, properties = dict(
addresses = relation(mapper(Address, addresses), lazy=None)
q = create_session().query(m).options(lazyload('addresses'))
l = [None]
def go():
- x = q.select(users.c.user_id == 7)
+ x = q.filter(users.c.user_id == 7).all()
x[0].addresses
l[0] = x
self.assert_sql_count(testbase.db, go, 2)
class MapperExtensionTest(PersistTest):
def setUpAll(self):
tables.create()
-
+
global methods, Ext
-
+
methods = []
-
+
class Ext(MapperExtension):
def load(self, query, *args, **kwargs):
methods.append('load')
clear_mappers()
methods[:] = []
tables.delete()
-
+
def tearDownAll(self):
tables.drop()
-
+
def test_basic(self):
"""test that common user-defined methods get called."""
mapper(User, users, extension=Ext())
sess.flush()
sess.delete(u)
sess.flush()
- self.assertEquals(methods, ['before_insert', 'after_insert', 'load', 'translate_row', 'populate_instance', 'get',
+ self.assertEquals(methods, ['before_insert', 'after_insert', 'load', 'translate_row', 'populate_instance', 'get',
'translate_row', 'create_instance', 'populate_instance', 'before_update', 'after_update', 'before_delete', 'after_delete'])
def test_inheritance(self):
# test using inheritance
class AdminUser(User):
pass
-
+
mapper(User, users, extension=Ext())
mapper(AdminUser, addresses, inherits=User)
-
+
sess = create_session()
am = AdminUser()
sess.save(am)
sess.flush()
sess.delete(am)
sess.flush()
- self.assertEquals(methods, ['before_insert', 'after_insert', 'load', 'translate_row', 'populate_instance', 'get',
+ self.assertEquals(methods, ['before_insert', 'after_insert', 'load', 'translate_row', 'populate_instance', 'get',
'translate_row', 'create_instance', 'populate_instance', 'before_update', 'after_update', 'before_delete', 'after_delete'])
def test_after_with_no_changes(self):
# test that after_update is called even if no cols were updated
-
+
mapper(Item, orderitems, extension=Ext() , properties={
'keywords':relation(Keyword, secondary=itemkeywords)
})
mapper(Keyword, keywords, extension=Ext() )
-
+
sess = create_session()
i1 = Item()
k1 = Keyword()
i1.keywords.append(k1)
sess.flush()
self.assertEquals(methods, ['before_update', 'after_update'])
-
-
+
+
def test_inheritance_with_dupes(self):
# test using inheritance, same extension on both mappers
class AdminUser(User):
sess.flush()
sess.delete(am)
sess.flush()
- self.assertEquals(methods, ['before_insert', 'after_insert', 'load', 'translate_row', 'populate_instance', 'get',
+ self.assertEquals(methods, ['before_insert', 'after_insert', 'load', 'translate_row', 'populate_instance', 'get',
'translate_row', 'create_instance', 'populate_instance', 'before_update', 'after_update', 'before_delete', 'after_delete'])
-
+
class RequirementsTest(AssertMixin):
"""Tests the contract for user classes."""
s.flush()
self.assertEquals(t1.count().scalar(), 4)
-
+
h6 = H6()
h6.h1a = h1
h6.h1b = h1
h6.h1a = h1
h6.h1b = x = H1()
assert x in s
-
+
h6.h1b.h2s.append(H2())
s.flush()
self.description = description
class O2OTest(AssertMixin):
+ @testing.uses_deprecated('SessionContext')
def setUpAll(self):
global jack, port, metadata, ctx
metadata = MetaData(testbase.db)
ctx = SessionContext(create_session)
- jack = Table('jack', metadata,
+ jack = Table('jack', metadata,
Column('id', Integer, primary_key=True),
#Column('room_id', Integer, ForeignKey("room.id")),
Column('number', String(50)),
)
- port = Table('port', metadata,
+ port = Table('port', metadata,
Column('id', Integer, primary_key=True),
#Column('device_id', Integer, ForeignKey("device.id")),
Column('name', String(30)),
clear_mappers()
def tearDownAll(self):
metadata.drop_all()
-
+
+ @testing.uses_deprecated('SessionContext')
def test1(self):
mapper(Port, port, extension=ctx.mapper_extension)
mapper(Jack, jack, order_by=[jack.c.number],properties = {
'port': relation(Port, backref='jack', uselist=False, lazy=True),
- }, extension=ctx.mapper_extension)
+ }, extension=ctx.mapper_extension)
j=Jack(number='101')
p=Port(name='fa0/1')
ctx.current.delete(j)
ctx.current.flush()
-if __name__ == "__main__":
+if __name__ == "__main__":
testbase.main()
D.mapper = mapper(D, tbl_d)
C.mapper = mapper(C, tbl_c, properties=dict(
- d_rows=relation(D, private=True, backref="c_row"),
+ d_rows=relation(D, cascade="all, delete-orphan", backref="c_row"),
))
B.mapper = mapper(B, tbl_b)
A.mapper = mapper(A, tbl_a, properties=dict(
- c_rows=relation(C, private=True, backref="a_row"),
+ c_rows=relation(C, cascade="all, delete-orphan", backref="a_row"),
))
D.mapper.add_property("b_row", relation(B))
def tearDownAll(self):
metadata.drop_all()
+ @testing.uses_deprecated('foreignkey option')
+ # TODO: fixme!
def testexplicit(self):
"""test with mappers that have fairly explicit join conditions"""
+
class Company(object):
pass
class Employee(object):
assert sess.query(Employee).get([c1.company_id, 3]).reports_to.name == 'emp1'
assert sess.query(Employee).get([c2.company_id, 3]).reports_to.name == 'emp5'
+ @testing.uses_deprecated('foreignkey option')
+ # TODO: fixme!
def testimplicit(self):
"""test with mappers that have the most minimal arguments"""
class Company(object):
mapper(Page, pages, properties={
'job': relation(Job, backref=backref('pages', cascade="all, delete-orphan", order_by=pages.c.pagename)),
'currentversion': relation(PageVersion,
- foreignkey=pages.c.current_version,
+ foreign_keys=[pages.c.current_version],
primaryjoin=and_(pages.c.jobno==pageversions.c.jobno,
pages.c.pagename==pageversions.c.pagename,
pages.c.current_version==pageversions.c.version),
s.flush()
s.clear()
- j = s.query(Job).get_by(jobno=u'somejob')
+ j = s.query(Job).filter_by(jobno=u'somejob').one()
oldp = list(j.pages)
j.pages = []
s.flush()
s.clear()
- j = s.query(Job).get_by(jobno=u'somejob2')
+ j = s.query(Job).filter_by(jobno=u'somejob2').one()
j.pages[1].current_version = 12
s.delete(j)
s.flush()
sess.flush()
sess.commit() # commit does nothing
trans.rollback() # rolls back
- assert len(sess.query(User).select()) == 0
+ assert len(sess.query(User).all()) == 0
sess.close()
@testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
sess.rollback()
trans.commit()
- assert len(sess.query(User).select()) == 1
+ assert len(sess.query(User).all()) == 1
except:
conn.close()
raise
sess.flush()
sess.commit() # commit does nothing
sess.rollback() # rolls back
- assert len(sess.query(User).select()) == 0
+ assert len(sess.query(User).all()) == 0
sess.close()
@testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
sess.rollback()
sess.commit()
- assert len(sess.query(User).select()) == 1
+ assert len(sess.query(User).all()) == 1
sess.close()
@testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
sess.rollback()
sess.commit()
- assert len(sess.query(User).select()) == 1
+ assert len(sess.query(User).all()) == 1
sess.close()
@engines.close_open_connections
assert str(e) == "Session already has a Connection associated for the given Engine"
transaction.rollback()
- assert len(sess.query(User).select()) == 0
+ assert len(sess.query(User).all()) == 0
sess.close()
def test_bound_connection_transactional(self):
class SessionContextTest(AssertMixin):
def setUp(self):
clear_mappers()
-
+
def do_test(self, class_, context):
"""test session assignment on object creation"""
obj = class_()
del context.current
assert context.current != new_session
assert old_session == object_session(obj)
-
+
obj2 = class_()
assert context.current == object_session(obj2)
-
+
+ @testing.uses_deprecated('SessionContext')
def test_mapper_extension(self):
context = SessionContext(Session)
class User(object): pass
if __name__ == "__main__":
- testbase.main()
+ testbase.main()
s.commit()
s.close()
- u = s.query(m).select()[0]
+ u = s.query(m).all()[0]
print u.addresses[0].user
class VersioningTest(ORMTest):
t1.t2s.append(Test2())
Session.commit()
Session.close()
- t1 = Session.query(Test).get_by(id=t1.id)
+ t1 = Session.query(Test).filter_by(id=t1.id).one()
assert len(t1.t2s) == 2
class UnicodeSchemaTest(ORMTest):
f1.data = pickleable.Bar(4,5)
Session.commit()
Session.close()
- f2 = Session.query(Foo).get_by(id=f1.id)
+ f2 = Session.query(Foo).filter_by(id=f1.id).one()
assert f2.data == f1.data
f2.data.y = 19
assert f2 in Session.dirty
Session.commit()
Session.close()
- f3 = Session.query(Foo).get_by(id=f1.id)
+ f3 = Session.query(Foo).filter_by(id=f1.id).one()
print f2.data, f3.data
assert f3.data != f1.data
assert f3.data == pickleable.Bar(4, 19)
Session.close()
- f2 = Session.query(Foo).get_by(id=f1.id)
+ f2 = Session.query(Foo).filter_by(id=f1.id).one()
def go():
Session.commit()
self.assert_sql_count(testbase.db, go, 1)
Session.close()
- f3 = Session.query(Foo).get_by(id=f1.id)
+ f3 = Session.query(Foo).filter_by(id=f1.id).one()
print f2.data, f3.data
assert (f3.data.x, f3.data.y) == (4,19)
u.addresses.append(Address())
Session.commit()
Session.close()
- ulist = Session.query(m1).select()
+ ulist = Session.query(m1).all()
u1 = ulist[0]
u1.user_name = 'newname'
Session.commit()
au = AddressUser()
Session.commit()
Session.close()
- l = Session.query(AddressUser).selectone()
+ l = Session.query(AddressUser).one()
self.assert_(l.user_id == au.user_id and l.address_id == au.address_id)
def test_deferred(self):
item.item_name = elem['item_name']
item.keywords = []
if elem['keywords'][1]:
- klist = Session.query(keywordmapper).select(keywords.c.name.in_([e['name'] for e in elem['keywords'][1]]))
+ klist = Session.query(keywordmapper).filter(keywords.c.name.in_([e['name'] for e in elem['keywords'][1]]))
else:
klist = []
khash = {}
Session.commit()
- l = Session.query(m).select(items.c.item_name.in_([e['item_name'] for e in data[1:]]), order_by=[items.c.item_name])
+ l = Session.query(m).filter(items.c.item_name.in_([e['item_name'] for e in data[1:]])).order_by(items.c.item_name).all()
self.assert_result(l, *data)
objects[4].item_name = 'item4updated'
)
sometable = Table( 'Manager', metadata,
Column('obj_id', Integer, Sequence('obj_id_seq'), ),
- Column('name', String, ),
+ Column('name', String(128)),
Column('id', Integer, Sequence('Manager_id_seq', optional=True),
primary_key=True),
)
Column('col1', Integer),
Column('col2', Float))
assert isinstance(t.c.col1.label('hi').type, Integer)
- assert isinstance(select([t.c.col2], scalar=True).label('lala').type, Float)
+ assert isinstance(select([t.c.col2]).as_scalar().label('lala').type, Float)
class LongLabelsTest(SQLCompileTest):
def setUpAll(self):
datetable.create()
try:
datetable.insert().execute(id=1, today=datetime.datetime(2006, 5, 12, 12, 0, 0))
- s = select([datetable.alias('x').c.today], scalar=True)
+ s = select([datetable.alias('x').c.today]).as_scalar()
s2 = select([datetable.c.id, s.label('somelabel')])
#print s2.c.somelabel.type
assert isinstance(s2.execute().fetchone()['somelabel'], datetime.datetime)
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable ORDER BY (SELECT myothertable.otherid FROM myothertable WHERE mytable.myid = myothertable.otherid) DESC"
)
-
+ @testing.uses_deprecated('scalar option')
def test_scalar_select(self):
try:
s = select([table1.c.myid, table1.c.name]).as_scalar()
dialect=postgres.dialect()
)
+ @testing.uses_deprecated('//get_params')
def testbindparam(self):
for (
stmt,
self.assert_compile(stmt, expected_positional_stmt, dialect=sqlite.dialect())
nonpositional = stmt.compile()
positional = stmt.compile(dialect=sqlite.dialect())
- testing.squelch_deprecation(positional.get_params)
pp = positional.get_params()
assert [pp[k] for k in positional.positiontup] == expected_default_params_list
assert nonpositional.get_params(**test_param_dict) == expected_test_params_dict, "expected :%s got %s" % (str(expected_test_params_dict), str(nonpositional.get_params(**test_param_dict)))
pp = positional.get_params(**test_param_dict)
assert [pp[k] for k in positional.positiontup] == expected_test_params_list
- testing.enable_deprecation(positional.get_params)
# check that params() doesnt modify original statement
s = select([table1], or_(table1.c.myid==bindparam('myid'), table2.c.otherid==bindparam('myotherid')))
"SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable WHERE mytable.myid = "\
":mytable_myid_1) AS anon_1 FROM mytable WHERE mytable.myid = (SELECT mytable.myid FROM mytable WHERE mytable.myid = :mytable_myid_1)")
positional = s2.compile(dialect=sqlite.dialect())
- testing.squelch_deprecation(positional.get_params)
+
pp = positional.get_params()
- testing.enable_deprecation(positional.get_params)
assert [pp[k] for k in positional.positiontup] == [12, 12]
# check that conflicts with "unique" params are caught
self.assert_compile(select([table1], table1.c.myid.in_([])),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE (CASE WHEN (mytable.myid IS NULL) THEN NULL ELSE 0 END = 1)")
+ @testing.uses_deprecated('passing in_')
def test_in_deprecated_api(self):
self.assert_compile(select([table1], table1.c.myid.in_('abc')),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid_1)")
# monkeypatches unittest.TestLoader.suiteClass at import time
import testbase
-import itertools, unittest, re, sys, os, operator
+import itertools, unittest, re, sys, os, operator, warnings
from cStringIO import StringIO
import testlib.config as config
sql, MetaData, clear_mappers, Session, util = None, None, None, None, None
-
+salogging = None
__all__ = ('PersistTest', 'AssertMixin', 'ORMTest', 'SQLCompileTest')
bind = config.db
return bind.dialect.server_version_info(bind.contextual_connect())
+def uses_deprecated(*messages):
+ """Mark a test as immune from fatal deprecation warnings.
+
+ With no arguments, squelches all SADeprecationWarning failures.
+ Or pass one or more strings; these will be matched to the root
+ of the warning description by warnings.filterwarnings().
+
+ As a special case, you may pass a function name prefixed with //
+ and it will be re-written as needed to match the standard warning
+ verbiage emitted by the sqlalchemy.util.deprecated decorator.
+ """
+
+ def decorate(fn):
+ def safe(*args, **kw):
+ global salogging
+ if salogging is None:
+ from sqlalchemy import logging as salogging
+
+ if not messages:
+ filters = [dict(action='ignore',
+ category=salogging.SADeprecationWarning)]
+ else:
+ filters = [dict(action='ignore',
+ message=message,
+ category=salogging.SADeprecationWarning)
+ for message in
+ [ (m.startswith('//') and
+ ('Call to deprecated function ' + m[2:]) or m)
+ for m in messages] ]
+
+ for f in filters:
+ warnings.filterwarnings(**f)
+ try:
+ return fn(*args, **kw)
+ finally:
+ resetwarnings()
+ try:
+ safe.__name__ = fn.name
+ except:
+ pass
+ return safe
+ return decorate
+
+def resetwarnings():
+ """Reset warning behavior to testing defaults."""
+
+ global salogging
+ if salogging is None:
+ from sqlalchemy import logging as salogging
+ warnings.resetwarnings()
+ warnings.filterwarnings('error', category=salogging.SADeprecationWarning)
def against(*queries):
"""Boolean predicate, compares to testing database configuration.
return set([tuple(row) for row in results])
-def squelch_deprecation(callable_):
- _set_deprecation(callable_, False)
-
-def enable_deprecation(callable_):
- _set_deprecation(callable_, True)
-
-def _set_deprecation(callable_, state):
- if hasattr(callable_, 'im_func'):
- callable_ = callable_.im_func
- assert hasattr(callable_, 'warn'), 'Callable is not deprecated'
- setattr(callable_, 'warn', state)
-
class TestData(object):
"""Tracks SQL expressions as they are executed via an instrumented ExecutionContext."""
"""A TestSuite with once per TestCase setUpAll() and tearDownAll()"""
def __init__(self, tests=()):
- if len(tests) >0 and isinstance(tests[0], PersistTest):
+ if len(tests) > 0 and isinstance(tests[0], PersistTest):
self._initTest = tests[0]
else:
self._initTest = None
_server_version())
return True
try:
+ resetwarnings()
init.setUpAll()
except:
# skip tests if global setup fails
result.addError(test, ex)
return False
try:
+ resetwarnings()
return self.do_run(result)
finally:
try:
+ resetwarnings()
if init is not None:
init.tearDownAll()
except: