]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
repair yield_per for non-SS dialects and add new options
authorMike Bayer <mike_mp@zzzcomputing.com>
Thu, 30 Jun 2022 23:10:06 +0000 (19:10 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Fri, 1 Jul 2022 16:14:58 +0000 (12:14 -0400)
Implemented new :paramref:`_engine.Connection.execution_options.yield_per`
execution option for :class:`_engine.Connection` in Core, to mirror that of
the same :ref:`yield_per <orm_queryguide_yield_per>` option available in
the ORM. The option sets both the
:paramref:`_engine.Connection.execution_options.stream_results` option at
the same time as invoking :meth:`_engine.Result.yield_per`, to provide the
most common streaming result configuration which also mirrors that of the
ORM use case in its usage pattern.

Fixed bug in :class:`_engine.Result` where the usage of a buffered result
strategy would not be used if the dialect in use did not support an
explicit "server side cursor" setting, when using
:paramref:`_engine.Connection.execution_options.stream_results`. This is in
error as DBAPIs such as that of SQLite and Oracle already use a
non-buffered result fetching scheme, which still benefits from usage of
partial result fetching.   The "buffered" strategy is now used in all
cases where :paramref:`_engine.Connection.execution_options.stream_results`
is set.

Added :meth:`.FilterResult.yield_per` so that result implementations
such as :class:`.MappingResult`, :class:`.ScalarResult` and
:class:`.AsyncResult` have access to this method.

Fixes: #8199
Change-Id: I6dde3cbe483a1bf81e945561b60f4b7d1c434750
(cherry picked from commit e5a0cdb2eaa1d7f381e93d0529a7f8e6d5888877)

15 files changed:
doc/build/changelog/unreleased_14/yp.rst [new file with mode: 0644]
doc/build/core/connections.rst
doc/build/orm/queryguide.rst
lib/sqlalchemy/engine/__init__.py
lib/sqlalchemy/engine/base.py
lib/sqlalchemy/engine/cursor.py
lib/sqlalchemy/engine/default.py
lib/sqlalchemy/engine/result.py
lib/sqlalchemy/ext/asyncio/result.py
lib/sqlalchemy/orm/context.py
lib/sqlalchemy/orm/query.py
lib/sqlalchemy/testing/fixtures.py
test/ext/asyncio/test_engine_py3k.py
test/orm/test_query.py
test/sql/test_resultset.py

diff --git a/doc/build/changelog/unreleased_14/yp.rst b/doc/build/changelog/unreleased_14/yp.rst
new file mode 100644 (file)
index 0000000..74e2c6a
--- /dev/null
@@ -0,0 +1,38 @@
+.. change::
+    :tags: usecase, engine
+
+    Implemented new :paramref:`_engine.Connection.execution_options.yield_per`
+    execution option for :class:`_engine.Connection` in Core, to mirror that of
+    the same :ref:`yield_per <orm_queryguide_yield_per>` option available in
+    the ORM. The option sets both the
+    :paramref:`_engine.Connection.execution_options.stream_results` option at
+    the same time as invoking :meth:`_engine.Result.yield_per`, to provide the
+    most common streaming result configuration which also mirrors that of the
+    ORM use case in its usage pattern.
+
+    .. seealso::
+
+        :ref:`engine_stream_results` - revised documentation
+
+
+.. change::
+    :tags: bug, engine
+
+    Fixed bug in :class:`_engine.Result` where the usage of a buffered result
+    strategy would not be used if the dialect in use did not support an
+    explicit "server side cursor" setting, when using
+    :paramref:`_engine.Connection.execution_options.stream_results`. This is in
+    error as DBAPIs such as that of SQLite and Oracle already use a
+    non-buffered result fetching scheme, which still benefits from usage of
+    partial result fetching.   The "buffered" strategy is now used in all
+    cases where :paramref:`_engine.Connection.execution_options.stream_results`
+    is set.
+
+
+.. change::
+    :tags: bug, engine
+    :tickets: 8199
+
+    Added :meth:`.FilterResult.yield_per` so that result implementations
+    such as :class:`.MappingResult`, :class:`.ScalarResult` and
+    :class:`.AsyncResult` have access to this method.
index b9ba6d9b3924660a1f86e98bdb1488c3e020352d..5228235e73fe108e5d711f2e352e0caf1c9d8498 100644 (file)
@@ -633,20 +633,33 @@ To sum up:
 Using Server Side Cursors (a.k.a. stream results)
 ==================================================
 
-A limited number of dialects have explicit support for the concept of "server
-side cursors" vs. "buffered cursors".    While a server side cursor implies a
-variety of different capabilities, within SQLAlchemy's engine and dialect
-implementation, it refers only to whether or not a particular set of results is
-fully buffered in memory before they are fetched from the cursor, using a
-method such as ``cursor.fetchall()``.   SQLAlchemy has no direct support
-for cursor behaviors such as scrolling; to make use of these features for
-a particular DBAPI, use the cursor directly as documented at
-:ref:`dbapi_connections`.
-
-Some DBAPIs, such as the cx_Oracle DBAPI, exclusively use server side cursors
-internally.  All result sets are essentially unbuffered across the total span
-of a result set, utilizing only a smaller buffer that is of a fixed size such
-as 100 rows at a time.
+Some backends feature explicit support for the concept of "server
+side cursors" versus "client side cursors".  A client side cursor here
+means that the database driver fully fetches all rows from a result set
+into memory before returning from a statement execution.  Drivers such as
+those of PostgreSQL and MySQL/MariaDB generally use client side cursors
+by default.   A server side cursor, by contrast, indicates that result rows
+remain pending within the database server's state as result rows are consumed
+by the client.  The drivers for Oracle generally use a "server side" model,
+for example, and the SQLite dialect, while not using a real "client / server"
+architecture, still uses an unbuffered result fetching approach that will
+leave result rows outside of process memory before they are consumed.
+
+.. topic:: What we really mean is "buffered" vs. "unbuffered" results
+
+  Server side cursors also imply a wider set of features with relational
+  databases, such as the ability to "scroll" a cursor forwards and backwards.
+  SQLAlchemy does not include any explicit support for these behaviors; within
+  SQLAlchemy itself, the general term "server side cursors" should be considered
+  to mean "unbuffered results" and "client side cursors" means "result rows
+  are buffered into memory before the first row is returned".   To work with
+  a richer "server side cursor" featureset specific to a certain DBAPI driver,
+  see the section :ref:`dbapi_connections_cursor`.
+
+From this basic architecture it follows that a "server side cursor" is more
+memory efficient when fetching very large result sets, while at the same time
+may introduce more complexity in the client/server communication process
+and be less efficient for small result sets (typically less than 10000 rows).
 
 For those dialects that have conditional support for buffered or unbuffered
 results, there are usually caveats to the use of the "unbuffered", or server
@@ -665,75 +678,119 @@ unbuffered cursors are not generally useful except in the uncommon case
 of an application fetching a very large number of rows in chunks, where
 the processing of these rows can be complete before more rows are fetched.
 
-To make use of a server side cursor for a particular execution, the
-:paramref:`_engine.Connection.execution_options.stream_results` option
-is used, which may be called on the :class:`_engine.Connection` object,
-on the statement object, or in the ORM-level contexts mentioned below.
-
-When using this option for a statement, it's usually appropriate to use
-a method like :meth:`_engine.Result.partitions` to work on small sections
-of the result set at a time, while also fetching enough rows for each
-pull so that the operation is efficient::
+For database drivers that provide client and server side cursor options,
+the :paramref:`_engine.Connection.execution_options.stream_results`
+and :paramref:`_engine.Connection.execution_options.yield_per` execution
+options provide access to "server side cursors" on a per-:class:`_engine.Connection`
+or per-statement basis.    Similar options exist when using an ORM
+:class:`_orm.Session` as well.
 
 
-    with engine.connect() as conn:
-        result = conn.execution_options(stream_results=True).execute(text("select * from table"))
+Streaming with a fixed buffer via yield_per
+--------------------------------------------
 
-        for partition in result.partitions(100):
-            _process_rows(partition)
+As individual row-fetch operations with fully unbuffered server side cursors
+are typically more expensive than fetching batches of rows at once, The
+:paramref:`_engine.Connection.execution_options.yield_per` execution option
+configures a :class:`_engine.Connection` or statement to make use of
+server-side cursors as are available, while at the same time configuring a
+fixed-size buffer of rows that will retrieve rows from the server in batches as
+they are consumed. This parameter may be to a positive integer value using the
+:meth:`_engine.Connection.execution_options` method on
+:class:`_engine.Connection` or on a statement using the
+:meth:`.Executable.execution_options` method.
+
+.. versionadded:: 1.4.40 :paramref:`_engine.Connection.execution_options.yield_per` as a
+   Core-only option is new as of SQLAlchemy 1.4.40; for prior 1.4 versions,
+   use :paramref:`_engine.Connection.execution_options.stream_results`
+   directly in combination with :meth:`_engine.Result.yield_per`.
+
+Using this option is equivalent to manually setting the
+:paramref:`_engine.Connection.execution_options.stream_results` option,
+described in the next section, and then invoking the
+:meth:`_engine.Result.yield_per` method on the :class:`_engine.Result`
+object with the given integer value.   In both cases, the effect this
+combination has includes:
+
+* server side cursors mode is selected for the given backend, if available
+  and not already the default behavior for that backend
+* as result rows are fetched, they will be buffered in batches, where the
+  size of each batch up until the last batch will be equal to the integer
+  argument passed to the
+  :paramref:`_engine.Connection.execution_options.yield_per` option or the
+  :meth:`_engine.Result.yield_per` method; the last batch is then sized against
+  the remaining rows fewer than this size
+* The default partition size used by the :meth:`_engine.Result.partitions`
+  method, if used, will be made equal to this integer size as well.
+
+These three behaviors are illustrated in the example below::
 
+    with engine.connect() as conn:
+        result = (
+          conn.
+          execution_options(yield_per=100).
+          execute(text("select * from table"))
+        )
 
-If the :class:`_engine.Result` is iterated directly, rows are fetched internally
+        for partition in result.partitions():
+            # partition is an iterable that will be at most 100 items
+            for row in partition:
+                print(f"{row}")
+
+The above example illustrates the combination of ``yield_per=100`` along
+with using the :meth:`_engine.Result.partitions` method to run processing
+on rows in batches that match the size fetched from the server.   The
+use of :meth:`_engine.Result.partitions` is optional, and if the
+:class:`_engine.Result` is iterated directly, a new batch of rows will be
+buffered for each 100 rows fetched.    Calling a method such as
+:meth:`_engine.Result.all` should **not** be used, as this will fully
+fetch all remaining rows at once and defeat the purpose of using ``yield_per``.
+
+The :paramref:`_engine.Connection.execution_options.yield_per` option
+is portable to the ORM as well, used by a :class:`_orm.Session` to fetch
+ORM objects, where it also limits the amount of ORM objects generated at once.
+See the section :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
+for further background on using
+:paramref:`_engine.Connection.execution_options.yield_per` with the ORM.
+
+.. versionadded:: 1.4.40 Added
+   :paramref:`_engine.Connection.execution_options.yield_per`
+   as a Core level execution option to conveniently set streaming results,
+   buffer size, and partition size all at once in a manner that is transferrable
+   to that of the ORM's similar use case.
+
+.. _engine_stream_results_sr:
+
+Streaming with a dynamically growing buffer using stream_results
+-----------------------------------------------------------------
+
+To enable server side cursors without a specific partition size, the
+:paramref:`_engine.Connection.execution_options.stream_results` option may be
+used, which like :paramref:`_engine.Connection.execution_options.yield_per` may
+be called on the :class:`_engine.Connection` object or the statement object.
+
+When a :class:`_engine.Result` object delivered using the
+:paramref:`_engine.Connection.execution_options.stream_results` option
+is iterated directly, rows are fetched internally
 using a default buffering scheme that buffers first a small set of rows,
 then a larger and larger buffer on each fetch up to a pre-configured limit
-of 1000 rows.   This can be affected using the ``max_row_buffer`` execution
-option::
+of 1000 rows.   The maximum size of this buffer can be affected using the
+:paramref:`_engine.Connection.execution_options.max_row_buffer` execution option::
 
     with engine.connect() as conn:
         conn = conn.execution_options(stream_results=True, max_row_buffer=100)
         result = conn.execute(text("select * from table"))
 
         for row in result:
-            _process_row(row)
-
-The size of the buffer may also be set to a fixed size using the
-:meth:`_engine.Result.yield_per` method.  Calling this method with a number
-of rows will cause all result-fetching methods to work from
-buffers of the given size, only fetching new rows when the buffer is empty::
-
-    with engine.connect() as conn:
-        result = conn.execution_options(stream_results=True).execute(text("select * from table"))
+            print(f"{row}")
 
-        for row in result.yield_per(100):
-            _process_row(row)
-
-The ``stream_results`` option is also available with the ORM. When using the
-ORM, the :meth:`_engine.Result.yield_per` method should be used to set the
-number of ORM rows to be buffered each time while yielding
-(:meth:`_engine.Result.partitions` uses the "yield per" value by default for
-partition size)::
-
-    with orm.Session(engine) as session:
-        result = session.execute(
-            select(User).order_by(User_id).execution_options(stream_results=True),
-        )
-        for partition in result.yield_per(100).partitions():
-            _process_rows(partition)
-
-
-.. note:: ORM result sets currently must make use of :meth:`_engine.Result.yield_per`
-   in order to achieve streaming ORM results.
-   If the method is not used to set the number of rows to
-   fetch before yielding, the entire result is fetched before rows are yielded.
-   This may change in a future release so that the automatic buffer size used
-   by :class:`_engine.Connection` takes place for ORM results as well.
-
-When using a :term:`1.x style` ORM query with :class:`_orm.Query`, yield_per is
-available via :meth:`_orm.Query.yield_per` - this also sets the ``stream_results``
-execution option::
-
-    for row in session.query(User).yield_per(100):
-        # process row
+While the :paramref:`_engine.Connection.execution_options.stream_results`
+option may be combined with use of the :meth:`_engine.Result.partitions`
+method, a specific partition size should be passed to
+:meth:`_engine.Result.partitions` so that the entire result is not fetched.
+It is usually more straightforward to use the
+:paramref:`_engine.Connection.execution_options.yield_per` option when setting
+up to use the :meth:`_engine.Result.partitions` method.
 
 .. seealso::
 
@@ -741,6 +798,7 @@ execution option::
 
     :meth:`_engine.Result.partitions`
 
+    :meth:`_engine.Result.yield_per`
 
 .. _dbengine_implicit:
 
@@ -1973,6 +2031,8 @@ method may be used::
 
 .. versionadded:: 1.4  Added the :meth:`_engine.Connection.exec_driver_sql` method.
 
+.. _dbapi_connections_cursor:
+
 Working with the DBAPI cursor directly
 --------------------------------------
 
@@ -2178,6 +2238,9 @@ Result Set  API
 .. autoclass:: ChunkedIteratorResult
     :members:
 
+.. autoclass:: FilterResult
+    :members:
+
 .. autoclass:: FrozenResult
     :members:
 
index 261be27812ea73373e828ca17d77724023495286..06d3dace9031a46bab69725a45b10f563adff26c 100644 (file)
@@ -1003,32 +1003,62 @@ Yield Per
 ^^^^^^^^^
 
 The ``yield_per`` execution option is an integer value which will cause the
-:class:`_engine.Result` to yield only a fixed count of rows at a time.  It is
-often useful to use with a result partitioning method such as
-:meth:`_engine.Result.partitions`, e.g.::
+:class:`_engine.Result` to yield only a fixed count of rows at a time.
+When used as an execution option, ``yield_per`` is equivalent to making use
+of both the :paramref:`_engine.Connection.execution_options.stream_results`
+execution option, which selects for server side cursors to be used
+by the backend if supported, and the :meth:`_engine.Result.yield_per` method
+on the returned :class:`_engine.Result` object,
+which establishes a fixed size of rows to be fetched as well as a
+corresponding limit to how many ORM objects will be constructed at once.
+
+.. tip::
+
+    ``yield_per`` is now available as a Core execution option as well,
+    described in detail at :ref:`engine_stream_results`.  This section details
+    the use of ``yield_per`` as an execution option with an ORM
+    :class:`_orm.Session`.  The option behaves as similarly as possible
+    in both contexts.
+
+``yield_per`` when used with the ORM is typically established either
+via the :meth:`.Executable.execution_options` method on the given statement
+or by passing it to the :paramref:`_orm.Session.execute.execution_options`
+parameter of :meth:`_orm.Session.execute` or other similar :class:`_orm.Session`
+method.  In the example below its invoked upon a statement::
 
     >>> stmt = select(User).execution_options(yield_per=10)
-    {sql}>>> for partition in session.execute(stmt).partitions(10):
-    ...     for row in partition:
-    ...         print(row)
+    {sql}>>> for row in session.execute(stmt):
+    ...     print(row)
+    SELECT user_account.id, user_account.name, user_account.fullname
+    FROM user_account
+    [...] (){stop}
+    (User(id=1, name='spongebob', fullname='Spongebob Squarepants'),)
+    ...
+
+The above code is mostly equivalent as making use of the
+:paramref:`_engine.Connection.execution_options.stream_results` execution
+option, setting the :paramref:`_engine.Connection.execution_options.max_row_buffer`
+to the given integer size, and then using the :meth:`_engine.Result.yield_per`
+method on the :class:`_engine.Result` returned by the
+:class:`_orm.Session`, as in the following example::
+
+    # equivalent code
+    >>> stmt = select(User).execution_options(stream_results=True, max_row_buffer=10)
+    {sql}>>> for row in session.execute(stmt).yield_per(10):
+    ...     print(row)
     SELECT user_account.id, user_account.name, user_account.fullname
     FROM user_account
     [...] (){stop}
     (User(id=1, name='spongebob', fullname='Spongebob Squarepants'),)
     ...
 
-For expediency, the :meth:`_engine.Result.yield_per` method may also be used
-with an ORM-enabled result set, which will have the similar effect at result
-fetching time as if the ``yield_per`` execution option were used, with the
-exception that ``stream_results`` option, described below, is not set
-automatically. The :meth:`_engine.Result.partitions` method, if used,
-automatically uses the number sent to :meth:`_engine.Result.yield_per` as the
-number of rows in each partition::
-
-    >>> stmt = select(User)
-    {sql} >>> for partition in session.execute(
-    ...          stmt, execution_options={"stream_results": True}
-    ...       ).yield_per(10).partitions():
+``yield_per`` is also commonly used in combination with the
+:meth:`_engine.Result.partitions` method, that will iterate rows in grouped
+partitions. The size of each partition defaults to the integer value passed to
+``yield_per``, as in the below example::
+
+    >>> stmt = select(User).execution_options(yield_per=10)
+    {sql}>>> for partition in session.execute(stmt).partitions():
     ...     for row in partition:
     ...         print(row)
     SELECT user_account.id, user_account.name, user_account.fullname
@@ -1041,20 +1071,17 @@ The purpose of "yield per" is when fetching very large result sets
 (> 10K rows), to batch results in sub-collections and yield them
 out partially, so that the Python interpreter doesn't need to declare
 very large areas of memory which is both time consuming and leads
-to excessive memory use.   The performance from fetching hundreds of
-thousands of rows can often double when a suitable yield-per setting
-(e.g. approximately 1000) is used, even with DBAPIs that buffer
-rows (which are most).
+to excessive memory use.
 
 When ``yield_per`` is used, the
 :paramref:`_engine.Connection.execution_options.stream_results` option is also
 set for the Core execution, so that a streaming / server side cursor will be
-used if the backend supports it [1]_
+used if the backend supports it.
 
 The ``yield_per`` execution option **is not compatible with subqueryload eager
 loading or joinedload eager loading when using collections**.  It is
-potentially compatible with selectinload eager loading, **provided the database
-driver supports multiple, independent cursors** [2]_ .
+potentially compatible with selectinload eager loading, provided the database
+driver supports multiple, independent cursors.
 
 Additionally, the ``yield_per`` execution option is not compatible
 with the :meth:`_engine.Result.unique` method; as this method relies upon
@@ -1067,20 +1094,10 @@ large number of rows.
    :meth:`_engine.Result.unique` filter, at the same time as the ``yield_per``
    execution option is used.
 
-The ``yield_per`` execution option is equvialent to the
-:meth:`_orm.Query.yield_per` method in :term:`1.x style` ORM queries.
-
-.. [1] currently known are
-   :mod:`_postgresql.psycopg2`,
-   :mod:`_mysql.mysqldb` and
-   :mod:`_mysql.pymysql`.  Other backends will pre buffer
-   all rows.  The memory use of raw database rows is much less than that of an
-   ORM-mapped object, but should still be taken into consideration when
-   benchmarking.
+When using the legacy :class:`_orm.Query` object with
+:term:`1.x style` ORM use, the :meth:`_orm.Query.yield_per` method
+will have the same result as that of the ``yield_per`` execution option.
 
-.. [2] the :mod:`_postgresql.psycopg2`
-   and :mod:`_sqlite.pysqlite` drivers are
-   known to work, drivers for MySQL and SQL Server ODBC drivers do not.
 
 .. seealso::
 
index 488e41de33b0001e5cf45991d8c52a960d1e9e90..2437e170dfa8618582c79ceace5f7f18d955a8d2 100644 (file)
@@ -44,6 +44,7 @@ from .interfaces import TypeCompiler
 from .mock import create_mock_engine
 from .reflection import Inspector
 from .result import ChunkedIteratorResult
+from .result import FilterResult
 from .result import FrozenResult
 from .result import IteratorResult
 from .result import MappingResult
index 1507e159ed60d54027a0b25904550cda4e633718..8a8cab140b0f955aac4cc8b925b4f3b7ce4722cc 100644 (file)
@@ -364,15 +364,89 @@ class Connection(Connectable):
 
         :param stream_results: Available on: Connection, statement.
           Indicate to the dialect that results should be
-          "streamed" and not pre-buffered, if possible.  This is a limitation
-          of many DBAPIs.  The flag is currently understood within a subset
-          of dialects within the PostgreSQL and MySQL categories, and
-          may be supported by other third party dialects as well.
+          "streamed" and not pre-buffered, if possible.  For backends
+          such as PostgreSQL, MySQL and MariaDB, this indicates the use of
+          a "server side cursor" as opposed to a client side cursor.
+          Other backends such as that of Oracle may already use server
+          side cursors by default.
+
+          The usage of
+          :paramref:`_engine.Connection.execution_options.stream_results` is
+          usually combined with setting a fixed number of rows to to be fetched
+          in batches, to allow for efficient iteration of database rows while
+          at the same time not loading all result rows into memory at once;
+          this can be configured on a :class:`_engine.Result` object using the
+          :meth:`_engine.Result.yield_per` method, after execution has
+          returned a new :class:`_engine.Result`.   If
+          :meth:`_engine.Result.yield_per` is not used,
+          the :paramref:`_engine.Connection.execution_options.stream_results`
+          mode of operation will instead use a dynamically sized buffer
+          which buffers sets of rows at a time, growing on each batch
+          based on a fixed growth size up until a limit which may
+          be configured using the
+          :paramref:`_engine.Connection.execution_options.max_row_buffer`
+          parameter.
+
+          When using the ORM to fetch ORM mapped objects from a result,
+          :meth:`_engine.Result.yield_per` should always be used with
+          :paramref:`_engine.Connection.execution_options.stream_results`,
+          so that the ORM does not fetch all rows into new ORM objects at once.
+
+          For typical use, the
+          :paramref:`_engine.Connection.execution_options.yield_per` execution
+          option should be preferred, which sets up both
+          :paramref:`_engine.Connection.execution_options.stream_results` and
+          :meth:`_engine.Result.yield_per` at once. This option is supported
+          both at a core level by :class:`_engine.Connection` as well as by the
+          ORM :class:`_engine.Session`; the latter is described at
+          :ref:`orm_queryguide_yield_per`.
 
           .. seealso::
 
+            :ref:`engine_stream_results` - background on
+            :paramref:`_engine.Connection.execution_options.stream_results`
+
+            :paramref:`_engine.Connection.execution_options.max_row_buffer`
+
+            :paramref:`_engine.Connection.execution_options.yield_per`
+
+            :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
+            describing the ORM version of ``yield_per``
+
+        :param max_row_buffer: Available on: :class:`_engine.Connection`,
+          :class:`_sql.Executable`.  Sets a maximum
+          buffer size to use when the
+          :paramref:`_engine.Connection.execution_options.stream_results`
+          execution option is used on a backend that supports server side
+          cursors.  The default value if not specified is 1000.
+
+          .. seealso::
+
+            :paramref:`_engine.Connection.execution_options.stream_results`
+
             :ref:`engine_stream_results`
 
+
+        :param yield_per: Available on: :class:`_engine.Connection`,
+          :class:`_sql.Executable`.  Integer value applied which will
+          set the :paramref:`_engine.Connection.execution_options.stream_results`
+          execution option and invoke :meth:`_engine.Result.yield_per`
+          automatically at once.  Allows equivalent functionality as
+          is present when using this parameter with the ORM.
+
+          .. versionadded:: 1.4.40
+
+          .. seealso::
+
+            :ref:`engine_stream_results` - background and examples
+            on using server side cursors with Core.
+
+            :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
+            describing the ORM version of ``yield_per``
+
+        :param schema_translate_map: Available on: :class:`_engine.Connection`,
+          :class:`_engine.Engine`, :class:`_sql.Executable`.
+
         :param schema_translate_map: Available on: Connection, Engine.
           A dictionary mapping schema names to schema names, that will be
           applied to the :paramref:`_schema.Table.schema` element of each
@@ -1711,6 +1785,13 @@ class Connection(Connectable):
             # the only feature that branching provides
             self = self.__branch_from
 
+        if execution_options:
+            yp = execution_options.get("yield_per", None)
+            if yp:
+                execution_options = execution_options.union(
+                    {"stream_results": True, "max_row_buffer": yp}
+                )
+
         try:
             conn = self._dbapi_connection
             if conn is None:
index e17422e1c312eb2816327faf506e186cf50b972f..abe58e2fde1a98f9bbe42610801c1d7fa7a58496 100644 (file)
@@ -1021,7 +1021,6 @@ class BufferedRowCursorFetchStrategy(CursorFetchStrategy):
         growth_factor=5,
         initial_buffer=None,
     ):
