]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
ensure soft_close occurs for fetchmany with server side cursor
authorMike Bayer <mike_mp@zzzcomputing.com>
Tue, 2 Nov 2021 14:58:01 +0000 (10:58 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Tue, 2 Nov 2021 20:31:22 +0000 (16:31 -0400)
Fixed regression where the :meth:`_engine.CursorResult.fetchmany` method
would fail to autoclose a server-side cursor (i.e. when ``stream_results``
or ``yield_per`` is in use, either Core or ORM oriented results) when the
results were fully exhausted.

All :class:`_result.Result` objects will now consistently raise
:class:`_exc.ResourceClosedError` if they are used after a hard close,
which includes the "hard close" that occurs after calling "single row or
value" methods like :meth:`_result.Result.first` and
:meth:`_result.Result.scalar`. This was already the behavior of the most
common class of result objects returned for Core statement executions, i.e.
those based on :class:`_engine.CursorResult`, so this behavior is not new.
However, the change has been extended to properly accommodate for the ORM
"filtering" result objects returned when using 2.0 style ORM queries,
which would previously behave in "soft closed" style of returning empty
results, or wouldn't actually "soft close" at all and would continue
yielding from the underlying cursor.

As part of this change, also added :meth:`_result.Result.close` to the base
:class:`_result.Result` class and implemented it for the filtered result
implementations that are used by the ORM, so that it is possible to call
the :meth:`_engine.CursorResult.close` method on the underlying
:class:`_engine.CursorResult` when the the ``yield_per`` execution option
is in use to close a server side cursor before remaining ORM results have
been fetched. This was again already available for Core result sets but the
change makes it available for 2.0 style ORM results as well.

Fixes: #7274
Change-Id: Id4acdfedbcab891582a7f8edd2e2e7d20d868e53
(cherry picked from commit 33824a9c06ca555ad208a9925bc7b40fe489fc72)

doc/build/changelog/unreleased_14/7274.rst [new file with mode: 0644]
lib/sqlalchemy/engine/cursor.py
lib/sqlalchemy/engine/result.py
test/base/test_result.py
test/orm/test_query.py
test/sql/test_resultset.py

diff --git a/doc/build/changelog/unreleased_14/7274.rst b/doc/build/changelog/unreleased_14/7274.rst
new file mode 100644 (file)
index 0000000..7364ae6
--- /dev/null
@@ -0,0 +1,37 @@
+.. change::
+    :tags: bug, core, regression
+    :tickets: 7274
+    :versions: 2.0.0b1
+
+    Fixed regression where the :meth:`_engine.CursorResult.fetchmany` method
+    would fail to autoclose a server-side cursor (i.e. when ``stream_results``
+    or ``yield_per`` is in use, either Core or ORM oriented results) when the
+    results were fully exhausted.
+
+.. change::
+    :tags: bug, orm
+    :tickets: 7274
+    :versions: 2.0.0b1
+
+    All :class:`_result.Result` objects will now consistently raise
+    :class:`_exc.ResourceClosedError` if they are used after a hard close,
+    which includes the "hard close" that occurs after calling "single row or
+    value" methods like :meth:`_result.Result.first` and
+    :meth:`_result.Result.scalar`. This was already the behavior of the most
+    common class of result objects returned for Core statement executions, i.e.
+    those based on :class:`_engine.CursorResult`, so this behavior is not new.
+    However, the change has been extended to properly accommodate for the ORM
+    "filtering" result objects returned when using 2.0 style ORM queries,
+    which would previously behave in "soft closed" style of returning empty
+    results, or wouldn't actually "soft close" at all and would continue
+    yielding from the underlying cursor.
+
+    As part of this change, also added :meth:`_result.Result.close` to the base
+    :class:`_result.Result` class and implemented it for the filtered result
+    implementations that are used by the ORM, so that it is possible to call
+    the :meth:`_engine.CursorResult.close` method on the underlying
+    :class:`_engine.CursorResult` when the the ``yield_per`` execution option
+    is in use to close a server side cursor before remaining ORM results have
+    been fetched. This was again already available for Core result sets but the
+    change makes it available for 2.0 style ORM results as well.
+
index 5e6078f8662cac6a644736e949ce4e3962528cfb..79f87fc0e040cf96147b080c1733181d563d0157 100644 (file)
@@ -1043,6 +1043,8 @@ class BufferedRowCursorFetchStrategy(CursorFetchStrategy):
         )
 
     def _buffer_rows(self, result, dbapi_cursor):
