--- /dev/null
+.. change::
+ :tags: usecase, engine
+
+ Implemented new :paramref:`_engine.Connection.execution_options.yield_per`
+ execution option for :class:`_engine.Connection` in Core, to mirror that of
+ the same :ref:`yield_per <orm_queryguide_yield_per>` option available in
+ the ORM. The option sets both the
+ :paramref:`_engine.Connection.execution_options.stream_results` option at
+ the same time as invoking :meth:`_engine.Result.yield_per`, to provide the
+ most common streaming result configuration which also mirrors that of the
+ ORM use case in its usage pattern.
+
+ .. seealso::
+
+ :ref:`engine_stream_results` - revised documentation
+
+
+.. change::
+ :tags: bug, engine
+
+ Fixed bug in :class:`_engine.Result` where the usage of a buffered result
+ strategy would not be used if the dialect in use did not support an
+ explicit "server side cursor" setting, when using
+ :paramref:`_engine.Connection.execution_options.stream_results`. This is in
+ error as DBAPIs such as that of SQLite and Oracle already use a
+ non-buffered result fetching scheme, which still benefits from usage of
+ partial result fetching. The "buffered" strategy is now used in all
+ cases where :paramref:`_engine.Connection.execution_options.stream_results`
+ is set.
+
+
+.. change::
+ :tags: bug, engine
+ :tickets: 8199
+
+ Added :meth:`.FilterResult.yield_per` so that result implementations
+ such as :class:`.MappingResult`, :class:`.ScalarResult` and
+ :class:`.AsyncResult` have access to this method.
Using Server Side Cursors (a.k.a. stream results)
==================================================
-A limited number of dialects have explicit support for the concept of "server
-side cursors" vs. "buffered cursors". While a server side cursor implies a
-variety of different capabilities, within SQLAlchemy's engine and dialect
-implementation, it refers only to whether or not a particular set of results is
-fully buffered in memory before they are fetched from the cursor, using a
-method such as ``cursor.fetchall()``. SQLAlchemy has no direct support
-for cursor behaviors such as scrolling; to make use of these features for
-a particular DBAPI, use the cursor directly as documented at
-:ref:`dbapi_connections`.
-
-Some DBAPIs, such as the cx_Oracle DBAPI, exclusively use server side cursors
-internally. All result sets are essentially unbuffered across the total span
-of a result set, utilizing only a smaller buffer that is of a fixed size such
-as 100 rows at a time.
+Some backends feature explicit support for the concept of "server
+side cursors" versus "client side cursors". A client side cursor here
+means that the database driver fully fetches all rows from a result set
+into memory before returning from a statement execution. Drivers such as
+those of PostgreSQL and MySQL/MariaDB generally use client side cursors
+by default. A server side cursor, by contrast, indicates that result rows
+remain pending within the database server's state as result rows are consumed
+by the client. The drivers for Oracle generally use a "server side" model,
+for example, and the SQLite dialect, while not using a real "client / server"
+architecture, still uses an unbuffered result fetching approach that will
+leave result rows outside of process memory before they are consumed.
+
+.. topic:: What we really mean is "buffered" vs. "unbuffered" results
+
+ Server side cursors also imply a wider set of features with relational
+ databases, such as the ability to "scroll" a cursor forwards and backwards.
+ SQLAlchemy does not include any explicit support for these behaviors; within
+ SQLAlchemy itself, the general term "server side cursors" should be considered
+ to mean "unbuffered results" and "client side cursors" means "result rows
+ are buffered into memory before the first row is returned". To work with
+ a richer "server side cursor" featureset specific to a certain DBAPI driver,
+ see the section :ref:`dbapi_connections_cursor`.
+
+From this basic architecture it follows that a "server side cursor" is more
+memory efficient when fetching very large result sets, while at the same time
+may introduce more complexity in the client/server communication process
+and be less efficient for small result sets (typically less than 10000 rows).
For those dialects that have conditional support for buffered or unbuffered
results, there are usually caveats to the use of the "unbuffered", or server
of an application fetching a very large number of rows in chunks, where
the processing of these rows can be complete before more rows are fetched.
-To make use of a server side cursor for a particular execution, the
-:paramref:`_engine.Connection.execution_options.stream_results` option
-is used, which may be called on the :class:`_engine.Connection` object,
-on the statement object, or in the ORM-level contexts mentioned below.
-
-When using this option for a statement, it's usually appropriate to use
-a method like :meth:`_engine.Result.partitions` to work on small sections
-of the result set at a time, while also fetching enough rows for each
-pull so that the operation is efficient::
+For database drivers that provide client and server side cursor options,
+the :paramref:`_engine.Connection.execution_options.stream_results`
+and :paramref:`_engine.Connection.execution_options.yield_per` execution
+options provide access to "server side cursors" on a per-:class:`_engine.Connection`
+or per-statement basis. Similar options exist when using an ORM
+:class:`_orm.Session` as well.
- with engine.connect() as conn:
- result = conn.execution_options(stream_results=True).execute(text("select * from table"))
+Streaming with a fixed buffer via yield_per
+--------------------------------------------
- for partition in result.partitions(100):
- _process_rows(partition)
+As individual row-fetch operations with fully unbuffered server side cursors
+are typically more expensive than fetching batches of rows at once, The
+:paramref:`_engine.Connection.execution_options.yield_per` execution option
+configures a :class:`_engine.Connection` or statement to make use of
+server-side cursors as are available, while at the same time configuring a
+fixed-size buffer of rows that will retrieve rows from the server in batches as
+they are consumed. This parameter may be to a positive integer value using the
+:meth:`_engine.Connection.execution_options` method on
+:class:`_engine.Connection` or on a statement using the
+:meth:`.Executable.execution_options` method.
+
+.. versionadded:: 1.4.40 :paramref:`_engine.Connection.execution_options.yield_per` as a
+ Core-only option is new as of SQLAlchemy 1.4.40; for prior 1.4 versions,
+ use :paramref:`_engine.Connection.execution_options.stream_results`
+ directly in combination with :meth:`_engine.Result.yield_per`.
+
+Using this option is equivalent to manually setting the
+:paramref:`_engine.Connection.execution_options.stream_results` option,
+described in the next section, and then invoking the
+:meth:`_engine.Result.yield_per` method on the :class:`_engine.Result`
+object with the given integer value. In both cases, the effect this
+combination has includes:
+
+* server side cursors mode is selected for the given backend, if available
+ and not already the default behavior for that backend
+* as result rows are fetched, they will be buffered in batches, where the
+ size of each batch up until the last batch will be equal to the integer
+ argument passed to the
+ :paramref:`_engine.Connection.execution_options.yield_per` option or the
+ :meth:`_engine.Result.yield_per` method; the last batch is then sized against
+ the remaining rows fewer than this size
+* The default partition size used by the :meth:`_engine.Result.partitions`
+ method, if used, will be made equal to this integer size as well.
+
+These three behaviors are illustrated in the example below::
+ with engine.connect() as conn:
+ result = (
+ conn.
+ execution_options(yield_per=100).
+ execute(text("select * from table"))
+ )
-If the :class:`_engine.Result` is iterated directly, rows are fetched internally
+ for partition in result.partitions():
+ # partition is an iterable that will be at most 100 items
+ for row in partition:
+ print(f"{row}")
+
+The above example illustrates the combination of ``yield_per=100`` along
+with using the :meth:`_engine.Result.partitions` method to run processing
+on rows in batches that match the size fetched from the server. The
+use of :meth:`_engine.Result.partitions` is optional, and if the
+:class:`_engine.Result` is iterated directly, a new batch of rows will be
+buffered for each 100 rows fetched. Calling a method such as
+:meth:`_engine.Result.all` should **not** be used, as this will fully
+fetch all remaining rows at once and defeat the purpose of using ``yield_per``.
+
+The :paramref:`_engine.Connection.execution_options.yield_per` option
+is portable to the ORM as well, used by a :class:`_orm.Session` to fetch
+ORM objects, where it also limits the amount of ORM objects generated at once.
+See the section :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
+for further background on using
+:paramref:`_engine.Connection.execution_options.yield_per` with the ORM.
+
+.. versionadded:: 1.4.40 Added
+ :paramref:`_engine.Connection.execution_options.yield_per`
+ as a Core level execution option to conveniently set streaming results,
+ buffer size, and partition size all at once in a manner that is transferrable
+ to that of the ORM's similar use case.
+
+.. _engine_stream_results_sr:
+
+Streaming with a dynamically growing buffer using stream_results
+-----------------------------------------------------------------
+
+To enable server side cursors without a specific partition size, the
+:paramref:`_engine.Connection.execution_options.stream_results` option may be
+used, which like :paramref:`_engine.Connection.execution_options.yield_per` may
+be called on the :class:`_engine.Connection` object or the statement object.
+
+When a :class:`_engine.Result` object delivered using the
+:paramref:`_engine.Connection.execution_options.stream_results` option
+is iterated directly, rows are fetched internally
using a default buffering scheme that buffers first a small set of rows,
then a larger and larger buffer on each fetch up to a pre-configured limit
-of 1000 rows. This can be affected using the ``max_row_buffer`` execution
-option::
+of 1000 rows. The maximum size of this buffer can be affected using the
+:paramref:`_engine.Connection.execution_options.max_row_buffer` execution option::
with engine.connect() as conn:
conn = conn.execution_options(stream_results=True, max_row_buffer=100)
result = conn.execute(text("select * from table"))
for row in result:
- _process_row(row)
-
-The size of the buffer may also be set to a fixed size using the
-:meth:`_engine.Result.yield_per` method. Calling this method with a number
-of rows will cause all result-fetching methods to work from
-buffers of the given size, only fetching new rows when the buffer is empty::
-
- with engine.connect() as conn:
- result = conn.execution_options(stream_results=True).execute(text("select * from table"))
+ print(f"{row}")
- for row in result.yield_per(100):
- _process_row(row)
-
-The ``stream_results`` option is also available with the ORM. When using the
-ORM, the :meth:`_engine.Result.yield_per` method should be used to set the
-number of ORM rows to be buffered each time while yielding
-(:meth:`_engine.Result.partitions` uses the "yield per" value by default for
-partition size)::
-
- with orm.Session(engine) as session:
- result = session.execute(
- select(User).order_by(User_id).execution_options(stream_results=True),
- )
- for partition in result.yield_per(100).partitions():
- _process_rows(partition)
-
-
-.. note:: ORM result sets currently must make use of :meth:`_engine.Result.yield_per`
- in order to achieve streaming ORM results.
- If the method is not used to set the number of rows to
- fetch before yielding, the entire result is fetched before rows are yielded.
- This may change in a future release so that the automatic buffer size used
- by :class:`_engine.Connection` takes place for ORM results as well.
-
-When using a :term:`1.x style` ORM query with :class:`_orm.Query`, yield_per is
-available via :meth:`_orm.Query.yield_per` - this also sets the ``stream_results``
-execution option::
-
- for row in session.query(User).yield_per(100):
- # process row
+While the :paramref:`_engine.Connection.execution_options.stream_results`
+option may be combined with use of the :meth:`_engine.Result.partitions`
+method, a specific partition size should be passed to
+:meth:`_engine.Result.partitions` so that the entire result is not fetched.
+It is usually more straightforward to use the
+:paramref:`_engine.Connection.execution_options.yield_per` option when setting
+up to use the :meth:`_engine.Result.partitions` method.
.. seealso::
:meth:`_engine.Result.partitions`
+ :meth:`_engine.Result.yield_per`
.. _dbengine_implicit:
.. versionadded:: 1.4 Added the :meth:`_engine.Connection.exec_driver_sql` method.
+.. _dbapi_connections_cursor:
+
Working with the DBAPI cursor directly
--------------------------------------
.. autoclass:: ChunkedIteratorResult
:members:
+.. autoclass:: FilterResult
+ :members:
+
.. autoclass:: FrozenResult
:members:
^^^^^^^^^
The ``yield_per`` execution option is an integer value which will cause the
-:class:`_engine.Result` to yield only a fixed count of rows at a time. It is
-often useful to use with a result partitioning method such as
-:meth:`_engine.Result.partitions`, e.g.::
+:class:`_engine.Result` to yield only a fixed count of rows at a time.
+When used as an execution option, ``yield_per`` is equivalent to making use
+of both the :paramref:`_engine.Connection.execution_options.stream_results`
+execution option, which selects for server side cursors to be used
+by the backend if supported, and the :meth:`_engine.Result.yield_per` method
+on the returned :class:`_engine.Result` object,
+which establishes a fixed size of rows to be fetched as well as a
+corresponding limit to how many ORM objects will be constructed at once.
+
+.. tip::
+
+ ``yield_per`` is now available as a Core execution option as well,
+ described in detail at :ref:`engine_stream_results`. This section details
+ the use of ``yield_per`` as an execution option with an ORM
+ :class:`_orm.Session`. The option behaves as similarly as possible
+ in both contexts.
+
+``yield_per`` when used with the ORM is typically established either
+via the :meth:`.Executable.execution_options` method on the given statement
+or by passing it to the :paramref:`_orm.Session.execute.execution_options`
+parameter of :meth:`_orm.Session.execute` or other similar :class:`_orm.Session`
+method. In the example below its invoked upon a statement::
>>> stmt = select(User).execution_options(yield_per=10)
- {sql}>>> for partition in session.execute(stmt).partitions(10):
- ... for row in partition:
- ... print(row)
+ {sql}>>> for row in session.execute(stmt):
+ ... print(row)
+ SELECT user_account.id, user_account.name, user_account.fullname
+ FROM user_account
+ [...] (){stop}
+ (User(id=1, name='spongebob', fullname='Spongebob Squarepants'),)
+ ...
+
+The above code is mostly equivalent as making use of the
+:paramref:`_engine.Connection.execution_options.stream_results` execution
+option, setting the :paramref:`_engine.Connection.execution_options.max_row_buffer`
+to the given integer size, and then using the :meth:`_engine.Result.yield_per`
+method on the :class:`_engine.Result` returned by the
+:class:`_orm.Session`, as in the following example::
+
+ # equivalent code
+ >>> stmt = select(User).execution_options(stream_results=True, max_row_buffer=10)
+ {sql}>>> for row in session.execute(stmt).yield_per(10):
+ ... print(row)
SELECT user_account.id, user_account.name, user_account.fullname
FROM user_account
[...] (){stop}
(User(id=1, name='spongebob', fullname='Spongebob Squarepants'),)
...
-For expediency, the :meth:`_engine.Result.yield_per` method may also be used
-with an ORM-enabled result set, which will have the similar effect at result
-fetching time as if the ``yield_per`` execution option were used, with the
-exception that ``stream_results`` option, described below, is not set
-automatically. The :meth:`_engine.Result.partitions` method, if used,
-automatically uses the number sent to :meth:`_engine.Result.yield_per` as the
-number of rows in each partition::
-
- >>> stmt = select(User)
- {sql} >>> for partition in session.execute(
- ... stmt, execution_options={"stream_results": True}
- ... ).yield_per(10).partitions():
+``yield_per`` is also commonly used in combination with the
+:meth:`_engine.Result.partitions` method, that will iterate rows in grouped
+partitions. The size of each partition defaults to the integer value passed to
+``yield_per``, as in the below example::
+
+ >>> stmt = select(User).execution_options(yield_per=10)
+ {sql}>>> for partition in session.execute(stmt).partitions():
... for row in partition:
... print(row)
SELECT user_account.id, user_account.name, user_account.fullname
(> 10K rows), to batch results in sub-collections and yield them
out partially, so that the Python interpreter doesn't need to declare
very large areas of memory which is both time consuming and leads
-to excessive memory use. The performance from fetching hundreds of
-thousands of rows can often double when a suitable yield-per setting
-(e.g. approximately 1000) is used, even with DBAPIs that buffer
-rows (which are most).
+to excessive memory use.
When ``yield_per`` is used, the
:paramref:`_engine.Connection.execution_options.stream_results` option is also
set for the Core execution, so that a streaming / server side cursor will be
-used if the backend supports it [1]_
+used if the backend supports it.
The ``yield_per`` execution option **is not compatible with subqueryload eager
loading or joinedload eager loading when using collections**. It is
-potentially compatible with selectinload eager loading, **provided the database
-driver supports multiple, independent cursors** [2]_ .
+potentially compatible with selectinload eager loading, provided the database
+driver supports multiple, independent cursors.
Additionally, the ``yield_per`` execution option is not compatible
with the :meth:`_engine.Result.unique` method; as this method relies upon
:meth:`_engine.Result.unique` filter, at the same time as the ``yield_per``
execution option is used.
-The ``yield_per`` execution option is equvialent to the
-:meth:`_orm.Query.yield_per` method in :term:`1.x style` ORM queries.
-
-.. [1] currently known are
- :mod:`_postgresql.psycopg2`,
- :mod:`_mysql.mysqldb` and
- :mod:`_mysql.pymysql`. Other backends will pre buffer
- all rows. The memory use of raw database rows is much less than that of an
- ORM-mapped object, but should still be taken into consideration when
- benchmarking.
+When using the legacy :class:`_orm.Query` object with
+:term:`1.x style` ORM use, the :meth:`_orm.Query.yield_per` method
+will have the same result as that of the ``yield_per`` execution option.
-.. [2] the :mod:`_postgresql.psycopg2`
- and :mod:`_sqlite.pysqlite` drivers are
- known to work, drivers for MySQL and SQL Server ODBC drivers do not.
.. seealso::
from .mock import create_mock_engine
from .reflection import Inspector
from .result import ChunkedIteratorResult
+from .result import FilterResult
from .result import FrozenResult
from .result import IteratorResult
from .result import MappingResult
:param stream_results: Available on: Connection, statement.
Indicate to the dialect that results should be
- "streamed" and not pre-buffered, if possible. This is a limitation
- of many DBAPIs. The flag is currently understood within a subset
- of dialects within the PostgreSQL and MySQL categories, and
- may be supported by other third party dialects as well.
+ "streamed" and not pre-buffered, if possible. For backends
+ such as PostgreSQL, MySQL and MariaDB, this indicates the use of
+ a "server side cursor" as opposed to a client side cursor.
+ Other backends such as that of Oracle may already use server
+ side cursors by default.
+
+ The usage of
+ :paramref:`_engine.Connection.execution_options.stream_results` is
+ usually combined with setting a fixed number of rows to to be fetched
+ in batches, to allow for efficient iteration of database rows while
+ at the same time not loading all result rows into memory at once;
+ this can be configured on a :class:`_engine.Result` object using the
+ :meth:`_engine.Result.yield_per` method, after execution has
+ returned a new :class:`_engine.Result`. If
+ :meth:`_engine.Result.yield_per` is not used,
+ the :paramref:`_engine.Connection.execution_options.stream_results`
+ mode of operation will instead use a dynamically sized buffer
+ which buffers sets of rows at a time, growing on each batch
+ based on a fixed growth size up until a limit which may
+ be configured using the
+ :paramref:`_engine.Connection.execution_options.max_row_buffer`
+ parameter.
+
+ When using the ORM to fetch ORM mapped objects from a result,
+ :meth:`_engine.Result.yield_per` should always be used with
+ :paramref:`_engine.Connection.execution_options.stream_results`,
+ so that the ORM does not fetch all rows into new ORM objects at once.
+
+ For typical use, the
+ :paramref:`_engine.Connection.execution_options.yield_per` execution
+ option should be preferred, which sets up both
+ :paramref:`_engine.Connection.execution_options.stream_results` and
+ :meth:`_engine.Result.yield_per` at once. This option is supported
+ both at a core level by :class:`_engine.Connection` as well as by the
+ ORM :class:`_engine.Session`; the latter is described at
+ :ref:`orm_queryguide_yield_per`.
.. seealso::
+ :ref:`engine_stream_results` - background on
+ :paramref:`_engine.Connection.execution_options.stream_results`
+
+ :paramref:`_engine.Connection.execution_options.max_row_buffer`
+
+ :paramref:`_engine.Connection.execution_options.yield_per`
+
+ :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
+ describing the ORM version of ``yield_per``
+
+ :param max_row_buffer: Available on: :class:`_engine.Connection`,
+ :class:`_sql.Executable`. Sets a maximum
+ buffer size to use when the
+ :paramref:`_engine.Connection.execution_options.stream_results`
+ execution option is used on a backend that supports server side
+ cursors. The default value if not specified is 1000.
+
+ .. seealso::
+
+ :paramref:`_engine.Connection.execution_options.stream_results`
+
:ref:`engine_stream_results`
+
+ :param yield_per: Available on: :class:`_engine.Connection`,
+ :class:`_sql.Executable`. Integer value applied which will
+ set the :paramref:`_engine.Connection.execution_options.stream_results`
+ execution option and invoke :meth:`_engine.Result.yield_per`
+ automatically at once. Allows equivalent functionality as
+ is present when using this parameter with the ORM.
+
+ .. versionadded:: 1.4.40
+
+ .. seealso::
+
+ :ref:`engine_stream_results` - background and examples
+ on using server side cursors with Core.
+
+ :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
+ describing the ORM version of ``yield_per``
+
+ :param schema_translate_map: Available on: :class:`_engine.Connection`,
+ :class:`_engine.Engine`, :class:`_sql.Executable`.
+
:param schema_translate_map: Available on: Connection, Engine.
A dictionary mapping schema names to schema names, that will be
applied to the :paramref:`_schema.Table.schema` element of each
# the only feature that branching provides
self = self.__branch_from
+ if execution_options:
+ yp = execution_options.get("yield_per", None)
+ if yp:
+ execution_options = execution_options.union(
+ {"stream_results": True, "max_row_buffer": yp}
+ )
+
try:
conn = self._dbapi_connection
if conn is None:
growth_factor=5,
initial_buffer=None,
):
-
self._max_row_buffer = execution_options.get("max_row_buffer", 1000)
if initial_buffer is not None:
return self.dialect.supports_sane_multi_rowcount
def _setup_result_proxy(self):
+ exec_opt = self.execution_options
+
if self.is_crud or self.is_text:
result = self._setup_dml_or_text_result()
+ yp = sr = False
else:
+ yp = exec_opt.get("yield_per", None)
+ sr = self._is_server_side or exec_opt.get("stream_results", False)
strategy = self.cursor_fetch_strategy
- if self._is_server_side and strategy is _cursor._DEFAULT_FETCH:
+ if sr and strategy is _cursor._DEFAULT_FETCH:
strategy = _cursor.BufferedRowCursorFetchStrategy(
self.cursor, self.execution_options
)
self._soft_closed = result._soft_closed
+ if yp:
+ result = result.yield_per(yp)
+
return result
def _setup_out_parameters(self, result):
@_generative
def yield_per(self, num):
- """Configure the row-fetching strategy to fetch num rows at a time.
+ """Configure the row-fetching strategy to fetch ``num`` rows at a time.
This impacts the underlying behavior of the result when iterating over
the result object, or otherwise making use of methods such as
conjunction with the
:paramref:`_engine.Connection.execution_options.stream_results`
execution option, which will allow the database dialect in use to make
- use of a server side cursor, if the DBAPI supports it.
+ use of a server side cursor, if the DBAPI supports a specific "server
+ side cursor" mode separate from its default mode of operation.
- Most DBAPIs do not use server side cursors by default, which means all
- rows will be fetched upfront from the database regardless of the
- :meth:`_engine.Result.yield_per` setting. However,
- :meth:`_engine.Result.yield_per` may still be useful in that it batches
- the SQLAlchemy-side processing of the raw data from the database, and
- additionally when used for ORM scenarios will batch the conversion of
- database rows into ORM entity rows.
+ .. tip::
+ Consider using the
+ :paramref:`_engine.Connection.execution_options.yield_per`
+ execution option, which will simultaneously set
+ :paramref:`_engine.Connection.execution_options.stream_results`
+ to ensure the use of server side cursors, as well as automatically
+ invoke the :meth:`_engine.Result.yield_per` method to establish
+ a fixed row buffer size at once.
+
+ The :paramref:`_engine.Connection.execution_options.yield_per`
+ execution option is available for ORM operations, with
+ :class:`_orm.Session`-oriented use described at
+ :ref:`orm_queryguide_yield_per`. The Core-only version which works
+ with :class:`_engine.Connection` is new as of SQLAlchemy 1.4.40.
.. versionadded:: 1.4
.. seealso::
- :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
+ :ref:`engine_stream_results` - describes Core behavior for
+ :meth:`_engine.Result.yield_per`
- :meth:`_engine.Result.partitions`
+ :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
"""
self._yield_per = num
When using the ORM, the :meth:`_engine.Result.partitions` method
is typically more effective from a memory perspective when it is
- combined with use of the :meth:`_engine.Result.yield_per` method,
- which instructs the ORM loading internals to only build a certain
- amount of ORM objects from a result at a time before yielding
- them out.
+ combined with use of the
+ :ref:`yield_per execution option <orm_queryguide_yield_per>`,
+ which instructs both the DBAPI driver to use server side cursors,
+ if available, as well as instructs the ORM loading internals to only
+ build a certain amount of ORM objects from a result at a time before
+ yielding them out.
.. versionadded:: 1.4
:param size: indicate the maximum number of rows to be present
in each list yielded. If None, makes use of the value set by
- :meth:`_engine.Result.yield_per`, if present, otherwise uses the
- :meth:`_engine.Result.fetchmany` default which may be backend
- specific.
+ the :meth:`_engine.Result.yield_per`, method, if it were called,
+ or the :paramref:`_engine.Connection.execution_options.yield_per`
+ execution option, which is equivalent in this regard. If
+ yield_per weren't set, it makes use of the
+ :meth:`_engine.Result.fetchmany` default, which may be backend
+ specific and not well defined.
:return: iterator of lists
.. seealso::
- :paramref:`.Connection.execution_options.stream_results`
+ :ref:`engine_stream_results`
:ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
"""A wrapper for a :class:`_engine.Result` that returns objects other than
:class:`_result.Row` objects, such as dictionaries or scalar objects.
+ :class:`.FilterResult` is the common base for additional result
+ APIs including :class:`.MappingResult`, :class:`.ScalarResult`
+ and :class:`.AsyncResult`.
+
"""
_post_creational_filter = None
+ @_generative
+ def yield_per(self, num):
+ """Configure the row-fetching strategy to fetch ``num`` rows at a time.
+
+ The :meth:`_engine.FilterResult.yield_per` method is a pass through
+ to the :meth:`_engine.Result.yield_per` method. See that method's
+ documentation for usage notes.
+
+ .. versionadded:: 1.4.40 - added :meth:`_engine.FilterResult.yield_per`
+ so that the method is available on all result set implementations
+
+ .. seealso::
+
+ :ref:`engine_stream_results` - describes Core behavior for
+ :meth:`_engine.Result.yield_per`
+
+ :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
+
+ """
+ self._real_result = self._real_result.yield_per(num)
+
def _soft_close(self, hard=False):
self._real_result._soft_close(hard=hard)
from ...engine.result import FilterResult
from ...engine.result import FrozenResult
from ...engine.result import MergedResult
+from ...sql.base import _generative
from ...util.concurrency import greenlet_spawn
"""
return self._metadata.keys
+ @_generative
def unique(self, strategy=None):
"""Apply unique filtering to the objects returned by this
:class:`_asyncio.AsyncResult`.
"""
self._unique_filter_state = (set(), strategy)
- return self
def columns(self, *col_expressions):
r"""Establish the columns that should be returned in each row.
else:
execution_options = execution_options.union(_orm_load_exec_options)
- if "yield_per" in execution_options or load_options._yield_per:
+ if load_options._yield_per:
execution_options = execution_options.union(
- {
- "stream_results": True,
- "max_row_buffer": execution_options.get(
- "yield_per", load_options._yield_per
- ),
- }
+ {"yield_per": load_options._yield_per}
)
bind_arguments["clause"] = statement
level. See the section :ref:`orm_queryguide_yield_per` for further
background on this option.
+ .. seealso::
+
+ :ref:`orm_queryguide_yield_per`
+
"""
self.load_options += {"_yield_per": count}
# run a close all connections.
conn.close()
+ @config.fixture()
+ def close_result_when_finished(self):
+ to_close = []
+ to_consume = []
+
+ def go(result, consume=False):
+ to_close.append(result)
+ if consume:
+ to_consume.append(result)
+
+ yield go
+ for r in to_consume:
+ try:
+ r.all()
+ except:
+ pass
+ for r in to_close:
+ try:
+ r.close()
+ except:
+ pass
+
@config.fixture()
def registry(self, metadata):
reg = registry(metadata=metadata)
from sqlalchemy import testing
from sqlalchemy import text
from sqlalchemy import union_all
+from sqlalchemy.engine import cursor as _cursor
from sqlalchemy.ext.asyncio import async_engine_from_config
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.ext.asyncio import engine as _async_engine
@testing.combinations(
(None,), ("scalars",), ("mappings",), argnames="filter_"
)
+ @testing.combinations(None, 2, 5, 10, argnames="yield_per")
+ @testing.combinations("method", "opt", argnames="yield_per_type")
@async_test
- async def test_partitions(self, async_engine, filter_):
+ async def test_partitions(
+ self, async_engine, filter_, yield_per, yield_per_type
+ ):
users = self.tables.users
async with async_engine.connect() as conn:
- result = await conn.stream(select(users))
+ stmt = select(users)
+ if yield_per and yield_per_type == "opt":
+ stmt = stmt.execution_options(yield_per=yield_per)
+ result = await conn.stream(stmt)
if filter_ == "mappings":
result = result.mappings()
elif filter_ == "scalars":
result = result.scalars(1)
+ if yield_per and yield_per_type == "method":
+ result = result.yield_per(yield_per)
+
check_result = []
- async for partition in result.partitions(5):
- check_result.append(partition)
+
+ # stream() sets stream_results unconditionally
+ assert isinstance(
+ result._real_result.cursor_strategy,
+ _cursor.BufferedRowCursorFetchStrategy,
+ )
+
+ if yield_per:
+ partition_size = yield_per
+
+ eq_(result._real_result.cursor_strategy._bufsize, yield_per)
+
+ async for partition in result.partitions():
+ check_result.append(partition)
+ else:
+ eq_(result._real_result.cursor_strategy._bufsize, 5)
+
+ partition_size = 5
+ async for partition in result.partitions(partition_size):
+ check_result.append(partition)
+
+ ranges = [
+ (i, min(20, i + partition_size))
+ for i in range(1, 21, partition_size)
+ ]
if filter_ == "mappings":
eq_(
{"user_id": i, "user_name": "name%d" % i}
for i in range(a, b)
]
- for (a, b) in [(1, 6), (6, 11), (11, 16), (16, 20)]
+ for (a, b) in ranges
],
)
elif filter_ == "scalars":
eq_(
check_result,
- [
- ["name%d" % i for i in range(a, b)]
- for (a, b) in [(1, 6), (6, 11), (11, 16), (16, 20)]
- ],
+ [["name%d" % i for i in range(a, b)] for (a, b) in ranges],
)
else:
eq_(
check_result,
[
[(i, "name%d" % i) for i in range(a, b)]
- for (a, b) in [(1, 6), (6, 11), (11, 16), (16, 20)]
+ for (a, b) in ranges
],
)
from sqlalchemy import Unicode
from sqlalchemy import union
from sqlalchemy import util
+from sqlalchemy.engine import cursor as _cursor
from sqlalchemy.engine import default
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.orm import aliased
if not k.startswith("_")
},
{
- "max_row_buffer": 15,
- "stream_results": True,
+ "yield_per": 15,
"foo": "bar",
"future_result": True,
},
if not k.startswith("_")
},
{
- "max_row_buffer": 15,
- "stream_results": True,
"yield_per": 15,
"future_result": True,
},
stmt = select(User).execution_options(yield_per=15)
result = sess.execute(stmt)
+
+ assert isinstance(
+ result.raw.cursor_strategy, _cursor.BufferedRowCursorFetchStrategy
+ )
+ eq_(result.raw.cursor_strategy._max_row_buffer, 15)
+
eq_(len(result.all()), 4)
def test_no_joinedload_opt(self):
Column("user_name", VARCHAR(20)),
test_needs_acid=True,
)
+ Table(
+ "test",
+ metadata,
+ Column("x", Integer, primary_key=True),
+ Column("y", String(50)),
+ )
def test_row_iteration(self, connection):
users = self.tables.users
with expect_raises_message(Exception, "canary"):
r.lastrowid
+ @testing.combinations("plain", "mapping", "scalar", argnames="result_type")
+ @testing.combinations(
+ "stream_results", "yield_per", "yield_per_meth", argnames="optname"
+ )
+ @testing.combinations(10, 50, argnames="value")
+ @testing.combinations("meth", "stmt", argnames="send_opts_how")
+ def test_stream_options(
+ self,
+ connection,
+ optname,
+ value,
+ send_opts_how,
+ result_type,
+ close_result_when_finished,
+ ):
+ table = self.tables.test
+
+ connection.execute(
+ table.insert(),
+ [{"x": i, "y": "t_%d" % i} for i in range(15, 3000)],
+ )
+
+ if optname == "stream_results":
+ opts = {"stream_results": True, "max_row_buffer": value}
+ elif optname == "yield_per":
+ opts = {"yield_per": value}
+ elif optname == "yield_per_meth":
+ opts = {"stream_results": True}
+ else:
+ assert False
+
+ if send_opts_how == "meth":
+ result = connection.execution_options(**opts).execute(
+ table.select()
+ )
+ elif send_opts_how == "stmt":
+ result = connection.execute(
+ table.select().execution_options(**opts)
+ )
+ else:
+ assert False
+
+ if result_type == "mapping":
+ result = result.mappings()
+ real_result = result._real_result
+ elif result_type == "scalar":
+ result = result.scalars()
+ real_result = result._real_result
+ else:
+ real_result = result
+
+ if optname == "yield_per_meth":
+ result = result.yield_per(value)
+
+ if result_type == "mapping" or result_type == "scalar":
+ real_result = result._real_result
+ else:
+ real_result = result
+
+ close_result_when_finished(result, consume=True)
+
+ if optname == "yield_per" and value is not None:
+ expected_opt = {
+ "stream_results": True,
+ "max_row_buffer": value,
+ "yield_per": value,
+ }
+ elif optname == "stream_results" and value is not None:
+ expected_opt = {
+ "stream_results": True,
+ "max_row_buffer": value,
+ }
+ else:
+ expected_opt = None
+
+ if expected_opt is not None:
+ eq_(real_result.context.execution_options, expected_opt)
+
+ if value is None:
+ assert isinstance(
+ real_result.cursor_strategy, _cursor.CursorFetchStrategy
+ )
+ return
+
+ assert isinstance(
+ real_result.cursor_strategy, _cursor.BufferedRowCursorFetchStrategy
+ )
+ eq_(real_result.cursor_strategy._max_row_buffer, value)
+
+ if optname == "yield_per" or optname == "yield_per_meth":
+ eq_(real_result.cursor_strategy._bufsize, value)
+ else:
+ eq_(real_result.cursor_strategy._bufsize, min(value, 5))
+ eq_(len(real_result.cursor_strategy._rowbuffer), 1)
+
+ next(result)
+ next(result)
+
+ if optname == "yield_per" or optname == "yield_per_meth":
+ eq_(len(real_result.cursor_strategy._rowbuffer), value - 1)
+ else:
+ # based on default growth of 5
+ eq_(len(real_result.cursor_strategy._rowbuffer), 4)
+
+ for i, row in enumerate(result):
+ if i == 186:
+ break
+
+ if optname == "yield_per" or optname == "yield_per_meth":
+ eq_(
+ len(real_result.cursor_strategy._rowbuffer),
+ value - (188 % value),
+ )
+ else:
+ # based on default growth of 5
+ eq_(
+ len(real_result.cursor_strategy._rowbuffer),
+ 7 if value == 10 else 42,
+ )
+
+ if optname == "yield_per" or optname == "yield_per_meth":
+ # ensure partition is set up to same size
+ partition = next(result.partitions())
+ eq_(len(partition), value)
+
class KeyTargetingTest(fixtures.TablesTest):
run_inserts = "once"