-
         self._max_row_buffer = execution_options.get("max_row_buffer", 1000)
 
         if initial_buffer is not None:
index 028c4b0713ad8ebec850f41ca15b97360583c459..268a2d60930d460320f6585b5b749aedaddee927 100644 (file)
@@ -1445,11 +1445,16 @@ class DefaultExecutionContext(interfaces.ExecutionContext):
         return self.dialect.supports_sane_multi_rowcount
 
     def _setup_result_proxy(self):
+        exec_opt = self.execution_options
+
         if self.is_crud or self.is_text:
             result = self._setup_dml_or_text_result()
+            yp = sr = False
         else:
+            yp = exec_opt.get("yield_per", None)
+            sr = self._is_server_side or exec_opt.get("stream_results", False)
             strategy = self.cursor_fetch_strategy
-            if self._is_server_side and strategy is _cursor._DEFAULT_FETCH:
+            if sr and strategy is _cursor._DEFAULT_FETCH:
                 strategy = _cursor.BufferedRowCursorFetchStrategy(
                     self.cursor, self.execution_options
                 )
@@ -1482,6 +1487,9 @@ class DefaultExecutionContext(interfaces.ExecutionContext):
 
         self._soft_closed = result._soft_closed
 
+        if yp:
+            result = result.yield_per(yp)
+
         return result
 
     def _setup_out_parameters(self, result):
