--- /dev/null
+.. change::
+ :tags: bug, core, regression
+ :tickets: 7274
+ :versions: 2.0.0b1
+
+ Fixed regression where the :meth:`_engine.CursorResult.fetchmany` method
+ would fail to autoclose a server-side cursor (i.e. when ``stream_results``
+ or ``yield_per`` is in use, either Core or ORM oriented results) when the
+ results were fully exhausted.
+
+.. change::
+ :tags: bug, orm
+ :tickets: 7274
+ :versions: 2.0.0b1
+
+ All :class:`_result.Result` objects will now consistently raise
+ :class:`_exc.ResourceClosedError` if they are used after a hard close,
+ which includes the "hard close" that occurs after calling "single row or
+ value" methods like :meth:`_result.Result.first` and
+ :meth:`_result.Result.scalar`. This was already the behavior of the most
+ common class of result objects returned for Core statement executions, i.e.
+ those based on :class:`_engine.CursorResult`, so this behavior is not new.
+ However, the change has been extended to properly accommodate for the ORM
+ "filtering" result objects returned when using 2.0 style ORM queries,
+ which would previously behave in "soft closed" style of returning empty
+ results, or wouldn't actually "soft close" at all and would continue
+ yielding from the underlying cursor.
+
+ As part of this change, also added :meth:`_result.Result.close` to the base
+ :class:`_result.Result` class and implemented it for the filtered result
+ implementations that are used by the ORM, so that it is possible to call
+ the :meth:`_engine.CursorResult.close` method on the underlying
+ :class:`_engine.CursorResult` when the the ``yield_per`` execution option
+ is in use to close a server side cursor before remaining ORM results have
+ been fetched. This was again already available for Core result sets but the
+ change makes it available for 2.0 style ORM results as well.
+
)
def _buffer_rows(self, result, dbapi_cursor):
+ """this is currently used only by fetchone()."""
+
size = self._bufsize
try:
if size < 1:
lb = len(buf)
if size > lb:
try:
- buf.extend(dbapi_cursor.fetchmany(size - lb))
+ new = dbapi_cursor.fetchmany(size - lb)
except BaseException as e:
self.handle_exception(result, dbapi_cursor, e)
+ else:
+ if not new:
+ result._soft_close()
+ else:
+ buf.extend(new)
result = buf[0:size]
self._rowbuffer = collections.deque(buf[size:])
"""
-
if (not hard and self._soft_closed) or (hard and self.closed):
return
def _soft_close(self, hard=False):
raise NotImplementedError()
+ def close(self):
+ """close this :class:`_result.Result`.
+
+ The behavior of this method is implementation specific, and is
+ not implemented by default. The method should generally end
+ the resources in use by the result object and also cause any
+ subsequent iteration or row fetching to raise
+ :class:`.ResourceClosedError`.
+
+ .. versionadded:: 1.4.27 - ``.close()`` was previously not generally
+ available for all :class:`_result.Result` classes, instead only
+ being available on the :class:`_engine.CursorResult` returned for
+ Core statement executions. As most other result objects, namely the
+ ones used by the ORM, are proxying a :class:`_engine.CursorResult`
+ in any case, this allows the underlying cursor result to be closed
+ from the outside facade for the case when the ORM query is using
+ the ``yield_per`` execution option where it does not immediately
+ exhaust and autoclose the database cursor.
+
+ """
+ self._soft_close(hard=True)
+
@_generative
def yield_per(self, num):
"""Configure the row-fetching strategy to fetch num rows at a time.
"""
+ _hard_closed = False
+
def __init__(
self,
cursor_metadata,
self.raw = raw
self._source_supports_scalars = _source_supports_scalars
- def _soft_close(self, **kw):
+ def _soft_close(self, hard=False, **kw):
+ if hard:
+ self._hard_closed = True
+ if self.raw is not None:
+ self.raw._soft_close(hard=hard, **kw)
self.iterator = iter([])
+ self._reset_memoizations()
+
+ def _raise_hard_closed(self):
+ raise exc.ResourceClosedError("This result object is closed.")
def _raw_row_iterator(self):
return self.iterator
def _fetchiter_impl(self):
+ if self._hard_closed:
+ self._raise_hard_closed()
return self.iterator
def _fetchone_impl(self, hard_close=False):
+ if self._hard_closed:
+ self._raise_hard_closed()
+
row = next(self.iterator, _NO_ROW)
if row is _NO_ROW:
self._soft_close(hard=hard_close)
return row
def _fetchall_impl(self):
+ if self._hard_closed:
+ self._raise_hard_closed()
+
try:
return list(self.iterator)
finally:
self._soft_close()
def _fetchmany_impl(self, size=None):
+ if self._hard_closed:
+ self._raise_hard_closed()
+
return list(itertools.islice(self.iterator, 0, size))
self._yield_per = num
self.iterator = itertools.chain.from_iterable(self.chunks(num))
+ def _soft_close(self, **kw):
+ super(ChunkedIteratorResult, self)._soft_close(**kw)
+ self.chunks = lambda size: []
+
def _fetchmany_impl(self, size=None):
if self.dynamic_yield_per:
self.iterator = itertools.chain.from_iterable(self.chunks(size))
*[r._attributes for r in results]
)
- def close(self):
- self._soft_close(hard=True)
-
- def _soft_close(self, hard=False):
+ def _soft_close(self, hard=False, **kw):
for r in self._results:
- r._soft_close(hard=hard)
+ r._soft_close(hard=hard, **kw)
if hard:
self.closed = True
row = result.first()
eq_(row, (1, 1, 1))
- eq_(result.all(), [])
+ # note this is a behavior change in 1.4.27 due to
+ # adding a real result.close() to Result, previously this would
+ # return an empty list. this is already the
+ # behavior with CursorResult, but was mis-implemented for
+ # other non-cursor result sets.
+ assert_raises(exc.ResourceClosedError, result.all)
def test_one_unique(self):
# assert that one() counts rows after uniqueness has been applied.
eq_(result.scalar(), 1)
- eq_(result.all(), [])
+ # note this is a behavior change in 1.4.27 due to
+ # adding a real result.close() to Result, previously this would
+ # return an empty list. this is already the
+ # behavior with CursorResult, but was mis-implemented for
+ # other non-cursor result sets.
+ assert_raises(exc.ResourceClosedError, result.all)
def test_partition(self):
result = self._fixture()
from sqlalchemy.testing.assertions import assert_raises
from sqlalchemy.testing.assertions import assert_raises_message
from sqlalchemy.testing.assertions import eq_
+from sqlalchemy.testing.assertions import expect_raises
from sqlalchemy.testing.assertions import expect_warnings
from sqlalchemy.testing.assertions import is_not_none
from sqlalchemy.testing.assertsql import CompiledSQL
run_setup_mappers = "each"
run_inserts = "each"
+ __backend__ = True
+
def _eagerload_mappings(self, addresses_lazy=True, user_lazy=True):
User, Address = self.classes("User", "Address")
users, addresses = self.tables("users", "addresses")
except StopIteration:
pass
+ def test_we_can_close_cursor(self):
+ """test new usecase close() added along with #7274"""
+ self._eagerload_mappings()
+
+ User = self.classes.User
+
+ sess = fixture_session()
+
+ stmt = select(User).execution_options(yield_per=15)
+ result = sess.execute(stmt)
+
+ with mock.patch.object(result.raw, "_soft_close") as mock_close:
+ two_results = result.fetchmany(2)
+ eq_(len(two_results), 2)
+
+ eq_(mock_close.mock_calls, [])
+
+ result.close()
+
+ eq_(mock_close.mock_calls, [mock.call(hard=True)])
+
+ with expect_raises(sa.exc.ResourceClosedError):
+ result.fetchmany(10)
+
+ with expect_raises(sa.exc.ResourceClosedError):
+ result.fetchone()
+
+ with expect_raises(sa.exc.ResourceClosedError):
+ result.all()
+
+ result.close()
+
+ @testing.combinations("fetchmany", "fetchone", "fetchall")
+ def test_cursor_is_closed_on_exhausted(self, fetch_method):
+ """test #7274"""
+ self._eagerload_mappings()
+
+ User = self.classes.User
+
+ sess = fixture_session()
+
+ stmt = select(User).execution_options(yield_per=15)
+ result = sess.execute(stmt)
+
+ with mock.patch.object(result.raw, "_soft_close") as mock_close:
+ # call assertions are implementation specific.
+ # test needs that _soft_close called at least once and without
+ # the hard=True flag
+ if fetch_method == "fetchmany":
+ while True:
+ buf = result.fetchmany(2)
+ if not buf:
+ break
+ eq_(mock_close.mock_calls, [mock.call()])
+ elif fetch_method == "fetchall":
+ eq_(len(result.all()), 4)
+ eq_(
+ mock_close.mock_calls, [mock.call(), mock.call(hard=False)]
+ )
+ elif fetch_method == "fetchone":
+ while True:
+ row = result.fetchone()
+ if row is None:
+ break
+ eq_(
+ mock_close.mock_calls, [mock.call(), mock.call(hard=False)]
+ )
+ else:
+ assert False
+
+ # soft closed, we can still get an empty result
+ eq_(result.all(), [])
+
+ # real closed
+ result.close()
+ assert_raises(sa.exc.ResourceClosedError, result.all)
+
def test_yield_per_and_execution_options_legacy(self):
self._eagerload_mappings()
# buffer of 98, plus buffer of 99 - 89, 10 rows
eq_(len(result.cursor_strategy._rowbuffer), 10)
+ @testing.combinations(True, False, argnames="close_on_init")
+ @testing.combinations(
+ "fetchone", "fetchmany", "fetchall", argnames="fetch_style"
+ )
+ def test_buffered_fetch_auto_soft_close(
+ self, connection, close_on_init, fetch_style
+ ):
+ """test #7274"""
+
+ table = self.tables.test
+
+ connection.execute(
+ table.insert(),
+ [{"x": i, "y": "t_%d" % i} for i in range(15, 30)],
+ )
+
+ result = connection.execute(table.select().limit(15))
+ assert isinstance(result.cursor_strategy, _cursor.CursorFetchStrategy)
+
+ if close_on_init:
+ # close_on_init - the initial buffering will exhaust the cursor,
+ # should soft close immediately
+ result = result.yield_per(30)
+ else:
+ # not close_on_init - soft close will occur after fetching an
+ # empty buffer
+ result = result.yield_per(5)
+ assert isinstance(
+ result.cursor_strategy, _cursor.BufferedRowCursorFetchStrategy
+ )
+
+ with mock.patch.object(result, "_soft_close") as soft_close:
+ if fetch_style == "fetchone":
+ while True:
+ row = result.fetchone()
+
+ if row:
+ eq_(soft_close.mock_calls, [])
+ else:
+ # fetchone() is also used by first(), scalar()
+ # and one() which want to embed a hard close in one
+ # step
+ eq_(soft_close.mock_calls, [mock.call(hard=False)])
+ break
+ elif fetch_style == "fetchmany":
+ while True:
+ rows = result.fetchmany(5)
+
+ if rows:
+ eq_(soft_close.mock_calls, [])
+ else:
+ eq_(soft_close.mock_calls, [mock.call()])
+ break
+ elif fetch_style == "fetchall":
+ rows = result.fetchall()
+
+ eq_(soft_close.mock_calls, [mock.call()])
+ else:
+ assert False
+
+ result.close()
+
def test_buffered_fetchmany_yield_per_all(self, connection):
table = self.tables.test