From abccc0624228def744b0382e84f01cf95e0d3aed Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Sat, 16 Jan 2010 22:44:04 +0000 Subject: [PATCH] - added "statement_options()" to Query, to so options can be passed to the resulting statement. Currently only Select-statements have these options, and the only option used is "stream_results", and the only dialect which knows "stream_results" is psycopg2. - Query.yield_per() will set the "stream_results" statement option automatically. - Added "statement_options()" to Selects, which set statement specific options. These enable e.g. dialect specific options such as whether to enable using server side cursors, etc. - The psycopg2 now respects the statement option "stream_results". This option overrides the connection setting "server_side_cursors". If true, server side cursors will be used for the statement. If false, they will not be used, even if "server_side_cursors" is true on the connection. [ticket:1619] - added a "frozendict" from http://code.activestate.com/recipes/414283/, adding more default collections as immutable class vars on Query, Insert, Select --- CHANGES | 20 ++++++ .../dialects/postgresql/psycopg2.py | 40 +++++++---- lib/sqlalchemy/engine/default.py | 4 +- lib/sqlalchemy/orm/query.py | 40 ++++++++--- lib/sqlalchemy/schema.py | 3 +- lib/sqlalchemy/sql/expression.py | 65 +++++++++++------ lib/sqlalchemy/util.py | 20 +++++- test/aaa_profiling/test_orm.py | 2 +- test/dialect/test_postgresql.py | 72 +++++++++++++++++++ test/orm/test_query.py | 27 +++++++ test/sql/test_generative.py | 21 ++++++ 11 files changed, 267 insertions(+), 47 deletions(-) diff --git a/CHANGES b/CHANGES index 68d497ca78..aa7001a821 100644 --- a/CHANGES +++ b/CHANGES @@ -176,6 +176,15 @@ CHANGES - The version_id_col feature on mapper() will raise a warning when used with dialects that don't support "rowcount" adequately. [ticket:1569] + + - added "statement_options()" to Query, to so options can be + passed to the resulting statement. Currently only + Select-statements have these options, and the only option + used is "stream_results", and the only dialect which knows + "stream_results" is psycopg2. + + - Query.yield_per() will set the "stream_results" statement + option automatically. - Deprecated or removed: * 'allow_null_pks' flag on mapper() is deprecated. It does @@ -294,6 +303,10 @@ CHANGES instead simply not querying, or modifying the criterion as appropriate for more complex situations. [ticket:1628] + + - Added "statement_options()" to Selects, which set statement + specific options. These enable e.g. dialect specific options + such as whether to enable using server side cursors, etc. - Deprecated or removed: * "scalar" flag on select() is removed, use @@ -596,6 +609,13 @@ CHANGES - postgresql dialect can properly detect pg "devel" version strings, i.e. "8.5devel" [ticket:1636] + + - The psycopg2 now respects the statement option + "stream_results". This option overrides the connection setting + "server_side_cursors". If true, server side cursors will be + used for the statement. If false, they will not be used, even + if "server_side_cursors" is true on the + connection. [ticket:1619] - mysql - New dialects: oursql, a new native dialect, diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg2.py b/lib/sqlalchemy/dialects/postgresql/psycopg2.py index a46fdbddbf..7733aadcd0 100644 --- a/lib/sqlalchemy/dialects/postgresql/psycopg2.py +++ b/lib/sqlalchemy/dialects/postgresql/psycopg2.py @@ -35,6 +35,15 @@ Transactions The psycopg2 dialect fully supports SAVEPOINT and two-phase commit operations. +Statement options +----------------- + +The following statement options are respected: + +* *stream_results* - Enable or disable usage of server side cursors for the SELECT-statement. + If *None* or not set, the *server_side_cursors* option of the connection is used. If + auto-commit is enabled, the option is ignored. + """ import decimal, random, re @@ -93,8 +102,9 @@ class _PGArray(ARRAY): if isinstance(self.item_type, sqltypes.String) and \ self.item_type.convert_unicode: self.item_type.convert_unicode = "force" - -# TODO: filter out 'FOR UPDATE' statements + +# When we're handed literal SQL, ensure it's a SELECT-query. Since +# 8.3, combining cursors and "FOR UPDATE" has been fine. SERVER_SIDE_CURSOR_RE = re.compile( r'\s*SELECT', re.I | re.UNICODE) @@ -102,16 +112,22 @@ SERVER_SIDE_CURSOR_RE = re.compile( class PostgreSQL_psycopg2ExecutionContext(PGExecutionContext): def create_cursor(self): # TODO: coverage for server side cursors + select.for_update() - is_server_side = \ - self.dialect.server_side_cursors and \ - not self.should_autocommit and \ - ((self.compiled and isinstance(self.compiled.statement, expression.Selectable) - and not getattr(self.compiled.statement, 'for_update', False)) \ - or \ - ( - (not self.compiled or isinstance(self.compiled.statement, expression._TextClause)) - and self.statement and SERVER_SIDE_CURSOR_RE.match(self.statement)) - ) + stream_results_option = self.statement_options.get('stream_results') + is_server_side = ( + # Enabled for this statement ... + (stream_results_option or + # ... or enabled for all statements + (self.dialect.server_side_cursors and + # ... and not explicitly disabled for this one. + (stream_results_option or stream_results_option is None)) + ) and ( + # But don't use SS-cursors when autocommit is on ... + (not self.should_autocommit and + self.compiled and isinstance(self.compiled.statement, expression.Selectable)) + or ( + # ... or if it's not even a SELECT. + (not self.compiled or isinstance(self.compiled.statement, expression._TextClause)) + and self.statement and SERVER_SIDE_CURSOR_RE.match(self.statement)))) self.__is_server_side = is_server_side if is_server_side: diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index 1bdeca377e..baa92b5d40 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -223,6 +223,7 @@ class DefaultDialect(base.Dialect): class DefaultExecutionContext(base.ExecutionContext): + statement_options = util.frozendict() def __init__(self, dialect, connection, compiled_sql=None, compiled_ddl=None, statement=None, parameters=None): self.dialect = dialect @@ -249,7 +250,7 @@ class DefaultExecutionContext(base.ExecutionContext): if not compiled.can_execute: raise exc.ArgumentError("Not an executable clause: %s" % compiled) - + self.processors = dict( (key, value) for key, value in ( (compiled.bind_names[bindparam], @@ -268,6 +269,7 @@ class DefaultExecutionContext(base.ExecutionContext): self.isupdate = compiled.isupdate self.isdelete = compiled.isdelete self.should_autocommit = compiled.statement._autocommit + self.statement_options = compiled.statement._statement_options if self.should_autocommit is expression.PARSE_AUTOCOMMIT: self.should_autocommit = self.should_autocommit_text(self.statement) diff --git a/lib/sqlalchemy/orm/query.py b/lib/sqlalchemy/orm/query.py index 7be0680197..444ad37bc6 100644 --- a/lib/sqlalchemy/orm/query.py +++ b/lib/sqlalchemy/orm/query.py @@ -79,14 +79,14 @@ class Query(object): _from_obj = () _filter_aliases = None _from_obj_alias = None - _joinpath = _joinpoint = {} + _joinpath = _joinpoint = util.frozendict() + _statement_options = util.frozendict() + _params = util.frozendict() + _attributes = util.frozendict() + _with_options = () def __init__(self, entities, session=None): self.session = session - - self._with_options = [] - self._params = {} - self._attributes = {} self._polymorphic_adapters = {} self._set_entities(entities) @@ -488,9 +488,17 @@ class Query(object): collections will be cleared for a new load when encountered in a subsequent result batch. + Also note that many DBAPIs do not "stream" results, pre-buffering + all rows before making them available, including mysql-python and + psycopg2. yield_per() will also set the ``stream_results`` statement + option to ``True``, which currently is only understood by psycopg2 + and causes server side cursors to be used. + """ self._yield_per = count - + self._statement_options = self._statement_options.copy() + self._statement_options['stream_results'] = True + def get(self, ident): """Return an instance of the object based on the given identifier, or None if not found. @@ -658,7 +666,7 @@ class Query(object): # most MapperOptions write to the '_attributes' dictionary, # so copy that as well self._attributes = self._attributes.copy() - opts = list(util.flatten_iterator(args)) + opts = tuple(util.flatten_iterator(args)) self._with_options = self._with_options + opts if conditional: for opt in opts: @@ -667,6 +675,21 @@ class Query(object): for opt in opts: opt.process_query(self) + @_generative() + def statement_options(self, **kwargs): + """ Set non-SQL options for the resulting statement, such as dialect-specific options. + + The only option currently understood is ``stream_results=True``, + only used by Psycopg2 to enable "server side cursors". This option + only has a useful effect if used in conjunction with :meth:`~sqlalchemy.orm.query.Query.yield_per()`, + which currently sets ``stream_results`` to ``True`` automatically. + + """ + _statement_options = self._statement_options.copy() + for key, value in kwargs.items(): + _statement_options[key] = value + self._statement_options = _statement_options + @_generative() def with_lockmode(self, mode): """Return a new Query object with the specified locking mode.""" @@ -1915,7 +1938,7 @@ class Query(object): context.adapter = sql_util.ColumnAdapter(inner, equivs) - statement = sql.select([inner] + context.secondary_columns, for_update=for_update, use_labels=labels) + statement = sql.select([inner] + context.secondary_columns, for_update=for_update, use_labels=labels, statement_options=self._statement_options) from_clause = inner for eager_join in eager_joins: @@ -1947,6 +1970,7 @@ class Query(object): for_update=for_update, correlate=False, order_by=context.order_by, + statement_options=self._statement_options, **self._select_args ) diff --git a/lib/sqlalchemy/schema.py b/lib/sqlalchemy/schema.py index 7c9fa58fec..e40b6f5927 100644 --- a/lib/sqlalchemy/schema.py +++ b/lib/sqlalchemy/schema.py @@ -2027,10 +2027,9 @@ class SchemaVisitor(visitors.ClauseVisitor): __traverse_options__ = {'schema_visitor':True} -class DDLElement(expression.ClauseElement): +class DDLElement(expression._Executable, expression.ClauseElement): """Base class for DDL expression constructs.""" - supports_execution = True _autocommit = True target = None diff --git a/lib/sqlalchemy/sql/expression.py b/lib/sqlalchemy/sql/expression.py index 742746cbeb..2dc13ee823 100644 --- a/lib/sqlalchemy/sql/expression.py +++ b/lib/sqlalchemy/sql/expression.py @@ -2178,8 +2178,13 @@ class _TypeClause(ClauseElement): def __init__(self, type): self.type = type +class _Executable(object): + """Mark a ClauseElement as supporting execution.""" -class _TextClause(ClauseElement): + supports_execution = True + _statement_options = util.frozendict() + +class _TextClause(_Executable, ClauseElement): """Represent a literal SQL text fragment. Public constructor is the :func:`text()` function. @@ -2189,7 +2194,6 @@ class _TextClause(ClauseElement): __visit_name__ = 'textclause' _bind_params_regex = re.compile(r'(?