--- /dev/null
+.. change::
+ :tags: change, engine
+
+ The ``server_side_cursors`` engine-wide parameter is deprecated and will be
+ removed in a future release. For unbuffered cursors, the
+ :paramref:`_engine.Connection.execution_options.stream_results` execution
+ option should be used on a per-execution basis.
:ref:`session_transaction_isolation` - for the ORM
+.. _engine_stream_results:
+
+Using Server Side Cursors (a.k.a. stream results)
+==================================================
+
+A limited number of dialects have explicit support for the concept of "server
+side cursors" vs. "buffered cursors". While a server side cursor implies a
+variety of different capabilities, within SQLAlchemy's engine and dialect
+implementation, it refers only to whether or not a particular set of results is
+fully buffered in memory before they are fetched from the cursor, using a
+method such as ``cursor.fetchall()``. SQLAlchemy has no direct support
+for cursor behaviors such as scrolling; to make use of these features for
+a particular DBAPI, use the cursor directly as documented at
+:ref:`dbapi_connections`.
+
+Some DBAPIs, such as the cx_Oracle DBAPI, exclusively use server side cursors
+internally. All result sets are essentially unbuffered across the total span
+of a result set, utilizing only a smaller buffer that is of a fixed size such
+as 100 rows at a time.
+
+For those dialects that have conditional support for buffered or unbuffered
+results, there are usually caveats to the use of the "unbuffered", or server
+side cursor mode. When using the psycopg2 dialect for example, an error is
+raised if a server side cursor is used with any kind of DML or DDL statement.
+When using MySQL drivers with a server side cursor, the DBAPI connection is in
+a more fragile state and does not recover as gracefully from error conditions
+nor will it allow a rollback to proceed until the cursor is fully closed.
+
+For this reason, SQLAlchemy's dialects will always default to the less error
+prone version of a cursor, which means for PostgreSQL and MySQL dialects
+it defaults to a buffered, "client side" cursor where the full set of results
+is pulled into memory before any fetch methods are called from the cursor.
+This mode of operation is appropriate in the **vast majority** of cases;
+unbuffered cursors are not generally useful except in the uncommon case
+of an application fetching a very large number of rows in chunks, where
+the processing of these rows can be complete before more rows are fetched.
+
+To make use of a server side cursor for a particular execution, the
+:paramref:`_engine.Connection.execution_options.stream_results` option
+is used, which may be called on the :class:`_engine.Connection` object,
+on the statement object, or in the ORM-level contexts mentioned below.
+
+When using this option for a statement, it's usually appropriate to use
+a method like :meth:`_engine.Result.partitions` to work on small sections
+of the result set at a time, while also fetching enough rows for each
+pull so that the operation is efficient::
+
+
+ with engine.connect() as conn:
+ result = conn.execution_options(stream_results=True).execute(text("select * from table"))
+
+ for partition in result.partitions(100):
+ _process_rows(partition)
+
+If the :class:`_engine.Result` is iterated directly, rows are fetched internally
+using a default buffering scheme that buffers first a small set of rows,
+then a larger and larger buffer on each fetch up to a pre-configured limit
+of 1000 rows. This can be affected using the ``max_row_buffer`` execution
+option::
+
+ with engine.connect() as conn:
+ conn = conn.execution_options(stream_results=True, max_row_buffer=100)
+ result = conn.execute(text("select * from table"))
+
+ for row in result:
+ _process_row(row)
+
+The option may also be set on statements. Such as when using
+:term:`1.x style` ORM use with :class:`_orm.Query`, the internal buffering
+approach will be used while iterating::
+
+ for row in session.query(User).execution_options(stream_results=True):
+ # process row
+
+The option may also be passed to :meth:`_future.Connection.execute` for a
+:term:`2.0 style` connection as well as to :meth:`_orm.Session.execute`::
+
+
+ with engine_20.connect() as conn:
+ result = engine.execute(text("select * from table"), execution_options={"stream_results": True})
+
+
+ with orm.Session(engine) as session:
+ result = session.execute(
+ select(User).order_by(User_id),
+ execution_options={"stream_results": True}
+ )
+
+
.. _dbengine_implicit:
Server Side Cursors
-------------------
-Server-side cursor support is available for the MySQLdb and PyMySQL dialects.
-From a database driver point of view this means that the ``MySQLdb.cursors.SSCursor`` or
-``pymysql.cursors.SSCursor`` class is used when building up the cursor which
-will receive results. The most typical way of invoking this feature is via the
+Server-side cursor support is available for the mysqlclient, PyMySQL,
+maridbconnector dialects and may also be available in others. This makes use
+of either the "buffered=True/False" flag if available or by using a class such
+as ``MySQLdb.cursors.SSCursor`` or ``pymysql.cursors.SSCursor`` internally.
+
+
+Server side cursors are enabled on a per-statement basis by using the
:paramref:`.Connection.execution_options.stream_results` connection execution
-option. Server side cursors can also be enabled for all SELECT statements
-unconditionally by passing ``server_side_cursors=True`` to
-:func:`_sa.create_engine`.
+option::
+
+ with engine.connect() as conn:
+ result = conn.execution_options(stream_resuls=True).execute(text("select * from table"))
+
+Note that some kinds of SQL statements may not be supported with
+server side cursors; generally, only SQL statements that return rows should be
+used with this option.
+
+.. deprecated:: 1.4 The dialect-level server_side_cursors flag is deprecated
+ and will be removed in a future release. Please use the
+ :paramref:`_engine.Connection.stream_results` execution option for
+ unbuffered cursor support.
+
+.. seealso::
-.. versionadded:: 1.1.4 - added server-side cursor support.
+ :ref:`engine_stream_results`
.. _mysql_unicode:
class MySQLExecutionContext_mariadbconnector(MySQLExecutionContext):
- pass
+ def create_server_side_cursor(self):
+ return self._dbapi_connection.cursor(buffered=False)
+
+ def create_default_cursor(self):
+ return self._dbapi_connection.cursor(buffered=True)
class MySQLCompiler_mariadbconnector(MySQLCompiler):
statement_compiler = MySQLCompiler_mariadbconnector
preparer = MySQLIdentifierPreparer_mariadbconnector
+ supports_server_side_cursors = True
+
@util.memoized_property
def _dbapi_version(self):
if self.dbapi and hasattr(self.dbapi, "__version__"):
else:
return (99, 99, 99)
- def __init__(self, server_side_cursors=False, **kwargs):
+ def __init__(self, **kwargs):
super(MySQLDialect_mariadbconnector, self).__init__(**kwargs)
- self.server_side_cursors = True
self.paramstyle = "qmark"
if self.dbapi is not None:
if self._dbapi_version < mariadb_cpy_minimum_version:
statement_compiler = MySQLCompiler_mysqldb
preparer = MySQLIdentifierPreparer_mysqldb
- def __init__(self, server_side_cursors=False, **kwargs):
+ def __init__(self, **kwargs):
super(MySQLDialect_mysqldb, self).__init__(**kwargs)
- self.server_side_cursors = server_side_cursors
self._mysql_dbapi_version = (
self._parse_dbapi_version(self.dbapi.__version__)
if self.dbapi is not None and hasattr(self.dbapi, "__version__")
supports_unicode_statements = True
supports_unicode_binds = True
- def __init__(self, server_side_cursors=False, **kwargs):
- super(MySQLDialect_pymysql, self).__init__(**kwargs)
- self.server_side_cursors = server_side_cursors
-
@langhelpers.memoized_property
def supports_server_side_cursors(self):
try:
},
)
- def __init__(self, server_side_cursors=False, **kwargs):
- PGDialect.__init__(self, **kwargs)
- self.server_side_cursors = server_side_cursors
-
@util.memoized_property
def _dbapi_version(self):
if self.dbapi and hasattr(self.dbapi, "__version__"):
PRIMARY KEY (id)
)
+.. _postgresql_ss_cursors:
+
+Server Side Cursors
+-------------------
+
+Server-side cursor support is available for the psycopg2, asyncpg
+dialects and may also be available in others.
+
+Server side cursors are enabled on a per-statement basis by using the
+:paramref:`.Connection.execution_options.stream_results` connection execution
+option::
+
+ with engine.connect() as conn:
+ result = conn.execution_options(stream_resuls=True).execute(text("select * from table"))
+
+Note that some kinds of SQL statements may not be supported with
+server side cursors; generally, only SQL statements that return rows should be
+used with this option.
+
+.. deprecated:: 1.4 The dialect-level server_side_cursors flag is deprecated
+ and will be removed in a future release. Please use the
+ :paramref:`_engine.Connection.stream_results` execution option for
+ unbuffered cursor support.
+
+.. seealso::
+
+ :ref:`engine_stream_results`
+
.. _postgresql_isolation_level:
Transaction Isolation Level
)
-"""
+""" # noqa E501
+
from collections import defaultdict
import datetime as dt
import re
psycopg2-specific keyword arguments which are accepted by
:func:`_sa.create_engine()` are:
-* ``server_side_cursors``: Enable the usage of "server side cursors" for SQL
- statements which support this feature. What this essentially means from a
- psycopg2 point of view is that the cursor is created using a name, e.g.
- ``connection.cursor('some name')``, which has the effect that result rows
- are not immediately pre-fetched and buffered after statement execution, but
- are instead left on the server and only retrieved as needed. SQLAlchemy's
- :class:`~sqlalchemy.engine.CursorResult` uses special row-buffering
- behavior when this feature is enabled, such that groups of 100 rows at a
- time are fetched over the wire to reduce conversational overhead.
- Note that the :paramref:`.Connection.execution_options.stream_results`
- execution option is a more targeted
- way of enabling this mode on a per-execution basis.
* ``use_native_unicode``: Enable the usage of Psycopg2 "native unicode" mode
per connection. True by default.
* ``stream_results`` - Enable or disable usage of psycopg2 server side
cursors - this feature makes use of "named" cursors in combination with
special result handling methods so that result rows are not fully buffered.
- If ``None`` or not set, the ``server_side_cursors`` option of the
- :class:`_engine.Engine` is used.
+ Defaults to False, meaning cursors are buffered by default.
* ``max_row_buffer`` - when using ``stream_results``, an integer value that
specifies the maximum number of rows to buffer at a time. This is
def __init__(
self,
- server_side_cursors=False,
use_native_unicode=True,
client_encoding=None,
use_native_hstore=True,
**kwargs
):
PGDialect.__init__(self, **kwargs)
- self.server_side_cursors = server_side_cursors
self.use_native_unicode = use_native_unicode
if not use_native_hstore:
self._has_native_hstore = False
:param stream_results: Available on: Connection, statement.
Indicate to the dialect that results should be
"streamed" and not pre-buffered, if possible. This is a limitation
- of many DBAPIs. The flag is currently understood only by the
- psycopg2, mysqldb and pymysql dialects.
+ of many DBAPIs. The flag is currently understood within a subset
+ of dialects within the PostgreSQL and MySQL categories, and
+ may be supported by other third party dialects as well.
+
+ .. seealso::
+
+ :ref:`engine_stream_results`
:param schema_translate_map: Available on: Connection, Engine.
A dictionary mapping schema names to schema names, that will be
supports_server_side_cursors = False
+ server_side_cursors = False
+
# extra record-level locking features (#4860)
supports_for_update_of = False
"Applications should work with result column names in a case "
"sensitive fashion.",
),
+ server_side_cursors=(
+ "1.4",
+ "The :paramref:`_sa.create_engine.server_side_cursors` parameter "
+ "is deprecated and will be removed in a future release. Please "
+ "use the "
+ ":paramref:`_engine.Connection.execution_options.stream_results` "
+ "parameter.",
+ ),
)
def __init__(
self,
# int() is because the @deprecated_params decorator cannot accommodate
# the direct reference to the "NO_LINTING" object
compiler_linting=int(compiler.NO_LINTING),
+ server_side_cursors=False,
**kwargs
):
% self.name
)
+ if server_side_cursors:
+ if not self.supports_server_side_cursors:
+ raise exc.ArgumentError(
+ "Dialect %s does not support server side cursors" % self
+ )
+ else:
+ self.server_side_cursors = True
+
self.convert_unicode = convert_unicode
self.encoding = encoding
self.positional = False
return False
if self.dialect.server_side_cursors:
+ # this is deprecated
use_server_side = self.execution_options.get(
"stream_results", True
) and (
return self.create_server_side_cursor()
else:
self._is_server_side = False
- return self._dbapi_connection.cursor()
+ return self.create_default_cursor()
+
+ def create_default_cursor(self):
+ return self._dbapi_connection.cursor()
def create_server_side_cursor(self):
raise NotImplementedError()
.. seealso::
- :meth:`_query.Query.enable_eagerloads`
+ :ref:`engine_stream_results`
"""
self.load_options += {"_yield_per": count}
.. seealso::
+ :ref:`engine_stream_results`
+
:meth:`_query.Query.get_execution_options`
"""
elif self.engine.dialect.driver == "mysqldb":
sscursor = __import__("MySQLdb.cursors").cursors.SSCursor
return isinstance(cursor, sscursor)
+ elif self.engine.dialect.driver == "mariadbconnector":
+ return not cursor.buffered
elif self.engine.dialect.driver == "asyncpg":
return cursor.server_side
else:
return False
def _fixture(self, server_side_cursors):
- self.engine = engines.testing_engine(
- options={"server_side_cursors": server_side_cursors}
- )
+ if server_side_cursors:
+ with testing.expect_deprecated(
+ "The create_engine.server_side_cursors parameter is "
+ "deprecated and will be removed in a future release. "
+ "Please use the Connection.execution_options.stream_results "
+ "parameter."
+ ):
+ self.engine = engines.testing_engine(
+ options={"server_side_cursors": server_side_cursors}
+ )
+ else:
+ self.engine = engines.testing_engine(
+ options={"server_side_cursors": server_side_cursors}
+ )
return self.engine
def tearDown(self):
)[1]
assert "raise_on_warnings" not in kw
- @testing.only_on(["mysql", "mariadb"])
+ @testing.only_on(
+ [
+ "mysql+mysqldb",
+ "mysql+pymysql",
+ "mariadb+mysqldb",
+ "mariadb+pymysql",
+ ]
+ )
def test_random_arg(self):
dialect = testing.db.dialect
kw = dialect.create_connect_args(
)[1]
eq_(kw["foo"], "true")
- @testing.only_on(["mysql", "mariadb"])
- @testing.skip_if("mysql+mysqlconnector", "totally broken for the moment")
- @testing.fails_on("mysql+oursql", "unsupported")
+ @testing.only_on(
+ [
+ "mysql+mysqldb",
+ "mysql+pymysql",
+ "mariadb+mysqldb",
+ "mariadb+pymysql",
+ ]
+ )
def test_special_encodings(self):
for enc in ["utf8mb4", "utf8"]:
assert isinstance(d, datetime.datetime)
-class AutocommitTextTest(test_execute.AutocommitTextTest):
+class AutocommitTextTest(
+ test_execute.AutocommitKeywordFixture, fixtures.TestBase
+):
__only_on__ = "mysql", "mariadb"
def test_load_data(self):
)
enum_table.create(connection)
+ assert_raises(
+ exc.DBAPIError,
+ enum_table.insert().execute,
+ e1=None,
+ e2=None,
+ e3=None,
+ e4=None,
+ )
+
assert enum_table.c.e2generic.type.validate_strings
assert_raises(
eq_(conn.info["boom"], "one")
-class AutocommitTextTest(fixtures.TestBase):
- __backend__ = True
-
+class AutocommitKeywordFixture(object):
def _test_keyword(self, keyword, expected=True):
dbapi = Mock(
connect=Mock(
with engine.connect() as conn:
conn.exec_driver_sql("%s something table something" % keyword)
+ for _call in dbapi.connect().mock_calls:
+ _call.kwargs.clear()
+
if expected:
eq_(dbapi.connect().mock_calls, [call.cursor(), call.commit()])
else:
eq_(dbapi.connect().mock_calls, [call.cursor()])
+
+class AutocommitTextTest(AutocommitKeywordFixture, fixtures.TestBase):
+ __backend__ = True
+
def test_update(self):
self._test_keyword("UPDATE")