--- /dev/null
+.. change::
+ :tags: bug, orm
+ :tickets: 8710
+
+ Fixed issue where the underlying DBAPI cursor would not be closed when
+ using :class:`_orm.Query` and direct iteration, if a user-defined exception
+ case were raised within the iteration process, interrupting the iterator
+ which otherwise is not possible to re-use in this context. When using
+ :meth:`_orm.Query.yield_per` to create server-side cursors, this would lead
+ to the usual MySQL-related issues with server side cursors out of sync.
+
+ To resolve, a catch for ``GeneratorExit`` is applied within the default
+ iterator, which applies only in those cases where the interpreter is
+ calling ``.close()`` on the iterator in any case.
+
+ A similar scenario can occur when using :term:`2.x` executions with direct
+ use of :class:`.Result`, in that case the end-user code has access to the
+ :class:`.Result` itself and should call :meth:`.Result.close` directly.
+ Version 2.0 will feature context-manager calling patterns to address this
+ use case. However within the 1.4 scope, ensured that ``.close()`` methods
+ are available on all :class:`.Result` implementations including
+ :class:`.ScalarResult`, :class:`.MappingResult`.
+
+.. change::
+ :tags: bug, engine
+ :tickets: 8710
+
+ Ensured all :class:`.Result` objects include a :meth:`.Result.close` method
+ as well as a :attr:`.Result.closed` attribute, including on
+ :class:`.ScalarResult` and :class:`.MappingResult`.
"""
self._yield_per = num
+ @property
+ def _soft_closed(self):
+ raise NotImplementedError()
+
+ @property
+ def closed(self):
+ """return True if this :class:`.Result` reports .closed
+
+ .. versionadded:: 1.4.43
+
+ """
+ raise NotImplementedError()
+
@_generative
def unique(self, strategy=None):
"""Apply unique filtering to the objects returned by this
def _soft_close(self, hard=False):
self._real_result._soft_close(hard=hard)
+ @property
+ def _soft_closed(self):
+ return self._real_result._soft_closed
+
+ @property
+ def closed(self):
+ """return True if the underlying result reports .closed
+
+ .. versionadded:: 1.4.43
+
+ """
+ return self._real_result.closed # type: ignore
+
+ def close(self):
+ """Close this :class:`.FilterResult`.
+
+ .. versionadded:: 1.4.43
+
+ """
+ self._real_result.close()
+
@property
def _attributes(self):
return self._real_result._attributes
"""
_hard_closed = False
+ _soft_closed = False
def __init__(
self,
self.raw._soft_close(hard=hard, **kw)
self.iterator = iter([])
self._reset_memoizations()
+ self._soft_closed = True
+
+ @property
+ def closed(self):
+ """return True if this :class:`.IteratorResult` has been closed
+
+ .. versionadded:: 1.4.43
+
+ """
+ return self._hard_closed
def _raise_hard_closed(self):
raise exc.ResourceClosedError("This result object is closed.")
return None
def __iter__(self):
- return self._iter().__iter__()
+ result = self._iter()
+ try:
+ for row in result:
+ yield row
+ except GeneratorExit:
+ # issue #8710 - direct iteration is not re-usable after
+ # an iterable block is broken, so close the result
+ result._soft_close()
+ raise
def _iter(self):
# new style execution.
return res
+ def test_close_attributes(self):
+ """test #8710"""
+ r1 = self._fixture()
+
+ is_false(r1.closed)
+ is_false(r1._soft_closed)
+
+ r1._soft_close()
+ is_false(r1.closed)
+ is_true(r1._soft_closed)
+
+ r1.close()
+ is_true(r1.closed)
+ is_true(r1._soft_closed)
+
def test_class_presented(self):
"""To support different kinds of objects returned vs. rows,
there are two wrapper classes for Result.
from sqlalchemy import text
from sqlalchemy.orm import loading
from sqlalchemy.orm import relationship
+from sqlalchemy.testing import is_true
from sqlalchemy.testing import mock
from sqlalchemy.testing.assertions import assert_raises
from sqlalchemy.testing.assertions import assert_raises_message
def setup_mappers(cls):
cls._setup_stock_mapping()
+ def test_cursor_close_exception_raised_in_iteration(self):
+ """test #8710"""
+
+ User = self.classes.User
+ s = fixture_session()
+
+ stmt = select(User).execution_options(yield_per=1)
+
+ result = s.execute(stmt)
+ raw_cursor = result.raw
+
+ for row in result:
+ with expect_raises_message(Exception, "whoops"):
+ for row in result:
+ raise Exception("whoops")
+
+ is_true(raw_cursor._soft_closed)
+
def test_cursor_close_w_failed_rowproc(self):
User = self.classes.User
s = fixture_session()
result.close()
assert_raises(sa.exc.ResourceClosedError, result.all)
+ def test_yield_per_close_on_interrupted_iteration_legacy(self):
+ """test #8710"""
+
+ self._eagerload_mappings()
+
+ User = self.classes.User
+
+ asserted_result = [None]
+
+ class _Query(Query):
+ def _iter(self):
+ asserted_result[0] = super(_Query, self)._iter()
+ return asserted_result[0]
+
+ sess = fixture_session(query_cls=_Query)
+
+ with expect_raises_message(Exception, "hi"):
+ for i, row in enumerate(sess.query(User).yield_per(1)):
+ assert not asserted_result[0]._soft_closed
+ assert not asserted_result[0].closed
+
+ if i > 1:
+ raise Exception("hi")
+
+ assert asserted_result[0]._soft_closed
+ assert not asserted_result[0].closed
+
+ def test_yield_per_close_on_interrupted_iteration(self):
+ """test #8710"""
+
+ self._eagerload_mappings()
+
+ User = self.classes.User
+
+ sess = fixture_session()
+
+ with expect_raises_message(Exception, "hi"):
+ result = sess.execute(select(User).execution_options(yield_per=1))
+ for i, row in enumerate(result):
+ assert not result._soft_closed
+ assert not result.closed
+
+ if i > 1:
+ raise Exception("hi")
+
+ assert not result._soft_closed
+ assert not result.closed
+ result.close()
+ assert result._soft_closed
+ assert result.closed
+
def test_yield_per_and_execution_options_legacy(self):
self._eagerload_mappings()
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import in_
from sqlalchemy.testing import is_
+from sqlalchemy.testing import is_false
from sqlalchemy.testing import is_true
from sqlalchemy.testing import le_
from sqlalchemy.testing import mock
partition = next(result.partitions())
eq_(len(partition), value)
+ @testing.fixture
+ def autoclose_row_fixture(self, connection):
+ users = self.tables.users
+ connection.execute(
+ users.insert(),
+ [
+ {"user_id": 1, "name": "u1"},
+ {"user_id": 2, "name": "u2"},
+ {"user_id": 3, "name": "u3"},
+ {"user_id": 4, "name": "u4"},
+ {"user_id": 5, "name": "u5"},
+ ],
+ )
+
+ @testing.fixture(params=["plain", "scalars", "mapping"])
+ def result_fixture(self, request, connection):
+ users = self.tables.users
+
+ result_type = request.param
+
+ if result_type == "plain":
+ result = connection.execute(select(users))
+ elif result_type == "scalars":
+ result = connection.scalars(select(users))
+ elif result_type == "mapping":
+ result = connection.execute(select(users)).mappings()
+ else:
+ assert False
+
+ return result
+
+ def test_results_can_close(self, autoclose_row_fixture, result_fixture):
+ """test #8710"""
+
+ r1 = result_fixture
+
+ is_false(r1.closed)
+ is_false(r1._soft_closed)
+
+ r1._soft_close()
+ is_false(r1.closed)
+ is_true(r1._soft_closed)
+
+ r1.close()
+ is_true(r1.closed)
+ is_true(r1._soft_closed)
+
+ def test_autoclose_rows_exhausted_plain(
+ self, connection, autoclose_row_fixture, result_fixture
+ ):
+ result = result_fixture
+
+ assert not result._soft_closed
+ assert not result.closed
+
+ read_iterator = list(result)
+ eq_(len(read_iterator), 5)
+
+ assert result._soft_closed
+ assert not result.closed
+
+ result.close()
+ assert result.closed
+
class KeyTargetingTest(fixtures.TablesTest):
run_inserts = "once"
# buffer of 98, plus buffer of 99 - 89, 10 rows
eq_(len(result.cursor_strategy._rowbuffer), 10)
+ for i, row in enumerate(result):
+ if i == 206:
+ break
+
+ eq_(i, 206)
+
+ def test_iterator_remains_unbroken(self, connection):
+ """test related to #8710.
+
+ demonstrate that we can't close the cursor by catching
+ GeneratorExit inside of our iteration. Leaving the iterable
+ block using break, then picking up again, would be directly
+ impacted by this. So this provides a clear rationale for
+ providing context manager support for result objects.
+
+ """
+ table = self.tables.test
+
+ connection.execute(
+ table.insert(),
+ [{"x": i, "y": "t_%d" % i} for i in range(15, 250)],
+ )
+
+ result = connection.execute(table.select())
+ result = result.yield_per(100)
+ for i, row in enumerate(result):
+ if i == 188:
+ # this will raise GeneratorExit inside the iterator.
+ # so we can't close the DBAPI cursor here, we have plenty
+ # more rows to yield
+ break
+
+ eq_(i, 188)
+
+ # demonstrate getting more rows
+ for i, row in enumerate(result, 188):
+ if i == 206:
+ break
+
+ eq_(i, 206)
+
@testing.combinations(True, False, argnames="close_on_init")
@testing.combinations(
"fetchone", "fetchmany", "fetchall", argnames="fetch_style"