def setUpAll(self):
global dbmeta, owners, categories, tests, options, Owner, Category, Test, Option, false
dbmeta = MetaData(testbase.db)
-
+
# determine a literal value for "false" based on the dialect
# FIXME: this PassiveDefault setup is bogus.
bp = Boolean().dialect_impl(testbase.db.dialect).bind_processor(testbase.db.dialect)
false = text('FALSE')
else:
false = str(False)
-
+
owners = Table ( 'owners', dbmeta ,
Column ( 'id', Integer, primary_key=True, nullable=False ),
Column('data', String(30)) )
'category':relation(Category),
'owner_option': relation(Option,primaryjoin=and_(tests.c.id==options.c.test_id,tests.c.owner_id==options.c.owner_id),
foreignkey=[options.c.test_id, options.c.owner_id],
- uselist=False )
+ uselist=False )
})
s=create_session()
def test_noorm(self):
"""test the control case"""
- # I want to display a list of tests owned by owner 1
+ # I want to display a list of tests owned by owner 1
# if someoption is false or he hasn't specified it yet (null)
# but not if he set it to true (example someoption is for hiding)
from_obj=[tests.join(categories).outerjoin(options,and_(tests.c.id==options.c.test_id,tests.c.owner_id==options.c.owner_id))] ).execute().fetchall()
print result
assert result == [(1, u'Some Category'), (3, u'Some Category')]
-
+
def test_withouteagerload(self):
s = create_session()
l=s.query(Test).select ( and_(tests.c.owner_id==1,or_(options.c.someoption==None,options.c.someoption==False)),
assert result == [u'1 Some Category', u'3 Some Category']
def test_witheagerload(self):
- """test that an eagerload locates the correct "from" clause with
+ """test that an eagerload locates the correct "from" clause with
which to attach to, when presented with a query that already has a complicated from clause."""
s = create_session()
q=s.query(Test).options(eagerload('category'))
"""test the same as witheagerload except using generative"""
s = create_session()
q=s.query(Test).options(eagerload('category'))
- l=q.filter (
+ l=q.filter (
and_(tests.c.owner_id==1,or_(options.c.someoption==None,options.c.someoption==False))
).outerjoin('owner_option')
-
+
result = ["%d %s" % ( t.id,t.category.name ) for t in l]
print result
assert result == [u'1 Some Category', u'3 Some Category']
s = create_session()
q=s.query(Test).options(eagerload('category'))
l=q.select( (tests.c.owner_id==1) & ('options.someoption is null or options.someoption=%s' % false) & q.join_to('owner_option') )
- result = ["%d %s" % ( t.id,t.category.name ) for t in l]
+ result = ["%d %s" % ( t.id,t.category.name ) for t in l]
print result
assert result == [u'3 Some Category']
s = create_session()
q=s.query(Test).options(eagerload('category'))
l=q.select( (tests.c.owner_id==1) & ((options.c.someoption==None) | (options.c.someoption==False)) & q.join_to('owner_option') )
- result = ["%d %s" % ( t.id,t.category.name ) for t in l]
+ result = ["%d %s" % ( t.id,t.category.name ) for t in l]
print result
assert result == [u'3 Some Category']
def tearDown(self):
for t in metadata.table_iterator(reverse=True):
t.delete().execute()
-
+
+ @testing.fails_on('maxdb')
def testeagerterminate(self):
"""test that eager query generation does not include the same mapper's table twice.
-
+
or, that bi-directional eager loads dont include each other in eager query generation."""
class Middle(object):
def __init__(self, data): self.data = data
Column ( 'id', Integer, primary_key=True, nullable=False ),
Column ( 'data_id', Integer, ForeignKey('datas.id')),
Column ( 'somedata', Integer, nullable=False ))
-
+
+ @testing.fails_on('maxdb')
def test_nesting_with_functions(self):
class Data(object): pass
class Foo(object):pass
d.a=x
s.save(d)
data.append(d)
-
+
for x in range(10):
rid=random.randint(0,len(data) - 1)
somedata=random.randint(1,50000)
[stats.c.data_id,func.max(stats.c.somedata).label('max')],
stats.c.data_id<=25,
group_by=[stats.c.data_id]).alias('arb')
-
+
arb_result = arb_data.execute().fetchall()
# order the result list descending based on 'max'
arb_result.sort(lambda a, b:cmp(b['max'],a['max']))
# extract just the "data_id" from it
arb_result = [row['data_id'] for row in arb_result]
-
- # now query for Data objects using that above select, adding the
+
+ # now query for Data objects using that above select, adding the
# "order by max desc" separately
q=s.query(Data).options(eagerload('foo')).select(
from_obj=[datas.join(arb_data,arb_data.c.data_id==datas.c.id)],
order_by=[desc(arb_data.c.max)],limit=10)
-
+
# extract "data_id" from the list of result objects
verify_result = [d.id for d in q]
-
+
# assert equality including ordering (may break if the DB "ORDER BY" and python's sort() used differing
# algorithms and there are repeated 'somedata' values in the list)
assert verify_result == arb_result
Column('department_id', Integer, primary_key=True),
Column('name', String(50)))
- employees = Table('employees', metadata,
+ employees = Table('employees', metadata,
Column('person_id', Integer, primary_key=True),
Column('name', String(50)),
Column('department_id', Integer,
ForeignKey('departments.department_id')))
+ @testing.fails_on('maxdb')
def test_basic(self):
class Department(object):
def __init__(self, **kwargs):
assert q[0] is d2
class EagerTest5(ORMTest):
- """test the construction of AliasedClauses for the same eager load property but different
+ """test the construction of AliasedClauses for the same eager load property but different
parent mappers, due to inheritance"""
def define_tables(self, metadata):
global base, derived, derivedII, comments
base = Table(
'base', metadata,
- Column('uid', String(30), primary_key=True),
+ Column('uid', String(30), primary_key=True),
Column('x', String(30))
)
class EagerTest6(ORMTest):
def define_tables(self, metadata):
global designType, design, part, inheritedPart
- designType = Table('design_types', metadata,
+ designType = Table('design_types', metadata,
Column('design_type_id', Integer, primary_key=True),
)
- design =Table('design', metadata,
+ design =Table('design', metadata,
Column('design_id', Integer, primary_key=True),
Column('design_type_id', Integer, ForeignKey('design_types.design_type_id')))
- part = Table('parts', metadata,
+ part = Table('parts', metadata,
Column('part_id', Integer, primary_key=True),
Column('design_id', Integer, ForeignKey('design.design_id')),
Column('design_type_id', Integer, ForeignKey('design_types.design_type_id')))
))
class_mapper(Design).add_property("type", relation(DesignType, lazy=False, backref="designs"))
-
+
class_mapper(Part).add_property("design", relation(Design, lazy=False, backref=backref("parts", cascade="all, delete-orphan")))
-
+
#Part.mapper.add_property("designType", relation(DesignType))
d = Design()
invoice_table = Table('invoices', metadata,
Column('invoice_id', Integer, Sequence('invoice_id_seq', optional=True), primary_key = True),
Column('company_id', Integer, ForeignKey("companies.company_id")),
- Column('date', DateTime),
+ Column('date', DateTime),
)
items_table = Table('items', metadata,
return "Item: " + repr(getattr(self, 'item_id', None)) + " " + repr(getattr(self, 'invoice_id', None)) + " " + repr(self.code) + " " + repr(self.qty)
def testone(self):
- """tests eager load of a many-to-one attached to a one-to-many. this testcase illustrated
+ """tests eager load of a many-to-one attached to a one-to-many. this testcase illustrated
the bug, which is that when the single Company is loaded, no further processing of the rows
occurred in order to load the Company's second Address object."""
testbase.db.execute(task_type_t.insert(), {'id':1})
testbase.db.execute(task_t.insert(), {'title':'task 1', 'task_type_id':1, 'status_id':1, 'prj_id':1})
+ @testing.fails_on('maxdb')
def test_nested_joins(self):
# this is testing some subtle column resolution stuff,
# concerning corresponding_column() being extremely accurate
tsk_cnt_join = outerjoin(project_t, task_t, task_t.c.prj_id==project_t.c.id)
- ss = select([project_t.c.id.label('prj_id'), func.count(task_t.c.id).label('tasks_number')],
+ ss = select([project_t.c.id.label('prj_id'), func.count(task_t.c.id).label('tasks_number')],
from_obj=[tsk_cnt_join], group_by=[project_t.c.id]).alias('prj_tsk_cnt_s')
j = join(project_t, ss, project_t.c.id == ss.c.prj_id)
mapper(Message_Type, message_type_t)
- mapper(Message, message_t,
+ mapper(Message, message_t,
properties=dict(type=relation(Message_Type, lazy=False, uselist=False),
))
tsk_cnt_join = outerjoin(project_t, task_t, task_t.c.prj_id==project_t.c.id)
- ss = select([project_t.c.id.label('prj_id'), func.count(task_t.c.id).label('tasks_number')],
+ ss = select([project_t.c.id.label('prj_id'), func.count(task_t.c.id).label('tasks_number')],
from_obj=[tsk_cnt_join], group_by=[project_t.c.id]).alias('prj_tsk_cnt_s')
j = join(project_t, ss, project_t.c.id == ss.c.prj_id)
session = create_session()
for t in session.query(cls.mapper).limit(10).offset(0).list():
- print t.id, t.title, t.props_cnt
+ print t.id, t.title, t.props_cnt
class EagerTest9(ORMTest):
- """test the usage of query options to eagerly load specific paths.
-
- this relies upon the 'path' construct used by PropertyOption to relate
- LoaderStrategies to specific paths, as well as the path state maintained
+ """test the usage of query options to eagerly load specific paths.
+
+ this relies upon the 'path' construct used by PropertyOption to relate
+ LoaderStrategies to specific paths, as well as the path state maintained
throughout the query setup/mapper instances process.
"""
-
+
def define_tables(self, metadata):
global accounts_table, transactions_table, entries_table
accounts_table = Table('accounts', metadata,
Column('transaction_id', Integer, ForeignKey(transactions_table.c.transaction_id)),
)
+ @testing.fails_on('maxdb')
def test_eagerload_on_path(self):
class Account(fixtures.Base):
pass
def go():
# load just the first Account. eager loading will actually load all objects saved thus far,
- # but will not eagerly load the "accounts" off the immediate "entries"; only the
+ # but will not eagerly load the "accounts" off the immediate "entries"; only the
# "accounts" off the entries->transaction->entries
acc = session.query(Account).options(eagerload_all('entries.transaction.entries.account')).first()
-
+
# no sql occurs
assert acc.name == 'acc1'
assert acc.entries[0].transaction.entries[0].account.name == 'acc1'
# lazyload triggers but no sql occurs because many-to-one uses cached query.get()
for e in acc.entries:
assert e.account is acc
-
+
self.assert_sql_count(testbase.db, go, 1)
-
-
-
-if __name__ == "__main__":
+
+
+
+if __name__ == "__main__":
testbase.main()
orders = relation(
mapper(tables.Order, tables.orders, properties = dict (
items = relation(mapper(tables.Item, tables.orderitems), lazy=True, uselist =True, cascade="all, delete-orphan")
- )),
+ )),
lazy = True, uselist = True, cascade="all, delete-orphan")
))
def setUp(self):
global data
data = [tables.User,
- {'user_name' : 'ed',
+ {'user_name' : 'ed',
'address' : (tables.Address, {'email_address' : 'foo@bar.com'}),
'orders' : (tables.Order, [
- {'description' : 'eds 1st order', 'items' : (tables.Item, [{'item_name' : 'eds o1 item'}, {'item_name' : 'eds other o1 item'}])},
+ {'description' : 'eds 1st order', 'items' : (tables.Item, [{'item_name' : 'eds o1 item'}, {'item_name' : 'eds other o1 item'}])},
{'description' : 'eds 2nd order', 'items' : (tables.Item, [{'item_name' : 'eds o2 item'}, {'item_name' : 'eds other o2 item'}])}
])
},
- {'user_name' : 'jack',
+ {'user_name' : 'jack',
'address' : (tables.Address, {'email_address' : 'jack@jack.com'}),
'orders' : (tables.Order, [
{'description' : 'jacks 1st order', 'items' : (tables.Item, [{'item_name' : 'im a lumberjack'}, {'item_name' : 'and im ok'}])}
])
},
- {'user_name' : 'foo',
+ {'user_name' : 'foo',
'address' : (tables.Address, {'email_address': 'hi@lala.com'}),
'orders' : (tables.Order, [
- {'description' : 'foo order', 'items' : (tables.Item, [])},
- {'description' : 'foo order 2', 'items' : (tables.Item, [{'item_name' : 'hi'}])},
+ {'description' : 'foo order', 'items' : (tables.Item, [])},
+ {'description' : 'foo order 2', 'items' : (tables.Item, [{'item_name' : 'hi'}])},
{'description' : 'foo order three', 'items' : (tables.Item, [{'item_name' : 'there'}])}
])
- }
+ }
]
sess = create_session()
assert o2 in sess
sess.flush()
sess.clear()
-
+
u = sess.query(tables.User).get(u.user_id)
o3 = tables.Order()
o3.description='order3'
assert o3 in sess
assert o4 in sess
sess.flush()
-
+
o5 = tables.Order()
o5.description='order5'
sess.save(o5)
assert False
except exceptions.FlushError, e:
assert "is an orphan" in str(e)
-
-
+
+
def testdelete(self):
sess = create_session()
l = sess.query(tables.User).all()
def testdelete2(self):
"""test that unloaded collections are still included in a delete-cascade by default."""
-
+
sess = create_session()
u = sess.query(tables.User).get_by(user_name='ed')
# assert 'addresses' collection not loaded
"""test that cascade only reaches instances that are still part of the collection,
not those that have been removed"""
sess = create_session()
-
+
u = tables.User()
u.user_name = 'newuser'
o = tables.Order()
u.orders.append(o)
sess.save(u)
sess.flush()
-
+
u.orders.remove(o)
sess.delete(u)
assert u in sess.deleted
assert o not in sess.deleted
-
+
def testorphan(self):
sess = create_session()
ctx.current.clear()
for t in metadata.table_iterator(reverse=True):
t.delete().execute()
-
+
def tearDownAll(self):
clear_mappers()
metadata.drop_all()
-
+
def setUpAll(self):
global ctx, data, metadata, User, Pref, Extra
ctx = SessionContext(create_session)
metadata = MetaData(testbase.db)
- extra = Table("extra", metadata,
+ extra = Table("extra", metadata,
Column("extra_id", Integer, Sequence("extra_id_seq", optional=True), primary_key=True),
Column("prefs_id", Integer, ForeignKey("prefs.prefs_id"))
)
- prefs = Table('prefs', metadata,
+ prefs = Table('prefs', metadata,
Column('prefs_id', Integer, Sequence('prefs_id_seq', optional=True), primary_key=True),
Column('prefs_data', String(40)))
-
+
users = Table('users', metadata,
Column('user_id', Integer, Sequence('user_id_seq', optional=True), primary_key = True),
Column('user_name', String(40)),
ctx.current.flush()
ctx.current.clear()
+ @testing.fails_on('maxdb')
def testorphan(self):
jack = ctx.current.query(User).get_by(user_name='jack')
p = jack.pref
assert p not in ctx.current
assert e not in ctx.current
+ @testing.fails_on('maxdb')
def testorphan2(self):
jack = ctx.current.query(User).get_by(user_name='jack')
p = jack.pref
ctx.current.flush()
assert p not in ctx.current
assert e not in ctx.current
-
-
+
+
class M2MCascadeTest(AssertMixin):
def setUpAll(self):
global metadata, a, b, atob
metadata = MetaData(testbase.db)
- a = Table('a', metadata,
+ a = Table('a', metadata,
Column('id', Integer, primary_key=True),
Column('data', String(30))
)
- b = Table('b', metadata,
+ b = Table('b', metadata,
Column('id', Integer, primary_key=True),
Column('data', String(30))
)
- atob = Table('atob', metadata,
+ atob = Table('atob', metadata,
Column('aid', Integer, ForeignKey('a.id')),
Column('bid', Integer, ForeignKey('b.id'))
-
+
)
metadata.create_all()
-
+
def tearDownAll(self):
metadata.drop_all()
class B(object):
def __init__(self, data):
self.data = data
-
+
mapper(A, a, properties={
# if no backref here, delete-orphan failed until [ticket:427] was fixed
'bs':relation(B, secondary=atob, cascade="all, delete-orphan")
})
mapper(B, b)
-
+
sess = create_session()
a1 = A('a1')
b1 = B('b1')
a1.bs.append(b1)
sess.save(a1)
sess.flush()
-
+
a1.bs.remove(b1)
sess.flush()
assert atob.count().scalar() ==0
assert b.count().scalar() == 0
assert a.count().scalar() == 1
-
+
def testcascadedelete(self):
class A(object):
def __init__(self, data):
class B(object):
def __init__(self, data):
self.data = data
-
+
mapper(A, a, properties={
'bs':relation(B, secondary=atob, cascade="all, delete-orphan")
})
mapper(B, b)
-
+
sess = create_session()
a1 = A('a1')
b1 = B('b1')
a1.bs.append(b1)
sess.save(a1)
sess.flush()
-
+
sess.delete(a1)
sess.flush()
assert atob.count().scalar() ==0
class UnsavedOrphansTest(ORMTest):
"""tests regarding pending entities that are orphans"""
-
+
def define_tables(self, metadata):
global users, addresses, User, Address
users = Table('users', metadata,
)
class User(object):pass
class Address(object):pass
-
+
def test_pending_orphan(self):
"""test that an entity that never had a parent on a delete-orphan cascade cant be saved."""
assert a.address_id is None, "Error: address should not be persistent"
def test_delete_new_object(self):
- """test that an entity which is attached then detached from its
+ """test that an entity which is attached then detached from its
parent with a delete-orphan cascade gets counted as an orphan"""
mapper(Address, addresses)
)
def testdeletechildwithchild(self):
- """test that an entity which is attached then detached from its
+ """test that an entity which is attached then detached from its
parent with a delete-orphan cascade gets counted as an orphan, as well
as its own child instances"""
class DoubleParentOrphanTest(AssertMixin):
"""test orphan detection for an entity with two parent relations"""
-
+
def setUpAll(self):
global metadata, address_table, businesses, homes
metadata = MetaData(testbase.db)
metadata.drop_all()
def test_non_orphan(self):
"""test that an entity can have two parent delete-orphan cascades, and persists normally."""
-
+
class Address(object):pass
class Home(object):pass
class Business(object):pass
b1.address = a2
[session.save(x) for x in [h1,b1]]
session.flush()
-
+
def test_orphan(self):
"""test that an entity can have two parent delete-orphan cascades, and is detected as an orphan
when saved without a parent."""
class DynamicTest(QueryTest):
keep_mappers = False
-
+
def setup_mappers(self):
pass
def go():
assert [User(id=7, addresses=[Address(id=1, email_address='jack@bean.com')])] == q.filter(User.id==7).all()
self.assert_sql_count(testbase.db, go, 2)
-
+
class FlushTest(FixtureTest):
def test_basic(self):
class Fixture(Base):
pass
-
+
mapper(User, users, properties={
'addresses':dynamic_loader(mapper(Address, addresses))
})
sess.save(u1)
sess.save(u2)
sess.flush()
-
+
sess.clear()
# test the test fixture a little bit
assert User(name='jack', addresses=[Address(email_address='wrong')]) != sess.query(User).first()
assert User(name='jack', addresses=[Address(email_address='lala@hoho.com')]) == sess.query(User).first()
-
+
assert [
User(name='jack', addresses=[Address(email_address='lala@hoho.com')]),
User(name='ed', addresses=[Address(email_address='foo@bar.com')])
] == sess.query(User).all()
+ @testing.fails_on('maxdb')
def test_delete(self):
mapper(User, users, properties={
'addresses':dynamic_loader(mapper(Address, addresses), backref='user')
u.addresses.append(Address(email_address='e'))
u.addresses.append(Address(email_address='f'))
sess.save(u)
-
+
assert Address(email_address='c') == u.addresses[2]
sess.delete(u.addresses[2])
sess.delete(u.addresses[4])
sess.delete(u.addresses[3])
assert [Address(email_address='a'), Address(email_address='b'), Address(email_address='d')] == list(u.addresses)
-
+
sess.delete(u)
sess.close()
+ @testing.fails_on('maxdb')
def test_remove_orphans(self):
mapper(User, users, properties={
'addresses':dynamic_loader(mapper(Address, addresses), cascade="all, delete-orphan", backref='user')
u.addresses.append(Address(email_address='f'))
sess.save(u)
- assert [Address(email_address='a'), Address(email_address='b'), Address(email_address='c'),
+ assert [Address(email_address='a'), Address(email_address='b'), Address(email_address='c'),
Address(email_address='d'), Address(email_address='e'), Address(email_address='f')] == sess.query(Address).all()
assert Address(email_address='c') == u.addresses[2]
-
+
try:
del u.addresses[3]
assert False
except TypeError, e:
assert "doesn't support item deletion" in str(e), str(e)
-
+
for a in u.addresses.filter(Address.email_address.in_(['c', 'e', 'f'])):
u.addresses.remove(a)
-
+
assert [Address(email_address='a'), Address(email_address='b'), Address(email_address='d')] == list(u.addresses)
assert [Address(email_address='a'), Address(email_address='b'), Address(email_address='d')] == sess.query(Address).all()
if not autoflush:
sess.flush()
-
+
assert u in sess
assert a in sess
-
+
self.assert_(list(u.addresses) == [a])
a.user = None
setattr(FlushTest, test_backref.__name__, test_backref)
for autoflush in (False, True):
- for saveuser in (False, True):
- create_backref_test(autoflush, saveuser)
+ for saveuser in (False, True):
+ create_backref_test(autoflush, saveuser)
if __name__ == '__main__':
testbase.main()
-
+
assert [
User(id=7, addresses=[
Address(id=1)
- ]),
+ ]),
User(id=8, addresses=[
Address(id=3, email_address='ed@bettyboop.com'),
Address(id=4, email_address='ed@lala.com'),
Address(id=2, email_address='ed@wood.com')
- ]),
+ ]),
User(id=9, addresses=[
Address(id=5)
- ]),
+ ]),
User(id=10, addresses=[])
] == q.all()
mapper(User, users, properties = dict(
addresses = relation(Address, lazy=False),
))
-
+
q = create_session().query(User)
l = q.filter(User.id==Address.user_id).order_by(Address.email_address).all()
-
+
assert [
User(id=8, addresses=[
Address(id=2, email_address='ed@wood.com'),
Address(id=3, email_address='ed@bettyboop.com'),
Address(id=4, email_address='ed@lala.com'),
- ]),
+ ]),
User(id=9, addresses=[
Address(id=5)
- ]),
+ ]),
User(id=7, addresses=[
Address(id=1)
- ]),
+ ]),
] == l
def test_orderby_desc(self):
assert [
User(id=7, addresses=[
Address(id=1)
- ]),
+ ]),
User(id=8, addresses=[
Address(id=2, email_address='ed@wood.com'),
Address(id=4, email_address='ed@lala.com'),
Address(id=3, email_address='ed@bettyboop.com'),
- ]),
+ ]),
User(id=9, addresses=[
Address(id=5)
- ]),
+ ]),
User(id=10, addresses=[])
] == sess.query(User).all()
def go():
assert fixtures.item_keyword_result == q.all()
self.assert_sql_count(testbase.db, go, 1)
-
+
def go():
assert fixtures.item_keyword_result[0:2] == q.join('keywords').filter(keywords.c.name == 'red').all()
self.assert_sql_count(testbase.db, go, 1)
def go():
assert fixtures.item_keyword_result[0:2] == q.options(eagerload('keywords')).join('keywords').filter(keywords.c.name == 'red').all()
-
+
self.assert_sql_count(testbase.db, go, 1)
def test_cyclical(self):
"""test that a circular eager relationship breaks the cycle with a lazy loader"""
-
+
mapper(Address, addresses)
mapper(User, users, properties = dict(
addresses = relation(Address, lazy=False, backref=backref('user', lazy=False))
))
assert class_mapper(User).get_property('addresses').lazy is False
assert class_mapper(Address).get_property('user').lazy is False
-
+
sess = create_session()
assert fixtures.user_address_result == sess.query(User).all()
-
+
def test_double(self):
"""tests eager loading with two relations simulatneously, from the same table, using aliases. """
openorders = alias(orders, 'openorders')
def test_no_false_hits(self):
"""test that eager loaders don't interpret main table columns as part of their eager load."""
-
+
mapper(User, users, properties={
'addresses':relation(Address, lazy=False),
'orders':relation(Order, lazy=False)
})
mapper(Address, addresses)
mapper(Order, orders)
-
+
allusers = create_session().query(User).all()
-
+
# using a textual select, the columns will be 'id' and 'name'.
- # the eager loaders have aliases which should not hit on those columns, they should
+ # the eager loaders have aliases which should not hit on those columns, they should
# be required to locate only their aliased/fully table qualified column name.
noeagers = create_session().query(User).from_statement("select * from users").all()
assert 'orders' not in noeagers[0].__dict__
assert 'addresses' not in noeagers[0].__dict__
+ @testing.fails_on('maxdb')
def test_limit(self):
"""test limit operations combined with lazy-load relationships."""
if testing.against('mysql'):
l = q.limit(2).all()
assert fixtures.user_all_result[:2] == l
- else:
+ else:
l = q.limit(2).offset(1).order_by(User.id).all()
print fixtures.user_all_result[1:3]
print l
assert fixtures.user_all_result[1:3] == l
-
+
def test_distinct(self):
# this is an involved 3x union of the users table to get a lot of rows.
# then see if the "distinct" works its way out. you actually get the same
l = q.filter(s.c.u2_id==User.c.id).distinct().all()
assert fixtures.user_address_result == l
self.assert_sql_count(testbase.db, go, 1)
-
+
+ @testing.fails_on('maxdb')
def test_limit_2(self):
mapper(Keyword, keywords)
mapper(Item, items, properties = dict(
keywords = relation(Keyword, secondary=item_keywords, lazy=False, order_by=[keywords.c.id]),
))
-
+
sess = create_session()
q = sess.query(Item)
l = q.filter((Item.c.description=='item 2') | (Item.c.description=='item 5') | (Item.c.description=='item 3')).\
order_by(Item.id).limit(2).all()
assert fixtures.item_keyword_result[1:3] == l
-
+
+ @testing.fails_on('maxdb')
def test_limit_3(self):
- """test that the ORDER BY is propigated from the inner select to the outer select, when using the
+ """test that the ORDER BY is propigated from the inner select to the outer select, when using the
'wrapped' select statement resulting from the combination of eager loading and limit/offset clauses."""
-
+
mapper(Item, items)
mapper(Order, orders, properties = dict(
items = relation(Item, secondary=order_items, lazy=False)
orders = relation(Order, lazy=False),
))
sess = create_session()
-
+
q = sess.query(User)
if not testing.against('maxdb', 'mssql'):
l = q.join('orders').order_by(Order.user_id.desc()).limit(2).offset(1)
assert [
- User(id=9,
+ User(id=9,
orders=[Order(id=2), Order(id=4)],
addresses=[Address(id=5)]
),
- User(id=7,
+ User(id=7,
orders=[Order(id=1), Order(id=3), Order(id=5)],
addresses=[Address(id=1)]
)
l = q.join('addresses').order_by(Address.email_address.desc()).limit(1).offset(0)
assert [
- User(id=7,
+ User(id=7,
orders=[Order(id=1), Order(id=3), Order(id=5)],
addresses=[Address(id=1)]
)
address = relation(mapper(Address, addresses), lazy=False, uselist=False)
))
q = create_session().query(User)
-
+
def go():
l = q.filter(users.c.id == 7).all()
assert [User(id=7, address=Address(id=1))] == l
self.assert_sql_count(testbase.db, go, 1)
+ @testing.fails_on('maxdb')
def test_many_to_one(self):
mapper(Address, addresses, properties = dict(
user = relation(mapper(User, users), lazy=False)
))
sess = create_session()
q = sess.query(Address)
-
+
def go():
a = q.filter(addresses.c.id==1).one()
assert a.user is not None
u1 = sess.query(User).get(7)
assert a.user is u1
self.assert_sql_count(testbase.db, go, 1)
-
+
def test_one_and_many(self):
- """tests eager load for a parent object with a child object that
+ """tests eager load for a parent object with a child object that
contains a many-to-many relationship to a third object."""
-
+
mapper(User, users, properties={
'orders':relation(Order, lazy=False)
})
- mapper(Item, items)
+ mapper(Item, items)
mapper(Order, orders, properties = dict(
items = relation(Item, secondary=order_items, lazy=False, order_by=items.c.id)
))
-
+
q = create_session().query(User)
-
+
l = q.filter("users.id in (7, 8, 9)")
-
+
def go():
assert fixtures.user_order_result[0:3] == l.all()
self.assert_sql_count(testbase.db, go, 1)
def test_double_with_aggregate(self):
max_orders_by_user = select([func.max(orders.c.id).label('order_id')], group_by=[orders.c.user_id]).alias('max_orders_by_user')
-
+
max_orders = orders.select(orders.c.id==max_orders_by_user.c.order_id).alias('max_orders')
-
+
mapper(Order, orders)
mapper(User, users, properties={
'orders':relation(Order, backref='user', lazy=False),
'max_order':relation(mapper(Order, max_orders, non_primary=True), lazy=False, uselist=False)
})
q = create_session().query(User)
-
+
def go():
assert [
User(id=7, orders=[
Order(id=1),
Order(id=3),
Order(id=5),
- ],
+ ],
max_order=Order(id=5)
),
User(id=8, orders=[]),
- User(id=9, orders=[Order(id=2),Order(id=4)],
+ User(id=9, orders=[Order(id=2),Order(id=4)],
max_order=Order(id=4)
),
User(id=10),
"""test eager loading of a mapper which is against a select"""
s = select([orders], orders.c.isopen==1).alias('openorders')
-
+
mapper(Order, s, properties={
'user':relation(User, lazy=False)
})
Order(id=3, user=User(id=7)),
Order(id=4, user=User(id=9))
] == q.all()
-
+
q = q.select_from(s.join(order_items).join(items)).filter(~Item.id.in_([1, 2, 5]))
assert [
Order(id=3, user=User(id=7)),
def test_aliasing(self):
"""test that eager loading uses aliases to insulate the eager load from regular criterion against those tables."""
-
+
mapper(User, users, properties = dict(
addresses = relation(mapper(Address, addresses), lazy=False)
))
Column('id', Integer, Sequence('node_id_seq', optional=True), primary_key=True),
Column('parent_id', Integer, ForeignKey('nodes.id')),
Column('data', String(30)))
-
+
+ @testing.fails_on('maxdb')
def test_basic(self):
class Node(Base):
def append(self, node):
self.children.append(node)
-
+
mapper(Node, nodes, properties={
'children':relation(Node, lazy=False, join_depth=3)
})
),
])
+ @testing.fails_on('maxdb')
def test_no_depth(self):
class Node(Base):
def append(self, node):
class SelfReferentialM2MEagerTest(ORMTest):
def define_tables(self, metadata):
global widget, widget_rel
-
+
widget = Table('widget', metadata,
Column('id', Integer, primary_key=True),
Column('name', Unicode(40), nullable=False, unique=True),
sess.save(w1)
sess.flush()
sess.clear()
-
+
# l = sess.query(Widget).filter(Widget.name=='w1').all()
# print l
assert [Widget(name='w1', children=[Widget(name='w2')])] == sess.query(Widget).filter(Widget.name=='w1').all()
-
+
class CyclicalInheritingEagerTest(ORMTest):
def define_tables(self, metadata):
global t1, t2
- t1 = Table('t1', metadata,
+ t1 = Table('t1', metadata,
Column('c1', Integer, primary_key=True),
Column('c2', String(30)),
Column('type', String(30))
)
- t2 = Table('t2', metadata,
+ t2 = Table('t2', metadata,
Column('c1', Integer, primary_key=True),
Column('c2', String(30)),
Column('type', String(30)),
Column('t1.id', Integer, ForeignKey('t1.c1')))
-
+
def test_basic(self):
class T(object):
pass
-
+
class SubT(T):
pass
-
+
class T2(object):
pass
class SubT2(T2):
pass
-
+
mapper(T, t1, polymorphic_on=t1.c.type, polymorphic_identity='t1')
mapper(SubT, None, inherits=T, polymorphic_identity='subt1', properties={
't2s':relation(SubT2, lazy=False, backref=backref('subt', lazy=False))
})
mapper(T2, t2, polymorphic_on=t2.c.type, polymorphic_identity='t2')
mapper(SubT2, None, inherits=T2, polymorphic_identity='subt2')
-
+
# testing a particular endless loop condition in eager join setup
create_session().query(SubT).all()
-
-
+
+
if __name__ == '__main__':
testbase.main()
Column('id', Integer, Sequence('foo_id_seq'), primary_key=True),
Column('bar', Integer),
Column('range', Integer))
-
+
mapper(Foo, foo)
metadata.create_all()
-
+
sess = create_session(bind=testbase.db)
for i in range(100):
sess.save(Foo(bar=i, range=i%10))
sess.flush()
-
+
def tearDownAll(self):
metadata.drop_all()
clear_mappers()
-
+
def test_selectby(self):
res = create_session(bind=testbase.db).query(Foo).filter_by(range=5)
assert res.order_by([Foo.c.bar])[0].bar == 5
assert res.order_by([desc(Foo.c.bar)])[0].bar == 95
-
+
@testing.unsupported('mssql')
+ @testing.fails_on('maxdb')
def test_slice(self):
sess = create_session(bind=testbase.db)
query = sess.query(Foo)
query = create_session(bind=testbase.db).query(Foo)
assert query.filter(foo.c.bar<30).apply_avg(foo.c.bar).first() == 14.5
assert query.filter(foo.c.bar<30).apply_avg(foo.c.bar).one() == 14.5
-
+
def test_filter(self):
query = create_session(bind=testbase.db).query(Foo)
assert query.count() == 100
assert query.filter(Foo.c.bar < 30).count() == 30
res2 = query.filter(Foo.c.bar < 30).filter(Foo.c.bar > 10)
assert res2.count() == 19
-
+
def test_options(self):
query = create_session(bind=testbase.db).query(Foo)
class ext1(MapperExtension):
instance.TEST = "hello world"
return EXT_CONTINUE
assert query.options(extension(ext1()))[0].TEST == "hello world"
-
+
def test_order_by(self):
query = create_session(bind=testbase.db).query(Foo)
assert query.order_by([Foo.c.bar])[0].bar == 0
def test_offset(self):
query = create_session(bind=testbase.db).query(Foo)
assert list(query.order_by([Foo.c.bar]).offset(10))[0].bar == 10
-
+
def test_offset(self):
query = create_session(bind=testbase.db).query(Foo)
assert len(list(query.limit(10))) == 10
filter(or_(tables.Order.c.order_id==None,tables.Item.c.item_id==2))
print x.compile()
self.assert_result(list(x), tables.User, *tables.user_result[1:3])
-
+
class CaseSensitiveTest(PersistTest):
def setUpAll(self):
def tearDownAll(self):
metadata.drop_all()
clear_mappers()
-
+
def test_distinctcount(self):
q = create_session(bind=testbase.db).query(Obj1)
assert q.count() == 4
class SelfRefTest(ORMTest):
def define_tables(self, metadata):
global t1
- t1 = Table('t1', metadata,
+ t1 = Table('t1', metadata,
Column('id', Integer, primary_key=True),
Column('parent_id', Integer, ForeignKey('t1.id'))
)
assert False
except exceptions.InvalidRequestError, e:
assert str(e) == "Self-referential query on 'T.children (T)' property requires create_aliases=True argument.", str(e)
-
-
-
+
+
+
if __name__ == "__main__":
- testbase.main()
+ testbase.main()
return "Bar id %d, data %s" % (self.id, self.data)
mapper(Bar, bar, inherits=Foo)
-
+
class Blub(Bar):
def __repr__(self):
return "Blub id %d, data %s" % (self.id, self.data)
self.assert_(l[0].parent_foo.data == 'foo #1' and l[1].parent_foo.data == 'foo #1')
-class GetTest(ORMTest):
+class GetTest(ORMTest):
def define_tables(self, metadata):
global foo, bar, blub
foo = Table('foo', metadata,
Column('foo_id', Integer, ForeignKey('foo.id')),
Column('bar_id', Integer, ForeignKey('bar.id')),
Column('data', String(20)))
-
+
def create_test(polymorphic):
def test_get(self):
class Foo(object):
class Bar(Foo):
pass
-
+
class Blub(Bar):
pass
mapper(Foo, foo)
mapper(Bar, bar, inherits=Foo)
mapper(Blub, blub, inherits=Bar)
-
+
sess = create_session()
f = Foo()
b = Bar()
sess.save(b)
sess.save(bl)
sess.flush()
-
+
if polymorphic:
def go():
assert sess.query(Foo).get(f.id) == f
assert sess.query(Bar).get(b.id) == b
assert sess.query(Bar).get(bl.id) == bl
assert sess.query(Blub).get(bl.id) == bl
-
+
self.assert_sql_count(testbase.db, go, 0)
else:
- # this is testing the 'wrong' behavior of using get()
+ # this is testing the 'wrong' behavior of using get()
# polymorphically with mappers that are not configured to be
# polymorphic. the important part being that get() always
# returns an instance of the query's type.
def go():
assert sess.query(Foo).get(f.id) == f
-
+
bb = sess.query(Foo).get(b.id)
assert isinstance(b, Foo) and bb.id==b.id
-
+
bll = sess.query(Foo).get(bl.id)
assert isinstance(bll, Foo) and bll.id==bl.id
-
+
assert sess.query(Bar).get(b.id) == b
-
+
bll = sess.query(Bar).get(bl.id)
assert isinstance(bll, Bar) and bll.id == bl.id
-
+
assert sess.query(Blub).get(bl.id) == bl
-
+
self.assert_sql_count(testbase.db, go, 3)
-
+
return test_get
-
+
test_get_polymorphic = create_test(True)
test_get_nonpolymorphic = create_test(False)
class ConstructionTest(ORMTest):
def define_tables(self, metadata):
global content_type, content, product
- content_type = Table('content_type', metadata,
+ content_type = Table('content_type', metadata,
Column('id', Integer, primary_key=True)
)
content = Table('content', metadata,
Column('content_type_id', Integer, ForeignKey('content_type.id')),
Column('type', String(30))
)
- product = Table('product', metadata,
+ product = Table('product', metadata,
Column('id', Integer, ForeignKey('content.id'), primary_key=True)
)
p = Product()
p.contenttype = ContentType()
# TODO: assertion ??
-
+
class EagerLazyTest(ORMTest):
"""tests eager load/lazy load of child items off inheritance mappers, tests that
LazyLoader constructs the right query condition."""
Column('foo_id', Integer, ForeignKey('foo.id'))
)
+ @testing.fails_on('maxdb')
def testbasic(self):
class Foo(object): pass
class Bar(Foo): pass
bar_foo.insert().execute(bar_id=1, foo_id=3)
bar_foo.insert().execute(bar_id=2, foo_id=4)
-
+
sess = create_session()
q = sess.query(Bar)
self.assert_(len(q.selectfirst().lazy) == 1)
roles = Table('role', metadata,
Column('id', Integer, primary_key=True),
- Column('description', String(32))
+ Column('description', String(32))
)
user_roles = Table('user_role', metadata,
Column('admin_id', Integer, primary_key=True),
Column('user_id', Integer, ForeignKey('users.id'))
)
-
+
def testone(self):
class User(object):pass
class Role(object):pass
class Admin(User):pass
role_mapper = mapper(Role, roles)
user_mapper = mapper(User, users, properties = {
- 'roles' : relation(Role, secondary=user_roles, lazy=False, private=False)
+ 'roles' : relation(Role, secondary=user_roles, lazy=False, private=False)
}
)
admin_mapper = mapper(Admin, admins, inherits=user_mapper)
a.password = 'admin'
sess.save(a)
sess.flush()
-
+
assert user_roles.count().scalar() == 1
def testtwo(self):
}
)
- admin_mapper = mapper(Admin, admins, inherits=user_mapper)
+ admin_mapper = mapper(Admin, admins, inherits=user_mapper)
# create roles
adminrole = Role('admin')
"""test that syncrules compile properly on custom inherit conds"""
def define_tables(self, metadata):
global _a_table, _b_table, _c_table
-
+
_a_table = Table('a', metadata,
Column('id', Integer, primary_key=True),
Column('data1', String)
Column('b_a_id', Integer, ForeignKey('b.a_id'), primary_key=True),
Column('data3', String)
)
-
+
def test_joins(self):
for j1 in (None, _b_table.c.a_id==_a_table.c.id, _a_table.c.id==_b_table.c.a_id):
for j2 in (None, _b_table.c.a_id==_c_table.c.b_a_id, _c_table.c.b_a_id==_b_table.c.a_id):
self._do_test(j1, j2)
for t in _a_table.metadata.table_iterator(reverse=True):
t.delete().execute().close()
-
+
def _do_test(self, j1, j2):
class A(object):
def __init__(self, **kwargs):
class C(B):
pass
-
+
mapper(A, _a_table)
- mapper(B, _b_table, inherits=A,
+ mapper(B, _b_table, inherits=A,
inherit_condition=j1
)
- mapper(C, _c_table, inherits=B,
+ mapper(C, _c_table, inherits=B,
inherit_condition=j2
)
assert len(session.query(B).all()) == 2
assert len(session.query(C).all()) == 1
-
-if __name__ == "__main__":
+
+if __name__ == "__main__":
testbase.main()
primary_key=True),
Column('name', String(50), nullable=False))
- users = Table('prin_users', metadata,
+ users = Table('prin_users', metadata,
Column('principal_id', Integer,
ForeignKey('principals.principal_id'), primary_key=True),
Column('password', String(50), nullable=False),
mapper(Group, groups, inherits=Principal, properties={
'users': relation(User, secondary=user_group_map,
- lazy=True, backref="groups")
+ lazy=True, backref="groups")
})
g = Group(name="group1")
sess.save(g)
sess.flush()
# TODO: put an assertion
-
+
class InheritTest2(ORMTest):
"""deals with inheritance and many-to-many relationships"""
def define_tables(self, metadata):
def __init__(self, data=None):
self.data = data
class Bar(Foo):pass
-
+
mapper(Foo, foo)
mapper(Bar, bar, inherits=Foo)
print foo.join(bar).primary_key
sess.save(b)
sess.flush()
sess.clear()
-
+
# test that "bar.bid" does not need to be referenced in a get
# (ticket 185)
assert sess.query(Bar).get(b.id).id == b.id
-
+
def testbasic(self):
- class Foo(object):
+ class Foo(object):
def __init__(self, data=None):
self.data = data
mapper(Bar, bar, inherits=Foo, properties={
'foos': relation(Foo, secondary=foo_bar, lazy=False)
})
-
+
sess = create_session()
b = Bar('barfoo')
sess.save(b)
Column('id', Integer, ForeignKey('bar.id'), primary_key=True),
Column('data', String(20)))
- bar_foo = Table('bar_foo', metadata,
+ bar_foo = Table('bar_foo', metadata,
Column('bar_id', Integer, ForeignKey('bar.id')),
Column('foo_id', Integer, ForeignKey('foo.id')))
-
+
blub_bar = Table('bar_blub', metadata,
Column('blub_id', Integer, ForeignKey('blub.id')),
Column('bar_id', Integer, ForeignKey('bar.id')))
blub_foo = Table('blub_foo', metadata,
Column('blub_id', Integer, ForeignKey('blub.id')),
Column('foo_id', Integer, ForeignKey('foo.id')))
-
+
def testbasic(self):
class Foo(object):
def __init__(self, data=None):
class Bar(Foo):
def __repr__(self):
return "Bar id %d, data %s" % (self.id, self.data)
-
+
mapper(Bar, bar, inherits=Foo, properties={
'foos' :relation(Foo, secondary=bar_foo, lazy=True)
})
l = sess.query(Bar).select()
print repr(l[0]) + repr(l[0].foos)
self.assert_(repr(l[0]) + repr(l[0].foos) == compare)
-
- def testadvanced(self):
+
+ @testing.fails_on('maxdb')
+ def testadvanced(self):
class Foo(object):
def __init__(self, data=None):
self.data = data
class Blub(Bar):
def __repr__(self):
return "Blub id %d, data %s, bars %s, foos %s" % (self.id, self.data, repr([b for b in self.bars]), repr([f for f in self.foos]))
-
+
mapper(Blub, blub, inherits=Bar, properties={
'bars':relation(Bar, secondary=blub_bar, lazy=False),
'foos':relation(Foo, secondary=blub_foo, lazy=False),
x = sess.query(Blub).get_by(id=blubid)
print x
self.assert_(repr(x) == compare)
-
-
-if __name__ == "__main__":
+
+
+if __name__ == "__main__":
testbase.main()
Column('id', Integer, ForeignKey('table1.id'), primary_key=True),
)
- data = Table('data', metadata,
+ data = Table('data', metadata,
Column('id', Integer, primary_key=True),
Column('node_id', Integer, ForeignKey('table1.id')),
Column('data', String(30))
)
-
+
#join = polymorphic_union(
# {
# 'table3' : table1.join(table3),
# 'table2' : table1.join(table2),
# 'table1' : table1.select(table1.c.type.in_(['table1', 'table1b'])),
# }, None, 'pjoin')
-
+
join = table1.outerjoin(table2).outerjoin(table3).alias('pjoin')
#join = None
-
+
class Table1(object):
def __init__(self, name, data=None):
self.name = name
class Table1B(Table1):
pass
-
+
class Table2(Table1):
pass
class Table3(Table1):
pass
-
+
class Data(object):
def __init__(self, data):
self.data = data
def __repr__(self):
return "%s(%d, %s)" % (self.__class__.__name__, self.id, repr(str(self.data)))
-
+
try:
# this is how the mapping used to work. ensure that this raises an error now
table1_mapper = mapper(Table1, table1,
polymorphic_on=table1.c.type,
polymorphic_identity='table1',
properties={
- 'next': relation(Table1,
- backref=backref('prev', primaryjoin=join.c.id==join.c.related_id, foreignkey=join.c.id, uselist=False),
+ 'next': relation(Table1,
+ backref=backref('prev', primaryjoin=join.c.id==join.c.related_id, foreignkey=join.c.id, uselist=False),
uselist=False, primaryjoin=join.c.id==join.c.related_id),
'data':relation(mapper(Data, data))
}
except:
assert True
clear_mappers()
-
+
# currently, the "eager" relationships degrade to lazy relationships
# due to the polymorphic load.
- # the "next" relation used to have a "lazy=False" on it, but the EagerLoader raises the "self-referential"
+ # the "next" relation used to have a "lazy=False" on it, but the EagerLoader raises the "self-referential"
# exception now. since eager loading would never work for that relation anyway, its better that the user
# gets an exception instead of it silently not eager loading.
table1_mapper = mapper(Table1, table1,
polymorphic_on=table1.c.type,
polymorphic_identity='table1',
properties={
- 'next': relation(Table1,
- backref=backref('prev', primaryjoin=table1.c.id==table1.c.related_id, remote_side=table1.c.id, uselist=False),
+ 'next': relation(Table1,
+ backref=backref('prev', primaryjoin=table1.c.id==table1.c.related_id, remote_side=table1.c.id, uselist=False),
uselist=False, primaryjoin=table1.c.id==table1.c.related_id),
'data':relation(mapper(Data, data), lazy=False)
}
polymorphic_identity='table2')
table3_mapper = mapper(Table3, table3, inherits=table1_mapper, polymorphic_identity='table3')
-
+
table1_mapper.compile()
assert table1_mapper.primary_key == [table1.c.id], table1_mapper.primary_key
-
+
+ @testing.fails_on('maxdb')
def testone(self):
self.do_testlist([Table1, Table2, Table1, Table2])
+ @testing.fails_on('maxdb')
def testtwo(self):
self.do_testlist([Table3])
-
+
+ @testing.fails_on('maxdb')
def testthree(self):
self.do_testlist([Table2, Table1, Table1B, Table3, Table3, Table1B, Table1B, Table2, Table1])
+ @testing.fails_on('maxdb')
def testfour(self):
self.do_testlist([
- Table2('t2', [Data('data1'), Data('data2')]),
+ Table2('t2', [Data('data1'), Data('data2')]),
Table1('t1', []),
Table3('t3', [Data('data3')]),
Table1B('t1b', [Data('data4'), Data('data5')])
])
-
+
def do_testlist(self, classes):
sess = create_session( )
# save to DB
sess.save(t)
sess.flush()
-
+
# string version of the saved list
assertlist = []
node = t
assert n.next is node
node = n
backwards = repr(assertlist)
-
+
# everything should match !
print "ORIGNAL", original
print "BACKWARDS",backwards
clear_mappers()
def setUp(self):
pass
-
+
class MapperTest(MapperSuperTest):
def test_propconflict(self):
assert False
except exceptions.ArgumentError:
pass
-
+
def test_prop_accessor(self):
mapper(User, users)
try:
assert False
except NotImplementedError, uoe:
assert str(uoe) == "Public collection of MapperProperty objects is provided by the get_property() and iterate_properties accessors."
-
+
def test_badcascade(self):
mapper(Address, addresses)
try:
assert False
except exceptions.ArgumentError, e:
assert str(e) == "Invalid cascade option 'fake'"
-
+
def test_columnprefix(self):
- mapper(User, users, column_prefix='_', properties={
- 'user_name':synonym('_user_name')
+ mapper(User, users, column_prefix='_', properties={
+ 'user_name':synonym('_user_name')
})
s = create_session()
assert u._user_name=='jack'
assert u._user_id ==7
assert not hasattr(u, 'user_name')
- u2 = s.query(User).filter_by(user_name='jack').one()
+ u2 = s.query(User).filter_by(user_name='jack').one()
assert u is u2
-
+
def test_refresh(self):
mapper(User, users, properties={'addresses':relation(mapper(Address, addresses), backref='user')})
s = create_session()
self.assert_(a in u.addresses)
s.refresh(u)
-
+
# its refreshed, so not dirty
self.assert_(u not in s.dirty)
-
+
# username is back to the DB
self.assert_(u.user_name == 'jack')
-
+
self.assert_(a not in u.addresses)
-
+
u.user_name = 'foo'
u.addresses.append(a)
# now its dirty
def test_compileonsession(self):
m = mapper(User, users)
session = create_session()
- session.connection(m)
+ session.connection(m)
def test_expirecascade(self):
mapper(User, users, properties={'addresses':relation(mapper(Address, addresses), cascade="all, refresh-expire")})
u.addresses[0].email_address = 'someotheraddress'
s.expire(u)
assert u.addresses[0].email_address == 'ed@wood.com'
-
+
def test_refreshwitheager(self):
"""test that a refresh/expire operation loads rows properly and sends correct "isnew" state to eager loaders"""
mapper(User, users, properties={'addresses':relation(mapper(Address, addresses), lazy=False)})
assert len(u.addresses) == 3
s.expire(u)
assert len(u.addresses) == 3
-
+
def test_incompletecolumns(self):
"""test loading from a select which does not contain all columns"""
mapper(Address, addresses)
assert a.user_id == 7
assert a.address_id == 1
assert a.email_address is None
-
+
def test_badconstructor(self):
"""test that if the construction of a mapped class fails, the instnace does not get placed in the session"""
class Foo(object):
pass
def test_constructorexceptions(self):
- """test that exceptions raised in the mapped class are not masked by sa decorations"""
+ """test that exceptions raised in the mapped class are not masked by sa decorations"""
ex = AssertionError('oops')
sess = create_session()
assert False
except Exception, e:
assert e is ex
-
+
def test_refresh_lazy(self):
"""test that when a lazy loader is set as a trigger on an object's attribute (at the attribute level, not the class level), a refresh() operation doesnt fire the lazy loader or create any problems"""
s = create_session()
# test plain expire
self.assert_(u.user_name =='jack')
self.assert_(len(u.addresses) == 1)
-
+
# we're changing the database here, so if this test fails in the middle,
# it'll screw up the other tests which are hardcoded to 7/'jack'
u.user_name = 'foo'
s.query(User).select()
# test that it refreshed
self.assert_(u.__dict__['user_name'] == 'jack')
-
- # object should be back to normal now,
+
+ # object should be back to normal now,
# this should *not* produce a SELECT statement (not tested here though....)
self.assert_(u.user_name =='jack')
-
+
+ @testing.fails_on('maxdb')
def test_refresh2(self):
"""test a hang condition that was occuring on expire/refresh"""
-
+
s = create_session()
m1 = mapper(Address, addresses)
print u.user_name #this line runs
s.refresh(u) #hangs
-
+
def test_props(self):
m = mapper(User, users, properties = {
'addresses' : relation(mapper(Address, addresses))
}).compile()
self.assert_(User.addresses.property is m.get_property('addresses'))
-
+
def test_compileonprop(self):
mapper(User, users, properties = {
'addresses' : relation(mapper(Address, addresses))
mapper(User, users)
mapper(Foo, addresses, inherits=User)
assert getattr(Foo().__class__, 'user_name').impl is not None
-
+
def test_addproperty(self):
m = mapper(User, users)
mapper(Address, addresses)
m.add_property('user_name', deferred(users.c.user_name))
m.add_property('name', synonym('user_name'))
m.add_property('addresses', relation(Address))
-
+
sess = create_session(transactional=True)
assert sess.query(User).get(7)
u = sess.query(User).filter_by(name='jack').one()
-
+
def go():
self.assert_result([u], User, user_address_result[0])
assert u.user_name == 'jack'
-
+
self.assert_sql_count(testbase.db, go, 2)
-
+
u3 = User()
u3.user_name = 'some user'
sess.save(u3)
sess.flush()
sess.rollback()
-
+
def test_propfilters(self):
t = Table('person', MetaData(),
Column('id', Integer, primary_key=True),
p_m.compile()
#compile_mappers()
-
+
def assert_props(cls, want):
have = set([n for n in dir(cls) if not n.startswith('_')])
want = set(want)
q = create_session().query(m)
l = q.select()
self.assert_result(l, User, *user_result[0:2])
-
+
def test_mappingtojoinnopk(self):
metadata = MetaData()
account_ids_table = Table('account_ids', metadata,
assert testbase.db.execute(account_stuff_table.count()).scalar() == 0
finally:
metadata.drop_all(testbase.db)
-
+
def test_mappingtoouterjoin(self):
"""test mapping to an outer join, with a composite primary key that allows nulls"""
result = [
{'user_id' : 8, 'address_id' : 4},
{'user_id' : 9, 'address_id':None}
]
-
+
j = join(users, addresses, isouter=True)
m = mapper(User, j, allow_null_pks=True, primary_key=[users.c.user_id, addresses.c.address_id])
q = create_session().query(m)
l = q.select()
self.assert_result(l, User, *result)
-
+
def test_customjoin(self):
"""test that the from_obj parameter to query.select() can be used
to totally replace the FROM parameters of the generated query."""
q = create_session().query(m)
l = q.select((orderitems.c.item_name=='item 4'), from_obj=[users.join(orders).join(orderitems)])
self.assert_result(l, User, user_result[0])
-
+
def test_orderby(self):
"""test ordering at the mapper and query level"""
# TODO: make a unit test out of these various combinations
# m = mapper(User, users, order_by=desc(users.c.user_name))
mapper(User, users, order_by=None)
# mapper(User, users)
-
+
# l = create_session().query(User).select(order_by=[desc(users.c.user_name), asc(users.c.user_id)])
l = create_session().query(User).select()
# l = create_session().query(User).select(order_by=[])
# l = create_session().query(User).select(order_by=None)
-
-
- @testing.unsupported('firebird')
+
+
+ @testing.unsupported('firebird')
def test_function(self):
"""test mapping to a SELECT statement that has functions in it."""
s = select([users, (users.c.user_id * 2).label('concat'), func.count(addresses.c.address_id).label('count')],
assert l[0].concat == l[0].user_id * 2 == 14
assert l[1].concat == l[1].user_id * 2 == 16
- @testing.unsupported('firebird')
+ @testing.unsupported('firebird')
def test_count(self):
"""test the count function on Query.
-
+
(why doesnt this work on firebird?)"""
mapper(User, users)
q = create_session().query(User)
self.assert_(False, "should have raised ArgumentError")
except exceptions.ArgumentError, e:
self.assert_(True)
-
+
clear_mappers()
# assert that allow_column_override cancels the error
m = mapper(User, users, properties = {
'user_name' : relation(mapper(Address, addresses))
}, allow_column_override=True)
-
+
clear_mappers()
# assert that the column being named else where also cancels the error
m = mapper(User, users, properties = {
adlist = synonym('addresses', proxy=True),
adname = synonym('addresses')
))
-
+
assert hasattr(User, 'adlist')
assert not hasattr(User, 'adname')
-
+
u = sess.query(User).get_by(uname='jack')
self.assert_result(u.adlist, Address, *(user_address_result[0]['addresses'][1]))
assert hasattr(u, 'adlist')
assert not hasattr(u, 'adname')
-
+
addr = sess.query(Address).get_by(address_id=user_address_result[0]['addresses'][1][0]['address_id'])
u = sess.query(User).get_by(adname=addr)
u2 = sess.query(User).get_by(adlist=addr)
assert u is u2
-
+
assert u not in sess.dirty
u.uname = "some user name"
assert u.uname == "some user name"
assert u.user_name == "some user name"
assert u in sess.dirty
+ @testing.fails_on('maxdb')
def test_synonymoptions(self):
sess = create_session()
mapper(User, users, properties = dict(
addresses = relation(mapper(Address, addresses), lazy = True),
adlist = synonym('addresses', proxy=True)
))
-
+
def go():
u = sess.query(User).options(eagerload('adlist')).get_by(user_name='jack')
self.assert_result(u.adlist, Address, *(user_address_result[0]['addresses'][1]))
self.assert_sql_count(testbase.db, go, 1)
-
+
def test_extensionoptions(self):
sess = create_session()
class ext1(MapperExtension):
assert l.TEST_2 == "also hello world"
assert not hasattr(l.addresses[0], 'TEST')
assert not hasattr(l.addresses[0], 'TEST2')
-
+
def test_eageroptions(self):
"""tests that a lazy relation can be upgraded to an eager relation via the options method"""
sess = create_session()
self.assert_result(l, User, *user_address_result)
self.assert_sql_count(testbase.db, go, 0)
+ @testing.fails_on('maxdb')
def test_eageroptionswithlimit(self):
sess = create_session()
mapper(User, users, properties = dict(
self.assert_sql_count(testbase.db, go, 0)
sess.clear()
-
+
# test that eager loading doesnt modify parent mapper
def go():
u = sess.query(User).get_by(user_id=8)
assert u.user_id == 8
assert len(u.addresses) == 3
assert "tbl_row_count" not in self.capture_sql(testbase.db, go)
-
+
+ @testing.fails_on('maxdb')
def test_lazyoptionswithlimit(self):
sess = create_session()
mapper(User, users, properties = dict(
self.assert_sql_count(testbase.db, go, 1)
sess.clear()
-
+
# then select just from users. run it into instances.
# then assert the data, which will launch 3 more lazy loads
# (previous users in session fell out of scope and were removed from session's identity map)
l = usermapper.instances(r, sess)
self.assert_result(l, User, *user_address_result)
self.assert_sql_count(testbase.db, go, 4)
-
+
clear_mappers()
sess.clear()
-
+
# test with a deeper set of eager loads. when we first load the three
# users, they will have no addresses or orders. the number of lazy loads when
- # traversing the whole thing will be three for the addresses and three for the
+ # traversing the whole thing will be three for the addresses and three for the
# orders.
# (previous users in session fell out of scope and were removed from session's identity map)
usermapper = mapper(User, users,
self.assert_sql_count(testbase.db, go, 1)
sess.clear()
-
+
# then select just from users. run it into instances.
# then assert the data, which will launch 6 more lazy loads
def go():
l = usermapper.instances(r, sess)
self.assert_result(l, User, *user_all_result)
self.assert_sql_count(testbase.db, go, 7)
-
-
+
+
def test_lazyoptions(self):
"""tests that an eager relation can be upgraded to a lazy relation via the options method"""
sess = create_session()
def test_latecompile(self):
"""tests mappers compiling late in the game"""
-
+
mapper(User, users, properties = {'orders': relation(Order)})
mapper(Item, orderitems, properties={'keywords':relation(Keyword, secondary=itemkeywords)})
mapper(Keyword, keywords)
mapper(Order, orders, properties={'items':relation(Item)})
-
+
sess = create_session()
u = sess.query(User).select()
def go():
}))
}))
})
-
+
sess = create_session()
-
+
# eagerload nothing.
u = sess.query(User).select()
def go():
print u[0].orders[1].items[0].keywords[1]
self.assert_sql_count(testbase.db, go, 3)
sess.clear()
-
-
+
+
print "-------MARK----------"
# eagerload orders.items.keywords; eagerload_all() implies eager load of orders, orders.items
q2 = sess.query(User).options(eagerload_all('orders.items.keywords'))
print "-------MARK4----------"
sess.clear()
-
+
# eagerload "keywords" on items. it will lazy load "orders", then lazy load
# the "items" on the order, but on "items" it will eager load the "keywords"
print "-------MARK5----------"
q3 = sess.query(User).options(eagerload('orders.items.keywords'))
u = q3.select()
self.assert_sql_count(testbase.db, go, 2)
-
-
+
+
class DeferredTest(MapperSuperTest):
def test_basic(self):
"""tests a basic "deferred" load"""
-
+
m = mapper(Order, orders, properties={
'description':deferred(orders.c.description)
})
-
+
o = Order()
self.assert_(o.description is None)
m = mapper(Order, orders, properties={
'description':deferred(orders.c.description)
})
-
+
sess = create_session()
o = Order()
sess.save(o)
def go():
o.description = "some description"
self.assert_sql_count(testbase.db, go, 0)
-
+
def test_save(self):
m = mapper(Order, orders, properties={
'description':deferred(orders.c.description)
})
-
+
sess = create_session()
q = sess.query(m)
l = q.select()
o2 = l[2]
o2.isopen = 1
sess.flush()
-
+
def test_group(self):
"""tests deferred load with a group"""
m = mapper(Order, orders, properties = {
("SELECT orders.order_id AS orders_order_id FROM orders ORDER BY %s" % orderby, {}),
("SELECT orders.user_id AS orders_user_id, orders.description AS orders_description, orders.isopen AS orders_isopen FROM orders WHERE orders.order_id = :param_1", {'param_1':3})
])
-
+
o2 = q.select()[2]
# assert o2.opened == 1
assert o2.description == 'order 3'
def go():
sess.flush()
self.assert_sql_count(testbase.db, go, 0)
-
+
def test_commitsstate(self):
"""test that when deferred elements are loaded via a group, they get the proper CommittedState
and dont result in changes being committed"""
-
+
m = mapper(Order, orders, properties = {
'userident':deferred(orders.c.user_id, group='primary'),
'description':deferred(orders.c.description, group='primary'),
# therefore the flush() shouldnt actually issue any SQL
sess.flush()
self.assert_sql_count(testbase.db, go, 0)
-
+
def test_options(self):
"""tests using options on a mapper to create deferred and undeferred columns"""
m = mapper(Order, orders)
def go():
l = q2.select()
print l[2].user_id
-
+
orderby = str(orders.default_order_by()[0].compile(testbase.db))
self.assert_sql(testbase.db, go, [
("SELECT orders.order_id AS orders_order_id, orders.description AS orders_description, orders.isopen AS orders_isopen FROM orders ORDER BY %s" % orderby, {}),
("SELECT orders.user_id AS orders_user_id, orders.description AS orders_description, orders.isopen AS orders_isopen, orders.order_id AS orders_order_id FROM orders ORDER BY %s" % orderby, {}),
])
-
+
def test_deepoptions(self):
m = mapper(User, users, properties={
'orders':relation(mapper(Order, orders, properties={
Column('id', Integer, primary_key=True),
Column('version_id', Integer, primary_key=True),
Column('name', String(30)))
-
- edges = Table('edges', metadata,
+
+ edges = Table('edges', metadata,
Column('id', Integer, primary_key=True),
Column('graph_id', Integer, nullable=False),
Column('graph_version_id', Integer, nullable=False),
self.x = x
self.y = y
def __composite_values__(self):
- return [self.x, self.y]
+ return [self.x, self.y]
def __eq__(self, other):
return other.x == self.x and other.y == self.y
def __ne__(self, other):
def __init__(self, start, end):
self.start = start
self.end = end
-
+
mapper(Graph, graphs, properties={
'edges':relation(Edge)
})
'start':composite(Point, edges.c.x1, edges.c.y1),
'end':composite(Point, edges.c.x2, edges.c.y2)
})
-
+
sess = create_session()
g = Graph()
g.id = 1
g.edges.append(Edge(Point(14, 5), Point(2, 7)))
sess.save(g)
sess.flush()
-
+
sess.clear()
g2 = sess.query(Graph).get([g.id, g.version_id])
for e1, e2 in zip(g.edges, g2.edges):
assert e1.start == e2.start
assert e1.end == e2.end
-
+
g2.edges[1].end = Point(18, 4)
sess.flush()
sess.clear()
assert sess.query(Edge).get(g2.edges[1].id).end == Point(19, 5)
g.edges[1].end = Point(19, 5)
-
+
sess.clear()
def go():
g2 = sess.query(Graph).options(eagerload('edges')).get([g.id, g.version_id])
assert e1.start == e2.start
assert e1.end == e2.end
self.assert_sql_count(testbase.db, go, 1)
-
+
# test comparison of CompositeProperties to their object instances
g = sess.query(Graph).get([1, 1])
assert sess.query(Edge).filter(Edge.start==Point(3, 4)).one() is g.edges[0]
-
+
assert sess.query(Edge).filter(Edge.start!=Point(3, 4)).first() is g.edges[1]
assert sess.query(Edge).filter(Edge.start==None).all() == []
-
-
+
+
def test_pk(self):
"""test using a composite type as a primary key"""
-
+
class Version(object):
def __init__(self, id, version):
self.id = id
return other.id == self.id and other.version == self.version
def __ne__(self, other):
return not self.__eq__(other)
-
+
class Graph(object):
def __init__(self, version):
self.version = version
-
+
mapper(Graph, graphs, properties={
'version':composite(Version, graphs.c.id, graphs.c.version_id)
})
-
+
sess = create_session()
g = Graph(Version(1, 1))
sess.save(g)
sess.flush()
-
+
sess.clear()
g2 = sess.query(Graph).get([1, 1])
assert g.version == g2.version
sess.clear()
-
+
g2 = sess.query(Graph).get(Version(1, 1))
assert g.version == g2.version
-
-
-
+
+
+
class NoLoadTest(MapperSuperTest):
def test_basic(self):
"""tests a basic one-to-many lazy load"""
x[0].addresses
l[0] = x
self.assert_sql_count(testbase.db, go, 1)
-
+
self.assert_result(l[0], User,
{'user_id' : 7, 'addresses' : (Address, [])},
)
x[0].addresses
l[0] = x
self.assert_sql_count(testbase.db, go, 2)
-
+
self.assert_result(l[0], User,
{'user_id' : 7, 'addresses' : (Address, [{'address_id' : 1}])},
)
m = mapper(User, users, extension=Ext(), properties = dict(
addresses = relation(Address, lazy=True),
))
-
+
q = create_session().query(m)
l = q.select();
self.assert_result(l, User, *user_address_result)
-
+
def test_methods(self):
"""test that common user-defined methods get called."""
-
+
methods = set()
class Ext(MapperExtension):
def load(self, query, *args, **kwargs):
def after_delete(self, mapper, connection, instance):
methods.add('after_delete')
return EXT_CONTINUE
-
+
mapper(User, users, extension=Ext())
sess = create_session()
u = User()
sess.flush()
sess.delete(u)
sess.flush()
- assert methods == set(['load', 'append_result', 'before_delete', 'create_instance', 'translate_row', 'get',
+ assert methods == set(['load', 'append_result', 'before_delete', 'create_instance', 'translate_row', 'get',
'after_delete', 'after_insert', 'before_update', 'before_insert', 'after_update', 'populate_instance'])
-
+
class RequirementsTest(AssertMixin):
"""Tests the contract for user classes."""
class QueryTest(FixtureTest):
keep_mappers = True
keep_data = True
-
+
def setUpAll(self):
super(QueryTest, self).setUpAll()
install_fixture_data()
self.setup_mappers()
-
+
def tearDownAll(self):
clear_mappers()
super(QueryTest, self).tearDownAll()
-
+
def setup_mappers(self):
mapper(User, users, properties={
'addresses':relation(Address, backref='user'),
class UnicodeSchemaTest(QueryTest):
keep_mappers = False
-
+
def setup_mappers(self):
pass
-
+
def define_tables(self, metadata):
super(UnicodeSchemaTest, self).define_tables(metadata)
global uni_meta, uni_users
uni_users = Table(u'users', uni_meta,
Column(u'id', Integer, primary_key=True),
Column(u'name', String(30), nullable=False))
-
+
def test_get(self):
mapper(User, uni_users)
assert User(id=7) == create_session(bind=testbase.db).query(User).get(7)
-
+
class GetTest(QueryTest):
def test_get(self):
s = create_session()
print s.primary_key
print m.primary_key
assert s.primary_key == m.primary_key
-
+
row = s.select(use_labels=True).execute().fetchone()
print row[s.primary_key[0]]
-
+
sess = create_session()
assert sess.query(SomeUser).get(7).name == 'jack'
def test_load(self):
s = create_session()
-
+
try:
assert s.query(User).load(19) is None
assert False
except exceptions.InvalidRequestError:
assert True
-
+
u = s.query(User).load(7)
u2 = s.query(User).load(7)
assert u is u2
s.clear()
u2 = s.query(User).load(7)
assert u is not u2
-
+
u2.name = 'some name'
a = Address(email_address='some other name')
u2.addresses.append(a)
assert u2 in s.dirty
assert a in u2.addresses
-
+
s.query(User).load(7)
assert u2 not in s.dirty
assert u2.name =='jack'
assert a not in u2.addresses
-
+
@testing.exclude('mysql', '<', (4, 1))
def test_unicode(self):
- """test that Query.get properly sets up the type for the bind parameter. using unicode would normally fail
+ """test that Query.get properly sets up the type for the bind parameter. using unicode would normally fail
on postgres, mysql and oracle unless it is converted to an encoded string"""
-
+
metadata = MetaData(engines.utf8_engine())
table = Table('unicode_data', metadata,
Column('id', Unicode(40), primary_key=True),
s.query(User).populate_existing().all()
assert u.addresses[0].email_address == 'lala'
assert u.orders[1].items[2].description == 'item 12'
-
+
# eager load does
s.query(User).options(eagerload('addresses'), eagerload_all('orders.items')).populate_existing().all()
assert u.addresses[0].email_address == 'jack@bean.com'
assert u.orders[1].items[2].description == 'item 5'
-
+
class OperatorTest(QueryTest):
"""test sql.Comparator implementation for MapperProperties"""
-
+
def _test(self, clause, expected):
c = str(clause.compile(dialect = default.DefaultDialect()))
assert c == expected, "%s != %s" % (c, expected)
-
+
def test_arithmetic(self):
create_session().query(User)
for (py_op, sql_op) in ((operator.add, '+'), (operator.mul, '*'),
self.assert_(compiled == fwd_sql or compiled == rev_sql,
"\n'" + compiled + "'\n does not match\n'" +
fwd_sql + "'\n or\n'" + rev_sql + "'")
-
+
def test_in(self):
self._test(User.id.in_(['a', 'b']),
"users.id IN (:users_id, :users_id_1)")
):
c = expr.compile(dialect=default.DefaultDialect())
assert str(c) == compare, "%s != %s" % (str(c), compare)
-
-
+
+
class CompileTest(QueryTest):
def test_deferred(self):
session = create_session()
s = session.query(User).filter(and_(addresses.c.email_address == bindparam('emailad'), Address.user_id==User.id)).compile()
-
+
l = session.query(User).instances(s.execute(emailad = 'jack@bean.com'))
assert [User(id=7)] == l
-
+
class SliceTest(QueryTest):
def test_first(self):
assert User(id=7) == create_session().query(User).first()
-
+
assert create_session().query(User).filter(User.id==27).first() is None
-
+
# more slice tests are available in test/orm/generative.py
-
+
class TextTest(QueryTest):
def test_fulltext(self):
assert [User(id=7), User(id=8), User(id=9),User(id=10)] == create_session().query(User).from_statement("select * from users").all()
def test_binds(self):
assert [User(id=8), User(id=9)] == create_session().query(User).filter("id in (:id1, :id2)").params(id1=8, id2=9).all()
-
+
class FilterTest(QueryTest):
def test_basic(self):
assert [User(id=7), User(id=8), User(id=9),User(id=10)] == create_session().query(User).all()
- @testing.unsupported('maxdb')
+ @testing.fails_on('maxdb')
def test_limit(self):
assert [User(id=8), User(id=9)] == create_session().query(User).limit(2).offset(1).all()
assert [User(id=8), User(id=9)] == list(create_session().query(User)[1:3])
assert User(id=8) == create_session().query(User)[1]
-
+
def test_onefilter(self):
assert [User(id=8), User(id=9)] == create_session().query(User).filter(User.name.endswith('ed')).all()
def test_contains(self):
"""test comparing a collection to an object instance."""
-
+
sess = create_session()
address = sess.query(Address).get(3)
assert [User(id=8)] == sess.query(User).filter(User.addresses.contains(address)).all()
assert False
except exceptions.InvalidRequestError:
assert True
-
+
#assert [User(id=7), User(id=9), User(id=10)] == sess.query(User).filter(User.addresses!=address).all()
-
+
def test_any(self):
sess = create_session()
assert [User(id=9)] == sess.query(User).filter(User.addresses.any(email_address='fred@fred.com')).all()
- # THIS ONE
- @testing.unsupported('maxdb')
+ @testing.unsupported('maxdb') # can core
def test_has(self):
sess = create_session()
assert [Address(id=5)] == sess.query(Address).filter(Address.user.has(name='fred')).all()
-
+
assert [Address(id=2), Address(id=3), Address(id=4), Address(id=5)] == sess.query(Address).filter(Address.user.has(User.name.like('%ed%'))).all()
-
+
assert [Address(id=2), Address(id=3), Address(id=4)] == sess.query(Address).filter(Address.user.has(User.name.like('%ed%'), id=8)).all()
-
+
def test_contains_m2m(self):
sess = create_session()
item = sess.query(Item).get(3)
def test_comparison(self):
"""test scalar comparison to an object instance"""
-
+
sess = create_session()
user = sess.query(User).get(8)
assert [Address(id=2), Address(id=3), Address(id=4)] == sess.query(Address).filter(Address.user==user).all()
def test_apply(self):
sess = create_session()
assert sess.query(Order).apply_sum(Order.user_id * Order.address_id).filter(Order.id.in_([2, 3, 4])).one() == 79
-
-
+
+
class CountTest(QueryTest):
def test_basic(self):
assert 4 == create_session().query(User).count()
def test_binds(self):
assert [User(id=8), User(id=9)] == create_session().query(User).filter("id in (:id1, :id2)").params(id1=8, id2=9).all()
-
-
+
+
class ParentTest(QueryTest):
def test_o2m(self):
sess = create_session()
q = sess.query(User)
-
+
u1 = q.filter_by(name='jack').one()
# test auto-lookup of property
def test_noparent(self):
sess = create_session()
q = sess.query(User)
-
+
u1 = q.filter_by(name='jack').one()
try:
i1 = sess.query(Item).filter_by(id=2).one()
k = sess.query(Keyword).with_parent(i1).all()
assert [Keyword(name='red'), Keyword(name='small'), Keyword(name='square')] == k
-
-
+
+
class JoinTest(QueryTest):
def test_overlapping_paths(self):
result = create_session().query(User).outerjoin(['orders', 'items'], aliased=aliased).filter_by(id=3).reset_joinpoint().outerjoin(['orders','address'], aliased=aliased).filter_by(id=1).all()
assert [User(id=7, name='jack')] == result
-
+
def test_overlap_with_aliases(self):
oalias = orders.alias('oalias')
result = create_session().query(User).select_from(users.join(oalias)).filter(oalias.c.description.in_(["order 1", "order 2", "order 3"])).join(['orders', 'items']).all()
assert [User(id=7, name='jack'), User(id=9, name='fred')] == result
-
+
result = create_session().query(User).select_from(users.join(oalias)).filter(oalias.c.description.in_(["order 1", "order 2", "order 3"])).join(['orders', 'items']).filter_by(id=4).all()
assert [User(id=7, name='jack')] == result
q = sess.query(User).join('orders').filter(Order.description=="order 3").join(['orders', 'items']).filter(Order.description=="item 1")
assert [] == q.all()
assert q.count() == 0
-
+
q = sess.query(User).join('orders', aliased=True).filter(Order.items.any(Item.description=='item 4'))
assert [User(id=7)] == q.all()
create_session().query(T1).join('t2s_1', aliased=True).filter(t2.c.id==5).reset_joinpoint().join('t2s_2').all()
create_session().query(T1).join('t2s_1').filter(t2.c.id==5).reset_joinpoint().join('t2s_2', aliased=True).all()
-
-
+
+
class SynonymTest(QueryTest):
keep_mappers = True
):
sess = create_session()
q = sess.query(User)
-
+
u1 = q.filter_by(**{nameprop:'jack'}).one()
o = sess.query(Order).with_parent(u1, property=orderprop).all()
assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o
-
+
class InstancesTest(QueryTest):
def test_from_alias(self):
(user8, address4),
(user9, address5),
(user10, None)]
-
+
selectquery = users.outerjoin(addresses).select(use_labels=True, order_by=[users.c.id, addresses.c.id])
q = sess.query(User)
l = q.instances(selectquery.execute(), Address)
q = sess.query(User).add_entity(Address)
l = q.join('addresses', aliased=aliased).filter_by(email_address='ed@bettyboop.com').all()
assert l == [(user8, address3)]
-
+
q = sess.query(User, Address).join('addresses', aliased=aliased).filter_by(email_address='ed@bettyboop.com')
assert q.all() == [(user8, address3)]
(user8, address4),
(user9, address5),
(user10, None)]
-
+
q = sess.query(User)
adalias = addresses.alias('adalias')
q = q.add_entity(Address, alias=adalias).select_from(users.outerjoin(adalias))
q = sess.query(User).add_entity(Address, alias=adalias)
l = q.select_from(users.outerjoin(adalias)).filter(adalias.c.email_address=='ed@bettyboop.com').all()
assert l == [(user8, address3)]
-
+
def test_multi_columns(self):
"""test aliased/nonalised joins with the usage of add_column()"""
sess = create_session()
(user9, 1),
(user10, 0)
]
-
+
for aliased in (False, True):
q = sess.query(User)
q = q.group_by([c for c in users.c]).order_by(User.id).outerjoin('addresses', aliased=aliased).add_column(func.count(Address.id).label('count'))
q = create_session().query(User)
l = q.add_column("count").add_column("concat").from_statement(s).all()
assert l == expected
-
+
# test with select_from()
q = create_session().query(User).add_column(func.count(addresses.c.id))\
.add_column(("Name:" + users.c.name)).select_from(users.outerjoin(addresses))\
.group_by([c for c in users.c]).order_by(users.c.id)
-
+
assert q.all() == expected
# test with outerjoin() both aliased and non
q = create_session().query(User).add_column(func.count(addresses.c.id))\
.add_column(("Name:" + users.c.name)).outerjoin('addresses', aliased=aliased)\
.group_by([c for c in users.c]).order_by(users.c.id)
-
+
assert q.all() == expected
class CustomJoinTest(QueryTest):
self.children.append(node)
mapper(Node, nodes, properties={
- 'children':relation(Node, lazy=True, join_depth=3,
+ 'children':relation(Node, lazy=True, join_depth=3,
backref=backref('parent', remote_side=[nodes.c.id])
)
})
sess.save(n1)
sess.flush()
sess.clear()
-
+
# TODO: the aliasing of the join in query._join_to has to limit the aliasing
# among local_side / remote_side (add local_side as an attribute on PropertyLoader)
# also implement this idea in EagerLoader
node = sess.query(Node).join(['children', 'children'], aliased=True).filter_by(data='n122').first()
assert node.data=='n1'
-
+
node = sess.query(Node).filter_by(data='n122').join('parent', aliased=True).filter_by(data='n12').\
join('parent', aliased=True, from_joinpoint=True).filter_by(data='n1').first()
assert node.data == 'n122'
mapper(Address, addresses, properties={
'user':relation(User, lazy=True)
- })
+ })
sess = create_session()
l = sess.query(User).all()
Address(id=4, user=User(id=8, concat=16, count=3)),
Address(id=5, user=User(id=9, concat=18, count=1))
]
-
+
assert address_result == sess.query(Address).all()
-
+
# run the eager version twice to test caching of aliased clauses
for x in range(2):
sess.clear()
self.assert_sql_count(testbase.db, go, 1)
tuple_address_result = [(address, address.user) for address in address_result]
-
+
tuple_address_result == sess.query(Address).join('user').add_entity(User).all()
assert tuple_address_result == sess.query(Address).join('user', aliased=True, id='ualias').add_entity(User, id='ualias').all()
{'pk':6, 'info':'pk_6_data'})
def tearDownAll(self):
info_table.drop()
-
- @testing.unsupported('maxdb')
+
+ @testing.fails_on('maxdb')
def testcase(self):
inner = select([case([
- [info_table.c.pk < 3,
+ [info_table.c.pk < 3,
literal('lessthan3', type_=String)],
- [and_(info_table.c.pk >= 3, info_table.c.pk < 7),
+ [and_(info_table.c.pk >= 3, info_table.c.pk < 7),
literal('gt3', type_=String)]]).label('x'),
- info_table.c.pk, info_table.c.info],
+ info_table.c.pk, info_table.c.info],
from_obj=[info_table]).alias('q_inner')
inner_result = inner.execute().fetchall()
]
w_else = select([case([
- [info_table.c.pk < 3,
+ [info_table.c.pk < 3,
literal(3, type_=Integer)],
- [and_(info_table.c.pk >= 3, info_table.c.pk < 6),
+ [and_(info_table.c.pk >= 3, info_table.c.pk < 6),
literal(6, type_=Integer)]],
else_ = 0).label('x'),
- info_table.c.pk, info_table.c.info],
+ info_table.c.pk, info_table.c.info],
from_obj=[info_table]).alias('q_inner')
else_result = w_else.execute().fetchall()
class QueryTest(PersistTest):
-
+
def setUpAll(self):
global users, addresses, metadata
metadata = MetaData(testbase.db)
Column('user_id', INT, primary_key = True),
Column('user_name', VARCHAR(20)),
)
- addresses = Table('query_addresses', metadata,
+ addresses = Table('query_addresses', metadata,
Column('address_id', Integer, primary_key=True),
Column('user_id', Integer, ForeignKey('query_users.user_id')),
Column('address', String(30)))
metadata.create_all()
-
+
def tearDown(self):
addresses.delete().execute()
users.delete().execute()
-
+
def tearDownAll(self):
metadata.drop_all()
-
+
def testinsert(self):
users.insert().execute(user_id = 7, user_name = 'jack')
assert users.count().scalar() == 1
-
+
def test_insert_heterogeneous_params(self):
users.insert().execute(
{'user_id':7, 'user_name':'jack'},
{'user_id':9}
)
assert users.select().execute().fetchall() == [(7, 'jack'), (8, 'ed'), (9, None)]
-
+
def testupdate(self):
users.insert().execute(user_id = 7, user_name = 'jack')
assert users.count().scalar() == 1
INSERTed including defaults that fired off on the DB side and
detects rows that had defaults and post-fetches.
"""
-
+
result = table.insert().execute(**values)
ret = values.copy()
for supported, table, values, assertvalues in [
(
{'unsupported':['sqlite']},
- Table("t1", metadata,
+ Table("t1", metadata,
Column('id', Integer, Sequence('t1_id_seq', optional=True), primary_key=True),
Column('foo', String(30), primary_key=True)),
{'foo':'hi'},
),
(
{'unsupported':['sqlite']},
- Table("t2", metadata,
+ Table("t2", metadata,
Column('id', Integer, Sequence('t2_id_seq', optional=True), primary_key=True),
Column('foo', String(30), primary_key=True),
Column('bar', String(30), PassiveDefault('hi'))
),
(
{'unsupported':[]},
- Table("t3", metadata,
+ Table("t3", metadata,
Column("id", String(40), primary_key=True),
Column('foo', String(30), primary_key=True),
Column("bar", String(30))
),
(
{'unsupported':[]},
- Table("t4", metadata,
+ Table("t4", metadata,
Column('id', Integer, Sequence('t4_id_seq', optional=True), primary_key=True),
Column('foo', String(30), primary_key=True),
Column('bar', String(30), PassiveDefault('hi'))
),
(
{'unsupported':[]},
- Table("t5", metadata,
+ Table("t5", metadata,
Column('id', String(10), primary_key=True),
Column('bar', String(30), PassiveDefault('hi'))
),
self.assert_(len(l) == 3)
def test_fetchmany(self):
- users.insert().execute(user_id = 7, user_name = 'jack')
- users.insert().execute(user_id = 8, user_name = 'ed')
- users.insert().execute(user_id = 9, user_name = 'fred')
- r = users.select().execute()
- l = []
- for row in r.fetchmany(size=2):
- l.append(row)
+ users.insert().execute(user_id = 7, user_name = 'jack')
+ users.insert().execute(user_id = 8, user_name = 'ed')
+ users.insert().execute(user_id = 9, user_name = 'fred')
+ r = users.select().execute()
+ l = []
+ for row in r.fetchmany(size=2):
+ l.append(row)
self.assert_(len(l) == 2, "fetchmany(size=2) got %s rows" % len(l))
-
+
def test_compiled_execute(self):
- users.insert().execute(user_id = 7, user_name = 'jack')
+ users.insert().execute(user_id = 7, user_name = 'jack')
s = select([users], users.c.user_id==bindparam('id')).compile()
c = testbase.db.connect()
assert c.execute(s, id=7).fetchall()[0]['user_id'] == 7
-
+
def test_compiled_insert_execute(self):
- users.insert().compile().execute(user_id = 7, user_name = 'jack')
+ users.insert().compile().execute(user_id = 7, user_name = 'jack')
s = select([users], users.c.user_id==bindparam('id')).compile()
c = testbase.db.connect()
assert c.execute(s, id=7).fetchall()[0]['user_id'] == 7
def test_repeated_bindparams(self):
"""Tests that a BindParam can be used more than once.
-
+
This should be run for DB-APIs with both positional and named
paramstyles.
"""
s = users.select(or_(users.c.user_name==u, users.c.user_name==u))
r = s.execute(userid='fred').fetchall()
assert len(r) == 1
-
+
def test_bindparam_shortname(self):
"""test the 'shortname' field on BindParamClause."""
users.insert().execute(user_id = 7, user_name = 'jack')
a_eq(prep(":this \:that"), "? :that")
a_eq(prep(r"(\:that$other)"), "(:that$other)")
a_eq(prep(r".\:that$ :other."), ".:that$ ?.")
-
+
def testdelete(self):
users.insert().execute(user_id = 7, user_name = 'jack')
users.insert().execute(user_id = 8, user_name = 'fred')
print repr(users.select().execute().fetchall())
users.delete(users.c.user_name == 'fred').execute()
-
+
print repr(users.select().execute().fetchall())
def testselectlimit(self):
users.insert().execute(user_id=7, user_name='fido')
r = users.select(limit=3, order_by=[users.c.user_id]).execute().fetchall()
self.assert_(r == [(1, 'john'), (2, 'jack'), (3, 'ed')], repr(r))
-
- @testing.unsupported('mssql', 'maxdb')
+
+ @testing.unsupported('mssql')
+ @testing.fails_on('maxdb')
def testselectlimitoffset(self):
users.insert().execute(user_id=1, user_name='john')
users.insert().execute(user_id=2, user_name='jack')
self.assert_(r==[(3, 'ed'), (4, 'wendy'), (5, 'laura')])
r = users.select(offset=5, order_by=[users.c.user_id]).execute().fetchall()
self.assert_(r==[(6, 'ralph'), (7, 'fido')])
-
- @testing.supported('mssql', 'maxdb')
+
+ @testing.supported('mssql')
+ @testing.fails_on('maxdb')
def test_select_limit_nooffset(self):
try:
r = users.select(limit=3, offset=2, order_by=[users.c.user_id]).execute().fetchall()
except exceptions.InvalidRequestError:
pass
- @testing.unsupported('mysql')
+ @testing.unsupported('mysql')
def test_scalar_select(self):
"""test that scalar subqueries with labels get their type propigated to the result set."""
# mysql and/or mysqldb has a bug here, type isnt propigated for scalar subquery.
- datetable = Table('datetable', metadata,
+ datetable = Table('datetable', metadata,
Column('id', Integer, primary_key=True),
Column('today', DateTime))
datetable.create()
Tests simple, compound, aliased and DESC clauses.
"""
-
+
users.insert().execute(user_id=1, user_name='c')
users.insert().execute(user_id=2, user_name='b')
users.insert().execute(user_id=3, user_name='a')
a_eq(users.select(order_by=[users.c.user_id],
use_labels=labels),
[(1, 'c'), (2, 'b'), (3, 'a')])
-
+
a_eq(users.select(order_by=[users.c.user_name, users.c.user_id],
use_labels=labels),
[(3, 'a'), (2, 'b'), (1, 'c')])
use_labels=labels,
order_by=[users.c.user_id]),
[(1,), (2,), (3,)])
-
+
a_eq(select([users.c.user_id.label('foo'), users.c.user_name],
use_labels=labels,
order_by=[users.c.user_name, users.c.user_id]),
users.insert().execute(user_id=1, user_name='john')
users.insert().execute(user_id=2, user_name='jack')
addresses.insert().execute(address_id=1, user_id=2, address='foo@bar.com')
-
+
r = users.select(users.c.user_id==2).execute().fetchone()
self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
r = text("select * from query_users where user_id=2", bind=testbase.db).execute().fetchone()
self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
-
+
# test slices
r = text("select * from query_addresses", bind=testbase.db).execute().fetchone()
self.assert_(r[0:1] == (1,))
self.assert_(r[1:] == (2, 'foo@bar.com'))
self.assert_(r[:-1] == (1, 2))
-
+
def test_ambiguous_column(self):
users.insert().execute(user_id=1, user_name='john')
r = users.outerjoin(addresses).select().execute().fetchone()
except exceptions.InvalidRequestError, e:
assert str(e) == "Ambiguous column name 'user_id' in result set! try 'use_labels' option on select statement." or \
str(e) == "Ambiguous column name 'USER_ID' in result set! try 'use_labels' option on select statement."
-
+
def test_column_label_targeting(self):
users.insert().execute(user_id=7, user_name='ed')
row = s.select(use_labels=True).execute().fetchone()
assert row[s.c.user_id] == 7
assert row[s.c.user_name] == 'ed'
-
+
def test_keys(self):
users.insert().execute(user_id=1, user_name='foo')
r = users.select().execute().fetchone()
r = testbase.db.execute('select user_name from query_users', {}).fetchone()
self.assertEqual(len(r), 1)
r.close()
-
+
def test_cant_execute_join(self):
try:
users.join(addresses).execute()
except exceptions.ArgumentError, e:
assert str(e).startswith('Not an executable clause: ')
-
+
def test_functions(self):
x = testbase.db.func.current_date().execute().scalar()
y = testbase.db.func.current_date().select().execute().scalar()
z = testbase.db.func.current_date().scalar()
assert (x == y == z) is True
-
+
x = testbase.db.func.current_date(type_=Date)
assert isinstance(x.type, Date)
assert isinstance(x.execute().scalar(), datetime.date)
finally:
conn.close()
assert (x == y == z) is True
-
+
def test_update_functions(self):
"""
Tests sending functions and SQL expressions to the VALUES and SET
res = exec_sorted(select([t2.c.value, t2.c.stuff]))
self.assertEquals(res, [(-14, 'hi'), (3, None), (7, None)])
-
+
t2.update(values=dict(value=func.length("asdsafasd"))).execute(stuff="some stuff")
assert select([t2.c.value, t2.c.stuff]).execute().fetchall() == [(9,"some stuff"), (9,"some stuff"), (9,"some stuff")]
-
+
t2.delete().execute()
-
+
t2.insert(values=dict(value=func.length("one") + 8)).execute()
assert t2.select().execute().fetchone()['value'] == 11
-
+
t2.update(values=dict(value=func.length("asfda"))).execute()
assert select([t2.c.value, t2.c.stuff]).execute().fetchone() == (5, "thisisstuff")
t2.update(values={t2.c.value:func.length("asfdaasdf"), t2.c.stuff:"foo"}).execute()
print "HI", select([t2.c.value, t2.c.stuff]).execute().fetchone()
assert select([t2.c.value, t2.c.stuff]).execute().fetchone() == (9, "foo")
-
+
finally:
meta.drop_all()
-
+
@testing.supported('postgres')
def test_functions_with_cols(self):
# TODO: shouldnt this work on oracle too ?
y = testbase.db.func.current_date().select().execute().scalar()
z = testbase.db.func.current_date().scalar()
w = select(['*'], from_obj=[testbase.db.func.current_date()]).scalar()
-
+
# construct a column-based FROM object out of a function, like in [ticket:172]
s = select([sql.column('date', type_=DateTime)], from_obj=[testbase.db.func.current_date()])
q = s.execute().fetchone()[s.c.date]
r = s.alias('datequery').select().scalar()
-
+
assert x == y == z == w == q == r
-
+
def test_column_order_with_simple_query(self):
# should return values in column definition order
users.insert().execute(user_id=1, user_name='foo')
self.assertEqual(r[1], 'foo')
self.assertEqual([x.lower() for x in r.keys()], ['user_id', 'user_name'])
self.assertEqual(r.values(), [1, 'foo'])
-
+
def test_column_order_with_text_query(self):
# should return values in query order
users.insert().execute(user_id=1, user_name='foo')
self.assertEqual(r[1], 1)
self.assertEqual([x.lower() for x in r.keys()], ['user_name', 'user_id'])
self.assertEqual(r.values(), ['foo', 1])
-
+
@testing.unsupported('oracle', 'firebird', 'maxdb')
def test_column_accessor_shadow(self):
meta = MetaData(testbase.db)
r.close()
finally:
shadowed.drop(checkfirst=True)
-
+
@testing.supported('mssql')
def test_fetchid_trigger(self):
meta = MetaData(testbase.db)
tr.commit()
con.execute("""drop trigger paj""")
meta.drop_all()
-
+
@testing.supported('mssql')
def test_insertid_schema(self):
meta = MetaData(testbase.db)
con = testbase.db.connect()
con.execute('create schema paj')
tbl = Table('test', meta, Column('id', Integer, primary_key=True), schema='paj')
- tbl.create()
+ tbl.create()
try:
- tbl.insert().execute({'id':1})
+ tbl.insert().execute({'id':1})
finally:
tbl.drop()
con.execute('drop schema paj')
def test_insertid_reserved(self):
meta = MetaData(testbase.db)
table = Table(
- 'select', meta,
+ 'select', meta,
Column('col', Integer, primary_key=True)
)
table.create()
-
+
meta2 = MetaData(testbase.db)
try:
table.insert().execute(col=7)
finally:
table.drop()
- @testing.unsupported('maxdb')
+ @testing.fails_on('maxdb')
def test_in_filtering(self):
"""test the behavior of the in_() function."""
-
+
users.insert().execute(user_id = 7, user_name = 'jack')
users.insert().execute(user_id = 8, user_name = 'fred')
users.insert().execute(user_id = 9, user_name = None)
-
+
s = users.select(users.c.user_name.in_([]))
r = s.execute().fetchall()
# No username is in empty set
assert len(r) == 0
-
+
s = users.select(not_(users.c.user_name.in_([])))
r = s.execute().fetchall()
# All usernames with a value are outside an empty set
assert len(r) == 2
-
+
s = users.select(users.c.user_name.in_(['jack','fred']))
r = s.execute().fetchall()
assert len(r) == 2
-
+
s = users.select(not_(users.c.user_name.in_(['jack','fred'])))
r = s.execute().fetchall()
# Null values are not outside any set
assert len(r) == 0
-
+
u = bindparam('search_key')
-
+
s = users.select(u.in_([]))
r = s.execute(search_key='john').fetchall()
assert len(r) == 0
r = s.execute(search_key=None).fetchall()
assert len(r) == 0
-
+
s = users.select(not_(u.in_([])))
r = s.execute(search_key='john').fetchall()
assert len(r) == 3
r = s.execute(search_key=None).fetchall()
assert len(r) == 0
-
+
s = users.select(users.c.user_name.in_([]) == True)
r = s.execute().fetchall()
assert len(r) == 0
def setUpAll(self):
global metadata, t1, t2, t3
metadata = MetaData(testbase.db)
- t1 = Table('t1', metadata,
+ t1 = Table('t1', metadata,
Column('col1', Integer, Sequence('t1pkseq'), primary_key=True),
Column('col2', String(30)),
Column('col3', String(40)),
Column('col4', String(30))
)
- t2 = Table('t2', metadata,
+ t2 = Table('t2', metadata,
Column('col1', Integer, Sequence('t2pkseq'), primary_key=True),
Column('col2', String(30)),
Column('col3', String(40)),
Column('col4', String(30)))
- t3 = Table('t3', metadata,
+ t3 = Table('t3', metadata,
Column('col1', Integer, Sequence('t3pkseq'), primary_key=True),
Column('col2', String(30)),
Column('col3', String(40)),
Column('col4', String(30)))
metadata.create_all()
-
+
t1.insert().execute([
dict(col2="t1col2r1", col3="aaa", col4="aaa"),
dict(col2="t1col2r2", col3="bbb", col4="bbb"),
dict(col2="t3col2r2", col3="bbb", col4="aaa"),
dict(col2="t3col2r3", col3="ccc", col4="bbb"),
])
-
+
def tearDownAll(self):
metadata.drop_all()
t1.c.col2.in_(["t1col2r1", "t1col2r2"])),
select([t2.c.col3.label('col3'), t2.c.col4.label('col4')],
t2.c.col2.in_(["t2col2r2", "t2col2r3"]))
- )
+ )
u = union(s1, s2)
wanted = [('aaa', 'aaa'), ('bbb', 'bbb'), ('bbb', 'ccc'),
found2 = self._fetchall_sorted(u.alias('bar').select().execute())
self.assertEquals(found2, wanted)
-
+
def test_union_ordered(self):
(s1, s2) = (
select([t1.c.col3.label('col3'), t1.c.col4.label('col4')],
t1.c.col2.in_(["t1col2r1", "t1col2r2"])),
select([t2.c.col3.label('col3'), t2.c.col4.label('col4')],
t2.c.col2.in_(["t2col2r2", "t2col2r3"]))
- )
+ )
u = union(s1, s2, order_by=['col3', 'col4'])
wanted = [('aaa', 'aaa'), ('bbb', 'bbb'), ('bbb', 'ccc'),
('ccc', 'aaa')]
self.assertEquals(u.execute().fetchall(), wanted)
- @testing.unsupported('maxdb')
+ @testing.fails_on('maxdb')
def test_union_ordered_alias(self):
(s1, s2) = (
select([t1.c.col3.label('col3'), t1.c.col4.label('col4')],
t1.c.col2.in_(["t1col2r1", "t1col2r2"])),
select([t2.c.col3.label('col3'), t2.c.col4.label('col4')],
t2.c.col2.in_(["t2col2r2", "t2col2r3"]))
- )
+ )
u = union(s1, s2, order_by=['col3', 'col4'])
wanted = [('aaa', 'aaa'), ('bbb', 'bbb'), ('bbb', 'ccc'),
)
wanted = [('aaa', 'bbb'), ('bbb', 'ccc'), ('ccc', 'aaa')]
found = self._fetchall_sorted(u.execute())
-
+
self.assertEquals(found, wanted)
@testing.unsupported('mysql')
def assertRows(self, statement, expected):
"""Execute a statement and assert that rows returned equal expected."""
-
+
found = exec_sorted(statement)
self.assertEquals(found, sorted(expected))
from_obj=[(t1.join(t2).outerjoin(t3, criteria))])
print expr
self.assertRows(expr, [(10, 20, 30), (11, 21, None)])
-
+
def test_mixed_where(self):
"""Joins t1->t2, outer t2->t3, plus a where on each table in turn."""
def setUpAll(self):
global metadata, flds
metadata = MetaData(testbase.db)
- flds = Table('flds', metadata,
+ flds = Table('flds', metadata,
Column('idcol', Integer, Sequence('t1pkseq'), primary_key=True),
Column('intcol', Integer),
Column('strcol', String(50)),
)
metadata.create_all()
-
+
flds.insert().execute([
dict(intcol=5, strcol='foo'),
dict(intcol=13, strcol='bar')
def tearDownAll(self):
metadata.drop_all()
- @testing.unsupported('maxdb')
+ @testing.fails_on('maxdb')
def test_modulo(self):
self.assertEquals(
select([flds.c.intcol % 3],
if __name__ == "__main__":
- testbase.main()
+ testbase.main()