index 6ca8f8c9d9dbc6c7be32c1ad7cce7c5558e63075..912dccf4bf897efee0f6ae1a25c8ec3ffaf554ad 100644 (file)
@@ -773,7 +773,7 @@ class Result(_WithKeys, ResultInternal):
 
     @_generative
     def yield_per(self, num):
-        """Configure the row-fetching strategy to fetch num rows at a time.
+        """Configure the row-fetching strategy to fetch ``num`` rows at a time.
 
         This impacts the underlying behavior of the result when iterating over
         the result object, or otherwise making use of  methods such as
@@ -788,16 +788,24 @@ class Result(_WithKeys, ResultInternal):
         conjunction with the
         :paramref:`_engine.Connection.execution_options.stream_results`
         execution option, which will allow the database dialect in use to make
-        use of a server side cursor, if the DBAPI supports it.
+        use of a server side cursor, if the DBAPI supports a specific "server
+        side cursor" mode separate from its default mode of operation.
 
-        Most DBAPIs do not use server side cursors by default, which means  all
-        rows will be fetched upfront from the database regardless of  the
-        :meth:`_engine.Result.yield_per` setting.  However,
-        :meth:`_engine.Result.yield_per` may still be useful in that it batches
-        the SQLAlchemy-side processing of the raw data from the database, and
-        additionally when used for ORM scenarios will batch the conversion of
-        database rows into  ORM entity rows.
+        .. tip::
 
