]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Deprecate engine-wise ss cursors; repair mariadbconnector
authorMike Bayer <mike_mp@zzzcomputing.com>
Sun, 13 Sep 2020 14:36:16 +0000 (10:36 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sun, 13 Sep 2020 17:06:02 +0000 (13:06 -0400)
The server_side_cursors engine-wide feature relies upon
regexp parsing of statements a well as general guessing as
to when the feature should be used.   This is not within the
2.0 way of doing things and should be removed.

Additionally, mariadbconnector defaults to unbuffered cursors;
add new cursor hooks so that mariadbconnector can specify
buffered or unbuffered cursors without too much difficulty.
This will also correctly default mariadbconnector to buffered
cursors which should repair the segfaults we've been getting.

Try to restore the assert_raises that was removed in
5b6dfc0c38bf1f01da4b8 to see if mariadbconnector segfaults
are resolved.

Change-Id: I77f1c972c742e40694972f578140bb0cac8c39eb

16 files changed:
doc/build/changelog/unreleased_14/sscursor.rst [new file with mode: 0644]
doc/build/core/connections.rst
lib/sqlalchemy/dialects/mysql/base.py
lib/sqlalchemy/dialects/mysql/mariadbconnector.py
lib/sqlalchemy/dialects/mysql/mysqldb.py
lib/sqlalchemy/dialects/mysql/pymysql.py
lib/sqlalchemy/dialects/postgresql/asyncpg.py
lib/sqlalchemy/dialects/postgresql/base.py
lib/sqlalchemy/dialects/postgresql/psycopg2.py
lib/sqlalchemy/engine/base.py
lib/sqlalchemy/engine/default.py
lib/sqlalchemy/orm/query.py
lib/sqlalchemy/testing/suite/test_results.py
test/dialect/mysql/test_dialect.py
test/dialect/mysql/test_types.py
test/engine/test_execute.py

diff --git a/doc/build/changelog/unreleased_14/sscursor.rst b/doc/build/changelog/unreleased_14/sscursor.rst
new file mode 100644 (file)
index 0000000..8f5040b
--- /dev/null
@@ -0,0 +1,7 @@
+.. change::
+    :tags: change, engine
+
+    The ``server_side_cursors`` engine-wide parameter is deprecated and will be
+    removed in a future release.  For unbuffered cursors, the
+    :paramref:`_engine.Connection.execution_options.stream_results` execution
+    option should be used on a per-execution basis.
index 5597b83f14187e1b9186096930f87d4e8f4fb6ba..85dddeab0115e09f720a8b0696e778818c1606b2 100644 (file)
@@ -475,6 +475,95 @@ reverted when a connection is returned to the connection pool.
 
       :ref:`session_transaction_isolation` - for the ORM
 
+.. _engine_stream_results:
+
+Using Server Side Cursors (a.k.a. stream results)
+==================================================
+
+A limited number of dialects have explicit support for the concept of "server
+side cursors" vs. "buffered cursors".    While a server side cursor implies a
+variety of different capabilities, within SQLAlchemy's engine and dialect
+implementation, it refers only to whether or not a particular set of results is
+fully buffered in memory before they are fetched from the cursor, using a
+method such as ``cursor.fetchall()``.   SQLAlchemy has no direct support
+for cursor behaviors such as scrolling; to make use of these features for
+a particular DBAPI, use the cursor directly as documented at
+:ref:`dbapi_connections`.
+
+Some DBAPIs, such as the cx_Oracle DBAPI, exclusively use server side cursors
+internally.  All result sets are essentially unbuffered across the total span
+of a result set, utilizing only a smaller buffer that is of a fixed size such
+as 100 rows at a time.
+
+For those dialects that have conditional support for buffered or unbuffered
+results, there are usually caveats to the use of the "unbuffered", or server
+side cursor mode.   When using the psycopg2 dialect for example, an error is
+raised if a server side cursor is used with any kind of DML or DDL statement.
+When using MySQL drivers with a server side cursor, the DBAPI connection is in
+a more fragile state and does not recover as gracefully from error conditions
+nor will it allow a rollback to proceed until the cursor is fully closed.
+
+For this reason, SQLAlchemy's dialects will always default to the less error
+prone version of a cursor, which means for PostgreSQL and MySQL dialects
+it defaults to a buffered, "client side" cursor where the full set of results
+is pulled into memory before any fetch methods are called from the cursor.
+This mode of operation is appropriate in the **vast majority** of cases;
+unbuffered cursors are not generally useful except in the uncommon case
+of an application fetching a very large number of rows in chunks, where
+the processing of these rows can be complete before more rows are fetched.
+
+To make use of a server side cursor for a particular execution, the
+:paramref:`_engine.Connection.execution_options.stream_results` option
+is used, which may be called on the :class:`_engine.Connection` object,
+on the statement object, or in the ORM-level contexts mentioned below.
+
+When using this option for a statement, it's usually appropriate to use
+a method like :meth:`_engine.Result.partitions` to work on small sections
+of the result set at a time, while also fetching enough rows for each
+pull so that the operation is efficient::
+
+
+    with engine.connect() as conn:
+        result = conn.execution_options(stream_results=True).execute(text("select * from table"))
+
+        for partition in result.partitions(100):
+            _process_rows(partition)
+
+If the :class:`_engine.Result` is iterated directly, rows are fetched internally
+using a default buffering scheme that buffers first a small set of rows,
+then a larger and larger buffer on each fetch up to a pre-configured limit
+of 1000 rows.   This can be affected using the ``max_row_buffer`` execution
+option::
+
+    with engine.connect() as conn:
+        conn = conn.execution_options(stream_results=True, max_row_buffer=100)
+        result = conn.execute(text("select * from table"))
+
+        for row in result:
+            _process_row(row)
+
+The option may also be set on statements.   Such as when using
+:term:`1.x style` ORM use with :class:`_orm.Query`, the internal buffering
+approach will be used while iterating::
+
+    for row in session.query(User).execution_options(stream_results=True):
+        # process row
+
+The option may also be passed to :meth:`_future.Connection.execute` for a
+:term:`2.0 style` connection as well as to :meth:`_orm.Session.execute`::
+
+
+    with engine_20.connect() as conn:
+        result = engine.execute(text("select * from table"), execution_options={"stream_results": True})
+
+
+    with orm.Session(engine) as session:
+        result = session.execute(
+            select(User).order_by(User_id),
+            execution_options={"stream_results": True}
+        )
+
+
 .. _dbengine_implicit:
 
 
index 130aeeb0ff8fcd15cac8bb2e42663ecc08d8a002..b9067f38409a14125e1dd35134f0c280c3e62fc0 100644 (file)
@@ -271,16 +271,31 @@ multi-column key for some storage engines::
 Server Side Cursors
 -------------------
 
-Server-side cursor support is available for the MySQLdb and PyMySQL dialects.
-From a database driver point of view this means that the ``MySQLdb.cursors.SSCursor`` or
-``pymysql.cursors.SSCursor`` class is used when building up the cursor which
-will receive results.  The most typical way of invoking this feature is via the
+Server-side cursor support is available for the mysqlclient, PyMySQL,
+maridbconnector dialects and may also be available in others.   This makes use
+of either the "buffered=True/False" flag if available or by using a class such
+as ``MySQLdb.cursors.SSCursor`` or ``pymysql.cursors.SSCursor`` internally.
+
+
+Server side cursors are enabled on a per-statement basis by using the
 :paramref:`.Connection.execution_options.stream_results` connection execution
-option.   Server side cursors can also be enabled for all SELECT statements
-unconditionally by passing ``server_side_cursors=True`` to
-:func:`_sa.create_engine`.
+option::
+
+    with engine.connect() as conn:
+        result = conn.execution_options(stream_resuls=True).execute(text("select * from table"))
+
+Note that some kinds of SQL statements may not be supported with
+server side cursors; generally, only SQL statements that return rows should be
+used with this option.
+
+.. deprecated:: 1.4  The dialect-level server_side_cursors flag is deprecated
+   and will be removed in a future release.  Please use the
+   :paramref:`_engine.Connection.stream_results` execution option for
+   unbuffered cursor support.
+
+.. seealso::
 
-.. versionadded:: 1.1.4 - added server-side cursor support.
+    :ref:`engine_stream_results`
 
 .. _mysql_unicode:
 
index aa28ffc67bbf9cdc92f378a89e09d3a7e06bc4cb..4e0b4e0a97a4c0d8146b154881ab554f8f8251e3 100644 (file)
@@ -40,7 +40,11 @@ mariadb_cpy_minimum_version = (1, 0, 1)
 
 
 class MySQLExecutionContext_mariadbconnector(MySQLExecutionContext):
-    pass
+    def create_server_side_cursor(self):
+        return self._dbapi_connection.cursor(buffered=False)
+
+    def create_default_cursor(self):
+        return self._dbapi_connection.cursor(buffered=True)
 
 
 class MySQLCompiler_mariadbconnector(MySQLCompiler):
@@ -75,6 +79,8 @@ class MySQLDialect_mariadbconnector(MySQLDialect):
     statement_compiler = MySQLCompiler_mariadbconnector
     preparer = MySQLIdentifierPreparer_mariadbconnector
 
+    supports_server_side_cursors = True
+
     @util.memoized_property
     def _dbapi_version(self):
         if self.dbapi and hasattr(self.dbapi, "__version__"):
@@ -89,9 +95,8 @@ class MySQLDialect_mariadbconnector(MySQLDialect):
         else:
             return (99, 99, 99)
 
-    def __init__(self, server_side_cursors=False, **kwargs):
+    def __init__(self, **kwargs):
         super(MySQLDialect_mariadbconnector, self).__init__(**kwargs)
-        self.server_side_cursors = True
         self.paramstyle = "qmark"
         if self.dbapi is not None:
             if self._dbapi_version < mariadb_cpy_minimum_version:
index 03c1779c3bbb6486d275310c61973500801c85ba..b20e061fb50e50fa8a2841823986e9d54da016f0 100644 (file)
@@ -88,9 +88,8 @@ class MySQLDialect_mysqldb(MySQLDialect):
     statement_compiler = MySQLCompiler_mysqldb
     preparer = MySQLIdentifierPreparer_mysqldb
 
-    def __init__(self, server_side_cursors=False, **kwargs):
+    def __init__(self, **kwargs):
         super(MySQLDialect_mysqldb, self).__init__(**kwargs)
-        self.server_side_cursors = server_side_cursors
         self._mysql_dbapi_version = (
             self._parse_dbapi_version(self.dbapi.__version__)
             if self.dbapi is not None and hasattr(self.dbapi, "__version__")
index e4d5d6206e02e8ad88bfdf6a3a8d7a880686e326..7d7770105bf6f110bc45699010843b3fdf959afc 100644 (file)
@@ -44,10 +44,6 @@ class MySQLDialect_pymysql(MySQLDialect_mysqldb):
     supports_unicode_statements = True
     supports_unicode_binds = True
 
-    def __init__(self, server_side_cursors=False, **kwargs):
-        super(MySQLDialect_pymysql, self).__init__(**kwargs)
-        self.server_side_cursors = server_side_cursors
-
     @langhelpers.memoized_property
     def supports_server_side_cursors(self):
         try:
index eb87249b40ad7adc10844386cf02bb1ddebe6539..780e238449cb28c18fa66fb0c5e0009f0a877fbd 100644 (file)
@@ -723,10 +723,6 @@ class PGDialect_asyncpg(PGDialect):
         },
     )
 
