]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
repair yield_per for non-SS dialects and add new options
authorMike Bayer <mike_mp@zzzcomputing.com>
Thu, 30 Jun 2022 23:10:06 +0000 (19:10 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Fri, 1 Jul 2022 16:14:02 +0000 (12:14 -0400)
Implemented new :paramref:`_engine.Connection.execution_options.yield_per`
execution option for :class:`_engine.Connection` in Core, to mirror that of
the same :ref:`yield_per <orm_queryguide_yield_per>` option available in
the ORM. The option sets both the
:paramref:`_engine.Connection.execution_options.stream_results` option at
the same time as invoking :meth:`_engine.Result.yield_per`, to provide the
most common streaming result configuration which also mirrors that of the
ORM use case in its usage pattern.

Fixed bug in :class:`_engine.Result` where the usage of a buffered result
strategy would not be used if the dialect in use did not support an
explicit "server side cursor" setting, when using
:paramref:`_engine.Connection.execution_options.stream_results`. This is in
error as DBAPIs such as that of SQLite and Oracle already use a
non-buffered result fetching scheme, which still benefits from usage of
partial result fetching.   The "buffered" strategy is now used in all
cases where :paramref:`_engine.Connection.execution_options.stream_results`
is set.

Added :meth:`.FilterResult.yield_per` so that result implementations
such as :class:`.MappingResult`, :class:`.ScalarResult` and
:class:`.AsyncResult` have access to this method.

Fixes: #8199
Change-Id: I6dde3cbe483a1bf81e945561b60f4b7d1c434750

16 files changed:
doc/build/changelog/unreleased_14/yp.rst [new file with mode: 0644]
doc/build/core/connections.rst
doc/build/orm/queryguide.rst
lib/sqlalchemy/engine/__init__.py
lib/sqlalchemy/engine/base.py
lib/sqlalchemy/engine/cursor.py
lib/sqlalchemy/engine/default.py
lib/sqlalchemy/engine/result.py
lib/sqlalchemy/ext/asyncio/result.py
lib/sqlalchemy/orm/context.py
lib/sqlalchemy/orm/query.py
lib/sqlalchemy/testing/fixtures.py
test/ext/asyncio/test_engine_py3k.py
test/orm/test_query.py
test/profiles.txt
test/sql/test_resultset.py

diff --git a/doc/build/changelog/unreleased_14/yp.rst b/doc/build/changelog/unreleased_14/yp.rst
new file mode 100644 (file)
index 0000000..74e2c6a
--- /dev/null
@@ -0,0 +1,38 @@
+.. change::
+    :tags: usecase, engine
+
+    Implemented new :paramref:`_engine.Connection.execution_options.yield_per`
+    execution option for :class:`_engine.Connection` in Core, to mirror that of
+    the same :ref:`yield_per <orm_queryguide_yield_per>` option available in
+    the ORM. The option sets both the
+    :paramref:`_engine.Connection.execution_options.stream_results` option at
+    the same time as invoking :meth:`_engine.Result.yield_per`, to provide the
+    most common streaming result configuration which also mirrors that of the
+    ORM use case in its usage pattern.
+
+    .. seealso::
+
+        :ref:`engine_stream_results` - revised documentation
+
+
+.. change::
+    :tags: bug, engine
+
+    Fixed bug in :class:`_engine.Result` where the usage of a buffered result
+    strategy would not be used if the dialect in use did not support an
+    explicit "server side cursor" setting, when using
+    :paramref:`_engine.Connection.execution_options.stream_results`. This is in
+    error as DBAPIs such as that of SQLite and Oracle already use a
+    non-buffered result fetching scheme, which still benefits from usage of
+    partial result fetching.   The "buffered" strategy is now used in all
+    cases where :paramref:`_engine.Connection.execution_options.stream_results`
+    is set.
+
+
+.. change::
+    :tags: bug, engine
+    :tickets: 8199
+
+    Added :meth:`.FilterResult.yield_per` so that result implementations
+    such as :class:`.MappingResult`, :class:`.ScalarResult` and
+    :class:`.AsyncResult` have access to this method.
index 3b6b041ee44b0a8cfb2d45abd54131877f17b7f1..2e3c661be0d093e7ee2297bad37503716c72091a 100644 (file)
@@ -582,20 +582,33 @@ To sum up:
 Using Server Side Cursors (a.k.a. stream results)
 ==================================================
 
-A limited number of dialects have explicit support for the concept of "server
-side cursors" vs. "buffered cursors".    While a server side cursor implies a
-variety of different capabilities, within SQLAlchemy's engine and dialect
-implementation, it refers only to whether or not a particular set of results is
-fully buffered in memory before they are fetched from the cursor, using a
-method such as ``cursor.fetchall()``.   SQLAlchemy has no direct support
-for cursor behaviors such as scrolling; to make use of these features for
-a particular DBAPI, use the cursor directly as documented at
-:ref:`dbapi_connections`.
-
-Some DBAPIs, such as the cx_Oracle DBAPI, exclusively use server side cursors
-internally.  All result sets are essentially unbuffered across the total span
-of a result set, utilizing only a smaller buffer that is of a fixed size such
-as 100 rows at a time.
+Some backends feature explicit support for the concept of "server
+side cursors" versus "client side cursors".  A client side cursor here
+means that the database driver fully fetches all rows from a result set
+into memory before returning from a statement execution.  Drivers such as
+those of PostgreSQL and MySQL/MariaDB generally use client side cursors
+by default.   A server side cursor, by contrast, indicates that result rows
+remain pending within the database server's state as result rows are consumed
+by the client.  The drivers for Oracle generally use a "server side" model,
+for example, and the SQLite dialect, while not using a real "client / server"
+architecture, still uses an unbuffered result fetching approach that will
+leave result rows outside of process memory before they are consumed.
+
+.. topic:: What we really mean is "buffered" vs. "unbuffered" results
+
+  Server side cursors also imply a wider set of features with relational
+  databases, such as the ability to "scroll" a cursor forwards and backwards.
+  SQLAlchemy does not include any explicit support for these behaviors; within
+  SQLAlchemy itself, the general term "server side cursors" should be considered
+  to mean "unbuffered results" and "client side cursors" means "result rows
+  are buffered into memory before the first row is returned".   To work with
+  a richer "server side cursor" featureset specific to a certain DBAPI driver,
+  see the section :ref:`dbapi_connections_cursor`.
+
+From this basic architecture it follows that a "server side cursor" is more
+memory efficient when fetching very large result sets, while at the same time
+may introduce more complexity in the client/server communication process
+and be less efficient for small result sets (typically less than 10000 rows).
 
 For those dialects that have conditional support for buffered or unbuffered
 results, there are usually caveats to the use of the "unbuffered", or server
@@ -614,76 +627,119 @@ unbuffered cursors are not generally useful except in the uncommon case
 of an application fetching a very large number of rows in chunks, where
 the processing of these rows can be complete before more rows are fetched.
 
-To make use of a server side cursor for a particular execution, the
-:paramref:`_engine.Connection.execution_options.stream_results` option
-is used, which may be called on the :class:`_engine.Connection` object,
-on the statement object, or in the ORM-level contexts mentioned below.
-
-When using this option for a statement, it's usually appropriate to use
-a method like :meth:`_engine.Result.partitions` to work on small sections
-of the result set at a time, while also fetching enough rows for each
-pull so that the operation is efficient::
+For database drivers that provide client and server side cursor options,
+the :paramref:`_engine.Connection.execution_options.stream_results`
+and :paramref:`_engine.Connection.execution_options.yield_per` execution
+options provide access to "server side cursors" on a per-:class:`_engine.Connection`
+or per-statement basis.    Similar options exist when using an ORM
+:class:`_orm.Session` as well.
 
 
-    with engine.connect() as conn:
-        result = conn.execution_options(stream_results=True).execute(text("select * from table"))
+Streaming with a fixed buffer via yield_per
+--------------------------------------------
 
-        for partition in result.partitions(100):
-            _process_rows(partition)
+As individual row-fetch operations with fully unbuffered server side cursors
+are typically more expensive than fetching batches of rows at once, The
+:paramref:`_engine.Connection.execution_options.yield_per` execution option
+configures a :class:`_engine.Connection` or statement to make use of
+server-side cursors as are available, while at the same time configuring a
+fixed-size buffer of rows that will retrieve rows from the server in batches as
+they are consumed. This parameter may be to a positive integer value using the
+:meth:`_engine.Connection.execution_options` method on
+:class:`_engine.Connection` or on a statement using the
+:meth:`.Executable.execution_options` method.
+
+.. versionadded:: 1.4.40 :paramref:`_engine.Connection.execution_options.yield_per` as a
+   Core-only option is new as of SQLAlchemy 1.4.40; for prior 1.4 versions,
+   use :paramref:`_engine.Connection.execution_options.stream_results`
+   directly in combination with :meth:`_engine.Result.yield_per`.
+
+Using this option is equivalent to manually setting the
+:paramref:`_engine.Connection.execution_options.stream_results` option,
+described in the next section, and then invoking the
+:meth:`_engine.Result.yield_per` method on the :class:`_engine.Result`
+object with the given integer value.   In both cases, the effect this
+combination has includes:
+
+* server side cursors mode is selected for the given backend, if available
+  and not already the default behavior for that backend
+* as result rows are fetched, they will be buffered in batches, where the
+  size of each batch up until the last batch will be equal to the integer
+  argument passed to the
+  :paramref:`_engine.Connection.execution_options.yield_per` option or the
+  :meth:`_engine.Result.yield_per` method; the last batch is then sized against
+  the remaining rows fewer than this size
+* The default partition size used by the :meth:`_engine.Result.partitions`
+  method, if used, will be made equal to this integer size as well.
+
+These three behaviors are illustrated in the example below::
 
+    with engine.connect() as conn:
+        result = (
+          conn.
+          execution_options(yield_per=100).
+          execute(text("select * from table"))
+        )
 
-If the :class:`_engine.Result` is iterated directly, rows are fetched internally
+        for partition in result.partitions():
+            # partition is an iterable that will be at most 100 items
+            for row in partition:
+                print(f"{row}")
+
+The above example illustrates the combination of ``yield_per=100`` along
+with using the :meth:`_engine.Result.partitions` method to run processing
+on rows in batches that match the size fetched from the server.   The
+use of :meth:`_engine.Result.partitions` is optional, and if the
+:class:`_engine.Result` is iterated directly, a new batch of rows will be
+buffered for each 100 rows fetched.    Calling a method such as
+:meth:`_engine.Result.all` should **not** be used, as this will fully
+fetch all remaining rows at once and defeat the purpose of using ``yield_per``.
+
+The :paramref:`_engine.Connection.execution_options.yield_per` option
+is portable to the ORM as well, used by a :class:`_orm.Session` to fetch
+ORM objects, where it also limits the amount of ORM objects generated at once.
+See the section :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
+for further background on using
+:paramref:`_engine.Connection.execution_options.yield_per` with the ORM.
+
+.. versionadded:: 1.4.40 Added
+   :paramref:`_engine.Connection.execution_options.yield_per`
+   as a Core level execution option to conveniently set streaming results,
+   buffer size, and partition size all at once in a manner that is transferrable
+   to that of the ORM's similar use case.
+
+.. _engine_stream_results_sr:
+
+Streaming with a dynamically growing buffer using stream_results
+-----------------------------------------------------------------
+
+To enable server side cursors without a specific partition size, the
+:paramref:`_engine.Connection.execution_options.stream_results` option may be
+used, which like :paramref:`_engine.Connection.execution_options.yield_per` may
+be called on the :class:`_engine.Connection` object or the statement object.
+
+When a :class:`_engine.Result` object delivered using the
+:paramref:`_engine.Connection.execution_options.stream_results` option
+is iterated directly, rows are fetched internally
 using a default buffering scheme that buffers first a small set of rows,
 then a larger and larger buffer on each fetch up to a pre-configured limit
-of 1000 rows.   This can be affected using the ``max_row_buffer`` execution
-option::
+of 1000 rows.   The maximum size of this buffer can be affected using the
+:paramref:`_engine.Connection.execution_options.max_row_buffer` execution option::
 
     with engine.connect() as conn:
         conn = conn.execution_options(stream_results=True, max_row_buffer=100)
         result = conn.execute(text("select * from table"))
 
         for row in result:
-            _process_row(row)
-
-The size of the buffer may also be set to a fixed size using the
-:meth:`_engine.Result.yield_per` method.  Calling this method with a number
-of rows will cause all result-fetching methods to work from
-buffers of the given size, only fetching new rows when the buffer is empty::
-
-    with engine.connect() as conn:
-        result = conn.execution_options(stream_results=True).execute(text("select * from table"))
-
-        for row in result.yield_per(100):
-            _process_row(row)
-
-The ``stream_results`` option is also available with the ORM. When using the
-ORM, the :meth:`_engine.Result.yield_per` method should be used to set the
-number of ORM rows to be buffered each time while yielding
-(:meth:`_engine.Result.partitions` uses the "yield per" value by default for
-partition size)::
-
-    with orm.Session(engine) as session:
-        result = session.execute(
-            select(User).order_by(User_id).execution_options(stream_results=True),
-        )
-        for partition in result.yield_per(100).partitions():
-            _process_rows(partition)
-
-
-.. note:: ORM result sets currently must make use of :meth:`_engine.Result.yield_per`
-   in order to achieve streaming ORM results.
-   If the method is not used to set the number of rows to
-   fetch before yielding, the entire result is fetched before rows are yielded.
-   This may change in a future release so that the automatic buffer size used
-   by :class:`_engine.Connection` takes place for ORM results as well.
-
-When using a :term:`1.x style` ORM query with :class:`_orm.Query`, yield_per is
-available via :meth:`_orm.Query.yield_per` - this also sets the ``stream_results``
-execution option::
-
-    for row in session.query(User).yield_per(100):
-        # process row
+            print(f"{row}")
 
+While the :paramref:`_engine.Connection.execution_options.stream_results`
+option may be combined with use of the :meth:`_engine.Result.partitions`
+method, a specific partition size should be passed to
+:meth:`_engine.Result.partitions` so that the entire result is not fetched.
+It is usually more straightforward to use the
+:paramref:`_engine.Connection.execution_options.yield_per` option when setting
+up to use the :meth:`_engine.Result.partitions` method.
 
 .. seealso::
 
@@ -691,6 +747,7 @@ execution option::
 
     :meth:`_engine.Result.partitions`
 
+    :meth:`_engine.Result.yield_per`
 
 
 .. _schema_translating:
@@ -1998,6 +2055,9 @@ Result Set  API
     :members:
     :inherited-members:
 
+.. autoclass:: FilterResult
+    :members:
+
 .. autoclass:: FrozenResult
     :members:
 
index 19285c1d3cfefb78aa9267dc344af705c5d87a01..e0e85750ac297163fa991f70711952e6f7a4c83e 100644 (file)
@@ -1017,33 +1017,63 @@ Yield Per
 ^^^^^^^^^
 
 The ``yield_per`` execution option is an integer value which will cause the
-:class:`_engine.Result` to yield only a fixed count of rows at a time.  It is
-often useful to use with a result partitioning method such as
-:meth:`_engine.Result.partitions`, e.g.::
+:class:`_engine.Result` to yield only a fixed count of rows at a time.
+When used as an execution option, ``yield_per`` is equivalent to making use
+of both the :paramref:`_engine.Connection.execution_options.stream_results`
+execution option, which selects for server side cursors to be used
+by the backend if supported, and the :meth:`_engine.Result.yield_per` method
+on the returned :class:`_engine.Result` object,
+which establishes a fixed size of rows to be fetched as well as a
+corresponding limit to how many ORM objects will be constructed at once.
+
+.. tip::
+
+    ``yield_per`` is now available as a Core execution option as well,
+    described in detail at :ref:`engine_stream_results`.  This section details
+    the use of ``yield_per`` as an execution option with an ORM
+    :class:`_orm.Session`.  The option behaves as similarly as possible
+    in both contexts.
+
+``yield_per`` when used with the ORM is typically established either
+via the :meth:`.Executable.execution_options` method on the given statement
+or by passing it to the :paramref:`_orm.Session.execute.execution_options`
+parameter of :meth:`_orm.Session.execute` or other similar :class:`_orm.Session`
+method.  In the example below its invoked upon a statement::
 
     >>> stmt = select(User).execution_options(yield_per=10)
-    {sql}>>> for partition in session.execute(stmt).partitions(10):
-    ...     for row in partition:
-    ...         print(row)
+    {sql}>>> for row in session.execute(stmt):
+    ...     print(row)
     SELECT user_account.id, user_account.name, user_account.fullname
     FROM user_account
     [...] (){stop}
     (User(id=1, name='spongebob', fullname='Spongebob Squarepants'),)
     ...
 
-For expediency, the :meth:`_engine.Result.yield_per` method may also be used
-with an ORM-enabled result set, which will have the similar effect at result
-fetching time as if the ``yield_per`` execution option were used, with the
-exception that ``stream_results`` option, described below, is not set
-automatically. The :meth:`_engine.Result.partitions` method, if used,
-automatically uses the number sent to :meth:`_engine.Result.yield_per` as the
-number of rows in each partition::
-
-    >>> stmt = select(User)
-    {sql}>>> for partition in session.execute(
-    ...          stmt, execution_options={"stream_results": True}
-    ...      ).yield_per(10).partitions():
-    ...      for row in partition:
+The above code is mostly equivalent as making use of the
+:paramref:`_engine.Connection.execution_options.stream_results` execution
+option, setting the :paramref:`_engine.Connection.execution_options.max_row_buffer`
+to the given integer size, and then using the :meth:`_engine.Result.yield_per`
+method on the :class:`_engine.Result` returned by the
+:class:`_orm.Session`, as in the following example::
+
+    # equivalent code
+    >>> stmt = select(User).execution_options(stream_results=True, max_row_buffer=10)
+    {sql}>>> for row in session.execute(stmt).yield_per(10):
+    ...     print(row)
+    SELECT user_account.id, user_account.name, user_account.fullname
+    FROM user_account
+    [...] (){stop}
+    (User(id=1, name='spongebob', fullname='Spongebob Squarepants'),)
+    ...
+
+``yield_per`` is also commonly used in combination with the
+:meth:`_engine.Result.partitions` method, that will iterate rows in grouped
+partitions. The size of each partition defaults to the integer value passed to
+``yield_per``, as in the below example::
+
+    >>> stmt = select(User).execution_options(yield_per=10)
+    {sql}>>> for partition in session.execute(stmt).partitions():
+    ...     for row in partition:
     ...         print(row)
     SELECT user_account.id, user_account.name, user_account.fullname
     FROM user_account
@@ -1055,20 +1085,17 @@ The purpose of "yield per" is when fetching very large result sets
 (> 10K rows), to batch results in sub-collections and yield them
 out partially, so that the Python interpreter doesn't need to declare
 very large areas of memory which is both time consuming and leads
-to excessive memory use.   The performance from fetching hundreds of
-thousands of rows can often double when a suitable yield-per setting
-(e.g. approximately 1000) is used, even with DBAPIs that buffer
-rows (which are most).
+to excessive memory use.
 
 When ``yield_per`` is used, the
 :paramref:`_engine.Connection.execution_options.stream_results` option is also
 set for the Core execution, so that a streaming / server side cursor will be
-used if the backend supports it [1]_
+used if the backend supports it.
 
 The ``yield_per`` execution option **is not compatible with subqueryload eager
 loading or joinedload eager loading when using collections**.  It is
-potentially compatible with selectinload eager loading, **provided the database
-driver supports multiple, independent cursors** [2]_ .
+potentially compatible with selectinload eager loading, provided the database
+driver supports multiple, independent cursors.
 
 Additionally, the ``yield_per`` execution option is not compatible
 with the :meth:`_engine.Result.unique` method; as this method relies upon
@@ -1081,20 +1108,10 @@ large number of rows.
    :meth:`_engine.Result.unique` filter, at the same time as the ``yield_per``
    execution option is used.
 
-The ``yield_per`` execution option is equvialent to the
-:meth:`_orm.Query.yield_per` method in :term:`1.x style` ORM queries.
-
-.. [1] currently known are
-   :mod:`_postgresql.psycopg2`,
-   :mod:`_mysql.mysqldb` and
-   :mod:`_mysql.pymysql`.  Other backends will pre buffer
-   all rows.  The memory use of raw database rows is much less than that of an
-   ORM-mapped object, but should still be taken into consideration when
-   benchmarking.
+When using the legacy :class:`_orm.Query` object with
+:term:`1.x style` ORM use, the :meth:`_orm.Query.yield_per` method
+will have the same result as that of the ``yield_per`` execution option.
 
-.. [2] the :mod:`_postgresql.psycopg2`
-   and :mod:`_sqlite.pysqlite` drivers are
-   known to work, drivers for MySQL and SQL Server ODBC drivers do not.
 
 .. seealso::
 
index 77c2fea40837af30b2c04df87268095321070b34..7bbeb1e73bd7f4f1d1ff871122285a37d9638135 100644 (file)
@@ -41,6 +41,7 @@ from .reflection import Inspector as Inspector
 from .reflection import ObjectKind as ObjectKind
 from .reflection import ObjectScope as ObjectScope
 from .result import ChunkedIteratorResult as ChunkedIteratorResult
+from .result import FilterResult as FilterResult
 from .result import FrozenResult as FrozenResult
 from .result import IteratorResult as IteratorResult
 from .result import MappingResult as MappingResult
index fdccf076df149b0a5cb1046033026ba6b71876e8..aafa94047c8c94499f70f115b6efacf8de83320a 100644 (file)
@@ -358,15 +358,86 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
           :class:`_sql.Executable`.
 
           Indicate to the dialect that results should be
-          "streamed" and not pre-buffered, if possible.  This is a limitation
-          of many DBAPIs.  The flag is currently understood within a subset
-          of dialects within the PostgreSQL and MySQL categories, and
-          may be supported by other third party dialects as well.
+          "streamed" and not pre-buffered, if possible.  For backends
+          such as PostgreSQL, MySQL and MariaDB, this indicates the use of
+          a "server side cursor" as opposed to a client side cursor.
+          Other backends such as that of Oracle may already use server
+          side cursors by default.
+
+          The usage of
+          :paramref:`_engine.Connection.execution_options.stream_results` is
+          usually combined with setting a fixed number of rows to to be fetched
+          in batches, to allow for efficient iteration of database rows while
+          at the same time not loading all result rows into memory at once;
+          this can be configured on a :class:`_engine.Result` object using the
+          :meth:`_engine.Result.yield_per` method, after execution has
+          returned a new :class:`_engine.Result`.   If
+          :meth:`_engine.Result.yield_per` is not used,
+          the :paramref:`_engine.Connection.execution_options.stream_results`
+          mode of operation will instead use a dynamically sized buffer
+          which buffers sets of rows at a time, growing on each batch
+          based on a fixed growth size up until a limit which may
+          be configured using the
+          :paramref:`_engine.Connection.execution_options.max_row_buffer`
+          parameter.
+
+          When using the ORM to fetch ORM mapped objects from a result,
+          :meth:`_engine.Result.yield_per` should always be used with
+          :paramref:`_engine.Connection.execution_options.stream_results`,
+          so that the ORM does not fetch all rows into new ORM objects at once.
+
+          For typical use, the
+          :paramref:`_engine.Connection.execution_options.yield_per` execution
+          option should be preferred, which sets up both
+          :paramref:`_engine.Connection.execution_options.stream_results` and
+          :meth:`_engine.Result.yield_per` at once. This option is supported
+          both at a core level by :class:`_engine.Connection` as well as by the
+          ORM :class:`_engine.Session`; the latter is described at
+          :ref:`orm_queryguide_yield_per`.
 
           .. seealso::
 
+            :ref:`engine_stream_results` - background on
+            :paramref:`_engine.Connection.execution_options.stream_results`
+
+            :paramref:`_engine.Connection.execution_options.max_row_buffer`
+
+            :paramref:`_engine.Connection.execution_options.yield_per`
+
+            :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
+            describing the ORM version of ``yield_per``
+
+        :param max_row_buffer: Available on: :class:`_engine.Connection`,
+          :class:`_sql.Executable`.  Sets a maximum
+          buffer size to use when the
+          :paramref:`_engine.Connection.execution_options.stream_results`
+          execution option is used on a backend that supports server side
+          cursors.  The default value if not specified is 1000.
+
+          .. seealso::
+
+            :paramref:`_engine.Connection.execution_options.stream_results`
+
             :ref:`engine_stream_results`
 
+
+        :param yield_per: Available on: :class:`_engine.Connection`,
+          :class:`_sql.Executable`.  Integer value applied which will
+          set the :paramref:`_engine.Connection.execution_options.stream_results`
+          execution option and invoke :meth:`_engine.Result.yield_per`
+          automatically at once.  Allows equivalent functionality as
+          is present when using this parameter with the ORM.
+
+          .. versionadded:: 1.4.40
+
+          .. seealso::
+
+            :ref:`engine_stream_results` - background and examples
+            on using server side cursors with Core.
+
+            :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
+            describing the ORM version of ``yield_per``
+
         :param schema_translate_map: Available on: :class:`_engine.Connection`,
           :class:`_engine.Engine`, :class:`_sql.Executable`.
 
@@ -1683,6 +1754,12 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
         """Create an :class:`.ExecutionContext` and execute, returning
         a :class:`_engine.CursorResult`."""
 
+        if execution_options:
+            yp = execution_options.get("yield_per", None)
+            if yp:
+                execution_options = execution_options.union(
+                    {"stream_results": True, "max_row_buffer": yp}
+                )
         try:
             conn = self._dbapi_connection
             if conn is None:
index 4b0047e3420fb165b0416ea36d193787124cffb3..3ff815f706bad45b13d5778e5f09ee07b99e91bf 100644 (file)
@@ -1034,7 +1034,6 @@ class BufferedRowCursorFetchStrategy(CursorFetchStrategy):
         growth_factor=5,
         initial_buffer=None,
     ):