+            Consider using the
+            :paramref:`_engine.Connection.execution_options.yield_per`
+            execution option, which will simultaneously set
+            :paramref:`_engine.Connection.execution_options.stream_results`
+            to ensure the use of server side cursors, as well as automatically
+            invoke the :meth:`_engine.Result.yield_per` method to establish
+            a fixed row buffer size at once.
+
+            The :paramref:`_engine.Connection.execution_options.yield_per`
+            execution option is available for ORM operations, with
+            :class:`_orm.Session`-oriented use described at
+            :ref:`orm_queryguide_yield_per`. The Core-only version which works
+            with :class:`_engine.Connection` is new as of SQLAlchemy 1.4.40.
 
         .. versionadded:: 1.4
 
@@ -806,9 +814,10 @@ class Result(_WithKeys, ResultInternal):
 
         .. seealso::
 
-            :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
+            :ref:`engine_stream_results` - describes Core behavior for
+            :meth:`_engine.Result.yield_per`
 
-            :meth:`_engine.Result.partitions`
+            :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
 
         """
         self._yield_per = num
@@ -1005,24 +1014,29 @@ class Result(_WithKeys, ResultInternal):
 
         When using the ORM, the :meth:`_engine.Result.partitions` method
         is typically more effective from a memory perspective when it is
-        combined with use of the :meth:`_engine.Result.yield_per` method,
-        which instructs the ORM loading internals to only build a certain
-        amount of ORM objects from a result at a time before yielding
-        them out.
+        combined with use of the
+        :ref:`yield_per execution option <orm_queryguide_yield_per>`,
+        which instructs both the DBAPI driver to use server side cursors,
+        if available, as well as instructs the ORM loading internals to only
+        build a certain amount of ORM objects from a result at a time before
+        yielding them out.
 
         .. versionadded:: 1.4
 
         :param size: indicate the maximum number of rows to be present
          in each list yielded.  If None, makes use of the value set by
-         :meth:`_engine.Result.yield_per`, if present, otherwise uses the
-         :meth:`_engine.Result.fetchmany` default which may be backend
-         specific.
+         the :meth:`_engine.Result.yield_per`, method, if it were called,
+         or the :paramref:`_engine.Connection.execution_options.yield_per`
+         execution option, which is equivalent in this regard.  If
+         yield_per weren't set, it makes use of the
+         :meth:`_engine.Result.fetchmany` default, which may be backend
+         specific and not well defined.
 
         :return: iterator of lists
 
         .. seealso::
 
-            :paramref:`.Connection.execution_options.stream_results`
+            :ref:`engine_stream_results`
 
             :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
 
@@ -1283,10 +1297,35 @@ class FilterResult(ResultInternal):
     """A wrapper for a :class:`_engine.Result` that returns objects other than
     :class:`_result.Row` objects, such as dictionaries or scalar objects.
 