+        """this is currently used only by fetchone()."""
+
         size = self._bufsize
         try:
             if size < 1:
@@ -1095,9 +1097,14 @@ class BufferedRowCursorFetchStrategy(CursorFetchStrategy):
         lb = len(buf)
         if size > lb:
             try:
-                buf.extend(dbapi_cursor.fetchmany(size - lb))
+                new = dbapi_cursor.fetchmany(size - lb)
             except BaseException as e:
                 self.handle_exception(result, dbapi_cursor, e)
+            else:
+                if not new:
+                    result._soft_close()
+                else:
+                    buf.extend(new)
 
         result = buf[0:size]
         self._rowbuffer = collections.deque(buf[size:])
@@ -1348,7 +1355,6 @@ class BaseCursorResult(object):
 
 
         """
-
         if (not hard and self._soft_closed) or (hard and self.closed):
             return
 
index 3c2e682be6540a40c360721a6e98f039a78f19c7..7cf0bd3f9669c970d965a9713c5f36aa75bb47f7 100644 (file)
@@ -739,6 +739,28 @@ class Result(_WithKeys, ResultInternal):
     def _soft_close(self, hard=False):
         raise NotImplementedError()
 
+    def close(self):
+        """close this :class:`_result.Result`.
+
+        The behavior of this method is implementation specific, and is
+        not implemented by default.    The method should generally end
+        the resources in use by the result object and also cause any
+        subsequent iteration or row fetching to raise
+        :class:`.ResourceClosedError`.
+
+        .. versionadded:: 1.4.27 - ``.close()`` was previously not generally
+           available for all :class:`_result.Result` classes, instead only
+           being available on the :class:`_engine.CursorResult` returned for
+           Core statement executions. As most other result objects, namely the
+           ones used by the ORM, are proxying a :class:`_engine.CursorResult`
+           in any case, this allows the underlying cursor result to be closed
+           from the outside facade for the case when the ORM query is using
+           the ``yield_per`` execution option where it does not immediately
+           exhaust and autoclose the database cursor.
+
+        """
+        self._soft_close(hard=True)
+
     @_generative
     def yield_per(self, num):
         """Configure the row-fetching strategy to fetch num rows at a time.
