-from sqlalchemy.sql import operators
-from sqlalchemy import MetaData, null, exists, text, union, literal, \
- literal_column, func, between, Unicode, desc, and_, bindparam, \
- select, distinct, or_, collate, insert, Integer, String, Boolean
-from sqlalchemy import inspect
-from sqlalchemy import exc as sa_exc, util, cast
-from sqlalchemy.sql import compiler, table, column
-from sqlalchemy.sql import expression
+from sqlalchemy import (
+ testing, null, exists, text, union, literal, literal_column, func, between,
+ Unicode, desc, and_, bindparam, select, distinct, or_, collate, insert,
+ Integer, String, Boolean, exc as sa_exc, util, cast)
+from sqlalchemy.sql import operators, column, expression
from sqlalchemy.engine import default
-from sqlalchemy.orm import attributes, mapper, relationship, backref, \
- configure_mappers, create_session, synonym, Session, class_mapper, \
- aliased, column_property, joinedload_all, joinedload, Query,\
- util as orm_util
+from sqlalchemy.orm import (
+ attributes, mapper, relationship, create_session, synonym, Session,
+ aliased, column_property, joinedload_all, joinedload, Query, Bundle)
from sqlalchemy.testing.assertsql import CompiledSQL
from sqlalchemy.testing.schema import Table, Column
import sqlalchemy as sa
-from sqlalchemy import testing
-from sqlalchemy.testing.assertions import eq_, assert_raises, assert_raises_message
-from sqlalchemy.testing import AssertsCompiledSQL
+from sqlalchemy.testing.assertions import (
+ eq_, assert_raises, assert_raises_message)
+from sqlalchemy.testing import fixtures, AssertsCompiledSQL
from test.orm import _fixtures
-from sqlalchemy.testing import fixtures, engines
-from sqlalchemy.orm import Bundle
-from sqlalchemy.orm.util import join, outerjoin, with_parent
+from sqlalchemy.orm.util import join, with_parent
+
class QueryTest(_fixtures.FixtureTest):
run_setup_mappers = 'once'
run_inserts = 'once'
run_deletes = None
-
@classmethod
def setup_mappers(cls):
cls._setup_stock_mapping()
+
class MiscTest(QueryTest):
run_create_tables = None
run_inserts = None
assert q2.session is s2
assert q1.session is s1
+
class RowTupleTest(QueryTest):
run_setup_mappers = None
def test_custom_names(self):
User, users = self.classes.User, self.tables.users
- mapper(User, users, properties={
- 'uname': users.c.name
- })
+ mapper(User, users, properties={'uname': users.c.name})
- row = create_session().\
- query(User.id, User.uname).\
- filter(User.id == 7).first()
+ row = create_session().query(User.id, User.uname).\
+ filter(User.id == 7).first()
assert row.id == 7
assert row.uname == 'jack'
for q, asserted in [
(
sess.query(User),
- [{'name':'User', 'type':User, 'aliased':False, 'expr':User}]
+ [
+ {
+ 'name': 'User', 'type': User, 'aliased': False,
+ 'expr': User}]
),
(
sess.query(User.id, User),
[
- {'name':'id', 'type':users.c.id.type, 'aliased':False,
- 'expr':User.id},
- {'name':'User', 'type':User, 'aliased':False, 'expr':User}
+ {
+ 'name': 'id', 'type': users.c.id.type,
+ 'aliased': False, 'expr': User.id},
+ {
+ 'name': 'User', 'type': User, 'aliased': False,
+ 'expr': User}
]
),
(
sess.query(User.id, user_alias),
[
- {'name':'id', 'type':users.c.id.type, 'aliased':False,
- 'expr':User.id},
- {'name':None, 'type':User, 'aliased':True,
- 'expr':user_alias}
+ {
+ 'name': 'id', 'type': users.c.id.type,
+ 'aliased': False, 'expr': User.id},
+ {
+ 'name': None, 'type': User, 'aliased': True,
+ 'expr': user_alias}
]
),
(
sess.query(address_alias),
[
- {'name':'aalias', 'type':Address, 'aliased':True,
- 'expr':address_alias}
+ {
+ 'name': 'aalias', 'type': Address, 'aliased': True,
+ 'expr': address_alias}
]
),
(
sess.query(name_label, fn),
[
- {'name':'uname', 'type':users.c.name.type,
- 'aliased':False, 'expr':name_label},
- {'name':None, 'type':fn.type, 'aliased':False,
- 'expr':fn
- },
+ {
+ 'name': 'uname', 'type': users.c.name.type,
+ 'aliased': False, 'expr': name_label},
+ {
+ 'name': None, 'type': fn.type, 'aliased': False,
+ 'expr': fn},
]
),
(
asserted
)
-
def test_unhashable_type(self):
from sqlalchemy.types import TypeDecorator, Integer
from sqlalchemy.sql import type_coerce
class MyType(TypeDecorator):
impl = Integer
hashable = False
+
def process_result_value(self, value, dialect):
return [value]
mapper(User, users)
s = Session()
- q = s.\
- query(User, type_coerce(users.c.id, MyType).label('foo')).\
- filter(User.id == 7)
+ q = s.query(User, type_coerce(users.c.id, MyType).label('foo')).\
+ filter(User.id == 7)
row = q.first()
eq_(
row, (User(id=7), [7])
)
+
class RawSelectTest(QueryTest, AssertsCompiledSQL):
__dialect__ = 'default'
Address = self.classes.Address
self.assert_compile(
- select([User.name, Address.id,
- select([func.count(Address.id)]).\
- where(User.id == Address.user_id).\
- correlate(User).as_scalar()
- ]),
+ select(
+ [
+ User.name, Address.id,
+ select([func.count(Address.id)]).
+ where(User.id == Address.user_id).
+ correlate(User).as_scalar()]),
"SELECT users.name, addresses.id, "
"(SELECT count(addresses.id) AS count_1 "
- "FROM addresses WHERE users.id = addresses.user_id) AS anon_1 "
+ "FROM addresses WHERE users.id = addresses.user_id) AS anon_1 "
"FROM users, addresses"
)
uu = aliased(User, name="uu")
self.assert_compile(
- select([uu.name, Address.id,
- select([func.count(Address.id)]).\
- where(uu.id == Address.user_id).\
- correlate(uu).as_scalar()
- ]),
+ select(
+ [
+ uu.name, Address.id,
+ select([func.count(Address.id)]).
+ where(uu.id == Address.user_id).
+ correlate(uu).as_scalar()]),
# for a long time, "uu.id = address.user_id" was reversed;
# this was resolved as of #2872 and had to do with
# InstrumentedAttribute.__eq__() taking precedence over
# QueryableAttribute.__eq__()
"SELECT uu.name, addresses.id, "
"(SELECT count(addresses.id) AS count_1 "
- "FROM addresses WHERE uu.id = addresses.user_id) AS anon_1 "
+ "FROM addresses WHERE uu.id = addresses.user_id) AS anon_1 "
"FROM users AS uu, addresses"
)
q = s.query(User.id, User.name).filter_by(name='ed')
self.assert_compile(
insert(Address).from_select(
- (Address.id, Address.email_address), q),
+ (Address.id, Address.email_address), q),
"INSERT INTO addresses (id, email_address) "
"SELECT users.id AS users_id, users.name AS users_name "
"FROM users WHERE users.name = :name_1"
class Foo(object):
pass
- mapper(Foo, self.tables.users, properties={
- 'foob': column_property(func.coalesce(self.tables.users.c.name))
+ mapper(
+ Foo, self.tables.users, properties={
+ 'foob': column_property(
+ func.coalesce(self.tables.users.c.name))
})
self.assert_compile(
select([Foo]).where(Foo.foob == 'somename').order_by(Foo.foob),
"SELECT users.id, users.name FROM users "
- "WHERE coalesce(users.name) = :coalesce_1 ORDER BY coalesce(users.name)"
+ "WHERE coalesce(users.name) = :coalesce_1 "
+ "ORDER BY coalesce(users.name)"
)
+
class GetTest(QueryTest):
def test_get(self):
User = self.classes.User
CompositePk = self.classes.CompositePk
s = Session()
- assert s.query(CompositePk).get((100,100)) is None
+ assert s.query(CompositePk).get((100, 100)) is None
def test_get_composite_pk_result(self):
CompositePk = self.classes.CompositePk
s = Session()
- one_two = s.query(CompositePk).get((1,2))
+ one_two = s.query(CompositePk).get((1, 2))
assert one_two.i == 1
assert one_two.j == 2
assert one_two.k == 3
users, addresses = self.tables.users, self.tables.addresses
-
s = users.outerjoin(addresses)
class UserThing(fixtures.ComparableEntity):
pass
- mapper(UserThing, s, properties={
- 'id':(users.c.id, addresses.c.user_id),
- 'address_id':addresses.c.id,
- })
+ mapper(
+ UserThing, s, properties={
+ 'id': (users.c.id, addresses.c.user_id),
+ 'address_id': addresses.c.id,
+ })
sess = create_session()
u10 = sess.query(UserThing).get((10, None))
- eq_(u10,
- UserThing(id=10)
- )
+ eq_(u10, UserThing(id=10))
def test_no_criterion(self):
- """test that get()/load() does not use preexisting filter/etc. criterion"""
+ """test that get()/load() does not use preexisting filter/etc.
+ criterion"""
User, Address = self.classes.User, self.classes.Address
-
s = create_session()
- q = s.query(User).join('addresses').filter(Address.user_id==8)
+ q = s.query(User).join('addresses').filter(Address.user_id == 8)
assert_raises(sa_exc.InvalidRequestError, q.get, 7)
- assert_raises(sa_exc.InvalidRequestError, s.query(User).filter(User.id==7).get, 19)
+ assert_raises(
+ sa_exc.InvalidRequestError,
+ s.query(User).filter(User.id == 7).get, 19)
# order_by()/get() doesn't raise
s.query(User).order_by(User.id).get(8)
def test_no_criterion_when_already_loaded(self):
- """test that get()/load() does not use preexisting filter/etc. criterion,
- even when we're only using the identity map."""
+ """test that get()/load() does not use preexisting filter/etc.
+ criterion, even when we're only using the identity map."""
User, Address = self.classes.User, self.classes.Address
s = create_session()
- u1 = s.query(User).get(7)
+ s.query(User).get(7)
- q = s.query(User).join('addresses').filter(Address.user_id==8)
+ q = s.query(User).join('addresses').filter(Address.user_id == 8)
assert_raises(sa_exc.InvalidRequestError, q.get, 7)
def test_unique_param_names(self):
class SomeUser(object):
pass
- s = users.select(users.c.id!=12).alias('users')
+ s = users.select(users.c.id != 12).alias('users')
m = mapper(SomeUser, s)
assert s.primary_key == m.primary_key
s.query(User).populate_existing().get(7)
assert u2 not in s.dirty
- assert u2.name =='jack'
+ assert u2.name == 'jack'
assert a not in u2.addresses
@testing.provide_metadata
oracle unless it is converted to an encoded string"""
metadata = self.metadata
- table = Table('unicode_data', metadata,
- Column('id', Unicode(40), primary_key=True, test_needs_autoincrement=True),
+ table = Table(
+ 'unicode_data', metadata,
+ Column(
+ 'id', Unicode(40), primary_key=True,
+ test_needs_autoincrement=True),
Column('data', Unicode(40)))
metadata.create_all()
ustring = util.b('petit voix m\xe2\x80\x99a').decode('utf-8')
table.insert().execute(id=ustring, data=ustring)
+
class LocalFoo(self.classes.Base):
pass
mapper(LocalFoo, table)
- eq_(create_session().query(LocalFoo).get(ustring),
- LocalFoo(id=ustring, data=ustring))
+ eq_(
+ create_session().query(LocalFoo).get(ustring),
+ LocalFoo(id=ustring, data=ustring))
def test_populate_existing(self):
User, Address = self.classes.User, self.classes.Address
assert u.orders[1].items[2].description == 'item 12'
# eager load does
- s.query(User).options(joinedload('addresses'), joinedload_all('orders.items')).populate_existing().all()
+ s.query(User). \
+ options(joinedload('addresses'), joinedload_all('orders.items')). \
+ populate_existing().all()
assert u.addresses[0].email_address == 'jack@bean.com'
assert u.orders[1].items[2].description == 'item 5'
):
assert_raises(sa_exc.InvalidRequestError, q.join, "addresses")
- assert_raises(sa_exc.InvalidRequestError, q.filter, User.name=='ed')
+ assert_raises(
+ sa_exc.InvalidRequestError, q.filter, User.name == 'ed')
assert_raises(sa_exc.InvalidRequestError, q.filter_by, name='ed')
assert_raises(sa_exc.InvalidRequestError, q.having, 'foo')
q.enable_assertions(False).join("addresses")
- q.enable_assertions(False).filter(User.name=='ed')
+ q.enable_assertions(False).filter(User.name == 'ed')
q.enable_assertions(False).order_by('foo')
q.enable_assertions(False).group_by('foo')
s = create_session()
q = s.query(User)
- assert_raises(sa_exc.ArgumentError, q.select_from, User.id==5)
+ assert_raises(sa_exc.ArgumentError, q.select_from, User.id == 5)
assert_raises(sa_exc.ArgumentError, q.select_from, User.id)
def test_invalid_from_statement(self):
s = create_session()
q = s.query(User)
- assert_raises(sa_exc.ArgumentError, q.from_statement, User.id==5)
- assert_raises(sa_exc.ArgumentError, q.from_statement, users.join(addresses))
+ assert_raises(sa_exc.ArgumentError, q.from_statement, User.id == 5)
+ assert_raises(
+ sa_exc.ArgumentError, q.from_statement, users.join(addresses))
def test_invalid_column(self):
User = self.classes.User
assert_raises(sa_exc.InvalidRequestError, q.add_column, (1, 1))
def test_distinct(self):
- """test that a distinct() call is not valid before 'clauseelement' conditions."""
+ """test that a distinct() call is not valid before 'clauseelement'
+ conditions."""
User = self.classes.User
-
s = create_session()
q = s.query(User).distinct()
assert_raises(sa_exc.InvalidRequestError, q.select_from, User)
- assert_raises(sa_exc.InvalidRequestError, q.from_statement, text("select * from table"))
+ assert_raises(
+ sa_exc.InvalidRequestError, q.from_statement,
+ text("select * from table"))
assert_raises(sa_exc.InvalidRequestError, q.with_polymorphic, User)
def test_order_by(self):
- """test that an order_by() call is not valid before 'clauseelement' conditions."""
+ """test that an order_by() call is not valid before 'clauseelement'
+ conditions."""
User = self.classes.User
-
s = create_session()
q = s.query(User).order_by(User.id)
assert_raises(sa_exc.InvalidRequestError, q.select_from, User)
- assert_raises(sa_exc.InvalidRequestError, q.from_statement, text("select * from table"))
+ assert_raises(
+ sa_exc.InvalidRequestError, q.from_statement,
+ text("select * from table"))
assert_raises(sa_exc.InvalidRequestError, q.with_polymorphic, User)
def test_cancel_order_by(self):
s = create_session()
q = s.query(User).order_by(User.id)
- self.assert_compile(q,
- "SELECT users.id AS users_id, users.name AS users_name FROM users ORDER BY users.id",
+ self.assert_compile(
+ q,
+ "SELECT users.id AS users_id, users.name AS users_name "
+ "FROM users ORDER BY users.id",
use_default_dialect=True)
- assert_raises(sa_exc.InvalidRequestError, q._no_select_modifiers, "foo")
+ assert_raises(
+ sa_exc.InvalidRequestError, q._no_select_modifiers, "foo")
q = q.order_by(None)
- self.assert_compile(q,
- "SELECT users.id AS users_id, users.name AS users_name FROM users",
- use_default_dialect=True)
+ self.assert_compile(
+ q,
+ "SELECT users.id AS users_id, users.name AS users_name FROM users",
+ use_default_dialect=True)
- assert_raises(sa_exc.InvalidRequestError, q._no_select_modifiers, "foo")
+ assert_raises(
+ sa_exc.InvalidRequestError, q._no_select_modifiers, "foo")
q = q.order_by(False)
- self.assert_compile(q,
- "SELECT users.id AS users_id, users.name AS users_name FROM users",
- use_default_dialect=True)
+ self.assert_compile(
+ q,
+ "SELECT users.id AS users_id, users.name AS users_name FROM users",
+ use_default_dialect=True)
# after False was set, this should pass
q._no_select_modifiers("foo")
s = create_session()
for meth, arg, kw in [
- (Query.filter, (User.id==5,), {}),
- (Query.filter_by, (), {'id':5}),
+ (Query.filter, (User.id == 5,), {}),
+ (Query.filter_by, (), {'id': 5}),
(Query.limit, (5, ), {}),
(Query.group_by, (User.name,), {}),
(Query.order_by, (User.name,), {})
(operators.le, '<=', '>='),
(operators.ge, '>=', '<=')):
for (lhs, rhs, l_sql, r_sql) in (
- ('a', User.id, ':id_1', 'users.id'),
- ('a', literal('b'), ':param_2', ':param_1'), # note swap!
- (User.id, 'b', 'users.id', ':id_1'),
- (User.id, literal('b'), 'users.id', ':param_1'),
- (User.id, User.id, 'users.id', 'users.id'),
- (literal('a'), 'b', ':param_1', ':param_2'),
- (literal('a'), User.id, ':param_1', 'users.id'),
- (literal('a'), literal('b'), ':param_1', ':param_2'),
- (ualias.id, literal('b'), 'users_1.id', ':param_1'),
- (User.id, ualias.name, 'users.id', 'users_1.name'),
- (User.name, ualias.name, 'users.name', 'users_1.name'),
- (ualias.name, User.name, 'users_1.name', 'users.name'),
- ):
+ ('a', User.id, ':id_1', 'users.id'),
+ ('a', literal('b'), ':param_2', ':param_1'), # note swap!
+ (User.id, 'b', 'users.id', ':id_1'),
+ (User.id, literal('b'), 'users.id', ':param_1'),
+ (User.id, User.id, 'users.id', 'users.id'),
+ (literal('a'), 'b', ':param_1', ':param_2'),
+ (literal('a'), User.id, ':param_1', 'users.id'),
+ (literal('a'), literal('b'), ':param_1', ':param_2'),
+ (ualias.id, literal('b'), 'users_1.id', ':param_1'),
+ (User.id, ualias.name, 'users.id', 'users_1.name'),
+ (User.name, ualias.name, 'users.name', 'users_1.name'),
+ (ualias.name, User.name, 'users_1.name', 'users.name'),
+ ):
# the compiled clause should match either (e.g.):
# 'a' < 'b' -or- 'b' > 'a'.
- compiled = str(py_op(lhs, rhs).compile(dialect=default.DefaultDialect()))
+ compiled = str(py_op(lhs, rhs).compile(
+ dialect=default.DefaultDialect()))
fwd_sql = "%s %s %s" % (l_sql, fwd_op, r_sql)
rev_sql = "%s %s %s" % (r_sql, rev_op, l_sql)
User, Address = self.classes.User, self.classes.Address
self._test(User.id == None, "users.id IS NULL")
- self._test(~(User.id==None), "users.id IS NOT NULL")
+ self._test(~(User.id == None), "users.id IS NOT NULL")
self._test(None == User.id, "users.id IS NULL")
self._test(~(None == User.id), "users.id IS NOT NULL")
self._test(Address.user == None, "addresses.user_id IS NULL")
- self._test(~(Address.user==None), "addresses.user_id IS NOT NULL")
+ self._test(~(Address.user == None), "addresses.user_id IS NOT NULL")
self._test(None == Address.user, "addresses.user_id IS NULL")
self._test(~(None == Address.user), "addresses.user_id IS NOT NULL")
def test_relationship_unimplemented(self):
- User, Address = self.classes.User, self.classes.Address
+ User = self.classes.User
for op in [
User.addresses.like,
User.addresses.ilike,
def test_relationship(self):
User, Address = self.classes.User, self.classes.Address
- self._test(User.addresses.any(Address.id==17),
- "EXISTS (SELECT 1 "
- "FROM addresses "
- "WHERE users.id = addresses.user_id AND addresses.id = :id_1)",
- entity=User
- )
+ self._test(
+ User.addresses.any(Address.id == 17),
+ "EXISTS (SELECT 1 FROM addresses "
+ "WHERE users.id = addresses.user_id AND addresses.id = :id_1)",
+ entity=User
+ )
u7 = User(id=7)
attributes.instance_state(u7)._commit_all(attributes.instance_dict(u7))
self._test(Address.user != None, "addresses.user_id IS NOT NULL")
-
def test_selfref_relationship(self):
Node = self.classes.Node
# auto self-referential aliasing
self._test(
- Node.children.any(Node.data=='n1'),
- "EXISTS (SELECT 1 FROM nodes AS nodes_1 WHERE "
- "nodes.id = nodes_1.parent_id AND nodes_1.data = :data_1)",
+ Node.children.any(Node.data == 'n1'),
+ "EXISTS (SELECT 1 FROM nodes AS nodes_1 WHERE "
+ "nodes.id = nodes_1.parent_id AND nodes_1.data = :data_1)",
entity=Node
)
self._test(
Node.children == None,
"NOT (EXISTS (SELECT 1 FROM nodes AS nodes_1 "
- "WHERE nodes.id = nodes_1.parent_id))",
+ "WHERE nodes.id = nodes_1.parent_id))",
entity=Node
)
self._test(
nalias.children == None,
- "NOT (EXISTS (SELECT 1 FROM nodes WHERE nodes_1.id = nodes.parent_id))",
+ "NOT (EXISTS ("
+ "SELECT 1 FROM nodes WHERE nodes_1.id = nodes.parent_id))",
entity=nalias
)
self._test(
- nalias.children.any(Node.data=='some data'),
- "EXISTS (SELECT 1 FROM nodes WHERE "
- "nodes_1.id = nodes.parent_id AND nodes.data = :data_1)",
- entity=nalias)
+ nalias.children.any(Node.data == 'some data'),
+ "EXISTS (SELECT 1 FROM nodes WHERE "
+ "nodes_1.id = nodes.parent_id AND nodes.data = :data_1)",
+ entity=nalias)
# this fails because self-referential any() is auto-aliasing;
# the fact that we use "nalias" here means we get two aliases.
self._test(
nalias.parent.has(Node.data == 'some data'),
- "EXISTS (SELECT 1 FROM nodes WHERE nodes.id = nodes_1.parent_id "
+ "EXISTS (SELECT 1 FROM nodes WHERE nodes.id = nodes_1.parent_id "
"AND nodes.data = :data_1)",
entity=nalias
)
-
self._test(
Node.parent.has(Node.data == 'some data'),
- "EXISTS (SELECT 1 FROM nodes AS nodes_1 WHERE "
+ "EXISTS (SELECT 1 FROM nodes AS nodes_1 WHERE "
"nodes_1.id = nodes.parent_id AND nodes_1.data = :data_1)",
entity=Node
)
sess = Session()
q = sess.query(User).filter(
- User.addresses.any(
- and_(Address.id == Dingaling.address_id,
- Dingaling.data == 'x')))
+ User.addresses.any(
+ and_(Address.id == Dingaling.address_id,
+ Dingaling.data == 'x')))
# new since #2746 - correlate_except() now takes context into account
# so its usage in any() is not as disrupting.
- self.assert_compile(q,
+ self.assert_compile(
+ q,
"SELECT users.id AS users_id, users.name AS users_name "
"FROM users "
"WHERE EXISTS (SELECT 1 "
def test_in(self):
User = self.classes.User
- self._test(User.id.in_(['a', 'b']),
- "users.id IN (:id_1, :id_2)")
+ self._test(User.id.in_(['a', 'b']), "users.id IN (:id_1, :id_2)")
def test_in_on_relationship_not_supported(self):
User, Address = self.classes.User, self.classes.Address
def test_between(self):
User = self.classes.User
- self._test(User.id.between('a', 'b'),
- "users.id BETWEEN :id_1 AND :id_2")
+ self._test(
+ User.id.between('a', 'b'), "users.id BETWEEN :id_1 AND :id_2")
def test_collate(self):
User = self.classes.User
- self._test(collate(User.id, 'binary'),
- "users.id COLLATE binary")
+ self._test(collate(User.id, 'binary'), "users.id COLLATE binary")
- self._test(User.id.collate('binary'),
- "users.id COLLATE binary")
+ self._test(User.id.collate('binary'), "users.id COLLATE binary")
def test_selfref_between(self):
User = self.classes.User
ualias = aliased(User)
- self._test(User.id.between(ualias.id, ualias.id), "users.id BETWEEN users_1.id AND users_1.id")
- self._test(ualias.id.between(User.id, User.id), "users_1.id BETWEEN users.id AND users.id")
+ self._test(
+ User.id.between(ualias.id, ualias.id),
+ "users.id BETWEEN users_1.id AND users_1.id")
+ self._test(
+ ualias.id.between(User.id, User.id),
+ "users_1.id BETWEEN users.id AND users.id")
def test_clauses(self):
User, Address = self.classes.User, self.classes.Address
for (expr, compare) in (
(func.max(User.id), "max(users.id)"),
(User.id.desc(), "users.id DESC"),
- (between(5, User.id, Address.id), ":param_1 BETWEEN users.id AND addresses.id"),
- # this one would require adding compile() to InstrumentedScalarAttribute. do we want this ?
- #(User.id, "users.id")
+ (between(5, User.id, Address.id),
+ ":param_1 BETWEEN users.id AND addresses.id"),
+ # this one would require adding compile() to
+ # InstrumentedScalarAttribute. do we want this ?
+ # (User.id, "users.id")
):
c = expr.compile(dialect=default.DefaultDialect())
assert str(c) == compare, "%s != %s" % (str(c), compare)
self.classes.Address)
session = create_session()
- s = session.query(User).filter(and_(addresses.c.email_address == bindparam('emailad'),
- Address.user_id==User.id)).statement
+ s = session.query(User).filter(
+ and_(addresses.c.email_address == bindparam('emailad'),
+ Address.user_id == User.id)).statement
- l = list(session.query(User).instances(s.execute(emailad = 'jack@bean.com')))
+ l = list(
+ session.query(User).instances(s.execute(emailad='jack@bean.com')))
eq_([User(id=7)], l)
def test_aliased_sql_construct(self):
session = create_session()
- q = session.query(User.id).filter(User.id==7)
+ q = session.query(User.id).filter(User.id == 7)
- q = session.query(Address).filter(Address.user_id==q)
+ q = session.query(Address).filter(Address.user_id == q)
assert isinstance(q._criterion.right, expression.ColumnElement)
self.assert_compile(
q,
User = self.classes.User
session = create_session()
- a1 = session.query(User.id).filter(User.id==7).subquery('foo1')
- a2 = session.query(User.id).filter(User.id==7).subquery(name='foo2')
- a3 = session.query(User.id).filter(User.id==7).subquery()
+ a1 = session.query(User.id).filter(User.id == 7).subquery('foo1')
+ a2 = session.query(User.id).filter(User.id == 7).subquery(name='foo2')
+ a3 = session.query(User.id).filter(User.id == 7).subquery()
eq_(a1.name, 'foo1')
eq_(a2.name, 'foo2')
User = self.classes.User
session = create_session()
- a1 = session.query(User.id).filter(User.id == 7).subquery(with_labels=True)
+ a1 = session.query(User.id).filter(User.id == 7). \
+ subquery(with_labels=True)
assert a1.c.users_id is not None
def test_reduced_subquery(self):
session = create_session()
a1 = session.query(User.id, ua.id, ua.name).\
- filter(User.id == ua.id).subquery(reduce_columns=True)
+ filter(User.id == ua.id).subquery(reduce_columns=True)
self.assert_compile(a1,
"SELECT users.id, users_1.name FROM "
"users, users AS users_1 WHERE users.id = users_1.id")
session = create_session()
- q = session.query(User.id).filter(User.id==7).label('foo')
+ q = session.query(User.id).filter(User.id == 7).label('foo')
self.assert_compile(
session.query(q),
"SELECT (SELECT users.id FROM users WHERE users.id = :id_1) AS foo"
session = create_session()
- q = session.query(User.id).filter(User.id==7).as_scalar()
+ q = session.query(User.id).filter(User.id == 7).as_scalar()
self.assert_compile(session.query(User).filter(User.id.in_(q)),
'SELECT users.id AS users_id, users.name '
session = create_session()
q = session.query(User.id).filter(User.id == bindparam('foo')).\
- params(foo=7).subquery()
+ params(foo=7).subquery()
q = session.query(User).filter(User.id.in_(q))
session = create_session()
s = session.query(User.id).join(User.addresses).group_by(User.id).\
- having(func.count(Address.id) > 2)
- eq_(
- session.query(User).filter(User.id.in_(s)).all(),
- [User(id=8)]
- )
+ having(func.count(Address.id) > 2)
+ eq_(session.query(User).filter(User.id.in_(s)).all(), [User(id=8)])
def test_union(self):
User = self.classes.User
s = create_session()
- q1 = s.query(User).filter(User.name=='ed').with_labels()
- q2 = s.query(User).filter(User.name=='fred').with_labels()
+ q1 = s.query(User).filter(User.name == 'ed').with_labels()
+ q2 = s.query(User).filter(User.name == 'fred').with_labels()
eq_(
- s.query(User).from_statement(union(q1, q2).order_by('users_name')).all(),
- [User(name='ed'), User(name='fred')]
+ s.query(User).from_statement(union(q1, q2).
+ order_by('users_name')).all(), [User(name='ed'), User(name='fred')]
)
def test_select(self):
s = create_session()
- # this is actually not legal on most DBs since the subquery has no alias
- q1 = s.query(User).filter(User.name=='ed')
-
+ # this is actually not legal on most DBs since the subquery has no
+ # alias
+ q1 = s.query(User).filter(User.name == 'ed')
self.assert_compile(
select([q1]),
s = create_session()
- # TODO: do we want aliased() to detect a query and convert to subquery()
- # automatically ?
- q1 = s.query(Address).filter(Address.email_address=='jack@bean.com')
+ # TODO: do we want aliased() to detect a query and convert to
+ # subquery() automatically ?
+ q1 = s.query(Address).filter(Address.email_address == 'jack@bean.com')
adalias = aliased(Address, q1.subquery())
eq_(
- s.query(User, adalias).join(adalias, User.id==adalias.user_id).all(),
- [(User(id=7,name='jack'), Address(email_address='jack@bean.com',user_id=7,id=1))]
- )
+ s.query(User, adalias).join(adalias, User.id == adalias.user_id).
+ all(),
+ [
+ (
+ User(id=7, name='jack'),
+ Address(email_address='jack@bean.com', user_id=7, id=1))])
+
# more slice tests are available in test/orm/generative.py
class SliceTest(QueryTest):
def test_first(self):
User = self.classes.User
- assert User(id=7) == create_session().query(User).first()
+ assert User(id=7) == create_session().query(User).first()
- assert create_session().query(User).filter(User.id==27).first() is None
+ assert create_session().query(User).filter(User.id == 27). \
+ first() is None
@testing.only_on('sqlite', 'testing execution but db-specific syntax')
def test_limit_offset_applies(self):
sess = create_session()
q = sess.query(User)
- self.assert_sql(testing.db, lambda: q[10:20], [
- ("SELECT users.id AS users_id, users.name AS users_name FROM users LIMIT :param_1 OFFSET :param_2", {'param_1':10, 'param_2':10})
- ])
-
- self.assert_sql(testing.db, lambda: q[:20], [
- ("SELECT users.id AS users_id, users.name AS users_name FROM users LIMIT :param_1 OFFSET :param_2", {'param_1':20, 'param_2':0})
- ])
-
- self.assert_sql(testing.db, lambda: q[5:], [
- ("SELECT users.id AS users_id, users.name AS users_name FROM users LIMIT :param_1 OFFSET :param_2", {'param_1':-1, 'param_2':5})
- ])
+ self.assert_sql(
+ testing.db, lambda: q[10:20], [
+ (
+ "SELECT users.id AS users_id, users.name "
+ "AS users_name FROM users LIMIT :param_1 OFFSET :param_2",
+ {'param_1': 10, 'param_2': 10})])
+
+ self.assert_sql(
+ testing.db, lambda: q[:20], [
+ (
+ "SELECT users.id AS users_id, users.name "
+ "AS users_name FROM users LIMIT :param_1 OFFSET :param_2",
+ {'param_1': 20, 'param_2': 0})])
+
+ self.assert_sql(
+ testing.db, lambda: q[5:], [
+ (
+ "SELECT users.id AS users_id, users.name "
+ "AS users_name FROM users LIMIT :param_1 OFFSET :param_2",
+ {'param_1': -1, 'param_2': 5})])
self.assert_sql(testing.db, lambda: q[2:2], [])
self.assert_sql(testing.db, lambda: q[-2:-5], [])
- self.assert_sql(testing.db, lambda: q[-5:-2], [
- ("SELECT users.id AS users_id, users.name AS users_name FROM users", {})
- ])
+ self.assert_sql(
+ testing.db, lambda: q[-5:-2], [
+ (
+ "SELECT users.id AS users_id, users.name AS users_name "
+ "FROM users", {})])
- self.assert_sql(testing.db, lambda: q[-5:], [
- ("SELECT users.id AS users_id, users.name AS users_name FROM users", {})
- ])
-
- self.assert_sql(testing.db, lambda: q[:], [
- ("SELECT users.id AS users_id, users.name AS users_name FROM users", {})
- ])
+ self.assert_sql(
+ testing.db, lambda: q[-5:], [
+ (
+ "SELECT users.id AS users_id, users.name AS users_name "
+ "FROM users", {})])
+ self.assert_sql(
+ testing.db, lambda: q[:], [
+ (
+ "SELECT users.id AS users_id, users.name AS users_name "
+ "FROM users", {})])
class FilterTest(QueryTest, AssertsCompiledSQL):
User = self.classes.User
users = create_session().query(User).all()
- eq_(
- [User(id=7), User(id=8), User(id=9),User(id=10)],
- users
- )
+ eq_([User(id=7), User(id=8), User(id=9), User(id=10)], users)
@testing.requires.offset
def test_limit_offset(self):
sess = create_session()
- assert [User(id=8), User(id=9)] == sess.query(User).order_by(User.id).limit(2).offset(1).all()
+ assert [User(id=8), User(id=9)] == \
+ sess.query(User).order_by(User.id).limit(2).offset(1).all()
- assert [User(id=8), User(id=9)] == list(sess.query(User).order_by(User.id)[1:3])
+ assert [User(id=8), User(id=9)] == \
+ list(sess.query(User).order_by(User.id)[1:3])
assert User(id=8) == sess.query(User).order_by(User.id)[1]
User = self.classes.User
sess = create_session()
q1 = sess.query(self.classes.User).\
- order_by(self.classes.User.id).limit(bindparam('n'))
+ order_by(self.classes.User.id).limit(bindparam('n'))
for n in range(1, 4):
result = q1.params(n=n).all()
eq_(len(result), n)
eq_(
- sess.query(User).order_by(User.id).
- limit(bindparam('limit')).
- offset(bindparam('offset')).
- params(limit=2, offset=1).all(),
+ sess.query(User).order_by(User.id).limit(bindparam('limit')).
+ offset(bindparam('offset')).params(limit=2, offset=1).all(),
[User(id=8), User(id=9)]
)
eq_(
list(
- sess.query(User).params(a=1, b=3).
- order_by(User.id)
+ sess.query(User).params(a=1, b=3).order_by(User.id)
[cast(bindparam('a'), Integer):cast(bindparam('b'), Integer)]),
[User(id=8), User(id=9)]
)
sess = create_session(testing.db)
- assert sess.query(exists().where(User.id==9)).scalar()
- assert not sess.query(exists().where(User.id==29)).scalar()
+ assert sess.query(exists().where(User.id == 9)).scalar()
+ assert not sess.query(exists().where(User.id == 29)).scalar()
def test_one_filter(self):
User = self.classes.User
- assert [User(id=8), User(id=9)] == create_session().query(User).filter(User.name.endswith('ed')).all()
+ assert [User(id=8), User(id=9)] == \
+ create_session().query(User).filter(User.name.endswith('ed')).all()
def test_contains(self):
"""test comparing a collection to an object instance."""
User, Address = self.classes.User, self.classes.Address
-
sess = create_session()
address = sess.query(Address).get(3)
- assert [User(id=8)] == sess.query(User).filter(User.addresses.contains(address)).all()
+ assert [User(id=8)] == \
+ sess.query(User).filter(User.addresses.contains(address)).all()
try:
sess.query(User).filter(User.addresses == address)
except sa_exc.InvalidRequestError:
assert True
- assert [User(id=10)] == sess.query(User).filter(User.addresses==None).all()
+ assert [User(id=10)] == \
+ sess.query(User).filter(User.addresses == None).all()
try:
- assert [User(id=7), User(id=9), User(id=10)] == sess.query(User).filter(User.addresses!=address).all()
+ assert [User(id=7), User(id=9), User(id=10)] == \
+ sess.query(User).filter(User.addresses != address).all()
assert False
except sa_exc.InvalidRequestError:
assert True
- #assert [User(id=7), User(id=9), User(id=10)] == sess.query(User).filter(User.addresses!=address).all()
+ # assert [User(id=7), User(id=9), User(id=10)] ==
+ # sess.query(User).filter(User.addresses!=address).all()
def test_clause_element_ok(self):
User = self.classes.User
)
def test_unique_binds_join_cond(self):
- """test that binds used when the lazyclause is used in criterion are unique"""
+ """test that binds used when the lazyclause is used in criterion are
+ unique"""
User, Address = self.classes.User, self.classes.Address
sess = Session()
"users.name AS users_name FROM users WHERE users.id = :param_1 "
"UNION SELECT users.id AS users_id, users.name AS users_name "
"FROM users WHERE users.id = :param_2) AS anon_1",
- checkparams = {'param_1': 7, 'param_2': 8}
+ checkparams={'param_1': 7, 'param_2': 8}
)
def test_any(self):
sess = create_session()
- assert [User(id=8), User(id=9)] == sess.query(User).filter(User.addresses.any(Address.email_address.like('%ed%'))).all()
+ assert [User(id=8), User(id=9)] == \
+ sess.query(User). \
+ filter(
+ User.addresses.any(Address.email_address.like('%ed%'))).all()
- assert [User(id=8)] == sess.query(User).filter(User.addresses.any(Address.email_address.like('%ed%'), id=4)).all()
+ assert [User(id=8)] == \
+ sess.query(User). \
+ filter(
+ User.addresses.any(
+ Address.email_address.like('%ed%'), id=4)).all()
- assert [User(id=8)] == sess.query(User).filter(User.addresses.any(Address.email_address.like('%ed%'))).\
+ assert [User(id=8)] == \
+ sess.query(User). \
+ filter(User.addresses.any(Address.email_address.like('%ed%'))).\
filter(User.addresses.any(id=4)).all()
- assert [User(id=9)] == sess.query(User).filter(User.addresses.any(email_address='fred@fred.com')).all()
+ assert [User(id=9)] == \
+ sess.query(User). \
+ filter(User.addresses.any(email_address='fred@fred.com')).all()
# test that any() doesn't overcorrelate
- assert [User(id=7), User(id=8)] == sess.query(User).join("addresses").filter(~User.addresses.any(Address.email_address=='fred@fred.com')).all()
+ assert [User(id=7), User(id=8)] == \
+ sess.query(User).join("addresses"). \
+ filter(
+ ~User.addresses.any(
+ Address.email_address == 'fred@fred.com')).all()
# test that the contents are not adapted by the aliased join
- assert [User(id=7), User(id=8)] == sess.query(User).join("addresses", aliased=True).filter(~User.addresses.any(Address.email_address=='fred@fred.com')).all()
+ assert [User(id=7), User(id=8)] == \
+ sess.query(User).join("addresses", aliased=True). \
+ filter(
+ ~User.addresses.any(
+ Address.email_address == 'fred@fred.com')).all()
- assert [User(id=10)] == sess.query(User).outerjoin("addresses", aliased=True).filter(~User.addresses.any()).all()
+ assert [User(id=10)] == \
+ sess.query(User).outerjoin("addresses", aliased=True). \
+ filter(~User.addresses.any()).all()
def test_has(self):
- Dingaling, User, Address = (self.classes.Dingaling,
- self.classes.User,
- self.classes.Address)
+ Dingaling, User, Address = (
+ self.classes.Dingaling, self.classes.User, self.classes.Address)
sess = create_session()
- assert [Address(id=5)] == sess.query(Address).filter(Address.user.has(name='fred')).all()
+ assert [Address(id=5)] == \
+ sess.query(Address).filter(Address.user.has(name='fred')).all()
- assert [Address(id=2), Address(id=3), Address(id=4), Address(id=5)] == \
- sess.query(Address).filter(Address.user.has(User.name.like('%ed%'))).order_by(Address.id).all()
+ assert [Address(id=2), Address(id=3), Address(id=4), Address(id=5)] \
+ == sess.query(Address). \
+ filter(Address.user.has(User.name.like('%ed%'))). \
+ order_by(Address.id).all()
assert [Address(id=2), Address(id=3), Address(id=4)] == \
- sess.query(Address).filter(Address.user.has(User.name.like('%ed%'), id=8)).order_by(Address.id).all()
+ sess.query(Address). \
+ filter(Address.user.has(User.name.like('%ed%'), id=8)). \
+ order_by(Address.id).all()
# test has() doesn't overcorrelate
assert [Address(id=2), Address(id=3), Address(id=4)] == \
- sess.query(Address).join("user").filter(Address.user.has(User.name.like('%ed%'), id=8)).order_by(Address.id).all()
+ sess.query(Address).join("user"). \
+ filter(Address.user.has(User.name.like('%ed%'), id=8)). \
+ order_by(Address.id).all()
# test has() doesn't get subquery contents adapted by aliased join
assert [Address(id=2), Address(id=3), Address(id=4)] == \
- sess.query(Address).join("user", aliased=True).filter(Address.user.has(User.name.like('%ed%'), id=8)).order_by(Address.id).all()
+ sess.query(Address).join("user", aliased=True). \
+ filter(Address.user.has(User.name.like('%ed%'), id=8)). \
+ order_by(Address.id).all()
dingaling = sess.query(Dingaling).get(2)
- assert [User(id=9)] == sess.query(User).filter(User.addresses.any(Address.dingaling==dingaling)).all()
+ assert [User(id=9)] == \
+ sess.query(User). \
+ filter(User.addresses.any(Address.dingaling == dingaling)).all()
def test_contains_m2m(self):
Item, Order = self.classes.Item, self.classes.Order
[Order(id=3)]
)
-
def test_comparison(self):
"""test scalar comparison to an object instance"""
- Item, Order, Dingaling, User, Address = (self.classes.Item,
- self.classes.Order,
- self.classes.Dingaling,
- self.classes.User,
- self.classes.Address)
-
+ Item, Order, Dingaling, User, Address = (
+ self.classes.Item, self.classes.Order, self.classes.Dingaling,
+ self.classes.User, self.classes.Address)
sess = create_session()
user = sess.query(User).get(8)
- assert [Address(id=2), Address(id=3), Address(id=4)] == sess.query(Address).filter(Address.user==user).all()
+ assert [Address(id=2), Address(id=3), Address(id=4)] == \
+ sess.query(Address).filter(Address.user == user).all()
- assert [Address(id=1), Address(id=5)] == sess.query(Address).filter(Address.user!=user).all()
+ assert [Address(id=1), Address(id=5)] == \
+ sess.query(Address).filter(Address.user != user).all()
# generates an IS NULL
assert [] == sess.query(Address).filter(Address.user == None).all()
assert [] == sess.query(Address).filter(Address.user == null()).all()
- assert [Order(id=5)] == sess.query(Order).filter(Order.address == None).all()
+ assert [Order(id=5)] == \
+ sess.query(Order).filter(Order.address == None).all()
# o2o
dingaling = sess.query(Dingaling).get(2)
- assert [Address(id=5)] == sess.query(Address).filter(Address.dingaling==dingaling).all()
+ assert [Address(id=5)] == \
+ sess.query(Address).filter(Address.dingaling == dingaling).all()
# m2m
- eq_(sess.query(Item).filter(Item.keywords==None).order_by(Item.id).all(), [Item(id=4), Item(id=5)])
- eq_(sess.query(Item).filter(Item.keywords!=None).order_by(Item.id).all(), [Item(id=1),Item(id=2), Item(id=3)])
-
+ eq_(
+ sess.query(Item).filter(Item.keywords == None).
+ order_by(Item.id).all(), [Item(id=4), Item(id=5)])
+ eq_(
+ sess.query(Item).filter(Item.keywords != None).
+ order_by(Item.id).all(), [Item(id=1), Item(id=2), Item(id=3)])
def test_filter_by(self):
User, Address = self.classes.User, self.classes.Address
sess = create_session()
user = sess.query(User).get(8)
- assert [Address(id=2), Address(id=3), Address(id=4)] == sess.query(Address).filter_by(user=user).all()
+ assert [Address(id=2), Address(id=3), Address(id=4)] == \
+ sess.query(Address).filter_by(user=user).all()
# many to one generates IS NULL
- assert [] == sess.query(Address).filter_by(user = None).all()
- assert [] == sess.query(Address).filter_by(user = null()).all()
+ assert [] == sess.query(Address).filter_by(user=None).all()
+ assert [] == sess.query(Address).filter_by(user=null()).all()
# one to many generates WHERE NOT EXISTS
- assert [User(name='chuck')] == sess.query(User).filter_by(addresses = None).all()
- assert [User(name='chuck')] == sess.query(User).filter_by(addresses = null()).all()
+ assert [User(name='chuck')] == \
+ sess.query(User).filter_by(addresses=None).all()
+ assert [User(name='chuck')] == \
+ sess.query(User).filter_by(addresses=null()).all()
def test_filter_by_tables(self):
addresses = self.tables.addresses
sess = create_session()
self.assert_compile(
- sess.query(users).\
- filter_by(name='ed').\
- join(addresses, users.c.id==addresses.c.user_id).\
- filter_by(email_address='ed@ed.com'),
+ sess.query(users).filter_by(name='ed').
+ join(addresses, users.c.id == addresses.c.user_id).
+ filter_by(email_address='ed@ed.com'),
"SELECT users.id AS users_id, users.name AS users_name "
"FROM users JOIN addresses ON users.id = addresses.user_id "
"WHERE users.name = :name_1 AND "
assert_raises_message(
sa.exc.InvalidRequestError,
"Entity 'addresses' has no property 'name'",
- sess.query(addresses).\
- filter_by, name='ed'
+ sess.query(addresses).filter_by, name='ed'
)
def test_none_comparison(self):
- Order, User, Address = (self.classes.Order,
- self.classes.User,
- self.classes.Address)
+ Order, User, Address = (
+ self.classes.Order, self.classes.User, self.classes.Address)
sess = create_session()
# scalar
eq_(
[Order(description="order 5")],
- sess.query(Order).filter(Order.address_id==None).all()
+ sess.query(Order).filter(Order.address_id == None).all()
)
eq_(
[Order(description="order 5")],
- sess.query(Order).filter(Order.address_id==null()).all()
+ sess.query(Order).filter(Order.address_id == null()).all()
)
# o2o
- eq_([Address(id=1), Address(id=3), Address(id=4)],
- sess.query(Address).filter(Address.dingaling==None).order_by(Address.id).all())
- eq_([Address(id=1), Address(id=3), Address(id=4)],
- sess.query(Address).filter(Address.dingaling==null()).order_by(Address.id).all())
- eq_([Address(id=2), Address(id=5)], sess.query(Address).filter(Address.dingaling != None).order_by(Address.id).all())
- eq_([Address(id=2), Address(id=5)], sess.query(Address).filter(Address.dingaling != null()).order_by(Address.id).all())
+ eq_(
+ [Address(id=1), Address(id=3), Address(id=4)],
+ sess.query(Address).filter(Address.dingaling == None).
+ order_by(Address.id).all())
+ eq_(
+ [Address(id=1), Address(id=3), Address(id=4)],
+ sess.query(Address).filter(Address.dingaling == null()).
+ order_by(Address.id).all())
+ eq_(
+ [Address(id=2), Address(id=5)],
+ sess.query(Address).filter(Address.dingaling != None).
+ order_by(Address.id).all())
+ eq_(
+ [Address(id=2), Address(id=5)],
+ sess.query(Address).filter(Address.dingaling != null()).
+ order_by(Address.id).all())
# m2o
- eq_([Order(id=5)], sess.query(Order).filter(Order.address==None).all())
- eq_([Order(id=1), Order(id=2), Order(id=3), Order(id=4)], sess.query(Order).order_by(Order.id).filter(Order.address!=None).all())
+ eq_(
+ [Order(id=5)],
+ sess.query(Order).filter(Order.address == None).all())
+ eq_(
+ [Order(id=1), Order(id=2), Order(id=3), Order(id=4)],
+ sess.query(Order).order_by(Order.id).
+ filter(Order.address != None).all())
# o2m
- eq_([User(id=10)], sess.query(User).filter(User.addresses==None).all())
- eq_([User(id=7),User(id=8),User(id=9)], sess.query(User).filter(User.addresses!=None).order_by(User.id).all())
+ eq_(
+ [User(id=10)],
+ sess.query(User).filter(User.addresses == None).all())
+ eq_(
+ [User(id=7), User(id=8), User(id=9)],
+ sess.query(User).filter(User.addresses != None).
+ order_by(User.id).all())
def test_blank_filter_by(self):
User = self.classes.User
)
eq_(
[(7,), (8,), (9,), (10,)],
- create_session().query(User.id).filter_by(**{}).order_by(User.id).all()
+ create_session().query(User.id).filter_by(**{}).
+ order_by(User.id).all()
)
def test_text_coerce(self):
"AS users_name FROM users WHERE name='ed'"
)
+
class SetOpsTest(QueryTest, AssertsCompiledSQL):
__dialect__ = 'default'
s = create_session()
- fred = s.query(User).filter(User.name=='fred')
- ed = s.query(User).filter(User.name=='ed')
- jack = s.query(User).filter(User.name=='jack')
+ fred = s.query(User).filter(User.name == 'fred')
+ ed = s.query(User).filter(User.name == 'ed')
+ jack = s.query(User).filter(User.name == 'jack')
- eq_(fred.union(ed).order_by(User.name).all(),
+ eq_(
+ fred.union(ed).order_by(User.name).all(),
[User(name='ed'), User(name='fred')]
)
- eq_(fred.union(ed, jack).order_by(User.name).all(),
+ eq_(
+ fred.union(ed, jack).order_by(User.name).all(),
[User(name='ed'), User(name='fred'), User(name='jack')]
)
User, Address = self.classes.User, self.classes.Address
-
s = create_session()
q1 = s.query(User, Address).join(User.addresses).\
- filter(Address.email_address=="ed@wood.com")
+ filter(Address.email_address == "ed@wood.com")
q2 = s.query(User, Address).join(User.addresses).\
- filter(Address.email_address=="jack@bean.com")
+ filter(Address.email_address == "jack@bean.com")
q3 = q1.union(q2).order_by(User.name)
eq_(
User = self.classes.User
-
s = Session()
q1 = s.query(User, literal("x"))
q2 = s.query(User, literal_column("'y'"))
self.assert_compile(
q3,
- "SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS anon_1_users_name,"
- " anon_1.param_1 AS anon_1_param_1 FROM (SELECT users.id AS users_id, users.name AS"
- " users_name, :param_1 AS param_1 FROM users UNION SELECT users.id AS users_id, "
+ "SELECT anon_1.users_id AS anon_1_users_id, "
+ "anon_1.users_name AS anon_1_users_name, "
+ "anon_1.param_1 AS anon_1_param_1 "
+ "FROM (SELECT users.id AS users_id, users.name AS "
+ "users_name, :param_1 AS param_1 "
+ "FROM users UNION SELECT users.id AS users_id, "
"users.name AS users_name, 'y' FROM users) AS anon_1"
)
['User', 'foo']
)
- for q in (q3.order_by(User.id, "anon_1_param_1"), q6.order_by(User.id, "foo")):
- eq_(q.all(),
+ for q in (
+ q3.order_by(User.id, "anon_1_param_1"),
+ q6.order_by(User.id, "foo")):
+ eq_(
+ q.all(),
[
(User(id=7, name='jack'), 'x'),
(User(id=7, name='jack'), 'y'),
"SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS "
"anon_1_users_name, anon_1.foo AS anon_1_foo, anon_1.bar AS "
"anon_1_bar FROM (SELECT users.id AS users_id, users.name AS "
- "users_name, c1 AS foo, c2 AS bar FROM users UNION SELECT users.id "
+ "users_name, c1 AS foo, c2 AS bar "
+ "FROM users UNION SELECT users.id "
"AS users_id, users.name AS users_name, c1 AS foo, c2 AS bar "
"FROM users) AS anon_1 ORDER BY anon_1.foo"
)
"SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS "
"anon_1_users_name, anon_1.foo AS anon_1_foo, anon_1.bar AS "
"anon_1_bar FROM (SELECT users.id AS users_id, users.name AS "
- "users_name, c1 AS foo, c2 AS bar FROM users UNION SELECT users.id "
+ "users_name, c1 AS foo, c2 AS bar "
+ "FROM users UNION SELECT users.id "
"AS users_id, users.name AS users_name, c1 AS foo, c2 AS bar "
"FROM users) AS anon_1 ORDER BY anon_1.foo"
)
['name']
)
-
@testing.requires.intersect
def test_intersect(self):
User = self.classes.User
s = create_session()
- fred = s.query(User).filter(User.name=='fred')
- ed = s.query(User).filter(User.name=='ed')
- jack = s.query(User).filter(User.name=='jack')
- eq_(fred.intersect(ed, jack).all(),
- []
- )
+ fred = s.query(User).filter(User.name == 'fred')
+ ed = s.query(User).filter(User.name == 'ed')
+ jack = s.query(User).filter(User.name == 'jack')
+ eq_(fred.intersect(ed, jack).all(), [])
- eq_(fred.union(ed).intersect(ed.union(jack)).all(),
- [User(name='ed')]
- )
+ eq_(fred.union(ed).intersect(ed.union(jack)).all(), [User(name='ed')])
def test_eager_load(self):
User, Address = self.classes.User, self.classes.Address
s = create_session()
- fred = s.query(User).filter(User.name=='fred')
- ed = s.query(User).filter(User.name=='ed')
- jack = s.query(User).filter(User.name=='jack')
+ fred = s.query(User).filter(User.name == 'fred')
+ ed = s.query(User).filter(User.name == 'ed')
def go():
eq_(
- fred.union(ed).order_by(User.name).options(joinedload(User.addresses)).all(),
- [
- User(name='ed', addresses=[Address(), Address(), Address()]),
- User(name='fred', addresses=[Address()])
- ]
+ fred.union(ed).order_by(User.name).
+ options(joinedload(User.addresses)).all(), [
+ User(
+ name='ed', addresses=[Address(), Address(),
+ Address()]),
+ User(name='fred', addresses=[Address()])]
)
self.assert_sql_count(testing.db, go, 1)
sess = create_session()
orders = sess.query(Order).filter(Order.id.in_([2, 3, 4]))
- eq_(next(orders.values(func.sum(Order.user_id * Order.address_id))), (79,))
+ eq_(
+ next(orders.values(func.sum(Order.user_id * Order.address_id))),
+ (79,))
eq_(orders.value(func.sum(Order.user_id * Order.address_id)), 79)
def test_apply(self):
Order = self.classes.Order
sess = create_session()
- assert sess.query(func.sum(Order.user_id * Order.address_id)).filter(Order.id.in_([2, 3, 4])).one() == (79,)
+ assert sess.query(func.sum(Order.user_id * Order.address_id)). \
+ filter(Order.id.in_([2, 3, 4])).one() == (79,)
def test_having(self):
User, Address = self.classes.User, self.classes.Address
sess = create_session()
- assert [User(name='ed',id=8)] == sess.query(User).order_by(User.id).group_by(User).join('addresses').having(func.count(Address.id)> 2).all()
+ assert [User(name='ed', id=8)] == \
+ sess.query(User).order_by(User.id).group_by(User). \
+ join('addresses').having(func.count(Address.id) > 2).all()
- assert [User(name='jack',id=7), User(name='fred',id=9)] == sess.query(User).order_by(User.id).group_by(User).join('addresses').having(func.count(Address.id)< 2).all()
+ assert [User(name='jack', id=7), User(name='fred', id=9)] == \
+ sess.query(User).order_by(User.id).group_by(User). \
+ join('addresses').having(func.count(Address.id) < 2).all()
class ExistsTest(QueryTest, AssertsCompiledSQL):
sess = create_session()
q1 = sess.query(User)
- self.assert_compile(sess.query(q1.exists()),
+ self.assert_compile(
+ sess.query(q1.exists()),
'SELECT EXISTS ('
- 'SELECT 1 FROM users'
+ 'SELECT 1 FROM users'
') AS anon_1'
)
q2 = sess.query(User).filter(User.name == 'fred')
- self.assert_compile(sess.query(q2.exists()),
+ self.assert_compile(
+ sess.query(q2.exists()),
'SELECT EXISTS ('
- 'SELECT 1 FROM users WHERE users.name = :name_1'
+ 'SELECT 1 FROM users WHERE users.name = :name_1'
') AS anon_1'
)
sess = create_session()
q1 = sess.query(User, Address).filter(User.id == Address.user_id)
- self.assert_compile(sess.query(q1.exists()),
+ self.assert_compile(
+ sess.query(q1.exists()),
'SELECT EXISTS ('
- 'SELECT 1 FROM users, addresses '
- 'WHERE users.id = addresses.user_id'
+ 'SELECT 1 FROM users, addresses '
+ 'WHERE users.id = addresses.user_id'
') AS anon_1'
)
sess = create_session()
q1 = sess.query().select_from(User).exists()
- self.assert_compile(sess.query(q1),
- 'SELECT EXISTS ('
- 'SELECT 1 FROM users'
- ') AS anon_1'
+ self.assert_compile(
+ sess.query(q1),
+ 'SELECT EXISTS (SELECT 1 FROM users) AS anon_1'
)
# rumors about Oracle preferring count(1) don't appear
# to be well founded.
self.assert_sql_execution(
- testing.db,
- s.query(User).count,
- CompiledSQL(
- "SELECT count(*) AS count_1 FROM "
- "(SELECT users.id AS users_id, users.name "
- "AS users_name FROM users) AS anon_1",
- {}
- )
+ testing.db, s.query(User).count, CompiledSQL(
+ "SELECT count(*) AS count_1 FROM "
+ "(SELECT users.id AS users_id, users.name "
+ "AS users_name FROM users) AS anon_1", {}
+ )
)
def test_multiple_entity(self):
User, Address = self.classes.User, self.classes.Address
-
s = create_session()
q = s.query(func.count(distinct(User.name)))
User = self.classes.User
eq_(
- [User(id=7), User(id=8), User(id=9),User(id=10)],
+ [User(id=7), User(id=8), User(id=9), User(id=10)],
create_session().query(User).order_by(User.id).distinct().all()
)
eq_(
- [User(id=7), User(id=9), User(id=8),User(id=10)],
- create_session().query(User).distinct().order_by(desc(User.name)).all()
+ [User(id=7), User(id=9), User(id=8), User(id=10)],
+ create_session().query(User).distinct().
+ order_by(desc(User.name)).all()
)
def test_joined(self):
- """test that orderbys from a joined table get placed into the columns clause when DISTINCT is used"""
+ """test that orderbys from a joined table get placed into the columns
+ clause when DISTINCT is used"""
User, Address = self.classes.User, self.classes.Address
-
sess = create_session()
- q = sess.query(User).join('addresses').distinct().order_by(desc(Address.email_address))
+ q = sess.query(User).join('addresses').distinct(). \
+ order_by(desc(Address.email_address))
assert [User(id=7), User(id=9), User(id=8)] == q.all()
sess.expunge_all()
# test that it works on embedded joinedload/LIMIT subquery
- q = sess.query(User).join('addresses').distinct().options(joinedload('addresses')).order_by(desc(Address.email_address)).limit(2)
+ q = sess.query(User).join('addresses').distinct(). \
+ options(joinedload('addresses')).\
+ order_by(desc(Address.email_address)).limit(2)
def go():
assert [
.prefix_with('PREFIX_1')
expected = "SELECT PREFIX_1 "\
"users.name AS users_name FROM users"
- self.assert_compile(query, expected,
- dialect=default.DefaultDialect()
- )
+ self.assert_compile(query, expected, dialect=default.DefaultDialect())
def test_many_prefixes(self):
User = self.classes.User
sess = create_session()
- query = sess.query(User.name)\
- .prefix_with('PREFIX_1', 'PREFIX_2')
+ query = sess.query(User.name).prefix_with('PREFIX_1', 'PREFIX_2')
expected = "SELECT PREFIX_1 PREFIX_2 "\
"users.name AS users_name FROM users"
- self.assert_compile(query, expected,
- dialect=default.DefaultDialect()
- )
+ self.assert_compile(query, expected, dialect=default.DefaultDialect())
def test_chained_prefixes(self):
User = self.classes.User
.prefix_with('PREFIX_2', 'PREFIX_3')
expected = "SELECT PREFIX_1 PREFIX_2 PREFIX_3 "\
"users.name AS users_name FROM users"
- self.assert_compile(query, expected,
- dialect=default.DefaultDialect()
- )
+ self.assert_compile(query, expected, dialect=default.DefaultDialect())
class YieldTest(QueryTest):
User = self.classes.User
sess = create_session()
- q = iter(sess.query(User).yield_per(1).from_statement("select * from users"))
+ q = iter(
+ sess.query(User).yield_per(1).from_statement(
+ "select * from users"))
ret = []
eq_(len(sess.identity_map), 0)
eq_(q._execution_options, {"stream_results": True, "foo": "bar"})
-
class HintsTest(QueryTest, AssertsCompiledSQL):
def test_hints(self):
User = self.classes.User
sess = create_session()
self.assert_compile(
- sess.query(User).with_hint(User, 'USE INDEX (col1_index,col2_index)'),
+ sess.query(User).with_hint(
+ User, 'USE INDEX (col1_index,col2_index)'),
"SELECT users.id AS users_id, users.name AS users_name "
"FROM users USE INDEX (col1_index,col2_index)",
dialect=dialect
)
self.assert_compile(
- sess.query(User).with_hint(User, 'WITH INDEX col1_index', 'sybase'),
+ sess.query(User).with_hint(
+ User, 'WITH INDEX col1_index', 'sybase'),
"SELECT users.id AS users_id, users.name AS users_name "
- "FROM users",
- dialect=dialect
+ "FROM users", dialect=dialect
)
ualias = aliased(User)
self.assert_compile(
- sess.query(User, ualias).with_hint(ualias, 'USE INDEX (col1_index,col2_index)').
- join(ualias, ualias.id > User.id),
+ sess.query(User, ualias).with_hint(
+ ualias, 'USE INDEX (col1_index,col2_index)').
+ join(ualias, ualias.id > User.id),
"SELECT users.id AS users_id, users.name AS users_name, "
"users_1.id AS users_1_id, users_1.name AS users_1_name "
- "FROM users INNER JOIN users AS users_1 USE INDEX (col1_index,col2_index) "
- "ON users_1.id > users.id",
- dialect=dialect
+ "FROM users INNER JOIN users AS users_1 "
+ "USE INDEX (col1_index,col2_index) "
+ "ON users_1.id > users.id", dialect=dialect
)
)
eq_(
- create_session().query(User).
- from_statement("select * from users order by id").first(),
- User(id=7)
+ create_session().query(User).from_statement(
+ "select * from users order by id").first(), User(id=7)
)
eq_(
- create_session().query(User).
- from_statement(
- "select * from users where name='nonexistent'").first(),
- None
- )
+ create_session().query(User).from_statement(
+ "select * from users where name='nonexistent'").first(), None)
def test_fragment(self):
User = self.classes.User
eq_(
create_session().query(User).filter("name='fred'").
- filter("id=9").all(),
- [User(id=9)]
+ filter("id=9").all(), [User(id=9)]
)
eq_(
create_session().query(User).filter("name='fred'").
- filter(User.id == 9).all(),
- [User(id=9)]
+ filter(User.id == 9).all(), [User(id=9)]
)
def test_binds(self):
User = self.classes.User
eq_(
- create_session().query(User).filter("id in (:id1, :id2)").\
- params(id1=8, id2=9).all(),
- [User(id=8), User(id=9)]
+ create_session().query(User).filter("id in (:id1, :id2)").
+ params(id1=8, id2=9).all(), [User(id=8), User(id=9)]
)
def test_as_column(self):
assert_raises(sa_exc.InvalidRequestError, s.query,
User.id, text("users.name"))
- eq_(s.query(User.id, "name").order_by(User.id).all(),
- [(7, 'jack'), (8, 'ed'), (9, 'fred'), (10, 'chuck')])
+ eq_(
+ s.query(User.id, "name").order_by(User.id).all(),
+ [(7, 'jack'), (8, 'ed'), (9, 'fred'), (10, 'chuck')])
def test_via_select(self):
User = self.classes.User
eq_(
s.query(User).from_statement(
- text("select * from users order by id").\
- columns(id=Integer, name=String)
- ).all(),
+ text("select * from users order by id").
+ columns(id=Integer, name=String)).all(),
[User(id=7), User(id=8), User(id=9), User(id=10)]
)
eq_(
s.query(User).from_statement(
- text("select * from users order by id").\
- columns(User.id, User.name)
- ).all(),
+ text("select * from users order by id").
+ columns(User.id, User.name)).all(),
[User(id=7), User(id=8), User(id=9), User(id=10)]
)
eq_(
s.query(User).select_from(
- text("select * from users").\
- columns(id=Integer, name=String)
+ text("select * from users").columns(id=Integer, name=String)
).order_by(User.id).all(),
[User(id=7), User(id=8), User(id=9), User(id=10)]
)
+
class ParentTest(QueryTest, AssertsCompiledSQL):
__dialect__ = 'default'
def test_o2m(self):
- User, orders, Order = (self.classes.User,
- self.tables.orders,
- self.classes.Order)
+ User, orders, Order = (
+ self.classes.User, self.tables.orders, self.classes.Order)
sess = create_session()
q = sess.query(User)
# test auto-lookup of property
o = sess.query(Order).with_parent(u1).all()
- assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o
+ assert [Order(description="order 1"), Order(description="order 3"),
+ Order(description="order 5")] == o
# test with explicit property
o = sess.query(Order).with_parent(u1, property='orders').all()
- assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o
+ assert [Order(description="order 1"), Order(description="order 3"),
+ Order(description="order 5")] == o
o = sess.query(Order).with_parent(u1, property=User.orders).all()
- assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o
+ assert [Order(description="order 1"), Order(description="order 3"),
+ Order(description="order 5")] == o
o = sess.query(Order).filter(with_parent(u1, User.orders)).all()
- assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o
+ assert [
+ Order(description="order 1"), Order(description="order 3"),
+ Order(description="order 5")] == o
# test generative criterion
- o = sess.query(Order).with_parent(u1).filter(orders.c.id>2).all()
- assert [Order(description="order 3"), Order(description="order 5")] == o
+ o = sess.query(Order).with_parent(u1).filter(orders.c.id > 2).all()
+ assert [
+ Order(description="order 3"), Order(description="order 5")] == o
- # test against None for parent? this can't be done with the current API since we don't know
- # what mapper to use
- #assert sess.query(Order).with_parent(None, property='addresses').all() == [Order(description="order 5")]
+ # test against None for parent? this can't be done with the current
+ # API since we don't know what mapper to use
+ # assert
+ # sess.query(Order).with_parent(None, property='addresses').all()
+ # == [Order(description="order 5")]
def test_noparent(self):
Item, User = self.classes.Item, self.classes.User
sess = create_session()
i1 = sess.query(Item).filter_by(id=2).one()
k = sess.query(Keyword).with_parent(i1).all()
- assert [Keyword(name='red'), Keyword(name='small'), Keyword(name='square')] == k
+ assert [
+ Keyword(name='red'), Keyword(name='small'),
+ Keyword(name='square')] == k
def test_with_transient(self):
User, Order = self.classes.User, self.classes.Order
utrans = User(id=u1.id)
o = sess.query(Order).with_parent(utrans, 'orders')
eq_(
- [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")],
+ [
+ Order(description="order 1"), Order(description="order 3"),
+ Order(description="order 5")],
o.all()
)
o = sess.query(Order).filter(with_parent(utrans, 'orders'))
eq_(
- [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")],
+ [
+ Order(description="order 1"), Order(description="order 3"),
+ Order(description="order 5")],
o.all()
)
"addresses.email_address AS addresses_email_address FROM "
"addresses WHERE :param_1 = addresses.user_id UNION SELECT "
"addresses.id AS addresses_id, addresses.user_id AS "
- "addresses_user_id, addresses.email_address AS addresses_email_address "
+ "addresses_user_id, addresses.email_address "
+ "AS addresses_email_address "
"FROM addresses WHERE :param_2 = addresses.user_id) AS anon_1",
checkparams={'param_1': 7, 'param_2': 8},
)
checkparams={'param_1': 7, 'param_2': 8},
)
+
class SynonymTest(QueryTest):
@classmethod
cls.tables.item_keywords, cls.tables.addresses
mapper(User, users, properties={
- 'name_syn':synonym('name'),
- 'addresses':relationship(Address),
- 'orders':relationship(Order, backref='user', order_by=orders.c.id), # o2m, m2o
- 'orders_syn':synonym('orders'),
- 'orders_syn_2':synonym('orders_syn')
+ 'name_syn': synonym('name'),
+ 'addresses': relationship(Address),
+ 'orders': relationship(
+ Order, backref='user', order_by=orders.c.id), # o2m, m2o
+ 'orders_syn': synonym('orders'),
+ 'orders_syn_2': synonym('orders_syn')
})
mapper(Address, addresses)
mapper(Order, orders, properties={
- 'items':relationship(Item, secondary=order_items), #m2m
- 'address':relationship(Address), # m2o
- 'items_syn':synonym('items')
+ 'items': relationship(Item, secondary=order_items), # m2m
+ 'address': relationship(Address), # m2o
+ 'items_syn': synonym('items')
})
mapper(Item, items, properties={
- 'keywords':relationship(Keyword, secondary=item_keywords) #m2m
+ 'keywords': relationship(Keyword, secondary=item_keywords) # m2m
})
mapper(Keyword, keywords)
User, Order = self.classes.User, self.classes.Order
s = create_session()
+
def go():
result = s.query(User).filter_by(name='jack').\
- options(joinedload(User.orders_syn)).all()
+ options(joinedload(User.orders_syn)).all()
eq_(result, [
User(id=7, name='jack', orders=[
Order(description='order 1'),
User, Order = self.classes.User, self.classes.Order
s = create_session()
+
def go():
result = s.query(User).filter_by(name='jack').\
- options(joinedload(User.orders_syn_2)).all()
+ options(joinedload(User.orders_syn_2)).all()
eq_(result, [
User(id=7, name='jack', orders=[
Order(description='order 1'),
User, Order = self.classes.User, self.classes.Order
s = create_session()
+
def go():
result = s.query(User).filter_by(name='jack').\
- options(joinedload('orders_syn_2')).all()
+ options(joinedload('orders_syn_2')).all()
eq_(result, [
User(id=7, name='jack', orders=[
Order(description='order 1'),
['orders_syn', 'items_syn'],
['orders_syn_2', 'items_syn'],
):
- result = create_session().query(User).join(*j).filter_by(id=3).all()
+ result = create_session().query(User).join(*j).filter_by(id=3). \
+ all()
assert [User(id=7, name='jack'), User(id=9, name='fred')] == result
def test_with_parent(self):
sess = create_session()
q = sess.query(User)
- u1 = q.filter_by(**{nameprop:'jack'}).one()
+ u1 = q.filter_by(**{nameprop: 'jack'}).one()
o = sess.query(Order).with_parent(u1, property=orderprop).all()
- assert [Order(description="order 1"),
- Order(description="order 3"), Order(description="order 5")] == o
+ assert [
+ Order(description="order 1"), Order(description="order 3"),
+ Order(description="order 5")] == o
class ImmediateTest(_fixtures.FixtureTest):
sess = create_session()
- assert_raises(sa.orm.exc.NoResultFound,
- sess.query(User).filter(User.id == 99).one)
+ assert_raises(
+ sa.orm.exc.NoResultFound,
+ sess.query(User).filter(User.id == 99).one)
eq_(sess.query(User).filter(User.id == 7).one().id, 7)
- assert_raises(sa.orm.exc.MultipleResultsFound,
- sess.query(User).one)
+ assert_raises(sa.orm.exc.MultipleResultsFound, sess.query(User).one)
assert_raises(
sa.orm.exc.NoResultFound,
eq_(sess.query(User.id, User.name).filter(User.id == 7).one(),
(7, 'jack'))
- assert_raises(sa.orm.exc.MultipleResultsFound,
- sess.query(User.id, User.name).one)
+ assert_raises(
+ sa.orm.exc.MultipleResultsFound,
+ sess.query(User.id, User.name).one)
- assert_raises(sa.orm.exc.NoResultFound,
- (sess.query(User, Address).
- join(User.addresses).
- filter(Address.id == 99)).one)
+ assert_raises(
+ sa.orm.exc.NoResultFound,
+ (sess.query(User, Address).join(User.addresses).
+ filter(Address.id == 99)).one)
eq_((sess.query(User, Address).
join(User.addresses).
filter(Address.id == 4)).one(),
(User(id=8), Address(id=4)))
- assert_raises(sa.orm.exc.MultipleResultsFound,
- sess.query(User, Address).join(User.addresses).one)
+ assert_raises(
+ sa.orm.exc.MultipleResultsFound,
+ sess.query(User, Address).join(User.addresses).one)
# this result returns multiple rows, the first
# two rows being the same. but uniquing is
# not applied for a column based result.
- assert_raises(sa.orm.exc.MultipleResultsFound,
- sess.query(User.id).
- join(User.addresses).
- filter(User.id.in_([8, 9])).
- order_by(User.id).
- one)
+ assert_raises(
+ sa.orm.exc.MultipleResultsFound,
+ sess.query(User.id).join(User.addresses).
+ filter(User.id.in_([8, 9])).order_by(User.id).one)
# test that a join which ultimately returns
# multiple identities across many rows still
# raises, even though the first two rows are of
# the same identity and unique filtering
# is applied ([ticket:1688])
- assert_raises(sa.orm.exc.MultipleResultsFound,
- sess.query(User).
- join(User.addresses).
- filter(User.id.in_([8, 9])).
- order_by(User.id).
- one)
-
+ assert_raises(
+ sa.orm.exc.MultipleResultsFound,
+ sess.query(User).join(User.addresses).filter(User.id.in_([8, 9])).
+ order_by(User.id).one)
@testing.future
def test_getslice(self):
sess.query(User).filter_by(id=7).one())
assert_raises(sa.orm.exc.MultipleResultsFound, sess.query(User).scalar)
- assert_raises(sa.orm.exc.MultipleResultsFound, sess.query(User.id, User.name).scalar)
+ assert_raises(
+ sa.orm.exc.MultipleResultsFound,
+ sess.query(User.id, User.name).scalar)
def test_value(self):
User = self.classes.User
sess.bind = testing.db
eq_(sess.query().value(sa.literal_column('1').label('x')), 1)
+
class ExecutionOptionsTest(QueryTest):
def test_option_building(self):
q3_options = dict(foo='not bar', stream_results=True, answer=42)
assert q3._execution_options == q3_options
-
def test_options_in_connection(self):
User = self.classes.User
execution_options = dict(foo='bar', stream_results=True)
+
class TQuery(Query):
def instances(self, result, ctx):
try:
eq_(
- result.connection._execution_options,
- execution_options
- )
+ result.connection._execution_options,
+ execution_options)
finally:
result.close()
return iter([])
- sess = create_session(bind=testing.db, autocommit=False, query_cls=TQuery)
+ sess = create_session(
+ bind=testing.db, autocommit=False, query_cls=TQuery)
q1 = sess.query(User).execution_options(**execution_options)
q1.all()
"SELECT x AS x HAVING x = 1",
dialect=self._dialect(False)
)
-