+    :class:`.FilterResult` is the common base for additional result
+    APIs including :class:`.MappingResult`, :class:`.ScalarResult`
+    and :class:`.AsyncResult`.
+
     """
 
     _post_creational_filter = None
 
+    @_generative
+    def yield_per(self, num):
+        """Configure the row-fetching strategy to fetch ``num`` rows at a time.
+
+        The :meth:`_engine.FilterResult.yield_per` method is a pass through
+        to the :meth:`_engine.Result.yield_per` method.  See that method's
+        documentation for usage notes.
+
+        .. versionadded:: 1.4.40 - added :meth:`_engine.FilterResult.yield_per`
+           so that the method is available on all result set implementations
+
+        .. seealso::
+
+            :ref:`engine_stream_results` - describes Core behavior for
+            :meth:`_engine.Result.yield_per`
+
+            :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
+
+        """
+        self._real_result = self._real_result.yield_per(num)
+
     def _soft_close(self, hard=False):
         self._real_result._soft_close(hard=hard)
 
index 62e4a9a0e545b3464654fa77a5d2882955d92706..c69fe191bec01c73e77cb0b00e6c9f403c7494e0 100644 (file)
@@ -12,6 +12,7 @@ from ...engine.result import _NO_ROW
 from ...engine.result import FilterResult
 from ...engine.result import FrozenResult
 from ...engine.result import MergedResult
+from ...sql.base import _generative
 from ...util.concurrency import greenlet_spawn
 
 
@@ -63,6 +64,7 @@ class AsyncResult(AsyncCommon):
         """
         return self._metadata.keys
 