-
         self._max_row_buffer = execution_options.get("max_row_buffer", 1000)
 
         if initial_buffer is not None:
index 5d3ff8bb7ecc7dd0e1ee294c1ba89e999a71944e..cab96eac115b564c6176b0cc9355fb175c71f0fb 100644 (file)
@@ -1473,11 +1473,16 @@ class DefaultExecutionContext(ExecutionContext):
         return self.dialect.supports_sane_multi_rowcount
 
     def _setup_result_proxy(self):
+        exec_opt = self.execution_options
+
         if self.is_crud or self.is_text:
             result = self._setup_dml_or_text_result()
+            yp = sr = False
         else:
+            yp = exec_opt.get("yield_per", None)
+            sr = self._is_server_side or exec_opt.get("stream_results", False)
             strategy = self.cursor_fetch_strategy
-            if self._is_server_side and strategy is _cursor._DEFAULT_FETCH:
+            if sr and strategy is _cursor._DEFAULT_FETCH:
                 strategy = _cursor.BufferedRowCursorFetchStrategy(
                     self.cursor, self.execution_options
                 )
@@ -1501,6 +1506,9 @@ class DefaultExecutionContext(ExecutionContext):
 
         self._soft_closed = result._soft_closed
 
+        if yp:
+            result = result.yield_per(yp)
+
         return result
 
     def _setup_out_parameters(self, result):
