]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- Fixed bug where if an exception were thrown at the start of a
authorMike Bayer <mike_mp@zzzcomputing.com>
Tue, 6 Jan 2015 02:38:19 +0000 (21:38 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Tue, 6 Jan 2015 02:42:10 +0000 (21:42 -0500)
:class:`.Query` before it fetched results, particularly when
row processors can't be formed, the cursor would stay open with
results pending and not actually be closed.  This is typically only
an issue on an interpreter like Pypy where the cursor isn't
immediately GC'ed, and can in some circumstances lead to transactions/
locks being open longer than is desirable.
fixes #3285

Conflicts:
lib/sqlalchemy/orm/loading.py

doc/build/changelog/changelog_09.rst
lib/sqlalchemy/orm/loading.py
test/orm/test_loading.py

index e0f46eb66fb05662912679597fcb7392a335ea01..81a26d1872193c1c3c982c4d6e9aadc12885a6ee 100644 (file)
 .. changelog::
     :version: 0.9.9
 
+    .. change::
+        :tags: bug, orm, pypy
+        :versions: 1.0.0
+        :tickets: 3285
+
+        Fixed bug where if an exception were thrown at the start of a
+        :class:`.Query` before it fetched results, particularly when
+        row processors can't be formed, the cursor would stay open with
+        results pending and not actually be closed.  This is typically only
+        an issue on an interpreter like Pypy where the cursor isn't
+        immediately GC'ed, and can in some circumstances lead to transactions/
+        locks being open longer than is desirable.
+
     .. change::
         :tags: change, mysql
         :versions: 1.0.0
index 923ab22ed26ef50fa25b3d7515cda23c05d9f35f..9e6c5bc8c230cda94fb47a38316ac9575f9038dd 100644 (file)
@@ -47,56 +47,60 @@ def instances(query, cursor, context):
     custom_rows = single_entity and \
         query._entities[0].custom_rows
 
-    (process, labels) = \
-        list(zip(*[
-            query_entity.row_processor(query,
-                                       context, custom_rows)
-            for query_entity in query._entities
-        ]))
-
-    while True:
-        context.progress = {}
-        context.partials = {}
-
-        if query._yield_per:
-            fetch = cursor.fetchmany(query._yield_per)
-            if not fetch:
-                break
-        else:
-            fetch = cursor.fetchall()
-
-        if custom_rows:
-            rows = []
-            for row in fetch:
-                process[0](row, rows)
-        elif single_entity:
-            rows = [process[0](row, None) for row in fetch]
-        else:
-            rows = [util.KeyedTuple([proc(row, None) for proc in process],
-                                    labels) for row in fetch]
+    try:
+        (process, labels) = \
+            list(zip(*[
+                query_entity.row_processor(query,
+                                           context, custom_rows)
+                for query_entity in query._entities
+            ]))
+
+        while True:
+            context.progress = {}
+            context.partials = {}
+
+            if query._yield_per:
+                fetch = cursor.fetchmany(query._yield_per)
+                if not fetch:
+                    break
+            else:
+                fetch = cursor.fetchall()
+
+            if custom_rows:
+                rows = []
+                for row in fetch:
+                    process[0](row, rows)
+            elif single_entity:
+                rows = [process[0](row, None) for row in fetch]
+            else:
+                rows = [util.KeyedTuple([proc(row, None) for proc in process],
+                                        labels) for row in fetch]
 
-        if filtered:
-            rows = util.unique_list(rows, filter_fn)
+            if filtered:
+                rows = util.unique_list(rows, filter_fn)
 
-        if context.refresh_state and query._only_load_props \
-                and context.refresh_state in context.progress:
-            context.refresh_state._commit(
-                context.refresh_state.dict, query._only_load_props)
-            context.progress.pop(context.refresh_state)
+            if context.refresh_state and query._only_load_props \
+                    and context.refresh_state in context.progress:
+                context.refresh_state._commit(
+                    context.refresh_state.dict, query._only_load_props)
+                context.progress.pop(context.refresh_state)
 
-        statelib.InstanceState._commit_all_states(
-            list(context.progress.items()),
-            session.identity_map
-        )
+            statelib.InstanceState._commit_all_states(
+                list(context.progress.items()),
+                session.identity_map
+            )
 
-        for state, (dict_, attrs) in context.partials.items():
-            state._commit(dict_, attrs)
+            for state, (dict_, attrs) in context.partials.items():
+                state._commit(dict_, attrs)
 
-        for row in rows:
-            yield row
+            for row in rows:
+                yield row
 
-        if not query._yield_per:
-            break
+            if not query._yield_per:
+                break
+    except Exception as err:
+        cursor.close()
+        util.raise_from_cause(err)
 
 
 @util.dependencies("sqlalchemy.orm.query")
index 97c08ea290539a2edfc19ac754ae52dbfe40b100..f86477ec293d28a057a459472a0ef9ff7787b097 100644 (file)
@@ -1,13 +1,40 @@
 from . import _fixtures
 from sqlalchemy.orm import loading, Session, aliased
-from sqlalchemy.testing.assertions import eq_
+from sqlalchemy.testing.assertions import eq_, assert_raises
 from sqlalchemy.util import KeyedTuple
-
-# class InstancesTest(_fixtures.FixtureTest):
+from sqlalchemy.testing import mock
 # class GetFromIdentityTest(_fixtures.FixtureTest):
 # class LoadOnIdentTest(_fixtures.FixtureTest):
 # class InstanceProcessorTest(_fixture.FixtureTest):
 
+
+class InstancesTest(_fixtures.FixtureTest):
+    run_setup_mappers = 'once'
+    run_inserts = 'once'
+    run_deletes = None
+
+    @classmethod
+    def setup_mappers(cls):
+        cls._setup_stock_mapping()
+
+    def test_cursor_close_w_failed_rowproc(self):
+        User = self.classes.User
+        s = Session()
+
+        q = s.query(User)
+
+        ctx = q._compile_context()
+        cursor = mock.Mock()
+        q._entities = [
+            mock.Mock(row_processor=mock.Mock(side_effect=Exception("boom")))
+        ]
+        assert_raises(
+            Exception,
+            list, loading.instances(q, cursor, ctx)
+        )
+        assert cursor.close.called, "Cursor wasn't closed"
+
+
 class MergeResultTest(_fixtures.FixtureTest):
     run_setup_mappers = 'once'
     run_inserts = 'once'