This allows to skip buffering of the results on the client side, e.g.
the following snippet:
table = sa.Table(
'testtbl', sa.MetaData(),
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('a', sa.Integer),
sa.Column('b', sa.String(512))
)
table.create(eng, checkfirst=True)
with eng.connect() as conn:
result = conn.execute(table.select().limit(1)).fetchone()
if result is None:
for _ in range(1000):
conn.execute(
table.insert(),
[{'a': random.randint(1, 100000),
'b': ''.join(random.choice(string.ascii_letters) for _ in range(100))}
for _ in range(1000)]
)
with eng.connect() as conn:
for row in conn.execution_options(stream_results=True).execute(table.select()):
pass
now uses ~23 MB of memory instead of ~327 MB on CPython 3.5.2 and
PyMySQL 0.7.9.
psycopg2 implementation and execution options (stream_results,
server_side_cursors) are reused.
Change-Id: I4dc23ce3094f027bdff51b896b050361991c62e2
strategy would cause backrefs and/or back_populates options to be
ignored.
+ .. change::
+ :tags: feature, mysql
+
+ Added support for server side cursors to the mysqlclient and
+ pymysql dialects. This feature is available via the
+ :paramref:`.Connection.execution_options.stream_results` flag as well
+ as the ``server_side_cursors=True`` dialect argument in the
+ same way that it has been for psycopg2 on Postgresql. Pull request
+ courtesy Roman Podoliaka.
+
.. change::
:tags: bug, mysql
:tickets: 3841
Column('id', Integer, primary_key=True)
)
+.. _mysql_ss_cursors:
+
+Server Side Cursors
+-------------------
+
+Server-side cursor support is available for the MySQLdb and PyMySQL dialects.
+From a MySQL point of view this means that the ``MySQLdb.cursors.SSCursor`` or
+``pymysql.cursors.SSCursor`` class is used when building up the cursor which
+will receive results. The most typical way of invoking this feature is via the
+:paramref:`.Connection.execution_options.stream_results` connection execution
+option. Server side cursors can also be enabled for all SELECT statements
+unconditionally by passing ``server_side_cursors=True`` to
+:func:`.create_engine`.
+
+.. versionadded:: 1.1.4 - added server-side cursor support.
+
.. _mysql_unicode:
Unicode
def should_autocommit_text(self, statement):
return AUTOCOMMIT_RE.match(statement)
+ def create_server_side_cursor(self):
+ if self.dialect.supports_server_side_cursors:
+ return self._dbapi_connection.cursor(self.dialect._sscursor)
+ else:
+ raise NotImplementedError()
+
class MySQLCompiler(compiler.SQLCompiler):
mysql+mysqldb://root@/<dbname>?unix_socket=/cloudsql/<projectid>:<instancename>
+Server Side Cursors
+-------------------
+
+The mysqldb dialect supports server-side cursors. See :ref:`mysql_ss_cursors`.
+
"""
from .base import (MySQLDialect, MySQLExecutionContext,
statement_compiler = MySQLCompiler_mysqldb
preparer = MySQLIdentifierPreparer_mysqldb
+ def __init__(self, server_side_cursors=False, **kwargs):
+ super(MySQLDialect_mysqldb, self).__init__(**kwargs)
+ self.server_side_cursors = server_side_cursors
+
+ @util.langhelpers.memoized_property
+ def supports_server_side_cursors(self):
+ try:
+ cursors = __import__('MySQLdb.cursors').cursors
+ self._sscursor = cursors.SSCursor
+ return True
+ except (ImportError, AttributeError):
+ return False
+
@classmethod
def dbapi(cls):
return __import__('MySQLdb')
"""
from .mysqldb import MySQLDialect_mysqldb
-from ...util import py3k
+from ...util import langhelpers, py3k
class MySQLDialect_pymysql(MySQLDialect_mysqldb):
supports_unicode_statements = True
supports_unicode_binds = True
+ def __init__(self, server_side_cursors=False, **kwargs):
+ super(MySQLDialect_pymysql, self).__init__(**kwargs)
+ self.server_side_cursors = server_side_cursors
+
+ @langhelpers.memoized_property
+ def supports_server_side_cursors(self):
+ try:
+ cursors = __import__('pymysql.cursors').cursors
+ self._sscursor = cursors.SSCursor
+ return True
+ except (ImportError, AttributeError):
+ return False
+
@classmethod
def dbapi(cls):
return __import__('pymysql')
:class:`~sqlalchemy.engine.ResultProxy` uses special row-buffering
behavior when this feature is enabled, such that groups of 100 rows at a
time are fetched over the wire to reduce conversational overhead.
- Note that the ``stream_results=True`` execution option is a more targeted
+ Note that the :paramref:`.Connection.execution_options.stream_results`
+ execution option is a more targeted
way of enabling this mode on a per-execution basis.
* ``use_native_unicode``: Enable the usage of Psycopg2 "native unicode" mode
per connection. True by default.
return value
return process
-# When we're handed literal SQL, ensure it's a SELECT query. Since
-# 8.3, combining cursors and "FOR UPDATE" has been fine.
-SERVER_SIDE_CURSOR_RE = re.compile(
- r'\s*SELECT',
- re.I | re.UNICODE)
_server_side_id = util.counter()
class PGExecutionContext_psycopg2(PGExecutionContext):
- def create_cursor(self):
- # TODO: coverage for server side cursors + select.for_update()
-
- if self.dialect.server_side_cursors:
- is_server_side = \
- self.execution_options.get('stream_results', True) and (
- (self.compiled and isinstance(self.compiled.statement,
- expression.Selectable)
- or
- (
- (not self.compiled or
- isinstance(self.compiled.statement,
- expression.TextClause))
- and self.statement and SERVER_SIDE_CURSOR_RE.match(
- self.statement))
- )
- )
- else:
- is_server_side = \
- self.execution_options.get('stream_results', False)
-
- self.__is_server_side = is_server_side
- if is_server_side:
- # use server-side cursors:
- # http://lists.initd.org/pipermail/psycopg/2007-January/005251.html
- ident = "c_%s_%s" % (hex(id(self))[2:],
- hex(_server_side_id())[2:])
- return self._dbapi_connection.cursor(ident)
- else:
- return self._dbapi_connection.cursor()
+ def create_server_side_cursor(self):
+ # use server-side cursors:
+ # http://lists.initd.org/pipermail/psycopg/2007-January/005251.html
+ ident = "c_%s_%s" % (hex(id(self))[2:],
+ hex(_server_side_id())[2:])
+ return self._dbapi_connection.cursor(ident)
def get_result_proxy(self):
# TODO: ouch
if logger.isEnabledFor(logging.INFO):
self._log_notices(self.cursor)
- if self.__is_server_side:
+ if self._is_server_side:
return _result.BufferedRowResultProxy(self)
else:
return _result.ResultProxy(self)
if util.py2k:
supports_unicode_statements = False
+ supports_server_side_cursors = True
+
default_paramstyle = 'pyformat'
# set to true based on psycopg2 version
supports_sane_multi_rowcount = False
Indicate to the dialect that results should be
"streamed" and not pre-buffered, if possible. This is a limitation
of many DBAPIs. The flag is currently understood only by the
- psycopg2 dialect.
+ psycopg2, mysqldb and pymysql dialects.
:param schema_translate_map: Available on: Connection, Engine.
A dictionary mapping schema names to schema names, that will be
r'\s*(?:UPDATE|INSERT|CREATE|DELETE|DROP|ALTER)',
re.I | re.UNICODE)
+# When we're handed literal SQL, ensure it's a SELECT query
+SERVER_SIDE_CURSOR_RE = re.compile(
+ r'\s*SELECT',
+ re.I | re.UNICODE)
+
class DefaultDialect(interfaces.Dialect):
"""Default implementation of Dialect"""
supports_empty_insert = True
supports_multivalues_insert = False
+ supports_server_side_cursors = False
+
server_version_info = None
construct_arguments = None
def should_autocommit_text(self, statement):
return AUTOCOMMIT_REGEXP.match(statement)
+ def _use_server_side_cursor(self):
+ if not self.dialect.supports_server_side_cursors:
+ return False
+
+ if self.dialect.server_side_cursors:
+ use_server_side = \
+ self.execution_options.get('stream_results', True) and (
+ (self.compiled and isinstance(self.compiled.statement,
+ expression.Selectable)
+ or
+ (
+ (not self.compiled or
+ isinstance(self.compiled.statement,
+ expression.TextClause))
+ and self.statement and SERVER_SIDE_CURSOR_RE.match(
+ self.statement))
+ )
+ )
+ else:
+ use_server_side = \
+ self.execution_options.get('stream_results', False)
+
+ return use_server_side
+
def create_cursor(self):
- return self._dbapi_connection.cursor()
+ if self._use_server_side_cursor():
+ self._is_server_side = True
+ return self.create_server_side_cursor()
+ else:
+ self._is_server_side = False
+ return self._dbapi_connection.cursor()
+
+ def create_server_side_cursor(self):
+ raise NotImplementedError()
def pre_exec(self):
pass
pass
def get_result_proxy(self):
- return result.ResultProxy(self)
+ if self._is_server_side:
+ return result.BufferedRowResultProxy(self)
+ else:
+ return result.ResultProxy(self)
@property
def rowcount(self):
:meth:`~sqlalchemy.orm.query.Query.yield_per` will set the
``stream_results`` execution option to True, currently
this is only understood by
- :mod:`~sqlalchemy.dialects.postgresql.psycopg2` dialect
+ :mod:`~sqlalchemy.dialects.postgresql.psycopg2`,
+ :mod:`~sqlalchemy.dialects.mysql.mysqldb` and
+ :mod:`~sqlalchemy.dialects.mysql.pymysql` dialects
which will stream results using server side cursors
instead of pre-buffer all rows for this query. Other
DBAPIs **pre-buffer all rows** before making them
return exclusions.closed()
+ @property
+ def server_side_cursors(self):
+ """Target dialect must support server side cursors."""
+
+ return exclusions.only_if([
+ lambda config: config.db.dialect.supports_server_side_cursors
+ ], "no server side cursors support")
+
@property
def sequences(self):
"""Target database must support SEQUENCEs."""
from .. import exclusions
from ..assertions import eq_
from .. import engines
+from ... import testing
-from sqlalchemy import Integer, String, select, util, sql, DateTime
+from sqlalchemy import Integer, String, select, util, sql, DateTime, text, func
import datetime
from ..schema import Table, Column
),
[(5, 15), (7, 15), (9, 15), (11, 15)]
)
+
+
+class ServerSideCursorsTest(fixtures.TestBase, testing.AssertsExecutionResults):
+
+ __requires__ = ('server_side_cursors', )
+
+ __backend__ = True
+
+ def _is_server_side(self, cursor):
+ if self.engine.url.drivername == 'postgresql':
+ return cursor.name
+ elif self.engine.url.drivername == 'mysql':
+ sscursor = __import__('MySQLdb.cursors').cursors.SSCursor
+ return isinstance(cursor, sscursor)
+ elif self.engine.url.drivername == 'mysql+pymysql':
+ sscursor = __import__('pymysql.cursors').cursors.SSCursor
+ return isinstance(cursor, sscursor)
+ else:
+ return False
+
+ def _fixture(self, server_side_cursors):
+ self.engine = engines.testing_engine(
+ options={'server_side_cursors': server_side_cursors}
+ )
+ return self.engine
+
+ def tearDown(self):
+ engines.testing_reaper.close_all()
+ self.engine.dispose()
+
+ def test_global_string(self):
+ engine = self._fixture(True)
+ result = engine.execute('select 1')
+ assert self._is_server_side(result.cursor)
+
+ def test_global_text(self):
+ engine = self._fixture(True)
+ result = engine.execute(text('select 1'))
+ assert self._is_server_side(result.cursor)
+
+ def test_global_expr(self):
+ engine = self._fixture(True)
+ result = engine.execute(select([1]))
+ assert self._is_server_side(result.cursor)
+
+ def test_global_off_explicit(self):
+ engine = self._fixture(False)
+ result = engine.execute(text('select 1'))
+
+ # It should be off globally ...
+
+ assert not self._is_server_side(result.cursor)
+
+ def test_stmt_option(self):
+ engine = self._fixture(False)
+
+ s = select([1]).execution_options(stream_results=True)
+ result = engine.execute(s)
+
+ # ... but enabled for this one.
+
+ assert self._is_server_side(result.cursor)
+
+ def test_conn_option(self):
+ engine = self._fixture(False)
+
+ # and this one
+ result = \
+ engine.connect().execution_options(stream_results=True).\
+ execute('select 1'
+ )
+ assert self._is_server_side(result.cursor)
+
+ def test_stmt_enabled_conn_option_disabled(self):
+ engine = self._fixture(False)
+
+ s = select([1]).execution_options(stream_results=True)
+
+ # not this one
+ result = \
+ engine.connect().execution_options(stream_results=False).\
+ execute(s)
+ assert not self._is_server_side(result.cursor)
+
+ def test_stmt_option_disabled(self):
+ engine = self._fixture(True)
+ s = select([1]).execution_options(stream_results=False)
+ result = engine.execute(s)
+ assert not self._is_server_side(result.cursor)
+
+ def test_aliases_and_ss(self):
+ engine = self._fixture(False)
+ s1 = select([1]).execution_options(stream_results=True).alias()
+ result = engine.execute(s1)
+ assert self._is_server_side(result.cursor)
+
+ # s1's options shouldn't affect s2 when s2 is used as a
+ # from_obj.
+ s2 = select([1], from_obj=s1)
+ result = engine.execute(s2)
+ assert not self._is_server_side(result.cursor)
+
+ def test_for_update_expr(self):
+ engine = self._fixture(True)
+ s1 = select([1], for_update=True)
+ result = engine.execute(s1)
+ assert self._is_server_side(result.cursor)
+
+ def test_for_update_string(self):
+ engine = self._fixture(True)
+ result = engine.execute('SELECT 1 FOR UPDATE')
+ assert self._is_server_side(result.cursor)
+
+ def test_text_no_ss(self):
+ engine = self._fixture(False)
+ s = text('select 42')
+ result = engine.execute(s)
+ assert not self._is_server_side(result.cursor)
+
+ def test_text_ss_option(self):
+ engine = self._fixture(False)
+ s = text('select 42').execution_options(stream_results=True)
+ result = engine.execute(s)
+ assert self._is_server_side(result.cursor)
+
+ @testing.provide_metadata
+ def test_roundtrip(self):
+ md = self.metadata
+
+ engine = self._fixture(True)
+ test_table = Table('test_table', md,
+ Column('id', Integer, primary_key=True),
+ Column('data', String(50)))
+ test_table.create(checkfirst=True)
+ test_table.insert().execute(data='data1')
+ test_table.insert().execute(data='data2')
+ eq_(test_table.select().execute().fetchall(), [(1, 'data1'
+ ), (2, 'data2')])
+ test_table.update().where(
+ test_table.c.id == 2).values(
+ data=test_table.c.data +
+ ' updated').execute()
+ eq_(test_table.select().execute().fetchall(),
+ [(1, 'data1'), (2, 'data2 updated')])
+ test_table.delete().execute()
+ eq_(select([func.count('*')]).select_from(test_table).scalar(), 0)
(33, 'd4')])
-class ServerSideCursorsTest(fixtures.TestBase, AssertsExecutionResults):
-
- __requires__ = 'psycopg2_compatibility',
-
- def _fixture(self, server_side_cursors):
- self.engine = engines.testing_engine(
- options={'server_side_cursors': server_side_cursors}
- )
- return self.engine
-
- def tearDown(self):
- engines.testing_reaper.close_all()
- self.engine.dispose()
-
- def test_global_string(self):
- engine = self._fixture(True)
- result = engine.execute('select 1')
- assert result.cursor.name
-
- def test_global_text(self):
- engine = self._fixture(True)
- result = engine.execute(text('select 1'))
- assert result.cursor.name
-
- def test_global_expr(self):
- engine = self._fixture(True)
- result = engine.execute(select([1]))
- assert result.cursor.name
-
- def test_global_off_explicit(self):
- engine = self._fixture(False)
- result = engine.execute(text('select 1'))
-
- # It should be off globally ...
-
- assert not result.cursor.name
-
- def test_stmt_option(self):
- engine = self._fixture(False)
-
- s = select([1]).execution_options(stream_results=True)
- result = engine.execute(s)
-
- # ... but enabled for this one.
-
- assert result.cursor.name
-
- def test_conn_option(self):
- engine = self._fixture(False)
-
- # and this one
- result = \
- engine.connect().execution_options(stream_results=True).\
- execute('select 1'
- )
- assert result.cursor.name
-
- def test_stmt_enabled_conn_option_disabled(self):
- engine = self._fixture(False)
-
- s = select([1]).execution_options(stream_results=True)
-
- # not this one
- result = \
- engine.connect().execution_options(stream_results=False).\
- execute(s)
- assert not result.cursor.name
-
- def test_stmt_option_disabled(self):
- engine = self._fixture(True)
- s = select([1]).execution_options(stream_results=False)
- result = engine.execute(s)
- assert not result.cursor.name
-
- def test_aliases_and_ss(self):
- engine = self._fixture(False)
- s1 = select([1]).execution_options(stream_results=True).alias()
- result = engine.execute(s1)
- assert result.cursor.name
-
- # s1's options shouldn't affect s2 when s2 is used as a
- # from_obj.
- s2 = select([1], from_obj=s1)
- result = engine.execute(s2)
- assert not result.cursor.name
-
- def test_for_update_expr(self):
- engine = self._fixture(True)
- s1 = select([1], for_update=True)
- result = engine.execute(s1)
- assert result.cursor.name
-
- def test_for_update_string(self):
- engine = self._fixture(True)
- result = engine.execute('SELECT 1 FOR UPDATE')
- assert result.cursor.name
-
- def test_text_no_ss(self):
- engine = self._fixture(False)
- s = text('select 42')
- result = engine.execute(s)
- assert not result.cursor.name
-
- def test_text_ss_option(self):
- engine = self._fixture(False)
- s = text('select 42').execution_options(stream_results=True)
- result = engine.execute(s)
- assert result.cursor.name
-
- @testing.provide_metadata
- def test_roundtrip(self):
- md = self.metadata
-
- engine = self._fixture(True)
- test_table = Table('test_table', md,
- Column('id', Integer, primary_key=True),
- Column('data', String(50)))
- test_table.create(checkfirst=True)
- test_table.insert().execute(data='data1')
- nextid = engine.execute(Sequence('test_table_id_seq'))
- test_table.insert().execute(id=nextid, data='data2')
- eq_(test_table.select().execute().fetchall(), [(1, 'data1'
- ), (2, 'data2')])
- test_table.update().where(
- test_table.c.id == 2).values(
- data=test_table.c.data +
- ' updated').execute()
- eq_(test_table.select().execute().fetchall(),
- [(1, 'data1'), (2, 'data2 updated')])
- test_table.delete().execute()
- eq_(select([func.count('*')]).select_from(test_table).scalar(), 0)
-
-
class MatchTest(fixtures.TestBase, AssertsCompiledSQL):
__only_on__ = 'postgresql >= 8.3'