index 52be2603c2053299d35c20312d9d52b77e4ca6c4..a4e373ec311e79b05a3d234b33dae6a78320e0b2 100644 (file)
@@ -934,7 +934,7 @@ class Result(_WithKeys, ResultInternal[Row[_TP]]):
 
     @_generative
     def yield_per(self: SelfResult, num: int) -> SelfResult:
-        """Configure the row-fetching strategy to fetch num rows at a time.
+        """Configure the row-fetching strategy to fetch ``num`` rows at a time.
 
         This impacts the underlying behavior of the result when iterating over
         the result object, or otherwise making use of  methods such as
@@ -949,16 +949,24 @@ class Result(_WithKeys, ResultInternal[Row[_TP]]):
         conjunction with the
         :paramref:`_engine.Connection.execution_options.stream_results`
         execution option, which will allow the database dialect in use to make
-        use of a server side cursor, if the DBAPI supports it.
+        use of a server side cursor, if the DBAPI supports a specific "server
+        side cursor" mode separate from its default mode of operation.
 
-        Most DBAPIs do not use server side cursors by default, which means  all
-        rows will be fetched upfront from the database regardless of  the
-        :meth:`_engine.Result.yield_per` setting.  However,
-        :meth:`_engine.Result.yield_per` may still be useful in that it batches
-        the SQLAlchemy-side processing of the raw data from the database, and
-        additionally when used for ORM scenarios will batch the conversion of
-        database rows into  ORM entity rows.
+        .. tip::
 
