series as well. For changes that are specific to 1.0 with an emphasis
on compatibility concerns, see :doc:`/changelog/migration_10`.
+ .. change::
+ :tags: changed, sql
+
+ The :func:`~.expression.column` and :func:`~.expression.table`
+ constructs are now importable from the "from sqlalchemy" namespace,
+ just like every other Core construct.
+
+ .. change::
+ :tags: changed, sql
+ :tickets: 2992
+
+ The implicit conversion of strings to :func:`.text` constructs
+ when passed to most builder methods of :func:`.select` as
+ well as :class:`.Query` now emits a warning with just the
+ plain string sent. The textual conversion still proceeds normally,
+ however. The only method that accepts a string without a warning
+ are the "label reference" methods like order_by(), group_by();
+ these functions will now at compile time attempt to resolve a single
+ string argument to a column or label expression present in the
+ selectable; if none is located, the expression still renders, but
+ you get the warning again. The rationale here is that the implicit
+ conversion from string to text is more unexpected than not these days,
+ and it is better that the user send more direction to the Core / ORM
+ when passing a raw string as to what direction should be taken.
+ Core/ORM tutorials have been updated to go more in depth as to how text
+ is handled.
+
+ .. seealso::
+
+ :ref:`migration_2992`
+
+
.. change::
:tags: feature, engine
:tickets: 3178
:ref:`migration_3177`
.. change::
- :tags: change, orm
+ :tags: changed, orm
The ``proc()`` callable passed to the ``create_row_processor()``
method of custom :class:`.Bundle` classes now accepts only a single
:ref:`bundle_api_change`
.. change::
- :tags: change, orm
+ :tags: changed, orm
Deprecated event hooks removed: ``populate_instance``,
``create_instance``, ``translate_row``, ``append_result``
:ticket:`3061`
+.. _migration_2992:
+
+Warnings emitted when coercing full SQL fragments into text()
+-------------------------------------------------------------
+
+Since SQLAlchemy's inception, there has always been an emphasis on not getting
+in the way of the usage of plain text. The Core and ORM expression systems
+were intended to allow any number of points at which the user can just
+use plain text SQL expressions, not just in the sense that you can send a
+full SQL string to :meth:`.Connection.execute`, but that you can send strings
+with SQL expressions into many functions, such as :meth:`.Select.where`,
+:meth:`.Query.filter`, and :meth:`.Select.order_by`.
+
+Note that by "SQL expressions" we mean a **full fragment of a SQL string**,
+such as::
+
+ # the argument sent to where() is a full SQL expression
+ stmt = select([sometable]).where("somecolumn = 'value'")
+
+and we are **not talking about string arguments**, that is, the normal
+behavior of passing string values that become parameterized::
+
+ # This is a normal Core expression with a string argument -
+ # we aren't talking about this!!
+ stmt = select([sometable]).where(sometable.c.somecolumn == 'value')
+
+The Core tutorial has long featured an example of the use of this technique,
+using a :func:`.select` construct where virtually all components of it
+are specified as straight strings. However, despite this long-standing
+behavior and example, users are apparently surprised that this behavior
+exists, and when asking around the community, I was unable to find any user
+that was in fact *not* surprised that you can send a full string into a method
+like :meth:`.Query.filter`.
+
+So the change here is to encourage the user to qualify textual strings when
+composing SQL that is partially or fully composed from textual fragments.
+When composing a select as below::
+
+ stmt = select(["a", "b"]).where("a = b").select_from("sometable")
+
+The statement is built up normally, with all the same coercions as before.
+However, one will see the following warnings emitted::
+
+ SAWarning: Textual column expression 'a' should be explicitly declared
+ with text('a'), or use column('a') for more specificity
+ (this warning may be suppressed after 10 occurrences)
+
+ SAWarning: Textual column expression 'b' should be explicitly declared
+ with text('b'), or use column('b') for more specificity
+ (this warning may be suppressed after 10 occurrences)
+
+ SAWarning: Textual SQL expression 'a = b' should be explicitly declared
+ as text('a = b') (this warning may be suppressed after 10 occurrences)
+
+ SAWarning: Textual SQL FROM expression 'sometable' should be explicitly
+ declared as text('sometable'), or use table('sometable') for more
+ specificity (this warning may be suppressed after 10 occurrences)
+
+These warnings attempt to show exactly where the issue is by displaying
+the parameters as well as where the string was received.
+The warnings make use of the :ref:`feature_3178` so that parameterized warnings
+can be emitted safely without running out of memory, and as always, if
+one wishes the warnings to be exceptions, the
+`Python Warnings Filter <https://docs.python.org/2/library/warnings.html>`_
+should be used::
+
+ import warnings
+ warnings.simplefilter("error") # all warnings raise an exception
+
+Given the above warnings, our statement works just fine, but
+to get rid of the warnings we would rewrite our statement as follows::
+
+ from sqlalchemy import select, text
+ stmt = select([
+ text("a"),
+ text("b")
+ ]).where(text("a = b")).select_from(text("sometable"))
+
+and as the warnings suggest, we can give our statement more specificity
+about the text if we use :func:`.column` and :func:`.table`::
+
+ from sqlalchemy import select, text, column, table
+
+ stmt = select([column("a"), column("b")]).\\
+ where(text("a = b")).select_from(table("sometable"))
+
+Where note also that :func:`.table` and :func:`.column` can now
+be imported from "sqlalchemy" without the "sql" part.
+
+The behavior here applies to :func:`.select` as well as to key methods
+on :class:`.Query`, including :meth:`.Query.filter`,
+:meth:`.Query.from_statement` and :meth:`.Query.having`.
+
+ORDER BY and GROUP BY are special cases
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+There is one case where usage of a string has special meaning, and as part
+of this change we have enhanced its functionality. When we have a
+:func:`.select` or :class:`.Query` that refers to some column name or named
+label, we might want to GROUP BY and/or ORDER BY known columns or labels::
+
+ stmt = select([
+ user.c.name,
+ func.count(user.c.id).label("id_count")
+ ]).group_by("name").order_by("id_count")
+
+In the above statement we expect to see "ORDER BY id_count", as opposed to a
+re-statement of the function. The string argument given is actively
+matched to an entry in the columns clause during compilation, so the above
+statement would produce as we expect, without warnings::
+
+ SELECT users.name, count(users.id) AS id_count
+ FROM users GROUP BY users.name ORDER BY id_count
+
+However, if we refer to a name that cannot be located, then we get
+the warning again, as below::
+
+ stmt = select([
+ user.c.name,
+ func.count(user.c.id).label("id_count")
+ ]).order_by("some_label")
+
+The output does what we say, but again it warns us::
+
+ SAWarning: Can't resolve label reference 'some_label'; converting to
+ text() (this warning may be suppressed after 10 occurrences)
+
+ SELECT users.name, count(users.id) AS id_count
+ FROM users ORDER BY some_label
+
+The above behavior applies to all those places where we might want to refer
+to a so-called "label reference"; ORDER BY and GROUP BY, but also within an
+OVER clause as well as a DISTINCT ON clause that refers to columns (e.g. the
+Postgresql syntax).
+
+We can still specify any arbitrary expression for ORDER BY or others using
+:func:`.text`::
+
+ stmt = select([users]).order_by(text("some special expression"))
+
+The upshot of the whole change is that SQLAlchemy now would like us
+to tell it when a string is sent that this string is explicitly
+a :func:`.text` construct, or a column, table, etc., and if we use it as a
+label name in an order by, group by, or other expression, SQLAlchemy expects
+that the string resolves to something known, else it should again
+be qualified with :func:`.text` or similar.
+
+:ticket:`2992`
+
.. _migration_yield_per_eager_loading:
Joined/Subquery eager loading explicitly disallowed with yield_per
them as duplicates.
To illustrate, the following test script will show only ten warnings being
-emitted for ten of the parameter sets, out of a total of 1000:
+emitted for ten of the parameter sets, out of a total of 1000::
from sqlalchemy import create_engine, Unicode, select, cast
import random
"postgresql", "mysql", "sqlite", "mssql", \
"oracle", "firebird"]
# tags to sort on inside of sections
-changelog_inner_tag_sort = ["feature", "bug", "moved", "changed", "removed"]
+changelog_inner_tag_sort = ["feature", "changed", "removed", "bug", "moved"]
# how to render changelog links
changelog_render_ticket = "http://www.sqlalchemy.org/trac/ticket/%s"
the less flexibility and ability for manipulation/transformation
the statement will have.
+.. versionchanged:: 1.0.0
+ The :func:`.select` construct emits warnings when string SQL
+ fragments are coerced to :func:`.text`, and :func:`.text` should
+ be used explicitly. See :ref:`migration_2992` for background.
+
.. _sqlexpression_literal_column:
Using More Specific Text with :func:`.table`, :func:`.literal_column`, and :func:`.column`
('ed',)
{stop}[(1, u'ed', 12)]
+.. versionchanged:: 1.0.0
+ The :class:`.Query` construct emits warnings when string SQL
+ fragments are coerced to :func:`.text`, and :func:`.text` should
+ be used explicitly. See :ref:`migration_2992` for background.
+
.. seealso::
:ref:`sqlexpression_text` - Core description of textual segments. The
case,
cast,
collate,
+ column,
delete,
desc,
distinct,
over,
select,
subquery,
+ table,
text,
true,
tuple_,
"Flushing object %s with "
"incompatible polymorphic identity %r; the "
"object may not refresh and/or load correctly",
- state_str(state),
- dict_[polymorphic_key]
+ (state_str(state), dict_[polymorphic_key])
)
self._set_polymorphic_identity = _set_polymorphic_identity
def _adapt_col_list(self, cols):
return [
self._adapt_clause(
- expression._literal_as_text(o),
+ expression._literal_as_label_reference(o),
True, True)
for o in cols
]
"""
for criterion in list(criterion):
- criterion = expression._literal_as_text(criterion)
+ criterion = expression._expression_literal_as_text(criterion)
criterion = self._adapt_clause(criterion, True, True)
"""
- if isinstance(criterion, util.string_types):
- criterion = sql.text(criterion)
+ criterion = expression._expression_literal_as_text(criterion)
if criterion is not None and \
not isinstance(criterion, sql.ClauseElement):
ORM tutorial
"""
- if isinstance(statement, util.string_types):
- statement = sql.text(statement)
+ statement = expression._expression_literal_as_text(statement)
if not isinstance(statement,
(expression.TextClause,
# .with_only_columns() after we have a core select() so that
# we get just "SELECT 1" without any entities.
return sql.exists(self.add_columns('1').with_labels().
- statement.with_only_columns(['1']))
+ statement.with_only_columns([1]))
def count(self):
"""Return a count of rows this Query would return.
def visit_grouping(self, grouping, asfrom=False, **kwargs):
return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")"
+ def visit_label_reference(self, element, **kwargs):
+ selectable = self.stack[-1]['selectable']
+ try:
+ col = selectable._inner_column_dict[element.text]
+ except KeyError:
+ # treat it like text()
+ util.warn_limited(
+ "Can't resolve label reference %r; converting to text()",
+ util.ellipses_string(element.text))
+ return self.process(
+ elements.TextClause._create_text(element.text)
+ )
+ else:
+ kwargs['render_label_as_label'] = col
+ return self.process(col, **kwargs)
+
def visit_label(self, label,
add_to_result_map=None,
within_label_clause=False,
{
'correlate_froms': entry['correlate_froms'],
'iswrapper': toplevel,
- 'asfrom_froms': entry['asfrom_froms']
+ 'asfrom_froms': entry['asfrom_froms'],
+ 'selectable': cs
})
keyword = self.compound_keywords.get(cs.keyword)
new_entry = {
'asfrom_froms': new_correlate_froms,
'iswrapper': iswrapper,
- 'correlate_froms': all_correlate_froms
+ 'correlate_froms': all_correlate_froms,
+ 'selectable': select,
}
self.stack.append(new_entry)
self.stack.append(
{'correlate_froms': set([update_stmt.table]),
"iswrapper": False,
- "asfrom_froms": set([update_stmt.table])})
+ "asfrom_froms": set([update_stmt.table]),
+ "selectable": update_stmt})
self.isupdate = True
def visit_delete(self, delete_stmt, **kw):
self.stack.append({'correlate_froms': set([delete_stmt.table]),
"iswrapper": False,
- "asfrom_froms": set([delete_stmt.table])})
+ "asfrom_froms": set([delete_stmt.table]),
+ "selectable": delete_stmt})
self.isdelete = True
text = "DELETE "
from .annotation import Annotated
import itertools
from .base import Executable, PARSE_AUTOCOMMIT, Immutable, NO_ARG
-from .base import _generative, Generative
+from .base import _generative
+import numbers
import re
import operator
__visit_name__ = 'column'
primary_key = False
foreign_keys = []
- _label = None
+ _label = _columns_clause_label = None
_key_label = key = None
_alt_names = ()
_hide_froms = []
+ # help in those cases where text() is
+ # interpreted in a column expression situation
+ key = _label = _columns_clause_label = None
+
def __init__(
self,
text,
self.operator = kwargs.pop('operator', operators.comma_op)
self.group = kwargs.pop('group', True)
self.group_contents = kwargs.pop('group_contents', True)
+ text_converter = kwargs.pop(
+ '_literal_as_text',
+ _expression_literal_as_text)
if self.group_contents:
self.clauses = [
- _literal_as_text(clause).self_group(against=self.operator)
+ text_converter(clause).self_group(against=self.operator)
for clause in clauses]
else:
self.clauses = [
- _literal_as_text(clause)
+ text_converter(clause)
for clause in clauses]
def __iter__(self):
clauses = util.coerce_generator_arg(clauses)
for clause in clauses:
- clause = _literal_as_text(clause)
+ clause = _expression_literal_as_text(clause)
if isinstance(clause, continue_on):
continue
return self.expr._from_objects
+class _label_reference(ColumnElement):
+ __visit_name__ = 'label_reference'
+
+ def __init__(self, text):
+ self.text = text
+
+
class UnaryExpression(ColumnElement):
"""Define a 'unary' expression.
"""
return UnaryExpression(
- _literal_as_text(column), modifier=operators.nullsfirst_op)
+ _literal_as_label_reference(column),
+ modifier=operators.nullsfirst_op)
@classmethod
def _create_nullslast(cls, column):
"""
return UnaryExpression(
- _literal_as_text(column), modifier=operators.nullslast_op)
+ _literal_as_label_reference(column),
+ modifier=operators.nullslast_op)
@classmethod
def _create_desc(cls, column):
"""
return UnaryExpression(
- _literal_as_text(column), modifier=operators.desc_op)
+ _literal_as_label_reference(column), modifier=operators.desc_op)
@classmethod
def _create_asc(cls, column):
"""
return UnaryExpression(
- _literal_as_text(column), modifier=operators.asc_op)
+ _literal_as_label_reference(column), modifier=operators.asc_op)
@classmethod
def _create_distinct(cls, expr):
"""
self.func = func
if order_by is not None:
- self.order_by = ClauseList(*util.to_list(order_by))
+ self.order_by = ClauseList(
+ *util.to_list(order_by),
+ _literal_as_text=_literal_as_label_reference)
if partition_by is not None:
- self.partition_by = ClauseList(*util.to_list(partition_by))
+ self.partition_by = ClauseList(
+ *util.to_list(partition_by),
+ _literal_as_text=_literal_as_label_reference)
@util.memoized_property
def type(self):
self.name = _anonymous_label(
'%%(%d %s)s' % (id(self), getattr(element, 'name', 'anon'))
)
- self.key = self._label = self._key_label = self.name
+ self.key = self._label = self._key_label = \
+ self._columns_clause_label = self.name
self._element = element
self._type = type_
self._proxies = [element]
:class:`.Column` class, is typically invoked using the
:func:`.column` function, as in::
- from sqlalchemy.sql import column
+ from sqlalchemy import column
id, name = column("id"), column("name")
stmt = select([id, name]).select_from("user")
:class:`.Column` class. The :func:`.column` function can
be invoked with just a name alone, as in::
- from sqlalchemy.sql import column
+ from sqlalchemy import column
id, name = column("id"), column("name")
stmt = select([id, name]).select_from("user")
(which is the lightweight analogue to :class:`.Table`) to produce
a working table construct with minimal boilerplate::
- from sqlalchemy.sql import table, column
+ from sqlalchemy import table, column, select
user = table("user",
column("id"),
:class:`.schema.MetaData`, DDL, or events, unlike its
:class:`.Table` counterpart.
+ .. versionchanged:: 1.0.0 :func:`.expression.column` can now
+ be imported from the plain ``sqlalchemy`` namespace like any
+ other SQL element.
+
:param text: the text of the element.
:param type: :class:`.types.TypeEngine` object which can associate
def _label(self):
return self._gen_label(self.name)
+ @_memoized_property
+ def _columns_clause_label(self):
+ if self.table is None:
+ return None
+ else:
+ return self._label
+
def _gen_label(self, name):
t = self.table
return element
-def _literal_as_text(element):
+def _literal_as_label_reference(element):
+ if isinstance(element, util.string_types):
+ return _label_reference(element)
+ else:
+ return _literal_as_text(element)
+
+
+def _expression_literal_as_text(element):
+ return _literal_as_text(element, warn=True)
+
+
+def _literal_as_text(element, warn=False):
if isinstance(element, Visitable):
return element
elif hasattr(element, '__clause_element__'):
return element.__clause_element__()
elif isinstance(element, util.string_types):
+ if warn:
+ util.warn_limited(
+ "Textual SQL expression %(expr)r should be "
+ "explicitly declared as text(%(expr)r)",
+ {"expr": util.ellipses_string(element)})
+
return TextClause(util.text_type(element))
elif isinstance(element, (util.NoneType, bool)):
return _const_expr(element)
else:
return element
+_guess_straight_column = re.compile(r'^\w\S*$', re.I)
+
def _interpret_as_column_or_from(element):
if isinstance(element, Visitable):
elif hasattr(insp, "selectable"):
return insp.selectable
- return ColumnClause(str(element), is_literal=True)
+ # be forgiving as this is an extremely common
+ # and known expression
+ if element == "*":
+ guess_is_literal = True
+ elif isinstance(element, (numbers.Number)):
+ return ColumnClause(str(element), is_literal=True)
+ else:
+ element = str(element)
+ # give into temptation, as this fact we are guessing about
+ # is not one we've previously ever needed our users tell us;
+ # but let them know we are not happy about it
+ guess_is_literal = not _guess_straight_column.match(element)
+ util.warn_limited(
+ "Textual column expression %(column)r should be "
+ "explicitly declared with text(%(column)r), "
+ "or use %(literal_column)s(%(column)r) "
+ "for more specificity",
+ {
+ "column": util.ellipses_string(element),
+ "literal_column": "literal_column"
+ if guess_is_literal else "column"
+ })
+ return ColumnClause(
+ element,
+ is_literal=guess_is_literal)
def _const_expr(element):
_is_column, _labeled, _only_column_elements, _string_or_unprintable, \
_truncated_label, _clone, _cloned_difference, _cloned_intersection,\
_column_as_key, _literal_as_binds, _select_iterables, \
- _corresponding_column_or_error
+ _corresponding_column_or_error, _literal_as_label_reference, \
+ _expression_literal_as_text
from .selectable import _interpret_as_from
super(CheckConstraint, self).\
__init__(name, deferrable, initially, _create_rule, info=info)
- self.sqltext = _literal_as_text(sqltext)
+ self.sqltext = _literal_as_text(sqltext, warn=False)
if table is not None:
self._set_parent_with_dispatch(table)
elif _autoattach:
from .elements import _clone, \
_literal_as_text, _interpret_as_column_or_from, _expand_cloned,\
_select_iterables, _anonymous_label, _clause_element_as_expr,\
- _cloned_intersection, _cloned_difference, True_, _only_column_elements,\
- TRUE
+ _cloned_intersection, _cloned_difference, True_, \
+ _literal_as_label_reference
from .base import Immutable, Executable, _generative, \
ColumnCollection, ColumnSet, _from_objects, Generative
from . import type_api
insp = inspection.inspect(element, raiseerr=False)
if insp is None:
if isinstance(element, util.string_types):
+ util.warn_limited(
+ "Textual SQL FROM expression %(expr)r should be "
+ "explicitly declared as text(%(expr)r), "
+ "or use table(%(expr)r) for more specificity",
+ {"expr": util.ellipses_string(element)})
+
return TextClause(util.text_type(element))
elif hasattr(insp, "selectable"):
return insp.selectable
collection of columns, which are typically produced
by the :func:`.expression.column` function::
- from sqlalchemy.sql import table, column
+ from sqlalchemy import table, column
user = table("user",
column("id"),
:class:`~.schema.Table` object.
It may be used to construct lightweight table constructs.
- Note that the :func:`.expression.table` function is not part of
- the ``sqlalchemy`` namespace. It must be imported from the
- ``sql`` package::
-
- from sqlalchemy.sql import table, column
+ .. versionchanged:: 1.0.0 :func:`.expression.table` can now
+ be imported from the plain ``sqlalchemy`` namespace like any
+ other SQL element.
:param name: Name of the table.
self._bind = bind
if order_by is not None:
- self._order_by_clause = ClauseList(*util.to_list(order_by))
+ self._order_by_clause = ClauseList(
+ *util.to_list(order_by),
+ _literal_as_text=_literal_as_label_reference)
if group_by is not None:
- self._group_by_clause = ClauseList(*util.to_list(group_by))
+ self._group_by_clause = ClauseList(
+ *util.to_list(group_by),
+ _literal_as_text=_literal_as_label_reference)
@property
def for_update(self):
else:
if getattr(self, '_order_by_clause', None) is not None:
clauses = list(self._order_by_clause) + list(clauses)
- self._order_by_clause = ClauseList(*clauses)
+ self._order_by_clause = ClauseList(
+ *clauses, _literal_as_text=_literal_as_label_reference)
def append_group_by(self, *clauses):
"""Append the given GROUP BY criterion applied to this selectable.
else:
if getattr(self, '_group_by_clause', None) is not None:
clauses = list(self._group_by_clause) + list(clauses)
- self._group_by_clause = ClauseList(*clauses)
+ self._group_by_clause = ClauseList(
+ *clauses, _literal_as_text=_literal_as_label_reference)
+
+ @property
+ def _inner_column_dict(self):
+ raise NotImplementedError()
def _copy_internals(self, clone=_clone, **kw):
if self._limit_clause is not None:
GenerativeSelect.__init__(self, **kwargs)
+ @property
+ def _inner_column_dict(self):
+ return dict(
+ (c.key, c) for c in self.c
+ )
+
@classmethod
def _create_union(cls, *selects, **kwargs):
"""Return a ``UNION`` of multiple selectables.
def _setup_prefixes(self, prefixes, dialect=None):
self._prefixes = self._prefixes + tuple(
- [(_literal_as_text(p), dialect) for p in prefixes])
+ [(_literal_as_text(p, warn=False), dialect) for p in prefixes])
class Select(HasPrefixes, GenerativeSelect):
"""
return _select_iterables(self._raw_columns)
+ @_memoized_property
+ def _inner_column_dict(self):
+ d = dict(
+ (c._label or c.key, c)
+ for c in _select_iterables(self._raw_columns))
+ d.update((c.key, c) for c in _select_iterables(self.froms))
+
+ return d
+
def is_derived_from(self, fromclause):
if self in fromclause._cloned_set:
return True
"""
if expr:
- expr = [_literal_as_text(e) for e in expr]
+ expr = [_literal_as_label_reference(e) for e in expr]
if isinstance(self._distinct, list):
self._distinct = self._distinct + expr
else:
names = set()
def name_for_col(c):
- if c._label is None:
+ if c._columns_clause_label is None:
return (None, c)
- name = c._label
+ name = c._columns_clause_label
if name in names:
name = c.anon_label
else:
util.warn_limited(
"Unicode type received non-unicode "
"bind param value %r.",
- util.ellipses_string(value))
+ (util.ellipses_string(value),))
return value
return process
else:
util.warn_limited(
"Unicode type received non-unicode bind "
"param value %r.",
- util.ellipses_string(value))
+ (util.ellipses_string(value),))
return value
return process
else:
from .assertions import emits_warning, emits_warning_on, uses_deprecated, \
eq_, ne_, is_, is_not_, startswith_, assert_raises, \
assert_raises_message, AssertsCompiledSQL, ComparesTables, \
- AssertsExecutionResults, expect_deprecated
+ AssertsExecutionResults, expect_deprecated, expect_warnings
from .util import run_as_contextmanager, rowset, fail, provide_metadata, adict
"""
- def __new__(cls, value, args, num):
- interpolated = value % args + \
+ def __new__(cls, value, num, args):
+ interpolated = (value % args) + \
(" (this warning may be suppressed after %d occurrences)" % num)
self = super(_hash_limit_string, cls).__new__(cls, interpolated)
self._hash = hash("%s_%d" % (value, hash(interpolated) % num))
warnings.warn(msg, exc.SAWarning, stacklevel=2)
-def warn_limited(msg, *args):
+def warn_limited(msg, args):
"""Issue a warning with a paramterized string, limiting the number
of registrations.
"""
if args:
- msg = _hash_limit_string(msg, args, 10)
+ msg = _hash_limit_string(msg, 10, args)
warnings.warn(msg, exc.SAWarning, stacklevel=2)
def go():
util.warn_limited(
"memusage warning, param1: %s, param2: %s",
- next(counter), next(counter))
+ (next(counter), next(counter)))
go()
def test_mapper_reset(self):
kw['distinct'] = distinct
if prefixes is not None:
kw['prefixes'] = prefixes
- return str(select(['q'], **kw).compile(dialect=dialect))
+ return str(select([column('q')], **kw).compile(dialect=dialect))
eq_(gen(None), 'SELECT q')
eq_(gen(True), 'SELECT DISTINCT q')
import datetime
from sqlalchemy import Table, MetaData, Column, Integer, Enum, Float, select, \
func, DateTime, Numeric, exc, String, cast, REAL, TypeDecorator, Unicode, \
- Text, null
+ Text, null, text
from sqlalchemy.sql import operators
from sqlalchemy import types
from sqlalchemy.dialects.postgresql import base as postgresql
engine = testing.db
connection = engine.connect()
- s = select(["timestamp '2007-12-25'"])
+ s = select([text("timestamp '2007-12-25'")])
result = connection.execute(s).first()
eq_(result[0], datetime.datetime(2007, 12, 25, 0, 0))
query = select([table1, table2], or_(table1.c.name == 'fred',
table1.c.myid == 10, table2.c.othername != 'jack',
- 'EXISTS (select yay from foo where boo = lar)'
+ text('EXISTS (select yay from foo where boo = lar)')
), from_obj=[outerjoin(table1, table2,
table1.c.myid == table2.c.otherid)])
self.assert_compile(query,
import sqlalchemy as sa
from sqlalchemy import testing
-from sqlalchemy import Integer, String, ForeignKey
+from sqlalchemy import Integer, String, ForeignKey, table, text
from sqlalchemy.testing.schema import Table, Column
from sqlalchemy.orm import mapper, relationship, backref, create_session
from sqlalchemy.testing import eq_
q = s.query(Thing).options(sa.orm.joinedload('category'))
l = (q.filter(
(tests.c.owner_id==1) &
- ('options.someoption is null or options.someoption=%s' % false)).
+ text('options.someoption is null or options.someoption=%s' % false)).
join('owner_option'))
result = ["%d %s" % ( t.id,t.category.name ) for t in l]
be migrated directly to the wiki, docs, etc.
"""
-from sqlalchemy import testing
-from sqlalchemy import Integer, String, ForeignKey, func
+from sqlalchemy import Integer, String, ForeignKey, func, text
from sqlalchemy.testing.schema import Table
from sqlalchemy.testing.schema import Column
-from sqlalchemy.orm import mapper, relationship, relation, create_session, sessionmaker
+from sqlalchemy.orm import mapper, relationship, create_session, sessionmaker
from sqlalchemy.testing import fixtures
session = create_session()
users = (session.query(User).
- from_statement('SELECT * FROM users_table')).all()
+ from_statement(text('SELECT * FROM users_table'))).all()
assert len(users) == 4
def test_select_whereclause(self):
users = session.query(User).filter(User.name=='ed').all()
assert len(users) == 1 and users[0].name == 'ed'
- users = session.query(User).filter("name='ed'").all()
+ users = session.query(User).filter(text("name='ed'")).all()
assert len(users) == 1 and users[0].name == 'ed'
joinedload_all, backref, Session,\
defaultload, Load
from sqlalchemy import Integer, String, Date, ForeignKey, and_, select, \
- func
+ func, text
from sqlalchemy.testing.schema import Table, Column
from sqlalchemy.orm import mapper, relationship, create_session, \
lazyload, aliased, column_property
# they should be required to locate only their aliased/fully table
# qualified column name.
noeagers = create_session().query(User).\
- from_statement("select * from users").all()
+ from_statement(text("select * from users")).all()
assert 'orders' not in noeagers[0].__dict__
assert 'addresses' not in noeagers[0].__dict__
q = create_session().query(User)
- l = q.filter("users.id in (7, 8, 9)").order_by("users.id")
+ l = q.filter(text("users.id in (7, 8, 9)")).order_by(text("users.id"))
def go():
eq_(self.static.user_order_result[0:3], l.all())
fixtures, eq_, assert_raises, assert_raises_message, AssertsCompiledSQL)
from sqlalchemy import (
exc as sa_exc, util, Integer, Table, String, ForeignKey, select, func,
- and_, asc, desc, inspect, literal_column, cast, exists)
+ and_, asc, desc, inspect, literal_column, cast, exists, text)
from sqlalchemy.orm import (
configure_mappers, Session, mapper, create_session, relationship,
column_property, joinedload_all, contains_eager, contains_alias,
self.assert_compile(
q3.order_by(c1),
"SELECT anon_1.c1 AS anon_1_c1, anon_1.c2 "
- "AS anon_1_c2 FROM (SELECT c1 AS c1, c2 AS c2 WHERE "
- "c1 = :c1_1 UNION SELECT c1 AS c1, c2 AS c2 "
+ "AS anon_1_c2 FROM (SELECT c1, c2 WHERE "
+ "c1 = :c1_1 UNION SELECT c1, c2 "
"WHERE c1 = :c1_2) AS anon_1 ORDER BY anon_1.c1"
)
"SELECT anon_1.anon_2_c1 AS anon_1_anon_2_c1, anon_1.anon_2_c2 AS "
"anon_1_anon_2_c2 FROM (SELECT anon_2.c1 AS anon_2_c1, anon_2.c2 "
"AS anon_2_c2 "
- "FROM (SELECT c1 AS c1, c2 AS c2 WHERE c1 = :c1_1) AS "
+ "FROM (SELECT c1, c2 WHERE c1 = :c1_1) AS "
"anon_2) AS anon_1 ORDER BY anon_1.anon_2_c1"
)
self.assert_compile(
q3.order_by(c1),
"SELECT anon_1.c1 AS anon_1_c1, anon_1.c2 "
- "AS anon_1_c2 FROM (SELECT c1 AS c1, c2 AS c2 WHERE "
- "c1 = :c1_1 UNION SELECT c1 AS c1, c2 AS c2 "
+ "AS anon_1_c2 FROM (SELECT c1, c2 WHERE "
+ "c1 = :c1_1 UNION SELECT c1, c2 "
"WHERE c1 = :c1_2) AS anon_1 ORDER BY anon_1.c1"
)
query = users.select(users.c.id == 7).\
union(users.select(users.c.id > 7)).alias('ulist').\
outerjoin(addresses).\
- select(use_labels=True, order_by=['ulist.id', addresses.c.id])
+ select(
+ use_labels=True,
+ order_by=[text('ulist.id'), addresses.c.id])
sess = create_session()
q = sess.query(User)
query = users.select(users.c.id == 7).\
union(users.select(users.c.id > 7)).alias('ulist').\
outerjoin(addresses). \
- select(use_labels=True, order_by=['ulist.id', addresses.c.id])
+ select(
+ use_labels=True,
+ order_by=[text('ulist.id'), addresses.c.id])
sess = create_session()
q = sess.query(User)
query = users.select(users.c.id == 7).\
union(users.select(users.c.id > 7)).alias('ulist').\
outerjoin(addresses). \
- select(use_labels=True, order_by=['ulist.id', addresses.c.id])
+ select(
+ use_labels=True,
+ order_by=[text('ulist.id'), addresses.c.id])
sess = create_session()
# better way. use select_entity_from()
query = users.select(users.c.id == 7).\
union(users.select(users.c.id > 7)).\
alias('ulist').outerjoin(adalias).\
- select(use_labels=True, order_by=['ulist.id', adalias.c.id])
+ select(use_labels=True, order_by=[text('ulist.id'), adalias.c.id])
def go():
l = sess.query(User).select_entity_from(query).\
testing, null, exists, text, union, literal, literal_column, func, between,
Unicode, desc, and_, bindparam, select, distinct, or_, collate, insert,
Integer, String, Boolean, exc as sa_exc, util, cast)
-from sqlalchemy.sql import operators, column, expression
+from sqlalchemy.sql import operators, expression
+from sqlalchemy import column, table
from sqlalchemy.engine import default
from sqlalchemy.orm import (
attributes, mapper, relationship, create_session, synonym, Session,
from sqlalchemy.testing.schema import Table, Column
import sqlalchemy as sa
from sqlalchemy.testing.assertions import (
- eq_, assert_raises, assert_raises_message)
+ eq_, assert_raises, assert_raises_message, expect_warnings)
from sqlalchemy.testing import fixtures, AssertsCompiledSQL
from test.orm import _fixtures
from sqlalchemy.orm.util import join, with_parent
q = meth(q, *arg, **kw)
assert_raises(
sa_exc.InvalidRequestError,
- q.from_statement, "x"
+ q.from_statement, text("x")
)
q = s.query(User)
- q = q.from_statement("x")
+ q = q.from_statement(text("x"))
assert_raises(
sa_exc.InvalidRequestError,
meth, q, *arg, **kw
User = self.classes.User
s = create_session()
self.assert_compile(
- s.query(User).filter("name='ed'"),
+ s.query(User).filter(text("name='ed'")),
"SELECT users.id AS users_id, users.name "
"AS users_name FROM users WHERE name='ed'"
)
)
for q in (
- q3.order_by(User.id, "anon_1_param_1"),
+ q3.order_by(User.id, text("anon_1_param_1")),
q6.order_by(User.id, "foo")):
eq_(
q.all(),
sess = create_session()
q = iter(
sess.query(User).yield_per(1).from_statement(
- "select * from users"))
+ text("select * from users")))
ret = []
eq_(len(sess.identity_map), 0)
def test_fulltext(self):
User = self.classes.User
- eq_(
- create_session().query(User).
- from_statement("select * from users order by id").all(),
- [User(id=7), User(id=8), User(id=9), User(id=10)]
- )
+ with expect_warnings("Textual SQL"):
+ eq_(
+ create_session().query(User).
+ from_statement("select * from users order by id").all(),
+ [User(id=7), User(id=8), User(id=9), User(id=10)]
+ )
eq_(
create_session().query(User).from_statement(
- "select * from users order by id").first(), User(id=7)
+ text("select * from users order by id")).first(), User(id=7)
)
eq_(
create_session().query(User).from_statement(
- "select * from users where name='nonexistent'").first(), None)
+ text("select * from users where name='nonexistent'")).first(),
+ None)
def test_fragment(self):
User = self.classes.User
- eq_(
- create_session().query(User).filter("id in (8, 9)").all(),
- [User(id=8), User(id=9)]
+ with expect_warnings("Textual SQL expression"):
+ eq_(
+ create_session().query(User).filter("id in (8, 9)").all(),
+ [User(id=8), User(id=9)]
- )
+ )
- eq_(
- create_session().query(User).filter("name='fred'").
- filter("id=9").all(), [User(id=9)]
- )
- eq_(
- create_session().query(User).filter("name='fred'").
- filter(User.id == 9).all(), [User(id=9)]
- )
+ eq_(
+ create_session().query(User).filter("name='fred'").
+ filter("id=9").all(), [User(id=9)]
+ )
+ eq_(
+ create_session().query(User).filter("name='fred'").
+ filter(User.id == 9).all(), [User(id=9)]
+ )
- def test_binds(self):
+ def test_binds_coerce(self):
User = self.classes.User
- eq_(
- create_session().query(User).filter("id in (:id1, :id2)").
- params(id1=8, id2=9).all(), [User(id=8), User(id=9)]
- )
+ with expect_warnings("Textual SQL expression"):
+ eq_(
+ create_session().query(User).filter("id in (:id1, :id2)").
+ params(id1=8, id2=9).all(), [User(id=8), User(id=9)]
+ )
def test_as_column(self):
User = self.classes.User
s = create_session()
- assert_raises(sa_exc.InvalidRequestError, s.query,
- User.id, text("users.name"))
+ assert_raises(
+ sa_exc.InvalidRequestError, s.query,
+ User.id, text("users.name"))
eq_(
s.query(User.id, "name").order_by(User.id).all(),
s = create_session()
eq_(
s.query(User).from_statement(
- select(['id', 'name']).select_from('users').order_by('id'),
+ select([column('id'), column('name')]).
+ select_from(table('users')).order_by('id'),
).all(),
[User(id=7), User(id=8), User(id=9), User(id=10)]
)
)
+class TextWarningTest(QueryTest, AssertsCompiledSQL):
+ def _test(self, fn, arg, offending_clause, expected):
+ assert_raises_message(
+ sa.exc.SAWarning,
+ r"Textual (?:SQL|column|SQL FROM) expression %(stmt)r should be "
+ r"explicitly declared (?:with|as) text\(%(stmt)r\)" % {
+ "stmt": util.ellipses_string(offending_clause),
+ },
+ fn, arg
+ )
+
+ with expect_warnings("Textual "):
+ stmt = fn(arg)
+ self.assert_compile(stmt, expected)
+
+ def test_filter(self):
+ User = self.classes.User
+ self._test(
+ Session().query(User.id).filter, "myid == 5", "myid == 5",
+ "SELECT users.id AS users_id FROM users WHERE myid == 5"
+ )
+
+ def test_having(self):
+ User = self.classes.User
+ self._test(
+ Session().query(User.id).having, "myid == 5", "myid == 5",
+ "SELECT users.id AS users_id FROM users HAVING myid == 5"
+ )
+
+ def test_from_statement(self):
+ User = self.classes.User
+ self._test(
+ Session().query(User.id).from_statement,
+ "select id from user",
+ "select id from user",
+ "select id from user",
+ )
+
+
class ParentTest(QueryTest, AssertsCompiledSQL):
__dialect__ = 'default'
c = column('x', Boolean)
self.assert_compile(
s.query(c).filter(c),
- "SELECT x AS x WHERE x",
+ "SELECT x WHERE x",
dialect=self._dialect(True)
)
c = column('x', Boolean)
self.assert_compile(
s.query(c).filter(c),
- "SELECT x AS x WHERE x = 1",
+ "SELECT x WHERE x = 1",
dialect=self._dialect(False)
)
c = column('x', Boolean)
self.assert_compile(
s.query(c).filter(~c),
- "SELECT x AS x WHERE x = 0",
+ "SELECT x WHERE x = 0",
dialect=self._dialect(False)
)
c = column('x', Boolean)
self.assert_compile(
s.query(c).filter(~c),
- "SELECT x AS x WHERE NOT x",
+ "SELECT x WHERE NOT x",
dialect=self._dialect(True)
)
c = column('x', Boolean)
self.assert_compile(
s.query(c).having(c),
- "SELECT x AS x HAVING x = 1",
+ "SELECT x HAVING x = 1",
dialect=self._dialect(False)
)
from sqlalchemy.testing import assert_raises, assert_raises_message
import sqlalchemy as sa
from sqlalchemy import testing
-from sqlalchemy import String, Integer, select
+from sqlalchemy import String, Integer, select, column
from sqlalchemy.testing.schema import Table, Column
from sqlalchemy.orm import mapper, Session
from sqlalchemy.testing import eq_, AssertsCompiledSQL
Subset = self.classes.Subset
- selectable = select(["x", "y", "z"]).alias()
+ selectable = select([column("x"), column("y"), column("z")]).alias()
mapper(Subset, selectable, primary_key=[selectable.c.x])
self.assert_compile(
Session().query(Subset),
- "SELECT anon_1.x, anon_1.y, anon_1.z FROM (SELECT x, y, z) AS anon_1",
+ "SELECT anon_1.x AS anon_1_x, anon_1.y AS anon_1_y, "
+ "anon_1.z AS anon_1_z FROM (SELECT x, y, z) AS anon_1",
use_default_dialect=True
)
Subset = self.classes.Subset
- selectable = select(["x", "y", "z"]).alias()
+ selectable = select([column("x"), column("y"), column("z")]).alias()
assert_raises_message(
sa.exc.ArgumentError,
"could not assemble any primary key columns",
from sqlalchemy.testing import eq_, assert_raises, assert_raises_message
from sqlalchemy.testing import fixtures
from sqlalchemy import Integer, String, ForeignKey, or_, and_, exc, \
- select, func, Boolean, case
+ select, func, Boolean, case, text
from sqlalchemy.orm import mapper, relationship, backref, Session, \
joinedload, aliased
from sqlalchemy import testing
sess = Session()
john, jack, jill, jane = sess.query(User).order_by(User.id).all()
- sess.query(User).filter('name = :name').params(
+ sess.query(User).filter(text('name = :name')).params(
name='john').delete('fetch')
assert john not in sess
john, jack, jill, jane = sess.query(User).order_by(User.id).all()
- sess.query(User).filter('age > :x').params(x=29).\
+ sess.query(User).filter(text('age > :x')).params(x=29).\
update({'age': User.age - 10}, synchronize_session='fetch')
eq_([john.age, jack.age, jill.age, jane.age], [25, 37, 29, 27])
)
def test_select_precol_compile_ordering(self):
- s1 = select([column('x')]).select_from('a').limit(5).as_scalar()
+ s1 = select([column('x')]).select_from(text('a')).limit(5).as_scalar()
s2 = select([s1]).limit(10)
class MyCompiler(compiler.SQLCompiler):
def test_select_from_clauselist(self):
self.assert_compile(
select([ClauseList(column('a'), column('b'))]
- ).select_from('sometable'),
+ ).select_from(text('sometable')),
'SELECT a, b FROM sometable'
)
)
self.assert_compile(
- select(["a", "a", "a"]),
+ select([column("a"), column("a"), column("a")]),
"SELECT a, a, a"
)
)
def test_conjunctions(self):
- a, b, c = 'a', 'b', 'c'
+ a, b, c = text('a'), text('b'), text('c')
x = and_(a, b, c)
assert isinstance(x.type, Boolean)
assert str(x) == 'a AND b AND c'
self.assert_compile(
and_(table1.c.myid == 12, table1.c.name == 'asdf',
- table2.c.othername == 'foo', "sysdate() = today()"),
+ table2.c.othername == 'foo', text("sysdate() = today()")),
"mytable.myid = :myid_1 AND mytable.name = :name_1 "
"AND myothertable.othername = "
":othername_1 AND sysdate() = today()"
table1.c.myid == 12,
or_(table2.c.othername == 'asdf',
table2.c.othername == 'foo', table2.c.otherid == 9),
- "sysdate() = today()",
+ text("sysdate() = today()"),
),
'mytable.myid = :myid_1 AND (myothertable.othername = '
':othername_1 OR myothertable.othername = :othername_2 OR '
def test_multiple_col_binds(self):
self.assert_compile(
- select(["*"], or_(table1.c.myid == 12, table1.c.myid == 'asdf',
- table1.c.myid == 'foo')),
+ select(
+ [literal_column("*")],
+ or_(
+ table1.c.myid == 12, table1.c.myid == 'asdf',
+ table1.c.myid == 'foo')
+ ),
"SELECT * FROM mytable WHERE mytable.myid = :myid_1 "
"OR mytable.myid = :myid_2 OR mytable.myid = :myid_3"
)
table1.c.name == 'fred',
table1.c.myid == 10,
table2.c.othername != 'jack',
- "EXISTS (select yay from foo where boo = lar)"
+ text("EXISTS (select yay from foo where boo = lar)")
),
from_obj=[outerjoin(table1, table2,
table1.c.myid == table2.c.otherid)]
"SELECT mytable.myid, mytable.name "
"FROM mytable UNION SELECT myothertable.otherid, "
"myothertable.othername "
- "FROM myothertable ORDER BY myid LIMIT :param_1 OFFSET :param_2",
+ "FROM myothertable ORDER BY myid " # note table name is omitted
+ "LIMIT :param_1 OFFSET :param_2",
{'param_1': 5, 'param_2': 10}
)
)
def test_compound_grouping(self):
- s = select([column('foo'), column('bar')]).select_from('bat')
+ s = select([column('foo'), column('bar')]).select_from(text('bat'))
self.assert_compile(
union(union(union(s, s), s), s),
self.assert_compile(
select([
func.max(table1.c.name).over(
- partition_by=['foo']
+ partition_by=['description']
)
]),
- "SELECT max(mytable.name) OVER (PARTITION BY foo) "
+ "SELECT max(mytable.name) OVER (PARTITION BY mytable.description) "
"AS anon_1 FROM mytable"
)
# from partition_by
def test_literal_as_text_fromstring(self):
self.assert_compile(
- and_("a", "b"),
+ and_(text("a"), text("b")),
"a AND b"
)
def test_select_method_three(self):
expr = func.rows("foo")
self.assert_compile(
- select(['foo']).select_from(expr),
+ select([column('foo')]).select_from(expr),
"SELECT foo FROM rows(:rows_1)"
)
from sqlalchemy.sql.expression import _clone, _from_objects
from sqlalchemy import func, select, Integer, Table, \
Column, MetaData, extract, String, bindparam, tuple_, and_, union, text,\
- case, ForeignKey
+ case, ForeignKey, literal_column
from sqlalchemy.testing import fixtures, AssertsExecutionResults, \
AssertsCompiledSQL
from sqlalchemy import testing
assert orig == str(s) == str(s5)
def test_correlated_select(self):
- s = select(['*'], t1.c.col1 == t2.c.col1,
+ s = select([literal_column('*')], t1.c.col1 == t2.c.col1,
from_obj=[t1, t2]).correlate(t2)
class Vis(CloningVisitor):
t2alias = t2.alias('t2alias')
vis = sql_util.ClauseAdapter(t1alias)
- s = select(['*'], from_obj=[t1alias, t2alias]).as_scalar()
+ s = select([literal_column('*')], from_obj=[t1alias, t2alias]).as_scalar()
assert t2alias in s._froms
assert t1alias in s._froms
- self.assert_compile(select(['*'], t2alias.c.col1 == s),
+ self.assert_compile(select([literal_column('*')], t2alias.c.col1 == s),
'SELECT * FROM table2 AS t2alias WHERE '
't2alias.col1 = (SELECT * FROM table1 AS '
't1alias)')
# correlate list on "s" needs to take into account the full
# _cloned_set for each element in _froms when correlating
- self.assert_compile(select(['*'], t2alias.c.col1 == s),
+ self.assert_compile(select([literal_column('*')], t2alias.c.col1 == s),
'SELECT * FROM table2 AS t2alias WHERE '
't2alias.col1 = (SELECT * FROM table1 AS '
't1alias)')
- s = select(['*'], from_obj=[t1alias,
+ s = select([literal_column('*')], from_obj=[t1alias,
t2alias]).correlate(t2alias).as_scalar()
- self.assert_compile(select(['*'], t2alias.c.col1 == s),
+ self.assert_compile(select([literal_column('*')], t2alias.c.col1 == s),
'SELECT * FROM table2 AS t2alias WHERE '
't2alias.col1 = (SELECT * FROM table1 AS '
't1alias)')
s = vis.traverse(s)
- self.assert_compile(select(['*'], t2alias.c.col1 == s),
+ self.assert_compile(select([literal_column('*')], t2alias.c.col1 == s),
'SELECT * FROM table2 AS t2alias WHERE '
't2alias.col1 = (SELECT * FROM table1 AS '
't1alias)')
s = CloningVisitor().traverse(s)
- self.assert_compile(select(['*'], t2alias.c.col1 == s),
+ self.assert_compile(select([literal_column('*')], t2alias.c.col1 == s),
'SELECT * FROM table2 AS t2alias WHERE '
't2alias.col1 = (SELECT * FROM table1 AS '
't1alias)')
- s = select(['*']).where(t1.c.col1 == t2.c.col1).as_scalar()
+ s = select([literal_column('*')]).where(t1.c.col1 == t2.c.col1).as_scalar()
self.assert_compile(select([t1.c.col1, s]),
'SELECT table1.col1, (SELECT * FROM table2 '
'WHERE table1.col1 = table2.col1) AS '
'SELECT t1alias.col1, (SELECT * FROM '
'table2 WHERE t1alias.col1 = table2.col1) '
'AS anon_1 FROM table1 AS t1alias')
- s = select(['*']).where(t1.c.col1
+ s = select([literal_column('*')]).where(t1.c.col1
== t2.c.col1).correlate(t1).as_scalar()
self.assert_compile(select([t1.c.col1, s]),
'SELECT table1.col1, (SELECT * FROM table2 '
def test_table_to_alias_2(self):
t1alias = t1.alias('t1alias')
vis = sql_util.ClauseAdapter(t1alias)
- self.assert_compile(vis.traverse(select(['*'], from_obj=[t1])),
+ self.assert_compile(vis.traverse(select([literal_column('*')], from_obj=[t1])),
'SELECT * FROM table1 AS t1alias')
def test_table_to_alias_3(self):
t1alias = t1.alias('t1alias')
vis = sql_util.ClauseAdapter(t1alias)
- self.assert_compile(select(['*'], t1.c.col1 == t2.c.col2),
+ self.assert_compile(select([literal_column('*')], t1.c.col1 == t2.c.col2),
'SELECT * FROM table1, table2 WHERE '
'table1.col1 = table2.col2')
def test_table_to_alias_4(self):
t1alias = t1.alias('t1alias')
vis = sql_util.ClauseAdapter(t1alias)
- self.assert_compile(vis.traverse(select(['*'], t1.c.col1
+ self.assert_compile(vis.traverse(select([literal_column('*')], t1.c.col1
== t2.c.col2)),
'SELECT * FROM table1 AS t1alias, table2 '
'WHERE t1alias.col1 = table2.col2')
self.assert_compile(
vis.traverse(
select(
- ['*'],
+ [literal_column('*')],
t1.c.col1 == t2.c.col2,
from_obj=[
t1,
self.assert_compile(
select([t1alias, t2]).where(
t1alias.c.col1 == vis.traverse(
- select(['*'], t1.c.col1 == t2.c.col2, from_obj=[t1, t2]).
+ select([literal_column('*')], t1.c.col1 == t2.c.col2, from_obj=[t1, t2]).
correlate(t1)
)
),
self.assert_compile(
select([t1alias, t2]).
where(t1alias.c.col1 == vis.traverse(
- select(['*'], t1.c.col1 == t2.c.col2, from_obj=[t1, t2]).
+ select([literal_column('*')], t1.c.col1 == t2.c.col2, from_obj=[t1, t2]).
correlate(t2))),
"SELECT t1alias.col1, t1alias.col2, t1alias.col3, "
"table2.col1, table2.col2, table2.col3 "
't1alias.col2 ELSE t1alias.col1 END')
def test_table_to_alias_10(self):
- s = select(['*'], from_obj=[t1]).alias('foo')
+ s = select([literal_column('*')], from_obj=[t1]).alias('foo')
self.assert_compile(s.select(),
'SELECT foo.* FROM (SELECT * FROM table1) '
'AS foo')
def test_table_to_alias_11(self):
- s = select(['*'], from_obj=[t1]).alias('foo')
+ s = select([literal_column('*')], from_obj=[t1]).alias('foo')
t1alias = t1.alias('t1alias')
vis = sql_util.ClauseAdapter(t1alias)
self.assert_compile(vis.traverse(s.select()),
'AS t1alias) AS foo')
def test_table_to_alias_12(self):
- s = select(['*'], from_obj=[t1]).alias('foo')
+ s = select([literal_column('*')], from_obj=[t1]).alias('foo')
self.assert_compile(s.select(),
'SELECT foo.* FROM (SELECT * FROM table1) '
'AS foo')
vis = sql_util.ClauseAdapter(t1alias)
t2alias = t2.alias('t2alias')
vis.chain(sql_util.ClauseAdapter(t2alias))
- self.assert_compile(vis.traverse(select(['*'], t1.c.col1
+ self.assert_compile(vis.traverse(select([literal_column('*')], t1.c.col1
== t2.c.col2)),
'SELECT * FROM table1 AS t1alias, table2 '
'AS t2alias WHERE t1alias.col1 = '
self.assert_compile(s,
'SELECT table1.col1, table1.col2, '
'table1.col3 FROM table1')
- select_copy = s.column('yyy')
+ select_copy = s.column(column('yyy'))
self.assert_compile(select_copy,
'SELECT table1.col1, table1.col2, '
'table1.col3, yyy FROM table1')
#! coding:utf-8
from sqlalchemy import Column, Integer, MetaData, String, Table,\
- bindparam, exc, func, insert, select
+ bindparam, exc, func, insert, select, column
from sqlalchemy.dialects import mysql, postgresql
from sqlalchemy.engine import default
from sqlalchemy.testing import AssertsCompiledSQL,\
def test_insert_from_select_union(self):
mytable = self.tables.mytable
- name = 'name'
- description = 'desc'
+ name = column('name')
+ description = column('desc')
sel = select(
[name, mytable.c.description],
).union(
ins,
"INSERT INTO mytable (name, description) "
"SELECT name, mytable.description FROM mytable "
- "UNION SELECT name, desc"
+ 'UNION SELECT name, "desc"'
)
def test_insert_from_select_col_values(self):
exc, sql, func, select, String, Integer, MetaData, and_, ForeignKey,
union, intersect, except_, union_all, VARCHAR, INT, CHAR, text, Sequence,
bindparam, literal, not_, type_coerce, literal_column, desc, asc,
- TypeDecorator, or_, cast)
+ TypeDecorator, or_, cast, table, column)
from sqlalchemy.engine import default, result as _result
from sqlalchemy.testing.schema import Table, Column
# this will create column() objects inside
# the select(), these need to match on name anyway
r = testing.db.execute(
- select(['user_id', 'user_name']).select_from('query_users').
- where('user_id=2')
+ select([
+ column('user_id'), column('user_name')
+ ]).select_from(table('query_users')).
+ where(text('user_id=2'))
).first()
self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
self.assert_(
# columns which the statement is against to be lightweight
# cols, which results in a more liberal comparison scheme
a, b = sql.column('a'), sql.column('b')
- stmt = select([a, b]).select_from("keyed2")
+ stmt = select([a, b]).select_from(table("keyed2"))
row = testing.db.execute(stmt).first()
assert keyed2.c.a in row
"""Test the TextClause and related constructs."""
-from sqlalchemy.testing import fixtures, AssertsCompiledSQL, eq_, assert_raises_message
+from sqlalchemy.testing import fixtures, AssertsCompiledSQL, eq_, \
+ assert_raises_message, expect_warnings
from sqlalchemy import text, select, Integer, String, Float, \
- bindparam, and_, func, literal_column, exc, MetaData, Table, Column
+ bindparam, and_, func, literal_column, exc, MetaData, Table, Column,\
+ asc, func, desc, union
from sqlalchemy.types import NullType
from sqlalchemy.sql import table, column
+from sqlalchemy import util
table1 = table('mytable',
column('myid', Integer),
def test_select_composition_one(self):
self.assert_compile(select(
- ["foobar(a)", "pk_foo_bar(syslaal)"],
- "a = 12",
- from_obj=["foobar left outer join lala on foobar.foo = lala.foo"]
+ [
+ literal_column("foobar(a)"),
+ literal_column("pk_foo_bar(syslaal)")
+ ],
+ text("a = 12"),
+ from_obj=[
+ text("foobar left outer join lala on foobar.foo = lala.foo")
+ ]
),
"SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar "
"left outer join lala on foobar.foo = lala.foo WHERE a = 12"
def test_select_composition_two(self):
s = select()
- s.append_column("column1")
- s.append_column("column2")
- s.append_whereclause("column1=12")
- s.append_whereclause("column2=19")
+ s.append_column(column("column1"))
+ s.append_column(column("column2"))
+ s.append_whereclause(text("column1=12"))
+ s.append_whereclause(text("column2=19"))
s = s.order_by("column1")
- s.append_from("table1")
+ s.append_from(text("table1"))
self.assert_compile(s, "SELECT column1, column2 FROM table1 WHERE "
"column1=12 AND column2=19 ORDER BY column1")
def test_select_composition_three(self):
self.assert_compile(
- select(["column1", "column2"],
+ select([column("column1"), column("column2")],
from_obj=table1).alias('somealias').select(),
"SELECT somealias.column1, somealias.column2 FROM "
"(SELECT column1, column2 FROM mytable) AS somealias"
def test_select_composition_four(self):
# test that use_labels doesn't interfere with literal columns
self.assert_compile(
- select(["column1", "column2", table1.c.myid], from_obj=table1,
- use_labels=True),
- "SELECT column1, column2, mytable.myid AS mytable_myid "
+ select([
+ text("column1"), column("column2"),
+ column("column3").label("bar"), table1.c.myid],
+ from_obj=table1,
+ use_labels=True),
+ "SELECT column1, column2, column3 AS bar, "
+ "mytable.myid AS mytable_myid "
"FROM mytable"
)
# test that use_labels doesn't interfere
# with literal columns that have textual labels
self.assert_compile(
- select(["column1 AS foobar", "column2 AS hoho", table1.c.myid],
- from_obj=table1, use_labels=True),
+ select([
+ text("column1 AS foobar"), text("column2 AS hoho"),
+ table1.c.myid],
+ from_obj=table1, use_labels=True),
"SELECT column1 AS foobar, column2 AS hoho, "
"mytable.myid AS mytable_myid FROM mytable"
)
# doesn't interfere with literal columns,
# exported columns don't get quoted
self.assert_compile(
- select(["column1 AS foobar", "column2 AS hoho", table1.c.myid],
- from_obj=[table1]).select(),
+ select([
+ literal_column("column1 AS foobar"),
+ literal_column("column2 AS hoho"), table1.c.myid],
+ from_obj=[table1]).select(),
"SELECT column1 AS foobar, column2 AS hoho, myid FROM "
"(SELECT column1 AS foobar, column2 AS hoho, "
"mytable.myid AS myid FROM mytable)"
def test_select_composition_seven(self):
self.assert_compile(
- select(['col1', 'col2'], from_obj='tablename').alias('myalias'),
+ select([
+ literal_column('col1'),
+ literal_column('col2')
+ ], from_obj=table('tablename')).alias('myalias'),
"SELECT col1, col2 FROM tablename"
)
def test_select_composition_eight(self):
self.assert_compile(select(
- [table1.alias('t'), "foo.f"],
- "foo.f = t.id",
- from_obj=["(select f from bar where lala=heyhey) foo"]
+ [table1.alias('t'), text("foo.f")],
+ text("foo.f = t.id"),
+ from_obj=[text("(select f from bar where lala=heyhey) foo")]
),
"SELECT t.myid, t.name, t.description, foo.f FROM mytable AS t, "
"(select f from bar where lala=heyhey) foo WHERE foo.f = t.id")
def test_select_bundle_columns(self):
self.assert_compile(select(
- [table1, table2.c.otherid, "sysdate()", "foo, bar, lala"],
+ [table1, table2.c.otherid,
+ text("sysdate()"), text("foo, bar, lala")],
and_(
- "foo.id = foofoo(lala)",
- "datetime(foo) = Today",
+ text("foo.id = foofoo(lala)"),
+ text("datetime(foo) = Today"),
table1.c.myid == table2.c.otherid,
)
),
set(t.element._bindparams),
set(["bat", "foo", "bar"])
)
+
+
+class TextWarningsTest(fixtures.TestBase, AssertsCompiledSQL):
+ __dialect__ = 'default'
+
+ def _test(self, fn, arg, offending_clause, expected):
+ assert_raises_message(
+ exc.SAWarning,
+ r"Textual (?:SQL|column|SQL FROM) expression %(stmt)r should be "
+ r"explicitly declared (?:with|as) text\(%(stmt)r\)" % {
+ "stmt": util.ellipses_string(offending_clause),
+ },
+ fn, arg
+ )
+
+ with expect_warnings("Textual "):
+ stmt = fn(arg)
+ self.assert_compile(stmt, expected)
+
+ def test_where(self):
+ self._test(
+ select([table1.c.myid]).where, "myid == 5", "myid == 5",
+ "SELECT mytable.myid FROM mytable WHERE myid == 5"
+ )
+
+ def test_column(self):
+ self._test(
+ select, ["myid"], "myid",
+ "SELECT myid"
+ )
+
+ def test_having(self):
+ self._test(
+ select([table1.c.myid]).having, "myid == 5", "myid == 5",
+ "SELECT mytable.myid FROM mytable HAVING myid == 5"
+ )
+
+ def test_from(self):
+ self._test(
+ select([table1.c.myid]).select_from, "mytable", "mytable",
+ "SELECT mytable.myid FROM mytable, mytable" # two FROMs
+ )
+
+
+class OrderByLabelResolutionTest(fixtures.TestBase, AssertsCompiledSQL):
+ __dialect__ = 'default'
+
+ def _test_warning(self, stmt, offending_clause, expected):
+ with expect_warnings(
+ "Can't resolve label reference %r;" % offending_clause):
+ self.assert_compile(
+ stmt,
+ expected
+ )
+ assert_raises_message(
+ exc.SAWarning,
+ "Can't resolve label reference %r; converting to text" %
+ offending_clause,
+ stmt.compile
+ )
+
+ def test_order_by_label(self):
+ stmt = select([table1.c.myid.label('foo')]).order_by('foo')
+ self.assert_compile(
+ stmt,
+ "SELECT mytable.myid AS foo FROM mytable ORDER BY foo"
+ )
+
+ def test_order_by_colname(self):
+ stmt = select([table1.c.myid]).order_by('name')
+ self.assert_compile(
+ stmt,
+ "SELECT mytable.myid FROM mytable ORDER BY mytable.name"
+ )
+
+ def test_order_by_alias_colname(self):
+ t1 = table1.alias()
+ stmt = select([t1.c.myid]).apply_labels().order_by('name')
+ self.assert_compile(
+ stmt,
+ "SELECT mytable_1.myid AS mytable_1_myid "
+ "FROM mytable AS mytable_1 ORDER BY mytable_1.name"
+ )
+
+ def test_unresolvable_warning_order_by(self):
+ stmt = select([table1.c.myid]).order_by('foobar')
+ self._test_warning(
+ stmt, "foobar",
+ "SELECT mytable.myid FROM mytable ORDER BY foobar"
+ )
+
+ def test_group_by_label(self):
+ stmt = select([table1.c.myid.label('foo')]).group_by('foo')
+ self.assert_compile(
+ stmt,
+ "SELECT mytable.myid AS foo FROM mytable GROUP BY foo"
+ )
+
+ def test_group_by_colname(self):
+ stmt = select([table1.c.myid]).group_by('name')
+ self.assert_compile(
+ stmt,
+ "SELECT mytable.myid FROM mytable GROUP BY mytable.name"
+ )
+
+ def test_unresolvable_warning_group_by(self):
+ stmt = select([table1.c.myid]).group_by('foobar')
+ self._test_warning(
+ stmt, "foobar",
+ "SELECT mytable.myid FROM mytable GROUP BY foobar"
+ )
+
+ def test_asc(self):
+ stmt = select([table1.c.myid]).order_by(asc('name'), 'description')
+ self.assert_compile(
+ stmt,
+ "SELECT mytable.myid FROM mytable "
+ "ORDER BY mytable.name ASC, mytable.description"
+ )
+
+ def test_group_by_subquery(self):
+ stmt = select([table1]).alias()
+ stmt = select([stmt]).apply_labels().group_by("myid")
+ self.assert_compile(
+ stmt,
+ "SELECT anon_1.myid AS anon_1_myid, anon_1.name AS anon_1_name, "
+ "anon_1.description AS anon_1_description FROM "
+ "(SELECT mytable.myid AS myid, mytable.name AS name, "
+ "mytable.description AS description FROM mytable) AS anon_1 "
+ "GROUP BY anon_1.myid"
+ )
+
+ def test_order_by_func_label_desc(self):
+ stmt = select([func.foo('bar').label('fb'), table1]).\
+ order_by(desc('fb'))
+
+ self.assert_compile(
+ stmt,
+ "SELECT foo(:foo_1) AS fb, mytable.myid, mytable.name, "
+ "mytable.description FROM mytable ORDER BY fb DESC"
+ )
+
+ def test_pg_distinct(self):
+ stmt = select([table1]).distinct('name')
+ self.assert_compile(
+ stmt,
+ "SELECT DISTINCT ON (mytable.name) mytable.myid, "
+ "mytable.name, mytable.description FROM mytable",
+ dialect="postgresql"
+ )
+
+ def test_over(self):
+ stmt = select([column("foo"), column("bar")])
+ stmt = select(
+ [func.row_number().
+ over(order_by='foo', partition_by='bar')]
+ ).select_from(stmt)
+
+ self.assert_compile(
+ stmt,
+ "SELECT row_number() OVER (PARTITION BY bar ORDER BY foo) "
+ "AS anon_1 FROM (SELECT foo, bar)"
+ )
+
+ def test_union_column(self):
+ s1 = select([table1])
+ s2 = select([table1])
+ stmt = union(s1, s2).order_by("name")
+ self.assert_compile(
+ stmt,
+ "SELECT mytable.myid, mytable.name, mytable.description FROM "
+ "mytable UNION SELECT mytable.myid, mytable.name, "
+ "mytable.description FROM mytable ORDER BY name"
+ )
+
+ def test_union_label(self):
+ s1 = select([func.foo("hoho").label('x')])
+ s2 = select([func.foo("Bar").label('y')])
+ stmt = union(s1, s2).order_by("x")
+ self.assert_compile(
+ stmt,
+ "SELECT foo(:foo_1) AS x UNION SELECT foo(:foo_2) AS y ORDER BY x"
+ )
+