+    @_generative
     def unique(self, strategy=None):
         """Apply unique filtering to the objects returned by this
         :class:`_asyncio.AsyncResult`.
@@ -73,7 +75,6 @@ class AsyncResult(AsyncCommon):
 
         """
         self._unique_filter_state = (set(), strategy)
-        return self
 
     def columns(self, *col_expressions):
         r"""Establish the columns that should be returned in each row.
index ab1fc4045b8414651d5b1ab03dcf9d1707262cd1..7cedc2b43cb387df212bcb5d51db271ef4837a4b 100644 (file)
@@ -286,14 +286,9 @@ class ORMCompileState(CompileState):
         else:
             execution_options = execution_options.union(_orm_load_exec_options)
 
-        if "yield_per" in execution_options or load_options._yield_per:
+        if load_options._yield_per:
             execution_options = execution_options.union(
-                {
-                    "stream_results": True,
-                    "max_row_buffer": execution_options.get(
-                        "yield_per", load_options._yield_per
-                    ),
-                }
+                {"yield_per": load_options._yield_per}
             )
 
         bind_arguments["clause"] = statement
index 0ab3912065896b72b9c5c4303c5ea1f54443d5cd..99e4591431973503f0f452e1d8735020265535a3 100644 (file)
@@ -850,6 +850,10 @@ class Query(
         level. See the section :ref:`orm_queryguide_yield_per` for further
         background on this option.
 
+        .. seealso::
+
+            :ref:`orm_queryguide_yield_per`
+
         """
         self.load_options += {"_yield_per": count}
 
