From 25b8e89d3d425656e51438b5cac7e6e1f2592817 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Sun, 13 Sep 2020 10:36:16 -0400 Subject: [PATCH] Deprecate engine-wise ss cursors; repair mariadbconnector The server_side_cursors engine-wide feature relies upon regexp parsing of statements a well as general guessing as to when the feature should be used. This is not within the 2.0 way of doing things and should be removed. Additionally, mariadbconnector defaults to unbuffered cursors; add new cursor hooks so that mariadbconnector can specify buffered or unbuffered cursors without too much difficulty. This will also correctly default mariadbconnector to buffered cursors which should repair the segfaults we've been getting. Try to restore the assert_raises that was removed in 5b6dfc0c38bf1f01da4b8 to see if mariadbconnector segfaults are resolved. Change-Id: I77f1c972c742e40694972f578140bb0cac8c39eb --- .../changelog/unreleased_14/sscursor.rst | 7 ++ doc/build/core/connections.rst | 89 +++++++++++++++++++ lib/sqlalchemy/dialects/mysql/base.py | 31 +++++-- .../dialects/mysql/mariadbconnector.py | 11 ++- lib/sqlalchemy/dialects/mysql/mysqldb.py | 3 +- lib/sqlalchemy/dialects/mysql/pymysql.py | 4 - lib/sqlalchemy/dialects/postgresql/asyncpg.py | 4 - lib/sqlalchemy/dialects/postgresql/base.py | 31 ++++++- .../dialects/postgresql/psycopg2.py | 17 +--- lib/sqlalchemy/engine/base.py | 9 +- lib/sqlalchemy/engine/default.py | 25 +++++- lib/sqlalchemy/orm/query.py | 4 +- lib/sqlalchemy/testing/suite/test_results.py | 19 +++- test/dialect/mysql/test_dialect.py | 24 +++-- test/dialect/mysql/test_types.py | 9 ++ test/engine/test_execute.py | 11 ++- 16 files changed, 245 insertions(+), 53 deletions(-) create mode 100644 doc/build/changelog/unreleased_14/sscursor.rst diff --git a/doc/build/changelog/unreleased_14/sscursor.rst b/doc/build/changelog/unreleased_14/sscursor.rst new file mode 100644 index 0000000000..8f5040bce7 --- /dev/null +++ b/doc/build/changelog/unreleased_14/sscursor.rst @@ -0,0 +1,7 @@ +.. change:: + :tags: change, engine + + The ``server_side_cursors`` engine-wide parameter is deprecated and will be + removed in a future release. For unbuffered cursors, the + :paramref:`_engine.Connection.execution_options.stream_results` execution + option should be used on a per-execution basis. diff --git a/doc/build/core/connections.rst b/doc/build/core/connections.rst index 5597b83f14..85dddeab01 100644 --- a/doc/build/core/connections.rst +++ b/doc/build/core/connections.rst @@ -475,6 +475,95 @@ reverted when a connection is returned to the connection pool. :ref:`session_transaction_isolation` - for the ORM +.. _engine_stream_results: + +Using Server Side Cursors (a.k.a. stream results) +================================================== + +A limited number of dialects have explicit support for the concept of "server +side cursors" vs. "buffered cursors". While a server side cursor implies a +variety of different capabilities, within SQLAlchemy's engine and dialect +implementation, it refers only to whether or not a particular set of results is +fully buffered in memory before they are fetched from the cursor, using a +method such as ``cursor.fetchall()``. SQLAlchemy has no direct support +for cursor behaviors such as scrolling; to make use of these features for +a particular DBAPI, use the cursor directly as documented at +:ref:`dbapi_connections`. + +Some DBAPIs, such as the cx_Oracle DBAPI, exclusively use server side cursors +internally. All result sets are essentially unbuffered across the total span +of a result set, utilizing only a smaller buffer that is of a fixed size such +as 100 rows at a time. + +For those dialects that have conditional support for buffered or unbuffered +results, there are usually caveats to the use of the "unbuffered", or server +side cursor mode. When using the psycopg2 dialect for example, an error is +raised if a server side cursor is used with any kind of DML or DDL statement. +When using MySQL drivers with a server side cursor, the DBAPI connection is in +a more fragile state and does not recover as gracefully from error conditions +nor will it allow a rollback to proceed until the cursor is fully closed. + +For this reason, SQLAlchemy's dialects will always default to the less error +prone version of a cursor, which means for PostgreSQL and MySQL dialects +it defaults to a buffered, "client side" cursor where the full set of results +is pulled into memory before any fetch methods are called from the cursor. +This mode of operation is appropriate in the **vast majority** of cases; +unbuffered cursors are not generally useful except in the uncommon case +of an application fetching a very large number of rows in chunks, where +the processing of these rows can be complete before more rows are fetched. + +To make use of a server side cursor for a particular execution, the +:paramref:`_engine.Connection.execution_options.stream_results` option +is used, which may be called on the :class:`_engine.Connection` object, +on the statement object, or in the ORM-level contexts mentioned below. + +When using this option for a statement, it's usually appropriate to use +a method like :meth:`_engine.Result.partitions` to work on small sections +of the result set at a time, while also fetching enough rows for each +pull so that the operation is efficient:: + + + with engine.connect() as conn: + result = conn.execution_options(stream_results=True).execute(text("select * from table")) + + for partition in result.partitions(100): + _process_rows(partition) + +If the :class:`_engine.Result` is iterated directly, rows are fetched internally +using a default buffering scheme that buffers first a small set of rows, +then a larger and larger buffer on each fetch up to a pre-configured limit +of 1000 rows. This can be affected using the ``max_row_buffer`` execution +option:: + + with engine.connect() as conn: + conn = conn.execution_options(stream_results=True, max_row_buffer=100) + result = conn.execute(text("select * from table")) + + for row in result: + _process_row(row) + +The option may also be set on statements. Such as when using +:term:`1.x style` ORM use with :class:`_orm.Query`, the internal buffering +approach will be used while iterating:: + + for row in session.query(User).execution_options(stream_results=True): + # process row + +The option may also be passed to :meth:`_future.Connection.execute` for a +:term:`2.0 style` connection as well as to :meth:`_orm.Session.execute`:: + + + with engine_20.connect() as conn: + result = engine.execute(text("select * from table"), execution_options={"stream_results": True}) + + + with orm.Session(engine) as session: + result = session.execute( + select(User).order_by(User_id), + execution_options={"stream_results": True} + ) + + .. _dbengine_implicit: diff --git a/lib/sqlalchemy/dialects/mysql/base.py b/lib/sqlalchemy/dialects/mysql/base.py index 130aeeb0ff..b9067f3840 100644 --- a/lib/sqlalchemy/dialects/mysql/base.py +++ b/lib/sqlalchemy/dialects/mysql/base.py @@ -271,16 +271,31 @@ multi-column key for some storage engines:: Server Side Cursors ------------------- -Server-side cursor support is available for the MySQLdb and PyMySQL dialects. -From a database driver point of view this means that the ``MySQLdb.cursors.SSCursor`` or -``pymysql.cursors.SSCursor`` class is used when building up the cursor which -will receive results. The most typical way of invoking this feature is via the +Server-side cursor support is available for the mysqlclient, PyMySQL, +maridbconnector dialects and may also be available in others. This makes use +of either the "buffered=True/False" flag if available or by using a class such +as ``MySQLdb.cursors.SSCursor`` or ``pymysql.cursors.SSCursor`` internally. + + +Server side cursors are enabled on a per-statement basis by using the :paramref:`.Connection.execution_options.stream_results` connection execution -option. Server side cursors can also be enabled for all SELECT statements -unconditionally by passing ``server_side_cursors=True`` to -:func:`_sa.create_engine`. +option:: + + with engine.connect() as conn: + result = conn.execution_options(stream_resuls=True).execute(text("select * from table")) + +Note that some kinds of SQL statements may not be supported with +server side cursors; generally, only SQL statements that return rows should be +used with this option. + +.. deprecated:: 1.4 The dialect-level server_side_cursors flag is deprecated + and will be removed in a future release. Please use the + :paramref:`_engine.Connection.stream_results` execution option for + unbuffered cursor support. + +.. seealso:: -.. versionadded:: 1.1.4 - added server-side cursor support. + :ref:`engine_stream_results` .. _mysql_unicode: diff --git a/lib/sqlalchemy/dialects/mysql/mariadbconnector.py b/lib/sqlalchemy/dialects/mysql/mariadbconnector.py index aa28ffc67b..4e0b4e0a97 100644 --- a/lib/sqlalchemy/dialects/mysql/mariadbconnector.py +++ b/lib/sqlalchemy/dialects/mysql/mariadbconnector.py @@ -40,7 +40,11 @@ mariadb_cpy_minimum_version = (1, 0, 1) class MySQLExecutionContext_mariadbconnector(MySQLExecutionContext): - pass + def create_server_side_cursor(self): + return self._dbapi_connection.cursor(buffered=False) + + def create_default_cursor(self): + return self._dbapi_connection.cursor(buffered=True) class MySQLCompiler_mariadbconnector(MySQLCompiler): @@ -75,6 +79,8 @@ class MySQLDialect_mariadbconnector(MySQLDialect): statement_compiler = MySQLCompiler_mariadbconnector preparer = MySQLIdentifierPreparer_mariadbconnector + supports_server_side_cursors = True + @util.memoized_property def _dbapi_version(self): if self.dbapi and hasattr(self.dbapi, "__version__"): @@ -89,9 +95,8 @@ class MySQLDialect_mariadbconnector(MySQLDialect): else: return (99, 99, 99) - def __init__(self, server_side_cursors=False, **kwargs): + def __init__(self, **kwargs): super(MySQLDialect_mariadbconnector, self).__init__(**kwargs) - self.server_side_cursors = True self.paramstyle = "qmark" if self.dbapi is not None: if self._dbapi_version < mariadb_cpy_minimum_version: diff --git a/lib/sqlalchemy/dialects/mysql/mysqldb.py b/lib/sqlalchemy/dialects/mysql/mysqldb.py index 03c1779c3b..b20e061fb5 100644 --- a/lib/sqlalchemy/dialects/mysql/mysqldb.py +++ b/lib/sqlalchemy/dialects/mysql/mysqldb.py @@ -88,9 +88,8 @@ class MySQLDialect_mysqldb(MySQLDialect): statement_compiler = MySQLCompiler_mysqldb preparer = MySQLIdentifierPreparer_mysqldb - def __init__(self, server_side_cursors=False, **kwargs): + def __init__(self, **kwargs): super(MySQLDialect_mysqldb, self).__init__(**kwargs) - self.server_side_cursors = server_side_cursors self._mysql_dbapi_version = ( self._parse_dbapi_version(self.dbapi.__version__) if self.dbapi is not None and hasattr(self.dbapi, "__version__") diff --git a/lib/sqlalchemy/dialects/mysql/pymysql.py b/lib/sqlalchemy/dialects/mysql/pymysql.py index e4d5d6206e..7d7770105b 100644 --- a/lib/sqlalchemy/dialects/mysql/pymysql.py +++ b/lib/sqlalchemy/dialects/mysql/pymysql.py @@ -44,10 +44,6 @@ class MySQLDialect_pymysql(MySQLDialect_mysqldb): supports_unicode_statements = True supports_unicode_binds = True - def __init__(self, server_side_cursors=False, **kwargs): - super(MySQLDialect_pymysql, self).__init__(**kwargs) - self.server_side_cursors = server_side_cursors - @langhelpers.memoized_property def supports_server_side_cursors(self): try: diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py index eb87249b40..780e238449 100644 --- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py +++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py @@ -723,10 +723,6 @@ class PGDialect_asyncpg(PGDialect): }, ) - def __init__(self, server_side_cursors=False, **kwargs): - PGDialect.__init__(self, **kwargs) - self.server_side_cursors = server_side_cursors - @util.memoized_property def _dbapi_version(self): if self.dbapi and hasattr(self.dbapi, "__version__"): diff --git a/lib/sqlalchemy/dialects/postgresql/base.py b/lib/sqlalchemy/dialects/postgresql/base.py index 3ef87620fd..5e81586b49 100644 --- a/lib/sqlalchemy/dialects/postgresql/base.py +++ b/lib/sqlalchemy/dialects/postgresql/base.py @@ -112,6 +112,34 @@ The CREATE TABLE for the above :class:`_schema.Table` object would be: PRIMARY KEY (id) ) +.. _postgresql_ss_cursors: + +Server Side Cursors +------------------- + +Server-side cursor support is available for the psycopg2, asyncpg +dialects and may also be available in others. + +Server side cursors are enabled on a per-statement basis by using the +:paramref:`.Connection.execution_options.stream_results` connection execution +option:: + + with engine.connect() as conn: + result = conn.execution_options(stream_resuls=True).execute(text("select * from table")) + +Note that some kinds of SQL statements may not be supported with +server side cursors; generally, only SQL statements that return rows should be +used with this option. + +.. deprecated:: 1.4 The dialect-level server_side_cursors flag is deprecated + and will be removed in a future release. Please use the + :paramref:`_engine.Connection.stream_results` execution option for + unbuffered cursor support. + +.. seealso:: + + :ref:`engine_stream_results` + .. _postgresql_isolation_level: Transaction Isolation Level @@ -1075,7 +1103,8 @@ E.g.:: ) -""" +""" # noqa E501 + from collections import defaultdict import datetime as dt import re diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg2.py b/lib/sqlalchemy/dialects/postgresql/psycopg2.py index 3cc62fc93a..91576c4d20 100644 --- a/lib/sqlalchemy/dialects/postgresql/psycopg2.py +++ b/lib/sqlalchemy/dialects/postgresql/psycopg2.py @@ -17,18 +17,6 @@ psycopg2 Connect Arguments psycopg2-specific keyword arguments which are accepted by :func:`_sa.create_engine()` are: -* ``server_side_cursors``: Enable the usage of "server side cursors" for SQL - statements which support this feature. What this essentially means from a - psycopg2 point of view is that the cursor is created using a name, e.g. - ``connection.cursor('some name')``, which has the effect that result rows - are not immediately pre-fetched and buffered after statement execution, but - are instead left on the server and only retrieved as needed. SQLAlchemy's - :class:`~sqlalchemy.engine.CursorResult` uses special row-buffering - behavior when this feature is enabled, such that groups of 100 rows at a - time are fetched over the wire to reduce conversational overhead. - Note that the :paramref:`.Connection.execution_options.stream_results` - execution option is a more targeted - way of enabling this mode on a per-execution basis. * ``use_native_unicode``: Enable the usage of Psycopg2 "native unicode" mode per connection. True by default. @@ -129,8 +117,7 @@ in addition to those not specific to DBAPIs: * ``stream_results`` - Enable or disable usage of psycopg2 server side cursors - this feature makes use of "named" cursors in combination with special result handling methods so that result rows are not fully buffered. - If ``None`` or not set, the ``server_side_cursors`` option of the - :class:`_engine.Engine` is used. + Defaults to False, meaning cursors are buffered by default. * ``max_row_buffer`` - when using ``stream_results``, an integer value that specifies the maximum number of rows to buffer at a time. This is @@ -691,7 +678,6 @@ class PGDialect_psycopg2(PGDialect): def __init__( self, - server_side_cursors=False, use_native_unicode=True, client_encoding=None, use_native_hstore=True, @@ -702,7 +688,6 @@ class PGDialect_psycopg2(PGDialect): **kwargs ): PGDialect.__init__(self, **kwargs) - self.server_side_cursors = server_side_cursors self.use_native_unicode = use_native_unicode if not use_native_hstore: self._has_native_hstore = False diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index afab8e7b4e..55266fae7c 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -307,8 +307,13 @@ class Connection(Connectable): :param stream_results: Available on: Connection, statement. Indicate to the dialect that results should be "streamed" and not pre-buffered, if possible. This is a limitation - of many DBAPIs. The flag is currently understood only by the - psycopg2, mysqldb and pymysql dialects. + of many DBAPIs. The flag is currently understood within a subset + of dialects within the PostgreSQL and MySQL categories, and + may be supported by other third party dialects as well. + + .. seealso:: + + :ref:`engine_stream_results` :param schema_translate_map: Available on: Connection, Engine. A dictionary mapping schema names to schema names, that will be diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index 9d2eaf6063..ff29c3b9dd 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -144,6 +144,8 @@ class DefaultDialect(interfaces.Dialect): supports_server_side_cursors = False + server_side_cursors = False + # extra record-level locking features (#4860) supports_for_update_of = False @@ -235,6 +237,14 @@ class DefaultDialect(interfaces.Dialect): "Applications should work with result column names in a case " "sensitive fashion.", ), + server_side_cursors=( + "1.4", + "The :paramref:`_sa.create_engine.server_side_cursors` parameter " + "is deprecated and will be removed in a future release. Please " + "use the " + ":paramref:`_engine.Connection.execution_options.stream_results` " + "parameter.", + ), ) def __init__( self, @@ -250,6 +260,7 @@ class DefaultDialect(interfaces.Dialect): # int() is because the @deprecated_params decorator cannot accommodate # the direct reference to the "NO_LINTING" object compiler_linting=int(compiler.NO_LINTING), + server_side_cursors=False, **kwargs ): @@ -259,6 +270,14 @@ class DefaultDialect(interfaces.Dialect): % self.name ) + if server_side_cursors: + if not self.supports_server_side_cursors: + raise exc.ArgumentError( + "Dialect %s does not support server side cursors" % self + ) + else: + self.server_side_cursors = True + self.convert_unicode = convert_unicode self.encoding = encoding self.positional = False @@ -1189,6 +1208,7 @@ class DefaultExecutionContext(interfaces.ExecutionContext): return False if self.dialect.server_side_cursors: + # this is deprecated use_server_side = self.execution_options.get( "stream_results", True ) and ( @@ -1232,7 +1252,10 @@ class DefaultExecutionContext(interfaces.ExecutionContext): return self.create_server_side_cursor() else: self._is_server_side = False - return self._dbapi_connection.cursor() + return self.create_default_cursor() + + def create_default_cursor(self): + return self._dbapi_connection.cursor() def create_server_side_cursor(self): raise NotImplementedError() diff --git a/lib/sqlalchemy/orm/query.py b/lib/sqlalchemy/orm/query.py index 26a5a89c78..91a638e4b5 100644 --- a/lib/sqlalchemy/orm/query.py +++ b/lib/sqlalchemy/orm/query.py @@ -803,7 +803,7 @@ class Query( .. seealso:: - :meth:`_query.Query.enable_eagerloads` + :ref:`engine_stream_results` """ self.load_options += {"_yield_per": count} @@ -1528,6 +1528,8 @@ class Query( .. seealso:: + :ref:`engine_stream_results` + :meth:`_query.Query.get_execution_options` """ diff --git a/lib/sqlalchemy/testing/suite/test_results.py b/lib/sqlalchemy/testing/suite/test_results.py index 6d28a207ea..1c1b20cf01 100644 --- a/lib/sqlalchemy/testing/suite/test_results.py +++ b/lib/sqlalchemy/testing/suite/test_results.py @@ -236,15 +236,28 @@ class ServerSideCursorsTest( elif self.engine.dialect.driver == "mysqldb": sscursor = __import__("MySQLdb.cursors").cursors.SSCursor return isinstance(cursor, sscursor) + elif self.engine.dialect.driver == "mariadbconnector": + return not cursor.buffered elif self.engine.dialect.driver == "asyncpg": return cursor.server_side else: return False def _fixture(self, server_side_cursors): - self.engine = engines.testing_engine( - options={"server_side_cursors": server_side_cursors} - ) + if server_side_cursors: + with testing.expect_deprecated( + "The create_engine.server_side_cursors parameter is " + "deprecated and will be removed in a future release. " + "Please use the Connection.execution_options.stream_results " + "parameter." + ): + self.engine = engines.testing_engine( + options={"server_side_cursors": server_side_cursors} + ) + else: + self.engine = engines.testing_engine( + options={"server_side_cursors": server_side_cursors} + ) return self.engine def tearDown(self): diff --git a/test/dialect/mysql/test_dialect.py b/test/dialect/mysql/test_dialect.py index c3bd94ffad..a555e53960 100644 --- a/test/dialect/mysql/test_dialect.py +++ b/test/dialect/mysql/test_dialect.py @@ -227,7 +227,14 @@ class DialectTest(fixtures.TestBase): )[1] assert "raise_on_warnings" not in kw - @testing.only_on(["mysql", "mariadb"]) + @testing.only_on( + [ + "mysql+mysqldb", + "mysql+pymysql", + "mariadb+mysqldb", + "mariadb+pymysql", + ] + ) def test_random_arg(self): dialect = testing.db.dialect kw = dialect.create_connect_args( @@ -235,9 +242,14 @@ class DialectTest(fixtures.TestBase): )[1] eq_(kw["foo"], "true") - @testing.only_on(["mysql", "mariadb"]) - @testing.skip_if("mysql+mysqlconnector", "totally broken for the moment") - @testing.fails_on("mysql+oursql", "unsupported") + @testing.only_on( + [ + "mysql+mysqldb", + "mysql+pymysql", + "mariadb+mysqldb", + "mariadb+pymysql", + ] + ) def test_special_encodings(self): for enc in ["utf8mb4", "utf8"]: @@ -487,7 +499,9 @@ class ExecutionTest(fixtures.TestBase): assert isinstance(d, datetime.datetime) -class AutocommitTextTest(test_execute.AutocommitTextTest): +class AutocommitTextTest( + test_execute.AutocommitKeywordFixture, fixtures.TestBase +): __only_on__ = "mysql", "mariadb" def test_load_data(self): diff --git a/test/dialect/mysql/test_types.py b/test/dialect/mysql/test_types.py index 60c4f0ec06..8983522c14 100644 --- a/test/dialect/mysql/test_types.py +++ b/test/dialect/mysql/test_types.py @@ -852,6 +852,15 @@ class EnumSetTest( ) enum_table.create(connection) + assert_raises( + exc.DBAPIError, + enum_table.insert().execute, + e1=None, + e2=None, + e3=None, + e4=None, + ) + assert enum_table.c.e2generic.type.validate_strings assert_raises( diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index 61e25488e1..3a4b58df6d 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -3103,9 +3103,7 @@ class DialectEventTest(fixtures.TestBase): eq_(conn.info["boom"], "one") -class AutocommitTextTest(fixtures.TestBase): - __backend__ = True - +class AutocommitKeywordFixture(object): def _test_keyword(self, keyword, expected=True): dbapi = Mock( connect=Mock( @@ -3122,11 +3120,18 @@ class AutocommitTextTest(fixtures.TestBase): with engine.connect() as conn: conn.exec_driver_sql("%s something table something" % keyword) + for _call in dbapi.connect().mock_calls: + _call.kwargs.clear() + if expected: eq_(dbapi.connect().mock_calls, [call.cursor(), call.commit()]) else: eq_(dbapi.connect().mock_calls, [call.cursor()]) + +class AutocommitTextTest(AutocommitKeywordFixture, fixtures.TestBase): + __backend__ = True + def test_update(self): self._test_keyword("UPDATE") -- 2.47.3