+            Consider using the
+            :paramref:`_engine.Connection.execution_options.yield_per`
+            execution option, which will simultaneously set
+            :paramref:`_engine.Connection.execution_options.stream_results`
+            to ensure the use of server side cursors, as well as automatically
+            invoke the :meth:`_engine.Result.yield_per` method to establish
+            a fixed row buffer size at once.
+
+            The :paramref:`_engine.Connection.execution_options.yield_per`
+            execution option is available for ORM operations, with
+            :class:`_orm.Session`-oriented use described at
+            :ref:`orm_queryguide_yield_per`. The Core-only version which works
+            with :class:`_engine.Connection` is new as of SQLAlchemy 1.4.40.
 
         .. versionadded:: 1.4
 
@@ -967,9 +975,10 @@ class Result(_WithKeys, ResultInternal[Row[_TP]]):
 
         .. seealso::
 
-            :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
+            :ref:`engine_stream_results` - describes Core behavior for
+            :meth:`_engine.Result.yield_per`
 
-            :meth:`_engine.Result.partitions`
+            :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
 
         """
         self._yield_per = num
@@ -1219,24 +1228,29 @@ class Result(_WithKeys, ResultInternal[Row[_TP]]):
 
         When using the ORM, the :meth:`_engine.Result.partitions` method
         is typically more effective from a memory perspective when it is
-        combined with use of the :meth:`_engine.Result.yield_per` method,
-        which instructs the ORM loading internals to only build a certain
-        amount of ORM objects from a result at a time before yielding
-        them out.
+        combined with use of the
+        :ref:`yield_per execution option <orm_queryguide_yield_per>`,
+        which instructs both the DBAPI driver to use server side cursors,
+        if available, as well as instructs the ORM loading internals to only
+        build a certain amount of ORM objects from a result at a time before
+        yielding them out.
 
         .. versionadded:: 1.4
 
         :param size: indicate the maximum number of rows to be present
          in each list yielded.  If None, makes use of the value set by
-         :meth:`_engine.Result.yield_per`, if present, otherwise uses the
-         :meth:`_engine.Result.fetchmany` default which may be backend
-         specific.
+         the :meth:`_engine.Result.yield_per`, method, if it were called,
+         or the :paramref:`_engine.Connection.execution_options.yield_per`
+         execution option, which is equivalent in this regard.  If
+         yield_per weren't set, it makes use of the
+         :meth:`_engine.Result.fetchmany` default, which may be backend
+         specific and not well defined.
 
         :return: iterator of lists
 
         .. seealso::
 
-            :paramref:`.Connection.execution_options.stream_results`
+            :ref:`engine_stream_results`
 
             :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
 
@@ -1517,10 +1531,17 @@ class Result(_WithKeys, ResultInternal[Row[_TP]]):
         return MergedResult(self._metadata, (self,) + others)
 
 
+SelfFilterResult = TypeVar("SelfFilterResult", bound="FilterResult[Any]")
+
+
 class FilterResult(ResultInternal[_R]):
     """A wrapper for a :class:`_engine.Result` that returns objects other than
     :class:`_result.Row` objects, such as dictionaries or scalar objects.
 