index ec7f2de4b45951a0b9fbebbf2a976c2b7fb9819e..0a2d63b5480d8707c40f82819bf0a2f46fa327f9 100644 (file)
@@ -85,6 +85,28 @@ class TestBase(object):
         # run a close all connections.
         conn.close()
 
+    @config.fixture()
+    def close_result_when_finished(self):
+        to_close = []
+        to_consume = []
+
+        def go(result, consume=False):
+            to_close.append(result)
+            if consume:
+                to_consume.append(result)
+
+        yield go
+        for r in to_consume:
+            try:
+                r.all()
+            except:
+                pass
+        for r in to_close:
+            try:
+                r.close()
+            except:
+                pass
+
     @config.fixture()
     def registry(self, metadata):
         reg = registry(metadata=metadata)
index d8d9e702113ea8198f1be8de27f2f227ea81bfc1..eddf4e52fc34432270b6c9dca509d928765dbaa7 100644 (file)
@@ -15,6 +15,7 @@ from sqlalchemy import Table
 from sqlalchemy import testing
 from sqlalchemy import text
 from sqlalchemy import union_all
+from sqlalchemy.engine import cursor as _cursor
 from sqlalchemy.ext.asyncio import async_engine_from_config
 from sqlalchemy.ext.asyncio import create_async_engine
 from sqlalchemy.ext.asyncio import engine as _async_engine
@@ -873,20 +874,53 @@ class AsyncResultTest(EngineFixture):
     @testing.combinations(
         (None,), ("scalars",), ("mappings",), argnames="filter_"
     )
+    @testing.combinations(None, 2, 5, 10, argnames="yield_per")
+    @testing.combinations("method", "opt", argnames="yield_per_type")
     @async_test
-    async def test_partitions(self, async_engine, filter_):
+    async def test_partitions(
+        self, async_engine, filter_, yield_per, yield_per_type
+    ):
         users = self.tables.users
         async with async_engine.connect() as conn:
-            result = await conn.stream(select(users))
+            stmt = select(users)
+            if yield_per and yield_per_type == "opt":
+                stmt = stmt.execution_options(yield_per=yield_per)
+            result = await conn.stream(stmt)
 
             if filter_ == "mappings":
                 result = result.mappings()
             elif filter_ == "scalars":
                 result = result.scalars(1)
 
+            if yield_per and yield_per_type == "method":
+                result = result.yield_per(yield_per)
+
             check_result = []
-            async for partition in result.partitions(5):
-                check_result.append(partition)
+
+            # stream() sets stream_results unconditionally
+            assert isinstance(
+                result._real_result.cursor_strategy,
+                _cursor.BufferedRowCursorFetchStrategy,
+            )
+
+            if yield_per:
+                partition_size = yield_per
+
+                eq_(result._real_result.cursor_strategy._bufsize, yield_per)
+
+                async for partition in result.partitions():
+                    check_result.append(partition)
+            else:
+                eq_(result._real_result.cursor_strategy._bufsize, 5)
+
+                partition_size = 5
+                async for partition in result.partitions(partition_size):
+                    check_result.append(partition)
+
+            ranges = [
+                (i, min(20, i + partition_size))
+                for i in range(1, 21, partition_size)
+            ]
 
             if filter_ == "mappings":
                 eq_(
@@ -896,23 +930,20 @@ class AsyncResultTest(EngineFixture):
                             {"user_id": i, "user_name": "name%d" % i}
                             for i in range(a, b)
                         ]
-                        for (a, b) in [(1, 6), (6, 11), (11, 16), (16, 20)]
+                        for (a, b) in ranges
                     ],
                 )
             elif filter_ == "scalars":
                 eq_(
                     check_result,
-                    [
-                        ["name%d" % i for i in range(a, b)]
-                        for (a, b) in [(1, 6), (6, 11), (11, 16), (16, 20)]
-                    ],
+                    [["name%d" % i for i in range(a, b)] for (a, b) in ranges],
                 )
             else:
                 eq_(
                     check_result,
                     [
                         [(i, "name%d" % i) for i in range(a, b)]
-                        for (a, b) in [(1, 6), (6, 11), (11, 16), (16, 20)]
+                        for (a, b) in ranges
                     ],
                 )
 
index 0539e6fe658b4377a50ecc94746e8f8a8f3c51b5..ddaa3c60dab963fd5f0bc567365bad50d1229a8b 100644 (file)
@@ -38,6 +38,7 @@ from sqlalchemy import type_coerce
 from sqlalchemy import Unicode
 from sqlalchemy import union
 from sqlalchemy import util
