.. changelog::
:version: 0.9.9
+ .. change::
+ :tags: bug, orm, pypy
+ :versions: 1.0.0
+ :tickets: 3285
+
+ Fixed bug where if an exception were thrown at the start of a
+ :class:`.Query` before it fetched results, particularly when
+ row processors can't be formed, the cursor would stay open with
+ results pending and not actually be closed. This is typically only
+ an issue on an interpreter like Pypy where the cursor isn't
+ immediately GC'ed, and can in some circumstances lead to transactions/
+ locks being open longer than is desirable.
+
.. change::
:tags: change, mysql
:versions: 1.0.0
def filter_fn(row):
return tuple(fn(x) for x, fn in zip(row, filter_fns))
- (process, labels) = \
- list(zip(*[
- query_entity.row_processor(query,
- context, cursor)
- for query_entity in query._entities
- ]))
-
- if not single_entity:
- keyed_tuple = util.lightweight_named_tuple('result', labels)
-
- while True:
- context.partials = {}
-
- if query._yield_per:
- fetch = cursor.fetchmany(query._yield_per)
- if not fetch:
- break
- else:
- fetch = cursor.fetchall()
+ try:
+ (process, labels) = \
+ list(zip(*[
+ query_entity.row_processor(query,
+ context, cursor)
+ for query_entity in query._entities
+ ]))
+
+ if not single_entity:
+ keyed_tuple = util.lightweight_named_tuple('result', labels)
+
+ while True:
+ context.partials = {}
+
+ if query._yield_per:
+ fetch = cursor.fetchmany(query._yield_per)
+ if not fetch:
+ break
+ else:
+ fetch = cursor.fetchall()
- if single_entity:
- proc = process[0]
- rows = [proc(row) for row in fetch]
- else:
- rows = [keyed_tuple([proc(row) for proc in process])
- for row in fetch]
+ if single_entity:
+ proc = process[0]
+ rows = [proc(row) for row in fetch]
+ else:
+ rows = [keyed_tuple([proc(row) for proc in process])
+ for row in fetch]
- if filtered:
- rows = util.unique_list(rows, filter_fn)
+ if filtered:
+ rows = util.unique_list(rows, filter_fn)
- for row in rows:
- yield row
+ for row in rows:
+ yield row
- if not query._yield_per:
- break
+ if not query._yield_per:
+ break
+ except Exception as err:
+ cursor.close()
+ util.raise_from_cause(err)
@util.dependencies("sqlalchemy.orm.query")
from . import _fixtures
from sqlalchemy.orm import loading, Session, aliased
-from sqlalchemy.testing.assertions import eq_
+from sqlalchemy.testing.assertions import eq_, assert_raises
from sqlalchemy.util import KeyedTuple
-
-# class InstancesTest(_fixtures.FixtureTest):
+from sqlalchemy.testing import mock
# class GetFromIdentityTest(_fixtures.FixtureTest):
# class LoadOnIdentTest(_fixtures.FixtureTest):
# class InstanceProcessorTest(_fixture.FixtureTest):
+
+class InstancesTest(_fixtures.FixtureTest):
+ run_setup_mappers = 'once'
+ run_inserts = 'once'
+ run_deletes = None
+
+ @classmethod
+ def setup_mappers(cls):
+ cls._setup_stock_mapping()
+
+ def test_cursor_close_w_failed_rowproc(self):
+ User = self.classes.User
+ s = Session()
+
+ q = s.query(User)
+
+ ctx = q._compile_context()
+ cursor = mock.Mock()
+ q._entities = [
+ mock.Mock(row_processor=mock.Mock(side_effect=Exception("boom")))
+ ]
+ assert_raises(
+ Exception,
+ list, loading.instances(q, cursor, ctx)
+ )
+ assert cursor.close.called, "Cursor wasn't closed"
+
+
class MergeResultTest(_fixtures.FixtureTest):
run_setup_mappers = 'once'
run_inserts = 'once'