@@ -1612,6 +1634,8 @@ class IteratorResult(Result):
 
     """
 
+    _hard_closed = False
+
     def __init__(
         self,
         cursor_metadata,
@@ -1624,16 +1648,29 @@ class IteratorResult(Result):
         self.raw = raw
         self._source_supports_scalars = _source_supports_scalars
 
-    def _soft_close(self, **kw):
+    def _soft_close(self, hard=False, **kw):
+        if hard:
+            self._hard_closed = True
+        if self.raw is not None:
+            self.raw._soft_close(hard=hard, **kw)
         self.iterator = iter([])
+        self._reset_memoizations()
+
+    def _raise_hard_closed(self):
+        raise exc.ResourceClosedError("This result object is closed.")
 
     def _raw_row_iterator(self):
         return self.iterator
 
     def _fetchiter_impl(self):
+        if self._hard_closed:
+            self._raise_hard_closed()
         return self.iterator
 
     def _fetchone_impl(self, hard_close=False):
+        if self._hard_closed:
+            self._raise_hard_closed()
+
         row = next(self.iterator, _NO_ROW)
         if row is _NO_ROW:
             self._soft_close(hard=hard_close)
@@ -1642,12 +1679,18 @@ class IteratorResult(Result):
             return row
 
     def _fetchall_impl(self):
+        if self._hard_closed:
+            self._raise_hard_closed()
+
         try:
             return list(self.iterator)
         finally:
             self._soft_close()
 
     def _fetchmany_impl(self, size=None):
+        if self._hard_closed:
+            self._raise_hard_closed()
+
         return list(itertools.islice(self.iterator, 0, size))
 
 
@@ -1696,6 +1739,10 @@ class ChunkedIteratorResult(IteratorResult):
         self._yield_per = num
         self.iterator = itertools.chain.from_iterable(self.chunks(num))
 
+    def _soft_close(self, **kw):
+        super(ChunkedIteratorResult, self)._soft_close(**kw)
+        self.chunks = lambda size: []
+
     def _fetchmany_impl(self, size=None):
         if self.dynamic_yield_per:
             self.iterator = itertools.chain.from_iterable(self.chunks(size))
@@ -1733,11 +1780,8 @@ class MergedResult(IteratorResult):
             *[r._attributes for r in results]
         )
 
-    def close(self):
-        self._soft_close(hard=True)
-
-    def _soft_close(self, hard=False):
+    def _soft_close(self, hard=False, **kw):
         for r in self._results:
-            r._soft_close(hard=hard)
+            r._soft_close(hard=hard, **kw)
         if hard:
             self.closed = True
index d94602203ce1a199453374767f92cbc06bb1b799..8c9eb398e1540fccf354001274c3ff70cf1a739b 100644 (file)
@@ -484,7 +484,12 @@ class ResultTest(fixtures.TestBase):
         row = result.first()
         eq_(row, (1, 1, 1))
 
-        eq_(result.all(), [])
+        # note this is a behavior change in 1.4.27 due to
+        # adding a real result.close() to Result, previously this would
+        # return an empty list.  this is already the
+        # behavior with CursorResult, but was mis-implemented for
+        # other non-cursor result sets.
+        assert_raises(exc.ResourceClosedError, result.all)
 
     def test_one_unique(self):
         # assert that one() counts rows after uniqueness has been applied.
@@ -597,7 +602,12 @@ class ResultTest(fixtures.TestBase):
 
         eq_(result.scalar(), 1)
 
-        eq_(result.all(), [])
+        # note this is a behavior change in 1.4.27 due to
+        # adding a real result.close() to Result, previously this would
+        # return an empty list.  this is already the
+        # behavior with CursorResult, but was mis-implemented for
+        # other non-cursor result sets.
+        assert_raises(exc.ResourceClosedError, result.all)
 
     def test_partition(self):
         result = self._fixture()
index c567cf1d16b9e4b3915016155ea9517895ac9511..a4e2ab3fa07ef331aee15faae5b2fb60d2d5c113 100644 (file)
@@ -73,6 +73,7 @@ from sqlalchemy.testing import mock
 from sqlalchemy.testing.assertions import assert_raises
 from sqlalchemy.testing.assertions import assert_raises_message
 from sqlalchemy.testing.assertions import eq_
+from sqlalchemy.testing.assertions import expect_raises
 from sqlalchemy.testing.assertions import expect_warnings
 from sqlalchemy.testing.assertions import is_not_none
 from sqlalchemy.testing.assertsql import CompiledSQL
@@ -5271,6 +5272,8 @@ class YieldTest(_fixtures.FixtureTest):
     run_setup_mappers = "each"
     run_inserts = "each"
 
+    __backend__ = True
+
     def _eagerload_mappings(self, addresses_lazy=True, user_lazy=True):
         User, Address = self.classes("User", "Address")
         users, addresses = self.tables("users", "addresses")
@@ -5313,6 +5316,83 @@ class YieldTest(_fixtures.FixtureTest):
         except StopIteration:
             pass
 
+    def test_we_can_close_cursor(self):
+        """test new usecase close() added along with #7274"""
+        self._eagerload_mappings()
+
+        User = self.classes.User
+
+        sess = fixture_session()
+
+        stmt = select(User).execution_options(yield_per=15)
+        result = sess.execute(stmt)
+
+        with mock.patch.object(result.raw, "_soft_close") as mock_close:
+            two_results = result.fetchmany(2)
+            eq_(len(two_results), 2)
+
+            eq_(mock_close.mock_calls, [])
+
+            result.close()
+
+            eq_(mock_close.mock_calls, [mock.call(hard=True)])
+
+            with expect_raises(sa.exc.ResourceClosedError):
+                result.fetchmany(10)
+
+            with expect_raises(sa.exc.ResourceClosedError):
+                result.fetchone()
+
+            with expect_raises(sa.exc.ResourceClosedError):
+                result.all()
+
+        result.close()
+
+    @testing.combinations("fetchmany", "fetchone", "fetchall")
+    def test_cursor_is_closed_on_exhausted(self, fetch_method):
+        """test #7274"""
+        self._eagerload_mappings()
+
+        User = self.classes.User
+
+        sess = fixture_session()
+
+        stmt = select(User).execution_options(yield_per=15)
+        result = sess.execute(stmt)
+
+        with mock.patch.object(result.raw, "_soft_close") as mock_close:
+            # call assertions are implementation specific.
+            # test needs that _soft_close called at least once and without
+            # the hard=True flag
+            if fetch_method == "fetchmany":
+                while True:
+                    buf = result.fetchmany(2)
+                    if not buf:
+                        break
+                eq_(mock_close.mock_calls, [mock.call()])
+            elif fetch_method == "fetchall":
+                eq_(len(result.all()), 4)
+                eq_(
+                    mock_close.mock_calls, [mock.call(), mock.call(hard=False)]
+                )
+            elif fetch_method == "fetchone":
+                while True:
+                    row = result.fetchone()
+                    if row is None:
+                        break
+                eq_(
+                    mock_close.mock_calls, [mock.call(), mock.call(hard=False)]
+                )
+            else:
+                assert False
+
+        # soft closed, we can still get an empty result
+        eq_(result.all(), [])
+
+        # real closed
+        result.close()
+        assert_raises(sa.exc.ResourceClosedError, result.all)
+
     def test_yield_per_and_execution_options_legacy(self):
         self._eagerload_mappings()
 