+from sqlalchemy.engine import cursor as _cursor
 from sqlalchemy.engine import default
 from sqlalchemy.ext.compiler import compiles
 from sqlalchemy.orm import aliased
@@ -5406,8 +5407,7 @@ class YieldTest(_fixtures.FixtureTest):
                     if not k.startswith("_")
                 },
                 {
-                    "max_row_buffer": 15,
-                    "stream_results": True,
+                    "yield_per": 15,
                     "foo": "bar",
                     "future_result": True,
                 },
@@ -5435,8 +5435,6 @@ class YieldTest(_fixtures.FixtureTest):
                     if not k.startswith("_")
                 },
                 {
-                    "max_row_buffer": 15,
-                    "stream_results": True,
                     "yield_per": 15,
                     "future_result": True,
                 },
@@ -5444,6 +5442,12 @@ class YieldTest(_fixtures.FixtureTest):
 
         stmt = select(User).execution_options(yield_per=15)
         result = sess.execute(stmt)
+
+        assert isinstance(
+            result.raw.cursor_strategy, _cursor.BufferedRowCursorFetchStrategy
+        )
+        eq_(result.raw.cursor_strategy._max_row_buffer, 15)
+
         eq_(len(result.all()), 4)
 
     def test_no_joinedload_opt(self):
index 088f580747449af1771d3d2fb8dcbb5e8cc1cee7..13ffc5eebdfbd087465b83ea204c53c922969fde 100644 (file)
@@ -97,6 +97,12 @@ class CursorResultTest(fixtures.TablesTest):
             Column("user_name", VARCHAR(20)),
             test_needs_acid=True,
         )
+        Table(
+            "test",
+            metadata,
+            Column("x", Integer, primary_key=True),
+            Column("y", String(50)),
+        )
 
     def test_row_iteration(self, connection):
         users = self.tables.users
@@ -1766,6 +1772,131 @@ class CursorResultTest(fixtures.TablesTest):
         with expect_raises_message(Exception, "canary"):
             r.lastrowid
 
+    @testing.combinations("plain", "mapping", "scalar", argnames="result_type")
+    @testing.combinations(
+        "stream_results", "yield_per", "yield_per_meth", argnames="optname"
+    )
+    @testing.combinations(10, 50, argnames="value")
+    @testing.combinations("meth", "stmt", argnames="send_opts_how")
+    def test_stream_options(
+        self,
+        connection,
+        optname,
+        value,
+        send_opts_how,
+        result_type,
+        close_result_when_finished,
+    ):
+        table = self.tables.test
+
+        connection.execute(
+            table.insert(),
+            [{"x": i, "y": "t_%d" % i} for i in range(15, 3000)],
+        )
+
+        if optname == "stream_results":
+            opts = {"stream_results": True, "max_row_buffer": value}
+        elif optname == "yield_per":
+            opts = {"yield_per": value}
+        elif optname == "yield_per_meth":
+            opts = {"stream_results": True}
+        else:
+            assert False
+
+        if send_opts_how == "meth":
+            result = connection.execution_options(**opts).execute(
+                table.select()
+            )
+        elif send_opts_how == "stmt":
+            result = connection.execute(
+                table.select().execution_options(**opts)
+            )
+        else:
+            assert False
+
+        if result_type == "mapping":
+            result = result.mappings()
+            real_result = result._real_result
+        elif result_type == "scalar":
+            result = result.scalars()
+            real_result = result._real_result
+        else:
+            real_result = result
+
+        if optname == "yield_per_meth":
+            result = result.yield_per(value)
+
+        if result_type == "mapping" or result_type == "scalar":
+            real_result = result._real_result
+        else:
+            real_result = result
+
+        close_result_when_finished(result, consume=True)
+
+        if optname == "yield_per" and value is not None:
+            expected_opt = {
+                "stream_results": True,
+                "max_row_buffer": value,
+                "yield_per": value,
+            }
+        elif optname == "stream_results" and value is not None:
+            expected_opt = {
+                "stream_results": True,
+                "max_row_buffer": value,
+            }
+        else:
+            expected_opt = None
+
+        if expected_opt is not None:
+            eq_(real_result.context.execution_options, expected_opt)
+
+        if value is None:
+            assert isinstance(
+                real_result.cursor_strategy, _cursor.CursorFetchStrategy
+            )
+            return
+
+        assert isinstance(
+            real_result.cursor_strategy, _cursor.BufferedRowCursorFetchStrategy
+        )
+        eq_(real_result.cursor_strategy._max_row_buffer, value)
+
+        if optname == "yield_per" or optname == "yield_per_meth":
+            eq_(real_result.cursor_strategy._bufsize, value)
+        else:
+            eq_(real_result.cursor_strategy._bufsize, min(value, 5))
+        eq_(len(real_result.cursor_strategy._rowbuffer), 1)
+
+        next(result)
+        next(result)
+
+        if optname == "yield_per" or optname == "yield_per_meth":
+            eq_(len(real_result.cursor_strategy._rowbuffer), value - 1)
+        else:
+            # based on default growth of 5
+            eq_(len(real_result.cursor_strategy._rowbuffer), 4)
+
+        for i, row in enumerate(result):
+            if i == 186:
+                break
+
+        if optname == "yield_per" or optname == "yield_per_meth":
+            eq_(
+                len(real_result.cursor_strategy._rowbuffer),
+                value - (188 % value),
+            )
+        else:
+            # based on default growth of 5
+            eq_(
+                len(real_result.cursor_strategy._rowbuffer),
+                7 if value == 10 else 42,
+            )
+
+        if optname == "yield_per" or optname == "yield_per_meth":
+            # ensure partition is set up to same size
+            partition = next(result.partitions())
+            eq_(len(partition), value)
+
 
 class KeyTargetingTest(fixtures.TablesTest):
     run_inserts = "once"