-    def __init__(self, server_side_cursors=False, **kwargs):
-        PGDialect.__init__(self, **kwargs)
-        self.server_side_cursors = server_side_cursors
-
     @util.memoized_property
     def _dbapi_version(self):
         if self.dbapi and hasattr(self.dbapi, "__version__"):
index 3ef87620fdb6a4d6314048004443be3c08f085d1..5e81586b49cefeadeea7dee55de2d3743ed5fe7f 100644 (file)
@@ -112,6 +112,34 @@ The CREATE TABLE for the above :class:`_schema.Table` object would be:
            PRIMARY KEY (id)
        )
 
+.. _postgresql_ss_cursors:
+
+Server Side Cursors
+-------------------
+
+Server-side cursor support is available for the psycopg2, asyncpg
+dialects and may also be available in others.
+
+Server side cursors are enabled on a per-statement basis by using the
+:paramref:`.Connection.execution_options.stream_results` connection execution
+option::
+
+    with engine.connect() as conn:
+        result = conn.execution_options(stream_resuls=True).execute(text("select * from table"))
+
+Note that some kinds of SQL statements may not be supported with
+server side cursors; generally, only SQL statements that return rows should be
+used with this option.
+
+.. deprecated:: 1.4  The dialect-level server_side_cursors flag is deprecated
+   and will be removed in a future release.  Please use the
+   :paramref:`_engine.Connection.stream_results` execution option for
+   unbuffered cursor support.
+
+.. seealso::
+
+    :ref:`engine_stream_results`
+
 .. _postgresql_isolation_level:
 
 Transaction Isolation Level
