custom_rows = single_entity and \
query._entities[0].custom_rows
- (process, labels) = \
- list(zip(*[
- query_entity.row_processor(query,
- context, custom_rows)
- for query_entity in query._entities
- ]))
-
- while True:
- context.progress = {}
- context.partials = {}
-
- if query._yield_per:
- fetch = cursor.fetchmany(query._yield_per)
- if not fetch:
- break
- else:
- fetch = cursor.fetchall()
-
- if custom_rows:
- rows = []
- for row in fetch:
- process[0](row, rows)
- elif single_entity:
- rows = [process[0](row, None) for row in fetch]
- else:
- rows = [util.KeyedTuple([proc(row, None) for proc in process],
- labels) for row in fetch]
+ try:
+ (process, labels) = \
+ list(zip(*[
+ query_entity.row_processor(query,
+ context, custom_rows)
+ for query_entity in query._entities
+ ]))
+
+ while True:
+ context.progress = {}
+ context.partials = {}
+
+ if query._yield_per:
+ fetch = cursor.fetchmany(query._yield_per)
+ if not fetch:
+ break
+ else:
+ fetch = cursor.fetchall()
+
+ if custom_rows:
+ rows = []
+ for row in fetch:
+ process[0](row, rows)
+ elif single_entity:
+ rows = [process[0](row, None) for row in fetch]
+ else:
+ rows = [util.KeyedTuple([proc(row, None) for proc in process],
+ labels) for row in fetch]
- if filtered:
- rows = util.unique_list(rows, filter_fn)
+ if filtered:
+ rows = util.unique_list(rows, filter_fn)
- if context.refresh_state and query._only_load_props \
- and context.refresh_state in context.progress:
- context.refresh_state._commit(
- context.refresh_state.dict, query._only_load_props)
- context.progress.pop(context.refresh_state)
+ if context.refresh_state and query._only_load_props \
+ and context.refresh_state in context.progress:
+ context.refresh_state._commit(
+ context.refresh_state.dict, query._only_load_props)
+ context.progress.pop(context.refresh_state)
- statelib.InstanceState._commit_all_states(
- list(context.progress.items()),
- session.identity_map
- )
+ statelib.InstanceState._commit_all_states(
+ list(context.progress.items()),
+ session.identity_map
+ )
- for state, (dict_, attrs) in context.partials.items():
- state._commit(dict_, attrs)
+ for state, (dict_, attrs) in context.partials.items():
+ state._commit(dict_, attrs)
- for row in rows:
- yield row
+ for row in rows:
+ yield row
- if not query._yield_per:
- break
+ if not query._yield_per:
+ break
+ except Exception as err:
+ cursor.close()
+ util.raise_from_cause(err)
@util.dependencies("sqlalchemy.orm.query")
from . import _fixtures
from sqlalchemy.orm import loading, Session, aliased
-from sqlalchemy.testing.assertions import eq_
+from sqlalchemy.testing.assertions import eq_, assert_raises
from sqlalchemy.util import KeyedTuple
-
-# class InstancesTest(_fixtures.FixtureTest):
+from sqlalchemy.testing import mock
# class GetFromIdentityTest(_fixtures.FixtureTest):
# class LoadOnIdentTest(_fixtures.FixtureTest):
# class InstanceProcessorTest(_fixture.FixtureTest):
+
+class InstancesTest(_fixtures.FixtureTest):
+ run_setup_mappers = 'once'
+ run_inserts = 'once'
+ run_deletes = None
+
+ @classmethod
+ def setup_mappers(cls):
+ cls._setup_stock_mapping()
+
+ def test_cursor_close_w_failed_rowproc(self):
+ User = self.classes.User
+ s = Session()
+
+ q = s.query(User)
+
+ ctx = q._compile_context()
+ cursor = mock.Mock()
+ q._entities = [
+ mock.Mock(row_processor=mock.Mock(side_effect=Exception("boom")))
+ ]
+ assert_raises(
+ Exception,
+ list, loading.instances(q, cursor, ctx)
+ )
+ assert cursor.close.called, "Cursor wasn't closed"
+
+
class MergeResultTest(_fixtures.FixtureTest):
run_setup_mappers = 'once'
run_inserts = 'once'