+    :class:`.FilterResult` is the common base for additional result
+    APIs including :class:`.MappingResult`, :class:`.ScalarResult`
+    and :class:`.AsyncResult`.
+
     """
 
     __slots__ = (
@@ -1535,6 +1556,28 @@ class FilterResult(ResultInternal[_R]):
 
     _real_result: Result[Any]
 
+    @_generative
+    def yield_per(self: SelfFilterResult, num: int) -> SelfFilterResult:
+        """Configure the row-fetching strategy to fetch ``num`` rows at a time.
+
+        The :meth:`_engine.FilterResult.yield_per` method is a pass through
+        to the :meth:`_engine.Result.yield_per` method.  See that method's
+        documentation for usage notes.
+
+        .. versionadded:: 1.4.40 - added :meth:`_engine.FilterResult.yield_per`
+           so that the method is available on all result set implementations
+
+        .. seealso::
+
+            :ref:`engine_stream_results` - describes Core behavior for
+            :meth:`_engine.Result.yield_per`
+
+            :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
+
+        """
+        self._real_result = self._real_result.yield_per(num)
+        return self
+
     def _soft_close(self, hard: bool = False) -> None:
         self._real_result._soft_close(hard=hard)
 
index 8a1b1be32e4650092e4c8787ea2aac7ca73846a2..43f5fae819c480ee574911503bfd99a1d679ddf6 100644 (file)
@@ -27,6 +27,7 @@ from ...engine.result import MergedResult
 from ...engine.result import ResultMetaData
 from ...engine.row import Row
 from ...engine.row import RowMapping
+from ...sql.base import _generative
 from ...util.concurrency import greenlet_spawn
 from ...util.typing import Literal
 
@@ -138,6 +139,7 @@ class AsyncResult(AsyncCommon[Row[_TP]]):
         """
         return self._metadata.keys
 