@@ -1075,7 +1103,8 @@ E.g.::
     )
 
 
-"""
+"""  # noqa E501
+
 from collections import defaultdict
 import datetime as dt
 import re
index 3cc62fc93a8e99be1c66b2192f5e4bf1ac5d7fb6..91576c4d2019d6e66e08226b6d7f49d81e5ca55c 100644 (file)
@@ -17,18 +17,6 @@ psycopg2 Connect Arguments
 psycopg2-specific keyword arguments which are accepted by
 :func:`_sa.create_engine()` are:
 
-* ``server_side_cursors``: Enable the usage of "server side cursors" for SQL
-  statements which support this feature. What this essentially means from a
-  psycopg2 point of view is that the cursor is created using a name, e.g.
-  ``connection.cursor('some name')``, which has the effect that result rows
-  are not immediately pre-fetched and buffered after statement execution, but
-  are instead left on the server and only retrieved as needed. SQLAlchemy's
-  :class:`~sqlalchemy.engine.CursorResult` uses special row-buffering
-  behavior when this feature is enabled, such that groups of 100 rows at a
-  time are fetched over the wire to reduce conversational overhead.
-  Note that the :paramref:`.Connection.execution_options.stream_results`
-  execution option is a more targeted
-  way of enabling this mode on a per-execution basis.
 
 * ``use_native_unicode``: Enable the usage of Psycopg2 "native unicode" mode
   per connection.  True by default.
