- Query.__getitem__ now returns list/scalar in all cases, not generative (#1035)
- added Query.slice_() which provides the simple "limit/offset from a positive range" operation,
we can rename this to range_()/section()/_something_private_because_users_shouldnt_do_this() as needed
return equivs
def __no_criterion_condition(self, meth):
- if self._criterion or self._statement or self._from_obj or self._limit or self._offset or self._group_by or self._order_by:
+ if self._criterion or self._statement or self._from_obj or self._limit is not None or self._offset is not None or self._group_by or self._order_by:
raise sa_exc.InvalidRequestError("Query.%s() being called on a Query with existing criterion. " % meth)
self._statement = self._criterion = self._from_obj = None
"statement - can't apply criterion.") % meth)
def __no_limit_offset(self, meth):
- if self._limit or self._offset:
+ if self._limit is not None or self._offset is not None:
# TODO: do we want from_self() to be implicit here ? i vote explicit for the time being
raise sa_exc.InvalidRequestError("Query.%s() being called on a Query which already has LIMIT or OFFSET applied. "
"To filter/join to the row-limited results of the query, call from_self() first."
def __getitem__(self, item):
if isinstance(item, slice):
- start = item.start
- stop = item.stop
+ start, stop, step = util.decode_slice(item)
# if we slice from the end we need to execute the query
- if (isinstance(start, int) and start < 0) or \
- (isinstance(stop, int) and stop < 0):
+ if start < 0 or stop < 0:
return list(self)[item]
else:
- res = self._clone()
- if start is not None and stop is not None:
- res._offset = (self._offset or 0) + start
- res._limit = stop - start
- elif start is None and stop is not None:
- res._limit = stop
- elif start is not None and stop is None:
- res._offset = (self._offset or 0) + start
- if item.step is not None:
+ res = self.slice_(start, stop)
+ if step is not None:
return list(res)[None:None:item.step]
else:
- return res
+ return list(res)
else:
return list(self[item:item+1])[0]
-
+
+ def slice_(self, start, stop):
+ """apply LIMIT/OFFSET to the ``Query`` based on a range and return the newly resulting ``Query``."""
+
+ if start is not None and stop is not None:
+ self._offset = (self._offset or 0) + start
+ self._limit = stop - start
+ elif start is None and stop is not None:
+ self._limit = stop
+ elif start is not None and stop is None:
+ self._offset = (self._offset or 0) + start
+ slice_ = _generative(__no_statement_condition)(slice_)
+
def limit(self, limit):
"""Apply a ``LIMIT`` to the query and return the newly resulting
``Query``.
"""
- return self[:limit]
-
+
+ self._limit = limit
+ limit = _generative(__no_statement_condition)(limit)
+
def offset(self, offset):
"""Apply an ``OFFSET`` to the query and return the newly resulting
``Query``.
"""
- return self[offset:]
-
+
+ self._offset = offset
+ offset = _generative(__no_statement_condition)(offset)
+
def distinct(self):
"""Apply a ``DISTINCT`` to the query and return the newly resulting
``Query``.
This results in an execution of the underlying query.
"""
- ret = list(self[0:1])
- if len(ret) > 0:
- return ret[0]
+ if self._statement:
+ return list(self)[0]
else:
- return None
+ ret = list(self[0:1])
+ if len(ret) > 0:
+ return ret[0]
+ else:
+ return None
def one(self):
"""Return the first result, raising an exception unless exactly one row exists.
This results in an execution of the underlying query.
"""
+ if self._statement:
+ raise exceptions.InvalidRequestError("one() not available when from_statement() is used; use `first()` instead.")
+
ret = list(self[0:2])
if len(ret) == 1:
text += " GROUP BY " + group_by
text += self.order_by_clause(cs)
- text += (cs._limit or cs._offset) and self.limit_clause(cs) or ""
+ text += (cs._limit is not None or cs._offset is not None) and self.limit_clause(cs) or ""
self.stack.pop(-1)
text += " \nHAVING " + t
text += self.order_by_clause(select)
- text += (select._limit or select._offset) and self.limit_clause(select) or ""
+ text += (select._limit is not None or select._offset is not None) and self.limit_clause(select) or ""
text += self.for_update_clause(select)
self.stack.pop(-1)
else:
raise TypeError
+if sys.version_info >= (2, 5):
+ def decode_slice(slc):
+ """decode a slice object as sent to __getitem__.
+
+ takes into account the 2.5 __index__() method, basically.
+
+ """
+ ret = []
+ for x in slc.start, slc.stop, slc.step:
+ if hasattr(x, '__index__'):
+ x = x.__index__()
+ ret.append(x)
+ return tuple(ret)
+else:
+ def decode_slice(slc):
+ return (slc.start, slc.stop, slc.step)
+
+
def flatten_iterator(x):
"""Given an iterator of which further sub-elements may also be
iterators, flatten the sub-elements into a single iterator.
sess = create_session()
def go():
- self.assertEquals(sess.query(Person).options(eagerload(Engineer.machines))[1:3].all(), all_employees[1:3])
+ self.assertEquals(sess.query(Person).options(eagerload(Engineer.machines))[1:3], all_employees[1:3])
self.assert_sql_count(testing.db, go, {'':6, 'Polymorphic':3}.get(select_type, 4))
sess = create_session()
assert sess.query(Person).with_polymorphic('*').options(eagerload(Engineer.machines)).limit(2).offset(1).with_labels().subquery().count().scalar() == 2
def go():
- self.assertEquals(sess.query(Person).with_polymorphic('*').options(eagerload(Engineer.machines))[1:3].all(), all_employees[1:3])
+ self.assertEquals(sess.query(Person).with_polymorphic('*').options(eagerload(Engineer.machines))[1:3], all_employees[1:3])
self.assert_sql_count(testing.db, go, 3)
assert [User(id=8), User(id=9)] == list(create_session().query(User)[1:3])
assert User(id=8) == create_session().query(User)[1]
-
+
+ assert [] == create_session().query(User)[3:3]
+ assert [] == create_session().query(User)[0:0]
+
+
def test_onefilter(self):
assert [User(id=8), User(id=9)] == create_session().query(User).filter(User.name.endswith('ed')).all()
assert [User(id=8), User(id=9)] == create_session().query(User).filter(User.id.in_([8,9]))._from_self().all()
- assert [User(id=8), User(id=9)] == create_session().query(User)[1:3]._from_self().all()
+ assert [User(id=8), User(id=9)] == create_session().query(User).slice_(1,3)._from_self().all()
assert [User(id=8)] == list(create_session().query(User).filter(User.id.in_([8,9]))._from_self()[0:1])
def test_join(self):
q2 = q.join('addresses').filter(User.name.like('%e%')).order_by(User.id, Address.id).values(User.name, Address.email_address)
self.assertEquals(list(q2), [(u'ed', u'ed@wood.com'), (u'ed', u'ed@bettyboop.com'), (u'ed', u'ed@lala.com'), (u'fred', u'fred@fred.com')])
- q2 = q.join('addresses').filter(User.name.like('%e%')).order_by(desc(Address.email_address))[1:3].values(User.name, Address.email_address)
+ q2 = q.join('addresses').filter(User.name.like('%e%')).order_by(desc(Address.email_address)).slice_(1, 3).values(User.name, Address.email_address)
self.assertEquals(list(q2), [(u'ed', u'ed@wood.com'), (u'ed', u'ed@lala.com')])
adalias = aliased(Address)
sess.query(User, adalias.email_address, adalias.id).outerjoin((User.addresses, adalias)).from_self(User, adalias.email_address).options(eagerload(User.addresses)).order_by(User.id, adalias.id).limit(10),
]:
self.assertEquals(
+
q.all(),
[(User(addresses=[Address(user_id=7,email_address=u'jack@bean.com',id=1)],name=u'jack',id=7), u'jack@bean.com'),
(User(addresses=[