=======
CHANGES
=======
+0.4.3
+-----
+- orm
+ - added very rudimentary yielding iterator behavior to Query. Call
+ query.yield_per(<number of rows>) and evaluate the Query in an
+ iterative context; every collection of N rows will be packaged up
+ and yielded. Use this method with extreme caution since it does
+ not attempt to reconcile eagerly loaded collections across
+ result batch boundaries, nor will it behave nicely if the same
+ instance occurs in more than one batch. This means that an eagerly
+ loaded collection will get cleared out if it's referenced in more than
+ one batch, and in all cases attributes will be overwritten on instances
+ that occur in more than one batch.
+
0.4.2
-----
- sql
self._limit = None
self._statement = None
self._params = {}
+ self._yield_per = None
self._criterion = None
self._having = None
self._column_aggregate = None
q = self._clone()
q._current_path = path
return q
+
+ def yield_per(self, count):
+ """yield only ``count`` rows at a time.
+
+ WARNING: use this method with caution; if the same instance
+ is present in more than one batch of rows, end-user changes
+ to attributes will be overwritten.
+ In particular, it's usually impossible to use this setting with
+ eagerly loaded collections (i.e. any lazy=False) since those
+ collections will be cleared for a new load when encountered
+ in a subsequent result batch.
+ """
+ q = self._clone()
+ q._yield_per = count
+ return q
def get(self, ident, **kwargs):
"""Return an instance of the object based on the given
def _execute_and_instances(self, querycontext):
result = self.session.execute(querycontext.statement, params=self._params, mapper=self.mapper, instance=self._refresh_instance)
- try:
- return iter(self.instances(result, querycontext=querycontext))
- finally:
- result.close()
+ return self.iterate_instances(result, querycontext=querycontext)
def instances(self, cursor, *mappers_or_columns, **kwargs):
+ return list(self.iterate_instances(cursor, *mappers_or_columns, **kwargs))
+
+ def iterate_instances(self, cursor, *mappers_or_columns, **kwargs):
session = self.session
context = kwargs.pop('querycontext', None)
else:
raise exceptions.InvalidRequestError("Invalid column expression '%r'" % m)
- context.progress = util.Set()
- if tuples:
- rows = util.OrderedSet()
- for row in cursor.fetchall():
- rows.add(tuple(proc(context, row) for proc in process))
- else:
- rows = util.UniqueAppender([])
- for row in cursor.fetchall():
- rows.append(main(context, row))
-
- if context.refresh_instance and context.only_load_props and context.refresh_instance in context.progress:
- context.refresh_instance.commit(context.only_load_props)
- context.progress.remove(context.refresh_instance)
-
- for ii in context.progress:
- context.attributes.get(('populating_mapper', ii), _state_mapper(ii))._post_instance(context, ii)
- ii.commit_all()
+ while True:
+ context.progress = util.Set()
+
+ if self._yield_per:
+ fetch = cursor.fetchmany(self._yield_per)
+ if not fetch:
+ return
+ else:
+ fetch = cursor.fetchall()
+
+ if tuples:
+ rows = util.OrderedSet()
+ for row in fetch:
+ rows.add(tuple(proc(context, row) for proc in process))
+ else:
+ rows = util.UniqueAppender([])
+ for row in fetch:
+ rows.append(main(context, row))
- return list(rows)
+ if context.refresh_instance and context.only_load_props and context.refresh_instance in context.progress:
+ context.refresh_instance.commit(context.only_load_props)
+ context.progress.remove(context.refresh_instance)
+ for ii in context.progress:
+ context.attributes.get(('populating_mapper', ii), _state_mapper(ii))._post_instance(context, ii)
+ ii.commit_all()
+
+ for row in rows:
+ yield row
+
+ if not self._yield_per:
+ break
+
def _get(self, key=None, ident=None, refresh_instance=None, lockmode=None, only_load_props=None):
lockmode = lockmode or self._lockmode
if not self._populate_existing and not refresh_instance and not self.mapper.always_refresh and lockmode is None:
] == q.all()
self.assert_sql_count(testbase.db, go, 1)
-
+
+class YieldTest(QueryTest):
+ def test_basic(self):
+ import gc
+ sess = create_session()
+ q = iter(sess.query(User).yield_per(1).from_statement("select * from users"))
+
+ ret = []
+ self.assertEquals(len(sess.identity_map), 0)
+ ret.append(q.next())
+ ret.append(q.next())
+ self.assertEquals(len(sess.identity_map), 2)
+ ret.append(q.next())
+ ret.append(q.next())
+ self.assertEquals(len(sess.identity_map), 4)
+ try:
+ q.next()
+ assert False
+ except StopIteration:
+ pass
+
class TextTest(QueryTest):
def test_fulltext(self):
assert [User(id=7), User(id=8), User(id=9),User(id=10)] == create_session().query(User).from_statement("select * from users").all()