From: Mike Bayer Date: Thu, 30 Jun 2022 23:10:06 +0000 (-0400) Subject: repair yield_per for non-SS dialects and add new options X-Git-Tag: rel_1_4_40~40 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=06685f392d2e36981b4073b902539ad966c57327;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git repair yield_per for non-SS dialects and add new options Implemented new :paramref:`_engine.Connection.execution_options.yield_per` execution option for :class:`_engine.Connection` in Core, to mirror that of the same :ref:`yield_per ` option available in the ORM. The option sets both the :paramref:`_engine.Connection.execution_options.stream_results` option at the same time as invoking :meth:`_engine.Result.yield_per`, to provide the most common streaming result configuration which also mirrors that of the ORM use case in its usage pattern. Fixed bug in :class:`_engine.Result` where the usage of a buffered result strategy would not be used if the dialect in use did not support an explicit "server side cursor" setting, when using :paramref:`_engine.Connection.execution_options.stream_results`. This is in error as DBAPIs such as that of SQLite and Oracle already use a non-buffered result fetching scheme, which still benefits from usage of partial result fetching. The "buffered" strategy is now used in all cases where :paramref:`_engine.Connection.execution_options.stream_results` is set. Added :meth:`.FilterResult.yield_per` so that result implementations such as :class:`.MappingResult`, :class:`.ScalarResult` and :class:`.AsyncResult` have access to this method. Fixes: #8199 Change-Id: I6dde3cbe483a1bf81e945561b60f4b7d1c434750 (cherry picked from commit e5a0cdb2eaa1d7f381e93d0529a7f8e6d5888877) --- diff --git a/doc/build/changelog/unreleased_14/yp.rst b/doc/build/changelog/unreleased_14/yp.rst new file mode 100644 index 0000000000..74e2c6a468 --- /dev/null +++ b/doc/build/changelog/unreleased_14/yp.rst @@ -0,0 +1,38 @@ +.. change:: + :tags: usecase, engine + + Implemented new :paramref:`_engine.Connection.execution_options.yield_per` + execution option for :class:`_engine.Connection` in Core, to mirror that of + the same :ref:`yield_per ` option available in + the ORM. The option sets both the + :paramref:`_engine.Connection.execution_options.stream_results` option at + the same time as invoking :meth:`_engine.Result.yield_per`, to provide the + most common streaming result configuration which also mirrors that of the + ORM use case in its usage pattern. + + .. seealso:: + + :ref:`engine_stream_results` - revised documentation + + +.. change:: + :tags: bug, engine + + Fixed bug in :class:`_engine.Result` where the usage of a buffered result + strategy would not be used if the dialect in use did not support an + explicit "server side cursor" setting, when using + :paramref:`_engine.Connection.execution_options.stream_results`. This is in + error as DBAPIs such as that of SQLite and Oracle already use a + non-buffered result fetching scheme, which still benefits from usage of + partial result fetching. The "buffered" strategy is now used in all + cases where :paramref:`_engine.Connection.execution_options.stream_results` + is set. + + +.. change:: + :tags: bug, engine + :tickets: 8199 + + Added :meth:`.FilterResult.yield_per` so that result implementations + such as :class:`.MappingResult`, :class:`.ScalarResult` and + :class:`.AsyncResult` have access to this method. diff --git a/doc/build/core/connections.rst b/doc/build/core/connections.rst index b9ba6d9b39..5228235e73 100644 --- a/doc/build/core/connections.rst +++ b/doc/build/core/connections.rst @@ -633,20 +633,33 @@ To sum up: Using Server Side Cursors (a.k.a. stream results) ================================================== -A limited number of dialects have explicit support for the concept of "server -side cursors" vs. "buffered cursors". While a server side cursor implies a -variety of different capabilities, within SQLAlchemy's engine and dialect -implementation, it refers only to whether or not a particular set of results is -fully buffered in memory before they are fetched from the cursor, using a -method such as ``cursor.fetchall()``. SQLAlchemy has no direct support -for cursor behaviors such as scrolling; to make use of these features for -a particular DBAPI, use the cursor directly as documented at -:ref:`dbapi_connections`. - -Some DBAPIs, such as the cx_Oracle DBAPI, exclusively use server side cursors -internally. All result sets are essentially unbuffered across the total span -of a result set, utilizing only a smaller buffer that is of a fixed size such -as 100 rows at a time. +Some backends feature explicit support for the concept of "server +side cursors" versus "client side cursors". A client side cursor here +means that the database driver fully fetches all rows from a result set +into memory before returning from a statement execution. Drivers such as +those of PostgreSQL and MySQL/MariaDB generally use client side cursors +by default. A server side cursor, by contrast, indicates that result rows +remain pending within the database server's state as result rows are consumed +by the client. The drivers for Oracle generally use a "server side" model, +for example, and the SQLite dialect, while not using a real "client / server" +architecture, still uses an unbuffered result fetching approach that will +leave result rows outside of process memory before they are consumed. + +.. topic:: What we really mean is "buffered" vs. "unbuffered" results + + Server side cursors also imply a wider set of features with relational + databases, such as the ability to "scroll" a cursor forwards and backwards. + SQLAlchemy does not include any explicit support for these behaviors; within + SQLAlchemy itself, the general term "server side cursors" should be considered + to mean "unbuffered results" and "client side cursors" means "result rows + are buffered into memory before the first row is returned". To work with + a richer "server side cursor" featureset specific to a certain DBAPI driver, + see the section :ref:`dbapi_connections_cursor`. + +From this basic architecture it follows that a "server side cursor" is more +memory efficient when fetching very large result sets, while at the same time +may introduce more complexity in the client/server communication process +and be less efficient for small result sets (typically less than 10000 rows). For those dialects that have conditional support for buffered or unbuffered results, there are usually caveats to the use of the "unbuffered", or server @@ -665,75 +678,119 @@ unbuffered cursors are not generally useful except in the uncommon case of an application fetching a very large number of rows in chunks, where the processing of these rows can be complete before more rows are fetched. -To make use of a server side cursor for a particular execution, the -:paramref:`_engine.Connection.execution_options.stream_results` option -is used, which may be called on the :class:`_engine.Connection` object, -on the statement object, or in the ORM-level contexts mentioned below. - -When using this option for a statement, it's usually appropriate to use -a method like :meth:`_engine.Result.partitions` to work on small sections -of the result set at a time, while also fetching enough rows for each -pull so that the operation is efficient:: +For database drivers that provide client and server side cursor options, +the :paramref:`_engine.Connection.execution_options.stream_results` +and :paramref:`_engine.Connection.execution_options.yield_per` execution +options provide access to "server side cursors" on a per-:class:`_engine.Connection` +or per-statement basis. Similar options exist when using an ORM +:class:`_orm.Session` as well. - with engine.connect() as conn: - result = conn.execution_options(stream_results=True).execute(text("select * from table")) +Streaming with a fixed buffer via yield_per +-------------------------------------------- - for partition in result.partitions(100): - _process_rows(partition) +As individual row-fetch operations with fully unbuffered server side cursors +are typically more expensive than fetching batches of rows at once, The +:paramref:`_engine.Connection.execution_options.yield_per` execution option +configures a :class:`_engine.Connection` or statement to make use of +server-side cursors as are available, while at the same time configuring a +fixed-size buffer of rows that will retrieve rows from the server in batches as +they are consumed. This parameter may be to a positive integer value using the +:meth:`_engine.Connection.execution_options` method on +:class:`_engine.Connection` or on a statement using the +:meth:`.Executable.execution_options` method. + +.. versionadded:: 1.4.40 :paramref:`_engine.Connection.execution_options.yield_per` as a + Core-only option is new as of SQLAlchemy 1.4.40; for prior 1.4 versions, + use :paramref:`_engine.Connection.execution_options.stream_results` + directly in combination with :meth:`_engine.Result.yield_per`. + +Using this option is equivalent to manually setting the +:paramref:`_engine.Connection.execution_options.stream_results` option, +described in the next section, and then invoking the +:meth:`_engine.Result.yield_per` method on the :class:`_engine.Result` +object with the given integer value. In both cases, the effect this +combination has includes: + +* server side cursors mode is selected for the given backend, if available + and not already the default behavior for that backend +* as result rows are fetched, they will be buffered in batches, where the + size of each batch up until the last batch will be equal to the integer + argument passed to the + :paramref:`_engine.Connection.execution_options.yield_per` option or the + :meth:`_engine.Result.yield_per` method; the last batch is then sized against + the remaining rows fewer than this size +* The default partition size used by the :meth:`_engine.Result.partitions` + method, if used, will be made equal to this integer size as well. + +These three behaviors are illustrated in the example below:: + with engine.connect() as conn: + result = ( + conn. + execution_options(yield_per=100). + execute(text("select * from table")) + ) -If the :class:`_engine.Result` is iterated directly, rows are fetched internally + for partition in result.partitions(): + # partition is an iterable that will be at most 100 items + for row in partition: + print(f"{row}") + +The above example illustrates the combination of ``yield_per=100`` along +with using the :meth:`_engine.Result.partitions` method to run processing +on rows in batches that match the size fetched from the server. The +use of :meth:`_engine.Result.partitions` is optional, and if the +:class:`_engine.Result` is iterated directly, a new batch of rows will be +buffered for each 100 rows fetched. Calling a method such as +:meth:`_engine.Result.all` should **not** be used, as this will fully +fetch all remaining rows at once and defeat the purpose of using ``yield_per``. + +The :paramref:`_engine.Connection.execution_options.yield_per` option +is portable to the ORM as well, used by a :class:`_orm.Session` to fetch +ORM objects, where it also limits the amount of ORM objects generated at once. +See the section :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` +for further background on using +:paramref:`_engine.Connection.execution_options.yield_per` with the ORM. + +.. versionadded:: 1.4.40 Added + :paramref:`_engine.Connection.execution_options.yield_per` + as a Core level execution option to conveniently set streaming results, + buffer size, and partition size all at once in a manner that is transferrable + to that of the ORM's similar use case. + +.. _engine_stream_results_sr: + +Streaming with a dynamically growing buffer using stream_results +----------------------------------------------------------------- + +To enable server side cursors without a specific partition size, the +:paramref:`_engine.Connection.execution_options.stream_results` option may be +used, which like :paramref:`_engine.Connection.execution_options.yield_per` may +be called on the :class:`_engine.Connection` object or the statement object. + +When a :class:`_engine.Result` object delivered using the +:paramref:`_engine.Connection.execution_options.stream_results` option +is iterated directly, rows are fetched internally using a default buffering scheme that buffers first a small set of rows, then a larger and larger buffer on each fetch up to a pre-configured limit -of 1000 rows. This can be affected using the ``max_row_buffer`` execution -option:: +of 1000 rows. The maximum size of this buffer can be affected using the +:paramref:`_engine.Connection.execution_options.max_row_buffer` execution option:: with engine.connect() as conn: conn = conn.execution_options(stream_results=True, max_row_buffer=100) result = conn.execute(text("select * from table")) for row in result: - _process_row(row) - -The size of the buffer may also be set to a fixed size using the -:meth:`_engine.Result.yield_per` method. Calling this method with a number -of rows will cause all result-fetching methods to work from -buffers of the given size, only fetching new rows when the buffer is empty:: - - with engine.connect() as conn: - result = conn.execution_options(stream_results=True).execute(text("select * from table")) + print(f"{row}") - for row in result.yield_per(100): - _process_row(row) - -The ``stream_results`` option is also available with the ORM. When using the -ORM, the :meth:`_engine.Result.yield_per` method should be used to set the -number of ORM rows to be buffered each time while yielding -(:meth:`_engine.Result.partitions` uses the "yield per" value by default for -partition size):: - - with orm.Session(engine) as session: - result = session.execute( - select(User).order_by(User_id).execution_options(stream_results=True), - ) - for partition in result.yield_per(100).partitions(): - _process_rows(partition) - - -.. note:: ORM result sets currently must make use of :meth:`_engine.Result.yield_per` - in order to achieve streaming ORM results. - If the method is not used to set the number of rows to - fetch before yielding, the entire result is fetched before rows are yielded. - This may change in a future release so that the automatic buffer size used - by :class:`_engine.Connection` takes place for ORM results as well. - -When using a :term:`1.x style` ORM query with :class:`_orm.Query`, yield_per is -available via :meth:`_orm.Query.yield_per` - this also sets the ``stream_results`` -execution option:: - - for row in session.query(User).yield_per(100): - # process row +While the :paramref:`_engine.Connection.execution_options.stream_results` +option may be combined with use of the :meth:`_engine.Result.partitions` +method, a specific partition size should be passed to +:meth:`_engine.Result.partitions` so that the entire result is not fetched. +It is usually more straightforward to use the +:paramref:`_engine.Connection.execution_options.yield_per` option when setting +up to use the :meth:`_engine.Result.partitions` method. .. seealso:: @@ -741,6 +798,7 @@ execution option:: :meth:`_engine.Result.partitions` + :meth:`_engine.Result.yield_per` .. _dbengine_implicit: @@ -1973,6 +2031,8 @@ method may be used:: .. versionadded:: 1.4 Added the :meth:`_engine.Connection.exec_driver_sql` method. +.. _dbapi_connections_cursor: + Working with the DBAPI cursor directly -------------------------------------- @@ -2178,6 +2238,9 @@ Result Set API .. autoclass:: ChunkedIteratorResult :members: +.. autoclass:: FilterResult + :members: + .. autoclass:: FrozenResult :members: diff --git a/doc/build/orm/queryguide.rst b/doc/build/orm/queryguide.rst index 261be27812..06d3dace90 100644 --- a/doc/build/orm/queryguide.rst +++ b/doc/build/orm/queryguide.rst @@ -1003,32 +1003,62 @@ Yield Per ^^^^^^^^^ The ``yield_per`` execution option is an integer value which will cause the -:class:`_engine.Result` to yield only a fixed count of rows at a time. It is -often useful to use with a result partitioning method such as -:meth:`_engine.Result.partitions`, e.g.:: +:class:`_engine.Result` to yield only a fixed count of rows at a time. +When used as an execution option, ``yield_per`` is equivalent to making use +of both the :paramref:`_engine.Connection.execution_options.stream_results` +execution option, which selects for server side cursors to be used +by the backend if supported, and the :meth:`_engine.Result.yield_per` method +on the returned :class:`_engine.Result` object, +which establishes a fixed size of rows to be fetched as well as a +corresponding limit to how many ORM objects will be constructed at once. + +.. tip:: + + ``yield_per`` is now available as a Core execution option as well, + described in detail at :ref:`engine_stream_results`. This section details + the use of ``yield_per`` as an execution option with an ORM + :class:`_orm.Session`. The option behaves as similarly as possible + in both contexts. + +``yield_per`` when used with the ORM is typically established either +via the :meth:`.Executable.execution_options` method on the given statement +or by passing it to the :paramref:`_orm.Session.execute.execution_options` +parameter of :meth:`_orm.Session.execute` or other similar :class:`_orm.Session` +method. In the example below its invoked upon a statement:: >>> stmt = select(User).execution_options(yield_per=10) - {sql}>>> for partition in session.execute(stmt).partitions(10): - ... for row in partition: - ... print(row) + {sql}>>> for row in session.execute(stmt): + ... print(row) + SELECT user_account.id, user_account.name, user_account.fullname + FROM user_account + [...] (){stop} + (User(id=1, name='spongebob', fullname='Spongebob Squarepants'),) + ... + +The above code is mostly equivalent as making use of the +:paramref:`_engine.Connection.execution_options.stream_results` execution +option, setting the :paramref:`_engine.Connection.execution_options.max_row_buffer` +to the given integer size, and then using the :meth:`_engine.Result.yield_per` +method on the :class:`_engine.Result` returned by the +:class:`_orm.Session`, as in the following example:: + + # equivalent code + >>> stmt = select(User).execution_options(stream_results=True, max_row_buffer=10) + {sql}>>> for row in session.execute(stmt).yield_per(10): + ... print(row) SELECT user_account.id, user_account.name, user_account.fullname FROM user_account [...] (){stop} (User(id=1, name='spongebob', fullname='Spongebob Squarepants'),) ... -For expediency, the :meth:`_engine.Result.yield_per` method may also be used -with an ORM-enabled result set, which will have the similar effect at result -fetching time as if the ``yield_per`` execution option were used, with the -exception that ``stream_results`` option, described below, is not set -automatically. The :meth:`_engine.Result.partitions` method, if used, -automatically uses the number sent to :meth:`_engine.Result.yield_per` as the -number of rows in each partition:: - - >>> stmt = select(User) - {sql} >>> for partition in session.execute( - ... stmt, execution_options={"stream_results": True} - ... ).yield_per(10).partitions(): +``yield_per`` is also commonly used in combination with the +:meth:`_engine.Result.partitions` method, that will iterate rows in grouped +partitions. The size of each partition defaults to the integer value passed to +``yield_per``, as in the below example:: + + >>> stmt = select(User).execution_options(yield_per=10) + {sql}>>> for partition in session.execute(stmt).partitions(): ... for row in partition: ... print(row) SELECT user_account.id, user_account.name, user_account.fullname @@ -1041,20 +1071,17 @@ The purpose of "yield per" is when fetching very large result sets (> 10K rows), to batch results in sub-collections and yield them out partially, so that the Python interpreter doesn't need to declare very large areas of memory which is both time consuming and leads -to excessive memory use. The performance from fetching hundreds of -thousands of rows can often double when a suitable yield-per setting -(e.g. approximately 1000) is used, even with DBAPIs that buffer -rows (which are most). +to excessive memory use. When ``yield_per`` is used, the :paramref:`_engine.Connection.execution_options.stream_results` option is also set for the Core execution, so that a streaming / server side cursor will be -used if the backend supports it [1]_ +used if the backend supports it. The ``yield_per`` execution option **is not compatible with subqueryload eager loading or joinedload eager loading when using collections**. It is -potentially compatible with selectinload eager loading, **provided the database -driver supports multiple, independent cursors** [2]_ . +potentially compatible with selectinload eager loading, provided the database +driver supports multiple, independent cursors. Additionally, the ``yield_per`` execution option is not compatible with the :meth:`_engine.Result.unique` method; as this method relies upon @@ -1067,20 +1094,10 @@ large number of rows. :meth:`_engine.Result.unique` filter, at the same time as the ``yield_per`` execution option is used. -The ``yield_per`` execution option is equvialent to the -:meth:`_orm.Query.yield_per` method in :term:`1.x style` ORM queries. - -.. [1] currently known are - :mod:`_postgresql.psycopg2`, - :mod:`_mysql.mysqldb` and - :mod:`_mysql.pymysql`. Other backends will pre buffer - all rows. The memory use of raw database rows is much less than that of an - ORM-mapped object, but should still be taken into consideration when - benchmarking. +When using the legacy :class:`_orm.Query` object with +:term:`1.x style` ORM use, the :meth:`_orm.Query.yield_per` method +will have the same result as that of the ``yield_per`` execution option. -.. [2] the :mod:`_postgresql.psycopg2` - and :mod:`_sqlite.pysqlite` drivers are - known to work, drivers for MySQL and SQL Server ODBC drivers do not. .. seealso:: diff --git a/lib/sqlalchemy/engine/__init__.py b/lib/sqlalchemy/engine/__init__.py index 488e41de33..2437e170df 100644 --- a/lib/sqlalchemy/engine/__init__.py +++ b/lib/sqlalchemy/engine/__init__.py @@ -44,6 +44,7 @@ from .interfaces import TypeCompiler from .mock import create_mock_engine from .reflection import Inspector from .result import ChunkedIteratorResult +from .result import FilterResult from .result import FrozenResult from .result import IteratorResult from .result import MappingResult diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index 1507e159ed..8a8cab140b 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -364,15 +364,89 @@ class Connection(Connectable): :param stream_results: Available on: Connection, statement. Indicate to the dialect that results should be - "streamed" and not pre-buffered, if possible. This is a limitation - of many DBAPIs. The flag is currently understood within a subset - of dialects within the PostgreSQL and MySQL categories, and - may be supported by other third party dialects as well. + "streamed" and not pre-buffered, if possible. For backends + such as PostgreSQL, MySQL and MariaDB, this indicates the use of + a "server side cursor" as opposed to a client side cursor. + Other backends such as that of Oracle may already use server + side cursors by default. + + The usage of + :paramref:`_engine.Connection.execution_options.stream_results` is + usually combined with setting a fixed number of rows to to be fetched + in batches, to allow for efficient iteration of database rows while + at the same time not loading all result rows into memory at once; + this can be configured on a :class:`_engine.Result` object using the + :meth:`_engine.Result.yield_per` method, after execution has + returned a new :class:`_engine.Result`. If + :meth:`_engine.Result.yield_per` is not used, + the :paramref:`_engine.Connection.execution_options.stream_results` + mode of operation will instead use a dynamically sized buffer + which buffers sets of rows at a time, growing on each batch + based on a fixed growth size up until a limit which may + be configured using the + :paramref:`_engine.Connection.execution_options.max_row_buffer` + parameter. + + When using the ORM to fetch ORM mapped objects from a result, + :meth:`_engine.Result.yield_per` should always be used with + :paramref:`_engine.Connection.execution_options.stream_results`, + so that the ORM does not fetch all rows into new ORM objects at once. + + For typical use, the + :paramref:`_engine.Connection.execution_options.yield_per` execution + option should be preferred, which sets up both + :paramref:`_engine.Connection.execution_options.stream_results` and + :meth:`_engine.Result.yield_per` at once. This option is supported + both at a core level by :class:`_engine.Connection` as well as by the + ORM :class:`_engine.Session`; the latter is described at + :ref:`orm_queryguide_yield_per`. .. seealso:: + :ref:`engine_stream_results` - background on + :paramref:`_engine.Connection.execution_options.stream_results` + + :paramref:`_engine.Connection.execution_options.max_row_buffer` + + :paramref:`_engine.Connection.execution_options.yield_per` + + :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` + describing the ORM version of ``yield_per`` + + :param max_row_buffer: Available on: :class:`_engine.Connection`, + :class:`_sql.Executable`. Sets a maximum + buffer size to use when the + :paramref:`_engine.Connection.execution_options.stream_results` + execution option is used on a backend that supports server side + cursors. The default value if not specified is 1000. + + .. seealso:: + + :paramref:`_engine.Connection.execution_options.stream_results` + :ref:`engine_stream_results` + + :param yield_per: Available on: :class:`_engine.Connection`, + :class:`_sql.Executable`. Integer value applied which will + set the :paramref:`_engine.Connection.execution_options.stream_results` + execution option and invoke :meth:`_engine.Result.yield_per` + automatically at once. Allows equivalent functionality as + is present when using this parameter with the ORM. + + .. versionadded:: 1.4.40 + + .. seealso:: + + :ref:`engine_stream_results` - background and examples + on using server side cursors with Core. + + :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` + describing the ORM version of ``yield_per`` + + :param schema_translate_map: Available on: :class:`_engine.Connection`, + :class:`_engine.Engine`, :class:`_sql.Executable`. + :param schema_translate_map: Available on: Connection, Engine. A dictionary mapping schema names to schema names, that will be applied to the :paramref:`_schema.Table.schema` element of each @@ -1711,6 +1785,13 @@ class Connection(Connectable): # the only feature that branching provides self = self.__branch_from + if execution_options: + yp = execution_options.get("yield_per", None) + if yp: + execution_options = execution_options.union( + {"stream_results": True, "max_row_buffer": yp} + ) + try: conn = self._dbapi_connection if conn is None: diff --git a/lib/sqlalchemy/engine/cursor.py b/lib/sqlalchemy/engine/cursor.py index e17422e1c3..abe58e2fde 100644 --- a/lib/sqlalchemy/engine/cursor.py +++ b/lib/sqlalchemy/engine/cursor.py @@ -1021,7 +1021,6 @@ class BufferedRowCursorFetchStrategy(CursorFetchStrategy): growth_factor=5, initial_buffer=None, ): - self._max_row_buffer = execution_options.get("max_row_buffer", 1000) if initial_buffer is not None: diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index 028c4b0713..268a2d6093 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -1445,11 +1445,16 @@ class DefaultExecutionContext(interfaces.ExecutionContext): return self.dialect.supports_sane_multi_rowcount def _setup_result_proxy(self): + exec_opt = self.execution_options + if self.is_crud or self.is_text: result = self._setup_dml_or_text_result() + yp = sr = False else: + yp = exec_opt.get("yield_per", None) + sr = self._is_server_side or exec_opt.get("stream_results", False) strategy = self.cursor_fetch_strategy - if self._is_server_side and strategy is _cursor._DEFAULT_FETCH: + if sr and strategy is _cursor._DEFAULT_FETCH: strategy = _cursor.BufferedRowCursorFetchStrategy( self.cursor, self.execution_options ) @@ -1482,6 +1487,9 @@ class DefaultExecutionContext(interfaces.ExecutionContext): self._soft_closed = result._soft_closed + if yp: + result = result.yield_per(yp) + return result def _setup_out_parameters(self, result): diff --git a/lib/sqlalchemy/engine/result.py b/lib/sqlalchemy/engine/result.py index 6ca8f8c9d9..912dccf4bf 100644 --- a/lib/sqlalchemy/engine/result.py +++ b/lib/sqlalchemy/engine/result.py @@ -773,7 +773,7 @@ class Result(_WithKeys, ResultInternal): @_generative def yield_per(self, num): - """Configure the row-fetching strategy to fetch num rows at a time. + """Configure the row-fetching strategy to fetch ``num`` rows at a time. This impacts the underlying behavior of the result when iterating over the result object, or otherwise making use of methods such as @@ -788,16 +788,24 @@ class Result(_WithKeys, ResultInternal): conjunction with the :paramref:`_engine.Connection.execution_options.stream_results` execution option, which will allow the database dialect in use to make - use of a server side cursor, if the DBAPI supports it. + use of a server side cursor, if the DBAPI supports a specific "server + side cursor" mode separate from its default mode of operation. - Most DBAPIs do not use server side cursors by default, which means all - rows will be fetched upfront from the database regardless of the - :meth:`_engine.Result.yield_per` setting. However, - :meth:`_engine.Result.yield_per` may still be useful in that it batches - the SQLAlchemy-side processing of the raw data from the database, and - additionally when used for ORM scenarios will batch the conversion of - database rows into ORM entity rows. + .. tip:: + Consider using the + :paramref:`_engine.Connection.execution_options.yield_per` + execution option, which will simultaneously set + :paramref:`_engine.Connection.execution_options.stream_results` + to ensure the use of server side cursors, as well as automatically + invoke the :meth:`_engine.Result.yield_per` method to establish + a fixed row buffer size at once. + + The :paramref:`_engine.Connection.execution_options.yield_per` + execution option is available for ORM operations, with + :class:`_orm.Session`-oriented use described at + :ref:`orm_queryguide_yield_per`. The Core-only version which works + with :class:`_engine.Connection` is new as of SQLAlchemy 1.4.40. .. versionadded:: 1.4 @@ -806,9 +814,10 @@ class Result(_WithKeys, ResultInternal): .. seealso:: - :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` + :ref:`engine_stream_results` - describes Core behavior for + :meth:`_engine.Result.yield_per` - :meth:`_engine.Result.partitions` + :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` """ self._yield_per = num @@ -1005,24 +1014,29 @@ class Result(_WithKeys, ResultInternal): When using the ORM, the :meth:`_engine.Result.partitions` method is typically more effective from a memory perspective when it is - combined with use of the :meth:`_engine.Result.yield_per` method, - which instructs the ORM loading internals to only build a certain - amount of ORM objects from a result at a time before yielding - them out. + combined with use of the + :ref:`yield_per execution option `, + which instructs both the DBAPI driver to use server side cursors, + if available, as well as instructs the ORM loading internals to only + build a certain amount of ORM objects from a result at a time before + yielding them out. .. versionadded:: 1.4 :param size: indicate the maximum number of rows to be present in each list yielded. If None, makes use of the value set by - :meth:`_engine.Result.yield_per`, if present, otherwise uses the - :meth:`_engine.Result.fetchmany` default which may be backend - specific. + the :meth:`_engine.Result.yield_per`, method, if it were called, + or the :paramref:`_engine.Connection.execution_options.yield_per` + execution option, which is equivalent in this regard. If + yield_per weren't set, it makes use of the + :meth:`_engine.Result.fetchmany` default, which may be backend + specific and not well defined. :return: iterator of lists .. seealso:: - :paramref:`.Connection.execution_options.stream_results` + :ref:`engine_stream_results` :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` @@ -1283,10 +1297,35 @@ class FilterResult(ResultInternal): """A wrapper for a :class:`_engine.Result` that returns objects other than :class:`_result.Row` objects, such as dictionaries or scalar objects. + :class:`.FilterResult` is the common base for additional result + APIs including :class:`.MappingResult`, :class:`.ScalarResult` + and :class:`.AsyncResult`. + """ _post_creational_filter = None + @_generative + def yield_per(self, num): + """Configure the row-fetching strategy to fetch ``num`` rows at a time. + + The :meth:`_engine.FilterResult.yield_per` method is a pass through + to the :meth:`_engine.Result.yield_per` method. See that method's + documentation for usage notes. + + .. versionadded:: 1.4.40 - added :meth:`_engine.FilterResult.yield_per` + so that the method is available on all result set implementations + + .. seealso:: + + :ref:`engine_stream_results` - describes Core behavior for + :meth:`_engine.Result.yield_per` + + :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` + + """ + self._real_result = self._real_result.yield_per(num) + def _soft_close(self, hard=False): self._real_result._soft_close(hard=hard) diff --git a/lib/sqlalchemy/ext/asyncio/result.py b/lib/sqlalchemy/ext/asyncio/result.py index 62e4a9a0e5..c69fe191be 100644 --- a/lib/sqlalchemy/ext/asyncio/result.py +++ b/lib/sqlalchemy/ext/asyncio/result.py @@ -12,6 +12,7 @@ from ...engine.result import _NO_ROW from ...engine.result import FilterResult from ...engine.result import FrozenResult from ...engine.result import MergedResult +from ...sql.base import _generative from ...util.concurrency import greenlet_spawn @@ -63,6 +64,7 @@ class AsyncResult(AsyncCommon): """ return self._metadata.keys + @_generative def unique(self, strategy=None): """Apply unique filtering to the objects returned by this :class:`_asyncio.AsyncResult`. @@ -73,7 +75,6 @@ class AsyncResult(AsyncCommon): """ self._unique_filter_state = (set(), strategy) - return self def columns(self, *col_expressions): r"""Establish the columns that should be returned in each row. diff --git a/lib/sqlalchemy/orm/context.py b/lib/sqlalchemy/orm/context.py index ab1fc4045b..7cedc2b43c 100644 --- a/lib/sqlalchemy/orm/context.py +++ b/lib/sqlalchemy/orm/context.py @@ -286,14 +286,9 @@ class ORMCompileState(CompileState): else: execution_options = execution_options.union(_orm_load_exec_options) - if "yield_per" in execution_options or load_options._yield_per: + if load_options._yield_per: execution_options = execution_options.union( - { - "stream_results": True, - "max_row_buffer": execution_options.get( - "yield_per", load_options._yield_per - ), - } + {"yield_per": load_options._yield_per} ) bind_arguments["clause"] = statement diff --git a/lib/sqlalchemy/orm/query.py b/lib/sqlalchemy/orm/query.py index 0ab3912065..99e4591431 100644 --- a/lib/sqlalchemy/orm/query.py +++ b/lib/sqlalchemy/orm/query.py @@ -850,6 +850,10 @@ class Query( level. See the section :ref:`orm_queryguide_yield_per` for further background on this option. + .. seealso:: + + :ref:`orm_queryguide_yield_per` + """ self.load_options += {"_yield_per": count} diff --git a/lib/sqlalchemy/testing/fixtures.py b/lib/sqlalchemy/testing/fixtures.py index ec7f2de4b4..0a2d63b548 100644 --- a/lib/sqlalchemy/testing/fixtures.py +++ b/lib/sqlalchemy/testing/fixtures.py @@ -85,6 +85,28 @@ class TestBase(object): # run a close all connections. conn.close() + @config.fixture() + def close_result_when_finished(self): + to_close = [] + to_consume = [] + + def go(result, consume=False): + to_close.append(result) + if consume: + to_consume.append(result) + + yield go + for r in to_consume: + try: + r.all() + except: + pass + for r in to_close: + try: + r.close() + except: + pass + @config.fixture() def registry(self, metadata): reg = registry(metadata=metadata) diff --git a/test/ext/asyncio/test_engine_py3k.py b/test/ext/asyncio/test_engine_py3k.py index d8d9e70211..eddf4e52fc 100644 --- a/test/ext/asyncio/test_engine_py3k.py +++ b/test/ext/asyncio/test_engine_py3k.py @@ -15,6 +15,7 @@ from sqlalchemy import Table from sqlalchemy import testing from sqlalchemy import text from sqlalchemy import union_all +from sqlalchemy.engine import cursor as _cursor from sqlalchemy.ext.asyncio import async_engine_from_config from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.ext.asyncio import engine as _async_engine @@ -873,20 +874,53 @@ class AsyncResultTest(EngineFixture): @testing.combinations( (None,), ("scalars",), ("mappings",), argnames="filter_" ) + @testing.combinations(None, 2, 5, 10, argnames="yield_per") + @testing.combinations("method", "opt", argnames="yield_per_type") @async_test - async def test_partitions(self, async_engine, filter_): + async def test_partitions( + self, async_engine, filter_, yield_per, yield_per_type + ): users = self.tables.users async with async_engine.connect() as conn: - result = await conn.stream(select(users)) + stmt = select(users) + if yield_per and yield_per_type == "opt": + stmt = stmt.execution_options(yield_per=yield_per) + result = await conn.stream(stmt) if filter_ == "mappings": result = result.mappings() elif filter_ == "scalars": result = result.scalars(1) + if yield_per and yield_per_type == "method": + result = result.yield_per(yield_per) + check_result = [] - async for partition in result.partitions(5): - check_result.append(partition) + + # stream() sets stream_results unconditionally + assert isinstance( + result._real_result.cursor_strategy, + _cursor.BufferedRowCursorFetchStrategy, + ) + + if yield_per: + partition_size = yield_per + + eq_(result._real_result.cursor_strategy._bufsize, yield_per) + + async for partition in result.partitions(): + check_result.append(partition) + else: + eq_(result._real_result.cursor_strategy._bufsize, 5) + + partition_size = 5 + async for partition in result.partitions(partition_size): + check_result.append(partition) + + ranges = [ + (i, min(20, i + partition_size)) + for i in range(1, 21, partition_size) + ] if filter_ == "mappings": eq_( @@ -896,23 +930,20 @@ class AsyncResultTest(EngineFixture): {"user_id": i, "user_name": "name%d" % i} for i in range(a, b) ] - for (a, b) in [(1, 6), (6, 11), (11, 16), (16, 20)] + for (a, b) in ranges ], ) elif filter_ == "scalars": eq_( check_result, - [ - ["name%d" % i for i in range(a, b)] - for (a, b) in [(1, 6), (6, 11), (11, 16), (16, 20)] - ], + [["name%d" % i for i in range(a, b)] for (a, b) in ranges], ) else: eq_( check_result, [ [(i, "name%d" % i) for i in range(a, b)] - for (a, b) in [(1, 6), (6, 11), (11, 16), (16, 20)] + for (a, b) in ranges ], ) diff --git a/test/orm/test_query.py b/test/orm/test_query.py index 0539e6fe65..ddaa3c60da 100644 --- a/test/orm/test_query.py +++ b/test/orm/test_query.py @@ -38,6 +38,7 @@ from sqlalchemy import type_coerce from sqlalchemy import Unicode from sqlalchemy import union from sqlalchemy import util +from sqlalchemy.engine import cursor as _cursor from sqlalchemy.engine import default from sqlalchemy.ext.compiler import compiles from sqlalchemy.orm import aliased @@ -5406,8 +5407,7 @@ class YieldTest(_fixtures.FixtureTest): if not k.startswith("_") }, { - "max_row_buffer": 15, - "stream_results": True, + "yield_per": 15, "foo": "bar", "future_result": True, }, @@ -5435,8 +5435,6 @@ class YieldTest(_fixtures.FixtureTest): if not k.startswith("_") }, { - "max_row_buffer": 15, - "stream_results": True, "yield_per": 15, "future_result": True, }, @@ -5444,6 +5442,12 @@ class YieldTest(_fixtures.FixtureTest): stmt = select(User).execution_options(yield_per=15) result = sess.execute(stmt) + + assert isinstance( + result.raw.cursor_strategy, _cursor.BufferedRowCursorFetchStrategy + ) + eq_(result.raw.cursor_strategy._max_row_buffer, 15) + eq_(len(result.all()), 4) def test_no_joinedload_opt(self): diff --git a/test/sql/test_resultset.py b/test/sql/test_resultset.py index 088f580747..13ffc5eebd 100644 --- a/test/sql/test_resultset.py +++ b/test/sql/test_resultset.py @@ -97,6 +97,12 @@ class CursorResultTest(fixtures.TablesTest): Column("user_name", VARCHAR(20)), test_needs_acid=True, ) + Table( + "test", + metadata, + Column("x", Integer, primary_key=True), + Column("y", String(50)), + ) def test_row_iteration(self, connection): users = self.tables.users @@ -1766,6 +1772,131 @@ class CursorResultTest(fixtures.TablesTest): with expect_raises_message(Exception, "canary"): r.lastrowid + @testing.combinations("plain", "mapping", "scalar", argnames="result_type") + @testing.combinations( + "stream_results", "yield_per", "yield_per_meth", argnames="optname" + ) + @testing.combinations(10, 50, argnames="value") + @testing.combinations("meth", "stmt", argnames="send_opts_how") + def test_stream_options( + self, + connection, + optname, + value, + send_opts_how, + result_type, + close_result_when_finished, + ): + table = self.tables.test + + connection.execute( + table.insert(), + [{"x": i, "y": "t_%d" % i} for i in range(15, 3000)], + ) + + if optname == "stream_results": + opts = {"stream_results": True, "max_row_buffer": value} + elif optname == "yield_per": + opts = {"yield_per": value} + elif optname == "yield_per_meth": + opts = {"stream_results": True} + else: + assert False + + if send_opts_how == "meth": + result = connection.execution_options(**opts).execute( + table.select() + ) + elif send_opts_how == "stmt": + result = connection.execute( + table.select().execution_options(**opts) + ) + else: + assert False + + if result_type == "mapping": + result = result.mappings() + real_result = result._real_result + elif result_type == "scalar": + result = result.scalars() + real_result = result._real_result + else: + real_result = result + + if optname == "yield_per_meth": + result = result.yield_per(value) + + if result_type == "mapping" or result_type == "scalar": + real_result = result._real_result + else: + real_result = result + + close_result_when_finished(result, consume=True) + + if optname == "yield_per" and value is not None: + expected_opt = { + "stream_results": True, + "max_row_buffer": value, + "yield_per": value, + } + elif optname == "stream_results" and value is not None: + expected_opt = { + "stream_results": True, + "max_row_buffer": value, + } + else: + expected_opt = None + + if expected_opt is not None: + eq_(real_result.context.execution_options, expected_opt) + + if value is None: + assert isinstance( + real_result.cursor_strategy, _cursor.CursorFetchStrategy + ) + return + + assert isinstance( + real_result.cursor_strategy, _cursor.BufferedRowCursorFetchStrategy + ) + eq_(real_result.cursor_strategy._max_row_buffer, value) + + if optname == "yield_per" or optname == "yield_per_meth": + eq_(real_result.cursor_strategy._bufsize, value) + else: + eq_(real_result.cursor_strategy._bufsize, min(value, 5)) + eq_(len(real_result.cursor_strategy._rowbuffer), 1) + + next(result) + next(result) + + if optname == "yield_per" or optname == "yield_per_meth": + eq_(len(real_result.cursor_strategy._rowbuffer), value - 1) + else: + # based on default growth of 5 + eq_(len(real_result.cursor_strategy._rowbuffer), 4) + + for i, row in enumerate(result): + if i == 186: + break + + if optname == "yield_per" or optname == "yield_per_meth": + eq_( + len(real_result.cursor_strategy._rowbuffer), + value - (188 % value), + ) + else: + # based on default growth of 5 + eq_( + len(real_result.cursor_strategy._rowbuffer), + 7 if value == 10 else 42, + ) + + if optname == "yield_per" or optname == "yield_per_meth": + # ensure partition is set up to same size + partition = next(result.partitions()) + eq_(len(partition), value) + class KeyTargetingTest(fixtures.TablesTest): run_inserts = "once"