index bf912bd2553e3a0b65ba97fa966ee4822ed0d299..c02b3cbc1b6987cd161680e8fc7e3575f19f73ce 100644 (file)
@@ -2682,6 +2682,68 @@ class AlternateCursorResultTest(fixtures.TablesTest):
         # buffer of 98, plus buffer of 99 - 89, 10 rows
         eq_(len(result.cursor_strategy._rowbuffer), 10)
 
+    @testing.combinations(True, False, argnames="close_on_init")
+    @testing.combinations(
+        "fetchone", "fetchmany", "fetchall", argnames="fetch_style"
+    )
+    def test_buffered_fetch_auto_soft_close(
+        self, connection, close_on_init, fetch_style
+    ):
+        """test #7274"""
+
+        table = self.tables.test
+
+        connection.execute(
+            table.insert(),
+            [{"x": i, "y": "t_%d" % i} for i in range(15, 30)],
+        )
+
+        result = connection.execute(table.select().limit(15))
+        assert isinstance(result.cursor_strategy, _cursor.CursorFetchStrategy)
+
+        if close_on_init:
+            # close_on_init - the initial buffering will exhaust the cursor,
+            # should soft close immediately
+            result = result.yield_per(30)
+        else:
+            # not close_on_init - soft close will occur after fetching an
+            # empty buffer
+            result = result.yield_per(5)
+        assert isinstance(
+            result.cursor_strategy, _cursor.BufferedRowCursorFetchStrategy
+        )
+
+        with mock.patch.object(result, "_soft_close") as soft_close:
+            if fetch_style == "fetchone":
+                while True:
+                    row = result.fetchone()
+
+                    if row:
+                        eq_(soft_close.mock_calls, [])
+                    else:
+                        # fetchone() is also used by first(), scalar()
+                        # and one() which want to embed a hard close in one
+                        # step
+                        eq_(soft_close.mock_calls, [mock.call(hard=False)])
+                        break
+            elif fetch_style == "fetchmany":
+                while True:
+                    rows = result.fetchmany(5)
+
+                    if rows:
+                        eq_(soft_close.mock_calls, [])
+                    else:
+                        eq_(soft_close.mock_calls, [mock.call()])
+                        break
+            elif fetch_style == "fetchall":
+                rows = result.fetchall()
+
+                eq_(soft_close.mock_calls, [mock.call()])
+            else:
+                assert False
+
+        result.close()
+
     def test_buffered_fetchmany_yield_per_all(self, connection):
         table = self.tables.test