+    @_generative
     def unique(
         self: SelfAsyncResult, strategy: Optional[_UniqueFilterType] = None
     ) -> SelfAsyncResult:
index a468244e9aa794971f6b4cc608100b0e00562a80..e743eeed5025ea480208dcdc9993742ca7501253 100644 (file)
@@ -357,14 +357,9 @@ class ORMCompileState(CompileState):
         else:
             execution_options = execution_options.union(_orm_load_exec_options)
 
-        if "yield_per" in execution_options or load_options._yield_per:
+        if load_options._yield_per:
             execution_options = execution_options.union(
-                {
-                    "stream_results": True,
-                    "max_row_buffer": execution_options.get(
-                        "yield_per", load_options._yield_per
-                    ),
-                }
+                {"yield_per": load_options._yield_per}
             )
 
         if (
index ead67be7fb3fc2b1e466a69a2b43ff398526ac14..a29e368b6a0e86a7ce99b81167286f70746eeb91 100644 (file)
@@ -1005,6 +1005,10 @@ class Query(
         level. See the section :ref:`orm_queryguide_yield_per` for further
         background on this option.
 
+        .. seealso::
+
+            :ref:`orm_queryguide_yield_per`
+
         """
         self.load_options += {"_yield_per": count}
         return self
index d0e7d8f3cfa3df2f8c0ef82de28d40205589d9cf..20dee5273bb96150517a7b0e0b3e09acc59f4de1 100644 (file)
@@ -92,11 +92,19 @@ class TestBase:
     @config.fixture()
     def close_result_when_finished(self):
         to_close = []
+        to_consume = []
 
-        def go(result):
+        def go(result, consume=False):
             to_close.append(result)
+            if consume:
+                to_consume.append(result)
 
         yield go
+        for r in to_consume:
+            try:
+                r.all()
+            except:
+                pass
         for r in to_close:
             try:
                 r.close()
index 23c422bcf406fe9b57ff9e54280d51937ef6fc99..a54c399811d5858124539c4e76dfa51ff0f34bbc 100644 (file)
@@ -15,6 +15,7 @@ from sqlalchemy import Table
 from sqlalchemy import testing
 from sqlalchemy import text
 from sqlalchemy import union_all
+from sqlalchemy.engine import cursor as _cursor
 from sqlalchemy.ext.asyncio import async_engine_from_config
 from sqlalchemy.ext.asyncio import create_async_engine
 from sqlalchemy.ext.asyncio import engine as _async_engine
@@ -900,20 +901,53 @@ class AsyncResultTest(EngineFixture):
     @testing.combinations(
         (None,), ("scalars",), ("mappings",), argnames="filter_"
     )
+    @testing.combinations(None, 2, 5, 10, argnames="yield_per")
+    @testing.combinations("method", "opt", argnames="yield_per_type")
     @async_test
-    async def test_partitions(self, async_engine, filter_):
+    async def test_partitions(
+        self, async_engine, filter_, yield_per, yield_per_type
+    ):
         users = self.tables.users
         async with async_engine.connect() as conn:
-            result = await conn.stream(select(users))
+            stmt = select(users)
+            if yield_per and yield_per_type == "opt":
+                stmt = stmt.execution_options(yield_per=yield_per)
+            result = await conn.stream(stmt)
 
             if filter_ == "mappings":
                 result = result.mappings()
             elif filter_ == "scalars":
                 result = result.scalars(1)
 
+            if yield_per and yield_per_type == "method":
+                result = result.yield_per(yield_per)
+
             check_result = []
-            async for partition in result.partitions(5):
-                check_result.append(partition)
+
+            # stream() sets stream_results unconditionally
+            assert isinstance(
+                result._real_result.cursor_strategy,
+                _cursor.BufferedRowCursorFetchStrategy,
+            )
+
+            if yield_per:
+                partition_size = yield_per
+
+                eq_(result._real_result.cursor_strategy._bufsize, yield_per)
+
+                async for partition in result.partitions():
+                    check_result.append(partition)
+            else:
+                eq_(result._real_result.cursor_strategy._bufsize, 5)
+
+                partition_size = 5
+                async for partition in result.partitions(partition_size):
+                    check_result.append(partition)
+
+            ranges = [
+                (i, min(20, i + partition_size))
+                for i in range(1, 21, partition_size)
+            ]
 
             if filter_ == "mappings":
                 eq_(
@@ -923,23 +957,20 @@ class AsyncResultTest(EngineFixture):
                             {"user_id": i, "user_name": "name%d" % i}
                             for i in range(a, b)
                         ]
-                        for (a, b) in [(1, 6), (6, 11), (11, 16), (16, 20)]
+                        for (a, b) in ranges
                     ],
                 )
             elif filter_ == "scalars":
                 eq_(
                     check_result,
-                    [
-                        ["name%d" % i for i in range(a, b)]
-                        for (a, b) in [(1, 6), (6, 11), (11, 16), (16, 20)]
-                    ],
+                    [["name%d" % i for i in range(a, b)] for (a, b) in ranges],
                 )
             else:
                 eq_(
                     check_result,
                     [
                         [(i, "name%d" % i) for i in range(a, b)]
-                        for (a, b) in [(1, 6), (6, 11), (11, 16), (16, 20)]
+                        for (a, b) in ranges
                     ],
                 )
 
index f621f264d86187f3ec5cbc6b355bd670ae015a71..2e3d176c95f4f8906aed8bbed0e08a36fc29ff57 100644 (file)
@@ -39,6 +39,7 @@ from sqlalchemy import type_coerce
 from sqlalchemy import Unicode
 from sqlalchemy import union
 from sqlalchemy import util
+from sqlalchemy.engine import cursor as _cursor
 from sqlalchemy.engine import default
 from sqlalchemy.ext.compiler import compiles
 from sqlalchemy.orm import aliased
@@ -5430,8 +5431,7 @@ class YieldTest(_fixtures.FixtureTest):
                     if not k.startswith("_")
                 },
                 {
-                    "max_row_buffer": 15,
-                    "stream_results": True,
+                    "yield_per": 15,
                     "foo": "bar",
                     "future_result": True,
                 },
@@ -5459,8 +5459,6 @@ class YieldTest(_fixtures.FixtureTest):
                     if not k.startswith("_")
                 },
                 {
-                    "max_row_buffer": 15,
-                    "stream_results": True,
                     "yield_per": 15,
                     "future_result": True,
                 },
@@ -5468,6 +5466,12 @@ class YieldTest(_fixtures.FixtureTest):
 
         stmt = select(User).execution_options(yield_per=15)
         result = sess.execute(stmt)
+
+        assert isinstance(
+            result.raw.cursor_strategy, _cursor.BufferedRowCursorFetchStrategy
+        )
+        eq_(result.raw.cursor_strategy._max_row_buffer, 15)
+
         eq_(len(result.all()), 4)
 
     def test_no_joinedload_opt(self):
index f037b4b1078e22d958d98ac33289334b3aa44980..949ec4b26b349a1c5d0eed31141397127f446b6c 100644 (file)
@@ -1,15 +1,15 @@
 # /home/classic/dev/sqlalchemy/test/profiles.txt
 # This file is written out on a per-environment basis.
-# For each test in aaa_profiling, the corresponding function and 
+# For each test in aaa_profiling, the corresponding function and
 # environment is located within this file.  If it doesn't exist,
 # the test is skipped.
-# If a callcount does exist, it is compared to what we received. 
+# If a callcount does exist, it is compared to what we received.
 # assertions are raised if the counts do not match.
-# 
-# To add a new callcount test, apply the function_call_count 
-# decorator and re-run the tests using the --write-profiles 
+#
+# To add a new callcount test, apply the function_call_count
+# decorator and re-run the tests using the --write-profiles
 # option - this file will be rewritten including the new count.
-# 
+#
 
 # TEST: test.aaa_profiling.test_compiler.CompileTest.test_insert
 
@@ -258,18 +258,18 @@ test.aaa_profiling.test_pool.QueuePoolTest.test_second_connect x86_64_linux_cpyt
 
 # TEST: test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute
 
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute x86_64_linux_cpython_3.10_mariadb_mysqldb_dbapiunicode_cextensions 47
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute x86_64_linux_cpython_3.10_mariadb_mysqldb_dbapiunicode_nocextensions 47
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute x86_64_linux_cpython_3.10_mariadb_pymysql_dbapiunicode_cextensions 47
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute x86_64_linux_cpython_3.10_mariadb_pymysql_dbapiunicode_nocextensions 47
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute x86_64_linux_cpython_3.10_mssql_pyodbc_dbapiunicode_cextensions 47
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute x86_64_linux_cpython_3.10_mssql_pyodbc_dbapiunicode_nocextensions 47
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute x86_64_linux_cpython_3.10_oracle_cx_oracle_dbapiunicode_cextensions 47
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute x86_64_linux_cpython_3.10_oracle_cx_oracle_dbapiunicode_nocextensions 47
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute x86_64_linux_cpython_3.10_postgresql_psycopg2_dbapiunicode_cextensions 47
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute x86_64_linux_cpython_3.10_postgresql_psycopg2_dbapiunicode_nocextensions 47
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute x86_64_linux_cpython_3.10_sqlite_pysqlite_dbapiunicode_cextensions 46
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute x86_64_linux_cpython_3.10_sqlite_pysqlite_dbapiunicode_nocextensions 46
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute x86_64_linux_cpython_3.10_mariadb_mysqldb_dbapiunicode_cextensions 50
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute x86_64_linux_cpython_3.10_mariadb_mysqldb_dbapiunicode_nocextensions 50
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute x86_64_linux_cpython_3.10_mariadb_pymysql_dbapiunicode_cextensions 50
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute x86_64_linux_cpython_3.10_mariadb_pymysql_dbapiunicode_nocextensions 50
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute x86_64_linux_cpython_3.10_mssql_pyodbc_dbapiunicode_cextensions 50
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute x86_64_linux_cpython_3.10_mssql_pyodbc_dbapiunicode_nocextensions 50
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute x86_64_linux_cpython_3.10_oracle_cx_oracle_dbapiunicode_cextensions 50
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute x86_64_linux_cpython_3.10_oracle_cx_oracle_dbapiunicode_nocextensions 50
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute x86_64_linux_cpython_3.10_postgresql_psycopg2_dbapiunicode_cextensions 50
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute x86_64_linux_cpython_3.10_postgresql_psycopg2_dbapiunicode_nocextensions 50
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute x86_64_linux_cpython_3.10_sqlite_pysqlite_dbapiunicode_cextensions 50
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute x86_64_linux_cpython_3.10_sqlite_pysqlite_dbapiunicode_nocextensions 50
 
 # TEST: test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute
 
index cb9f930180e795ef2e131688a58cd3172b113044..5df5c8a736863e02e1434fdcc30d41a52f671ab9 100644 (file)
@@ -97,6 +97,12 @@ class CursorResultTest(fixtures.TablesTest):
             Column("user_name", VARCHAR(20)),
             test_needs_acid=True,
         )
+        Table(
+            "test",
+            metadata,
+            Column("x", Integer, primary_key=True),
+            Column("y", String(50)),
+        )
 
     def test_keys_no_rows(self, connection):
 
@@ -1809,6 +1815,135 @@ class CursorResultTest(fixtures.TablesTest):
         with expect_raises_message(Exception, "canary"):
             r.lastrowid
 
+    @testing.combinations("plain", "mapping", "scalar", argnames="result_type")
+    @testing.combinations(
+        "stream_results", "yield_per", "yield_per_meth", argnames="optname"
+    )
+    @testing.combinations(10, 50, argnames="value")
+    @testing.combinations(
+        "meth", "passed_in", "stmt", argnames="send_opts_how"
+    )
+    def test_stream_options(
+        self,
+        connection,
+        optname,
+        value,
+        send_opts_how,
+        result_type,
+        close_result_when_finished,
+    ):
+        table = self.tables.test
+
+        connection.execute(
+            table.insert(),
+            [{"x": i, "y": "t_%d" % i} for i in range(15, 3000)],
+        )
+
+        if optname == "stream_results":
+            opts = {"stream_results": True, "max_row_buffer": value}
+        elif optname == "yield_per":
+            opts = {"yield_per": value}
+        elif optname == "yield_per_meth":
+            opts = {"stream_results": True}
+        else:
+            assert False
+
+        if send_opts_how == "meth":
+            result = connection.execution_options(**opts).execute(
+                table.select()
+            )
+        elif send_opts_how == "passed_in":
+            result = connection.execute(table.select(), execution_options=opts)
+        elif send_opts_how == "stmt":
+            result = connection.execute(
+                table.select().execution_options(**opts)
+            )
+        else:
+            assert False
+
+        if result_type == "mapping":
+            result = result.mappings()
+            real_result = result._real_result
+        elif result_type == "scalar":
+            result = result.scalars()
+            real_result = result._real_result
+        else:
+            real_result = result
+
+        if optname == "yield_per_meth":
+            result = result.yield_per(value)
+
+        if result_type == "mapping" or result_type == "scalar":
+            real_result = result._real_result
+        else:
+            real_result = result
+
+        close_result_when_finished(result, consume=True)
+
+        if optname == "yield_per" and value is not None:
+            expected_opt = {
+                "stream_results": True,
+                "max_row_buffer": value,
+                "yield_per": value,
+            }
+        elif optname == "stream_results" and value is not None:
+            expected_opt = {
+                "stream_results": True,
+                "max_row_buffer": value,
+            }
+        else:
+            expected_opt = None
+
+        if expected_opt is not None:
+            eq_(real_result.context.execution_options, expected_opt)
+
+        if value is None:
+            assert isinstance(
+                real_result.cursor_strategy, _cursor.CursorFetchStrategy
+            )
+            return
+
+        assert isinstance(
+            real_result.cursor_strategy, _cursor.BufferedRowCursorFetchStrategy
+        )
+        eq_(real_result.cursor_strategy._max_row_buffer, value)
+
+        if optname == "yield_per" or optname == "yield_per_meth":
+            eq_(real_result.cursor_strategy._bufsize, value)
+        else:
+            eq_(real_result.cursor_strategy._bufsize, min(value, 5))
+        eq_(len(real_result.cursor_strategy._rowbuffer), 1)
+
+        next(result)
+        next(result)
+
+        if optname == "yield_per" or optname == "yield_per_meth":
+            eq_(len(real_result.cursor_strategy._rowbuffer), value - 1)
+        else:
+            # based on default growth of 5
+            eq_(len(real_result.cursor_strategy._rowbuffer), 4)
+
+        for i, row in enumerate(result):
+            if i == 186:
+                break
+
+        if optname == "yield_per" or optname == "yield_per_meth":
+            eq_(
+                len(real_result.cursor_strategy._rowbuffer),
+                value - (188 % value),
+            )
+        else:
+            # based on default growth of 5
+            eq_(
+                len(real_result.cursor_strategy._rowbuffer),
+                7 if value == 10 else 42,
+            )
+
+        if optname == "yield_per" or optname == "yield_per_meth":
+            # ensure partition is set up to same size
+            partition = next(result.partitions())
+            eq_(len(partition), value)
+
 
 class KeyTargetingTest(fixtures.TablesTest):
     run_inserts = "once"