@@ -129,8 +117,7 @@ in addition to those not specific to DBAPIs:
 * ``stream_results`` - Enable or disable usage of psycopg2 server side
   cursors - this feature makes use of "named" cursors in combination with
   special result handling methods so that result rows are not fully buffered.
-  If ``None`` or not set, the ``server_side_cursors`` option of the
-  :class:`_engine.Engine` is used.
+  Defaults to False, meaning cursors are buffered by default.
 
 * ``max_row_buffer`` - when using ``stream_results``, an integer value that
   specifies the maximum number of rows to buffer at a time.  This is
@@ -691,7 +678,6 @@ class PGDialect_psycopg2(PGDialect):
 
     def __init__(
         self,
-        server_side_cursors=False,
         use_native_unicode=True,
         client_encoding=None,
         use_native_hstore=True,
@@ -702,7 +688,6 @@ class PGDialect_psycopg2(PGDialect):
         **kwargs
     ):
         PGDialect.__init__(self, **kwargs)
-        self.server_side_cursors = server_side_cursors
         self.use_native_unicode = use_native_unicode
         if not use_native_hstore:
             self._has_native_hstore = False
index afab8e7b4ed434d0790180ea0dbf4487e1d99400..55266fae7c885e31a3c38629b0fa971df504feb4 100644 (file)
@@ -307,8 +307,13 @@ class Connection(Connectable):
         :param stream_results: Available on: Connection, statement.
           Indicate to the dialect that results should be
           "streamed" and not pre-buffered, if possible.  This is a limitation
-          of many DBAPIs.  The flag is currently understood only by the
-          psycopg2, mysqldb and pymysql dialects.
+          of many DBAPIs.  The flag is currently understood within a subset
+          of dialects within the PostgreSQL and MySQL categories, and
+          may be supported by other third party dialects as well.
+
+          .. seealso::
+
+            :ref:`engine_stream_results`
 
         :param schema_translate_map: Available on: Connection, Engine.
           A dictionary mapping schema names to schema names, that will be
index 9d2eaf6063516db3273abd406fa1b8cea8402a2c..ff29c3b9dd77c451262aa585959f454f7f0e50a3 100644 (file)
@@ -144,6 +144,8 @@ class DefaultDialect(interfaces.Dialect):
 
     supports_server_side_cursors = False
 
+    server_side_cursors = False
+
     # extra record-level locking features (#4860)
     supports_for_update_of = False
 
@@ -235,6 +237,14 @@ class DefaultDialect(interfaces.Dialect):
             "Applications should work with result column names in a case "
             "sensitive fashion.",
         ),
+        server_side_cursors=(
+            "1.4",
+            "The :paramref:`_sa.create_engine.server_side_cursors` parameter "
+            "is deprecated and will be removed in a future release.  Please "
+            "use the "
+            ":paramref:`_engine.Connection.execution_options.stream_results` "
+            "parameter.",
+        ),
     )
     def __init__(
         self,
@@ -250,6 +260,7 @@ class DefaultDialect(interfaces.Dialect):
         # int() is because the @deprecated_params decorator cannot accommodate
         # the direct reference to the "NO_LINTING" object
         compiler_linting=int(compiler.NO_LINTING),
+        server_side_cursors=False,
         **kwargs
     ):
 
@@ -259,6 +270,14 @@ class DefaultDialect(interfaces.Dialect):
                 % self.name
             )
 
+        if server_side_cursors:
+            if not self.supports_server_side_cursors:
+                raise exc.ArgumentError(
+                    "Dialect %s does not support server side cursors" % self
+                )
+            else:
+                self.server_side_cursors = True
+
         self.convert_unicode = convert_unicode
         self.encoding = encoding
         self.positional = False
@@ -1189,6 +1208,7 @@ class DefaultExecutionContext(interfaces.ExecutionContext):
             return False
 
         if self.dialect.server_side_cursors:
+            # this is deprecated
             use_server_side = self.execution_options.get(
                 "stream_results", True
             ) and (
@@ -1232,7 +1252,10 @@ class DefaultExecutionContext(interfaces.ExecutionContext):
             return self.create_server_side_cursor()
         else:
             self._is_server_side = False
-            return self._dbapi_connection.cursor()
+            return self.create_default_cursor()
+
+    def create_default_cursor(self):
+        return self._dbapi_connection.cursor()
 
     def create_server_side_cursor(self):
         raise NotImplementedError()
index 26a5a89c78ca1eedc9b184317c5f608bc510680c..91a638e4b52501771396740426778d1ceec919fb 100644 (file)
@@ -803,7 +803,7 @@ class Query(
 
         .. seealso::
 
-            :meth:`_query.Query.enable_eagerloads`
+            :ref:`engine_stream_results`
 
         """
         self.load_options += {"_yield_per": count}
@@ -1528,6 +1528,8 @@ class Query(
 
         .. seealso::
 
+            :ref:`engine_stream_results`
+
             :meth:`_query.Query.get_execution_options`
 
         """
index 6d28a207eab106ac580687b47f1125d65a4db100..1c1b20cf0133c7f96a21c26c3435599be23d31c3 100644 (file)
@@ -236,15 +236,28 @@ class ServerSideCursorsTest(
         elif self.engine.dialect.driver == "mysqldb":
             sscursor = __import__("MySQLdb.cursors").cursors.SSCursor
             return isinstance(cursor, sscursor)
+        elif self.engine.dialect.driver == "mariadbconnector":
+            return not cursor.buffered
         elif self.engine.dialect.driver == "asyncpg":
             return cursor.server_side
         else:
             return False
 
     def _fixture(self, server_side_cursors):
-        self.engine = engines.testing_engine(
-            options={"server_side_cursors": server_side_cursors}
-        )
+        if server_side_cursors:
+            with testing.expect_deprecated(
+                "The create_engine.server_side_cursors parameter is "
+                "deprecated and will be removed in a future release.  "
+                "Please use the Connection.execution_options.stream_results "
+                "parameter."
+            ):
+                self.engine = engines.testing_engine(
+                    options={"server_side_cursors": server_side_cursors}
+                )
+        else:
+            self.engine = engines.testing_engine(
+                options={"server_side_cursors": server_side_cursors}
+            )
         return self.engine
 
     def tearDown(self):
index c3bd94ffad1e9ca0e64f9f32e0af7bf1e4fc9516..a555e539605612580ab40f2f41d15011f705c648 100644 (file)
@@ -227,7 +227,14 @@ class DialectTest(fixtures.TestBase):
         )[1]
         assert "raise_on_warnings" not in kw
 
-    @testing.only_on(["mysql", "mariadb"])
+    @testing.only_on(
+        [
+            "mysql+mysqldb",
+            "mysql+pymysql",
+            "mariadb+mysqldb",
+            "mariadb+pymysql",
+        ]
+    )
     def test_random_arg(self):
         dialect = testing.db.dialect
         kw = dialect.create_connect_args(
@@ -235,9 +242,14 @@ class DialectTest(fixtures.TestBase):
         )[1]
         eq_(kw["foo"], "true")
 
-    @testing.only_on(["mysql", "mariadb"])
-    @testing.skip_if("mysql+mysqlconnector", "totally broken for the moment")
-    @testing.fails_on("mysql+oursql", "unsupported")
+    @testing.only_on(
+        [
+            "mysql+mysqldb",
+            "mysql+pymysql",
+            "mariadb+mysqldb",
+            "mariadb+pymysql",
+        ]
+    )
     def test_special_encodings(self):
 
         for enc in ["utf8mb4", "utf8"]:
@@ -487,7 +499,9 @@ class ExecutionTest(fixtures.TestBase):
         assert isinstance(d, datetime.datetime)
 
 
-class AutocommitTextTest(test_execute.AutocommitTextTest):
+class AutocommitTextTest(
+    test_execute.AutocommitKeywordFixture, fixtures.TestBase
+):
     __only_on__ = "mysql", "mariadb"
 
     def test_load_data(self):
index 60c4f0ec066e5aed461f90f08c7e2fa5470ae4f7..8983522c14b40c65e3aa060637d5eab84b484e38 100644 (file)
@@ -852,6 +852,15 @@ class EnumSetTest(
         )
         enum_table.create(connection)
 
+        assert_raises(
+            exc.DBAPIError,
+            enum_table.insert().execute,
+            e1=None,
+            e2=None,
+            e3=None,
+            e4=None,
+        )
+
         assert enum_table.c.e2generic.type.validate_strings
 
         assert_raises(
index 61e25488e156dee0d482785f417dab4ea3afd70c..3a4b58df6d3cc2737290cf4d9ce459ecd542fb7c 100644 (file)
@@ -3103,9 +3103,7 @@ class DialectEventTest(fixtures.TestBase):
         eq_(conn.info["boom"], "one")
 
 
-class AutocommitTextTest(fixtures.TestBase):
-    __backend__ = True
-
+class AutocommitKeywordFixture(object):
     def _test_keyword(self, keyword, expected=True):
         dbapi = Mock(
             connect=Mock(
@@ -3122,11 +3120,18 @@ class AutocommitTextTest(fixtures.TestBase):
         with engine.connect() as conn:
             conn.exec_driver_sql("%s something table something" % keyword)
 
+            for _call in dbapi.connect().mock_calls:
+                _call.kwargs.clear()
+
             if expected:
                 eq_(dbapi.connect().mock_calls, [call.cursor(), call.commit()])
             else:
                 eq_(dbapi.connect().mock_calls, [call.cursor()])
 
+
+class AutocommitTextTest(AutocommitKeywordFixture, fixtures.TestBase):
+    __backend__ = True
+
     def test_update(self):
         self._test_keyword("UPDATE")