Key Changes - Core
==================
+.. _change_4481:
+
+Coercion of string SQL fragments to text() fully removed
+---------------------------------------------------------
+
+The warnings that were first added in version 1.0, described at
+:ref:`migration_2992`, have now been converted into exceptions. Continued
+concerns have been raised regarding the automatic coercion of string fragments
+passed to methods like :meth:`.Query.filter` and :meth:`.Select.order_by` being
+converted to :func:`.text` constructs, even though this has emitted a warning.
+In the case of :meth:`.Select.order_by`, :meth:`.Query.order_by`,
+:meth:`.Select.group_by`, and :meth:`.Query.group_by`, a string label or column
+name is still resolved into the corresponding expression construct, however if
+the resolution fails, a :class:`.CompileError` is raised, thus preventing raw
+SQL text from being rendered directly.
+
+:ticket:`4481`
+
.. _change_4393_threadlocal:
"threadlocal" engine strategy deprecated
--- /dev/null
+.. change::
+ :tags: bug, sql
+ :tickets: 4467
+
+ Quoting is applied to :class:`.Function` names, those which are usually but
+ not necessarily generated from the :attr:`.sql.func` construct, at compile
+ time if they contain illegal characters, such as spaces or punctuation. The
+ names are as before treated as case insensitive however, meaning if the
+ names contain uppercase or mixed case characters, that alone does not
+ trigger quoting. The case insensitivity is currently maintained for
+ backwards compatibility.
+
--- /dev/null
+.. change::
+ :tags: bug, postgresql
+ :tickets: 4473
+
+ Fixed issue where using an uppercase name for an index type (e.g. GIST,
+ BTREE, etc. ) or an EXCLUDE constraint would treat it as an identifier to
+ be quoted, rather than rendering it as is. The new behavior converts these
+ types to lowercase and ensures they contain only valid SQL characters.
--- /dev/null
+.. change::
+ :tags: bug, sql
+ :tickets: 4481
+
+ Fully removed the behavior of strings passed directly as components of a
+ :func:`.select` or :class:`.Query` object being coerced to :func:`.text`
+ constructs automatically; the warning that has been emitted is now an
+ ArgumentError or in the case of order_by() / group_by() a CompileError.
+ This has emitted a warning since version 1.0 however its presence continues
+ to create concerns for the potential of mis-use of this behavior.
+
+ Note that public CVEs have been posted for order_by() / group_by() which
+ are resolved by this commit: CVE-2019-7164 CVE-2019-7548
+
+
+ .. seealso::
+
+ :ref:`change_4481`
\ No newline at end of file
--- /dev/null
+.. change::
+ :tags: bug, sql
+ :tickets: 4481
+
+ Added "SQL phrase validation" to key DDL phrases that are accepted as plain
+ strings, including :paramref:`.ForeignKeyConstraint.on_delete`,
+ :paramref:`.ForeignKeyConstraint.on_update`,
+ :paramref:`.ExcludeConstraint.using`,
+ :paramref:`.ForeignKeyConstraint.initially`, for areas where a series of SQL
+ keywords only are expected.Any non-space characters that suggest the phrase
+ would need to be quoted will raise a :class:`.CompileError`. This change
+ is related to the series of changes committed as part of :ticket:`4481`.
from .engine import engine_from_config # noqa nosort
-__version__ = '1.3.0b3'
+__version__ = "1.3.0b3"
def __go(lcls):
_python_UUID = None
+IDX_USING = re.compile(r"^(?:btree|hash|gist|gin|[\w_]+)$", re.I)
+
AUTOCOMMIT_REGEXP = re.compile(
r"\s*(?:UPDATE|INSERT|CREATE|DELETE|DROP|ALTER|GRANT|REVOKE|"
"IMPORT FOREIGN SCHEMA|REFRESH MATERIALIZED VIEW|TRUNCATE)",
using = index.dialect_options["postgresql"]["using"]
if using:
- text += "USING %s " % preparer.quote(using)
+ text += (
+ "USING %s "
+ % self.preparer.validate_sql_phrase(using, IDX_USING).lower()
+ )
ops = index.dialect_options["postgresql"]["ops"]
text += "(%s)" % (
"%s WITH %s" % (self.sql_compiler.process(expr, **kw), op)
)
text += "EXCLUDE USING %s (%s)" % (
- constraint.using,
+ self.preparer.validate_sql_phrase(
+ constraint.using, IDX_USING
+ ).lower(),
", ".join(elements),
)
if constraint.where is not None:
where = None
+ @elements._document_text_coercion(
+ "where",
+ ":class:`.ExcludeConstraint`",
+ ":paramref:`.ExcludeConstraint.where`",
+ )
def __init__(self, *elements, **kw):
r"""
Create an :class:`.ExcludeConstraint` object.
)
:param \*elements:
+
A sequence of two tuples of the form ``(column, operator)`` where
"column" is a SQL expression element or a raw SQL string, most
- typically a :class:`.Column` object,
- and "operator" is a string containing the operator to use.
-
- .. note::
-
- A plain string passed for the value of "column" is interpreted
- as an arbitrary SQL expression; when passing a plain string,
- any necessary quoting and escaping syntaxes must be applied
- manually. In order to specify a column name when a
- :class:`.Column` object is not available, while ensuring that
- any necessary quoting rules take effect, an ad-hoc
- :class:`.Column` or :func:`.sql.expression.column` object may
- be used.
+ typically a :class:`.Column` object, and "operator" is a string
+ containing the operator to use. In order to specify a column name
+ when a :class:`.Column` object is not available, while ensuring
+ that any necessary quoting rules take effect, an ad-hoc
+ :class:`.Column` or :func:`.sql.expression.column` object should be
+ used.
:param name:
Optional, the in-database name of this constraint.
If set, emit WHERE <predicate> when issuing DDL
for this constraint.
- .. note::
-
- A plain string passed here is interpreted as an arbitrary SQL
- expression; when passing a plain string, any necessary quoting
- and escaping syntaxes must be applied manually.
-
"""
columns = []
render_exprs = []
# backwards compat
self.operators[name] = operator
- expr = expression._literal_as_text(expr)
+ expr = expression._literal_as_column(expr)
render_exprs.append((expr, name, operator))
self._render_exprs = render_exprs
+
ColumnCollectionConstraint.__init__(
self,
*columns,
self.using = kw.get("using", "gist")
where = kw.get("where")
if where is not None:
- self.where = expression._literal_as_text(where)
+ self.where = expression._literal_as_text(
+ where, allow_coercion_to_text=True
+ )
def copy(self, **kw):
elements = [(col, self.operators[col]) for col in self.columns.keys()]
in order to execute the statement.
"""
- clause = expression._literal_as_text(clause)
+ clause = expression._literal_as_text(
+ clause, allow_coercion_to_text=True
+ )
if bind is None:
bind = self.get_bind(mapper, clause=clause, **kw)
)
LEGAL_CHARACTERS = re.compile(r"^[A-Z0-9_$]+$", re.I)
+LEGAL_CHARACTERS_PLUS_SPACE = re.compile(r"^[A-Z0-9_ $]+$", re.I)
ILLEGAL_INITIAL_CHARACTERS = {str(x) for x in range(0, 10)}.union(["$"])
+FK_ON_DELETE = re.compile(
+ r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I
+)
+FK_ON_UPDATE = re.compile(
+ r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I
+)
+FK_INITIALLY = re.compile(r"^(?:DEFERRED|IMMEDIATE)$", re.I)
BIND_PARAMS = re.compile(r"(?<![:\w\$\x5c]):([\w\$]+)(?![:\w\$])", re.UNICODE)
BIND_PARAMS_ESC = re.compile(r"\x5c(:[\w\$]*)(?![:\w\$])", re.UNICODE)
else:
col = with_cols[element.element]
except KeyError:
- # treat it like text()
- util.warn_limited(
- "Can't resolve label reference %r; converting to text()",
- util.ellipses_string(element.element),
+ elements._no_text_coercion(
+ element.element,
+ exc.CompileError,
+ "Can't resolve label reference for ORDER BY / GROUP BY.",
)
- return self.process(element._text_clause)
else:
kwargs["render_label_as_label"] = col
return self.process(
if func._has_args:
name += "%(expr)s"
else:
- name = func.name + "%(expr)s"
- return ".".join(list(func.packagenames) + [name]) % {
- "expr": self.function_argspec(func, **kwargs)
- }
+ name = func.name
+ name = (
+ self.preparer.quote(name)
+ if self.preparer._requires_quotes_illegal_chars(name)
+ else name
+ )
+ name = name + "%(expr)s"
+ return ".".join(
+ [
+ (
+ self.preparer.quote(tok)
+ if self.preparer._requires_quotes_illegal_chars(tok)
+ else tok
+ )
+ for tok in func.packagenames
+ ]
+ + [name]
+ ) % {"expr": self.function_argspec(func, **kwargs)}
def visit_next_value_func(self, next_value, **kw):
return self.visit_sequence(next_value.sequence)
def define_constraint_cascades(self, constraint):
text = ""
if constraint.ondelete is not None:
- text += " ON DELETE %s" % constraint.ondelete
+ text += " ON DELETE %s" % self.preparer.validate_sql_phrase(
+ constraint.ondelete, FK_ON_DELETE
+ )
if constraint.onupdate is not None:
- text += " ON UPDATE %s" % constraint.onupdate
+ text += " ON UPDATE %s" % self.preparer.validate_sql_phrase(
+ constraint.onupdate, FK_ON_UPDATE
+ )
return text
def define_constraint_deferrability(self, constraint):
else:
text += " NOT DEFERRABLE"
if constraint.initially is not None:
- text += " INITIALLY %s" % constraint.initially
+ text += " INITIALLY %s" % self.preparer.validate_sql_phrase(
+ constraint.initially, FK_INITIALLY
+ )
return text
def define_constraint_match(self, constraint):
return value.replace(self.escape_to_quote, self.escape_quote)
+ def validate_sql_phrase(self, element, reg):
+ """keyword sequence filter.
+
+ a filter for elements that are intended to represent keyword sequences,
+ such as "INITIALLY", "INTIALLY DEFERRED", etc. no special characters
+ should be present.
+
+ .. versionadded:: 1.3
+
+ """
+
+ if element is not None and not reg.match(element):
+ raise exc.CompileError(
+ "Unexpected SQL phrase: %r (matching against %r)"
+ % (element, reg.pattern)
+ )
+ return element
+
def quote_identifier(self, value):
"""Quote an identifier.
or (lc_value != value)
)
+ def _requires_quotes_illegal_chars(self, value):
+ """Return True if the given identifier requires quoting, but
+ not taking case convention into account."""
+ return not self.legal_characters.match(util.text_type(value))
+
def quote_schema(self, schema, force=None):
"""Conditionally quote a schema name.
return element._clone()
+def _document_text_coercion(paramname, meth_rst, param_rst):
+ return util.add_parameter_text(
+ paramname,
+ (
+ ".. warning:: "
+ "The %s argument to %s can be passed as a Python string argument, "
+ "which will be treated "
+ "as **trusted SQL text** and rendered as given. **DO NOT PASS "
+ "UNTRUSTED INPUT TO THIS PARAMETER**."
+ )
+ % (param_rst, meth_rst),
+ )
+
+
def collate(expression, collation):
"""Return the clause ``expression COLLATE collation``.
"refer to the :meth:`.TextClause.columns` method.",
),
)
+ @_document_text_coercion("text", ":func:`.text`", ":paramref:`.text.text`")
def _create_text(
self, text, bind=None, bindparams=None, typemap=None, autocommit=None
):
def _expression_literal_as_text(element):
- return _literal_as_text(element, warn=True)
+ return _literal_as_text(element)
-def _literal_as_text(element, warn=False):
+def _literal_as(element, text_fallback):
if isinstance(element, Visitable):
return element
elif hasattr(element, "__clause_element__"):
return element.__clause_element__()
elif isinstance(element, util.string_types):
- if warn:
- util.warn_limited(
- "Textual SQL expression %(expr)r should be "
- "explicitly declared as text(%(expr)r)",
- {"expr": util.ellipses_string(element)},
- )
-
- return TextClause(util.text_type(element))
+ return text_fallback(element)
elif isinstance(element, (util.NoneType, bool)):
return _const_expr(element)
else:
raise exc.ArgumentError(
- "SQL expression object or string expected, got object of type %r "
+ "SQL expression object expected, got object of type %r "
"instead" % type(element)
)
+def _literal_as_text(element, allow_coercion_to_text=False):
+ if allow_coercion_to_text:
+ return _literal_as(element, TextClause)
+ else:
+ return _literal_as(element, _no_text_coercion)
+
+
+def _literal_as_column(element):
+ return _literal_as(element, ColumnClause)
+
+
+def _no_column_coercion(element):
+ element = str(element)
+ guess_is_literal = not _guess_straight_column.match(element)
+ raise exc.ArgumentError(
+ "Textual column expression %(column)r should be "
+ "explicitly declared with text(%(column)r), "
+ "or use %(literal_column)s(%(column)r) "
+ "for more specificity"
+ % {
+ "column": util.ellipses_string(element),
+ "literal_column": "literal_column"
+ if guess_is_literal
+ else "column",
+ }
+ )
+
+
+def _no_text_coercion(element, exc_cls=exc.ArgumentError, extra=None):
+ raise exc_cls(
+ "%(extra)sTextual SQL expression %(expr)r should be "
+ "explicitly declared as text(%(expr)r)"
+ % {
+ "expr": util.ellipses_string(element),
+ "extra": "%s " % extra if extra else "",
+ }
+ )
+
+
def _no_literals(element):
if hasattr(element, "__clause_element__"):
return element.__clause_element__()
elif isinstance(element, (numbers.Number)):
return ColumnClause(str(element), is_literal=True)
else:
- element = str(element)
- # give into temptation, as this fact we are guessing about
- # is not one we've previously ever needed our users tell us;
- # but let them know we are not happy about it
- guess_is_literal = not _guess_straight_column.match(element)
- util.warn_limited(
- "Textual column expression %(column)r should be "
- "explicitly declared with text(%(column)r), "
- "or use %(literal_column)s(%(column)r) "
- "for more specificity",
- {
- "column": util.ellipses_string(element),
- "literal_column": "literal_column"
- if guess_is_literal
- else "column",
- },
- )
+ _no_column_coercion(element)
return ColumnClause(element, is_literal=guess_is_literal)
from .elements import _is_column # noqa
from .elements import _labeled # noqa
from .elements import _literal_as_binds # noqa
+from .elements import _literal_as_column # noqa
from .elements import _literal_as_label_reference # noqa
from .elements import _literal_as_text # noqa
from .elements import _only_column_elements # noqa
from .base import DialectKWArgs
from .base import SchemaEventTarget
from .elements import _as_truncated
+from .elements import _document_text_coercion
from .elements import _literal_as_text
from .elements import ClauseElement
from .elements import ColumnClause
_allow_multiple_tables = True
+ @_document_text_coercion(
+ "sqltext",
+ ":class:`.CheckConstraint`",
+ ":paramref:`.CheckConstraint.sqltext`",
+ )
def __init__(
self,
sqltext,
"""
- self.sqltext = _literal_as_text(sqltext, warn=False)
+ self.sqltext = _literal_as_text(sqltext, allow_coercion_to_text=True)
columns = []
visitors.traverse(self.sqltext, {}, {"column": columns.append})
from .elements import _clone
from .elements import _cloned_difference
from .elements import _cloned_intersection
+from .elements import _document_text_coercion
from .elements import _expand_cloned
from .elements import _interpret_as_column_or_from
from .elements import _literal_and_labels_as_label_reference
from .elements import _literal_as_label_reference
from .elements import _literal_as_text
+from .elements import _no_text_coercion
from .elements import _select_iterables
from .elements import and_
from .elements import BindParameter
from .elements import ClauseList
from .elements import Grouping
from .elements import literal_column
-from .elements import TextClause
from .elements import True_
from .elements import UnaryExpression
from .. import exc
insp = inspection.inspect(element, raiseerr=False)
if insp is None:
if isinstance(element, util.string_types):
- util.warn_limited(
- "Textual SQL FROM expression %(expr)r should be "
- "explicitly declared as text(%(expr)r), "
- "or use table(%(expr)r) for more specificity",
- {"expr": util.ellipses_string(element)},
- )
-
- return TextClause(util.text_type(element))
+ _no_text_coercion(element)
try:
return insp.selectable
except AttributeError:
_prefixes = ()
@_generative
+ @_document_text_coercion(
+ "expr",
+ ":meth:`.HasPrefixes.prefix_with`",
+ ":paramref:`.HasPrefixes.prefix_with.*expr`",
+ )
def prefix_with(self, *expr, **kw):
r"""Add one or more expressions following the statement keyword, i.e.
SELECT, INSERT, UPDATE, or DELETE. Generative.
def _setup_prefixes(self, prefixes, dialect=None):
self._prefixes = self._prefixes + tuple(
- [(_literal_as_text(p, warn=False), dialect) for p in prefixes]
+ [
+ (_literal_as_text(p, allow_coercion_to_text=True), dialect)
+ for p in prefixes
+ ]
)
_suffixes = ()
@_generative
+ @_document_text_coercion(
+ "expr",
+ ":meth:`.HasSuffixes.suffix_with`",
+ ":paramref:`.HasSuffixes.suffix_with.*expr`",
+ )
def suffix_with(self, *expr, **kw):
r"""Add one or more expressions following the statement as a whole.
def _setup_suffixes(self, suffixes, dialect=None):
self._suffixes = self._suffixes + tuple(
- [(_literal_as_text(p, warn=False), dialect) for p in suffixes]
+ [
+ (_literal_as_text(p, allow_coercion_to_text=True), dialect)
+ for p in suffixes
+ ]
)
from .deprecations import pending_deprecation # noqa
from .deprecations import warn_deprecated # noqa
from .deprecations import warn_pending_deprecation # noqa
+from .langhelpers import add_parameter_text # noqa
from .langhelpers import as_interface # noqa
from .langhelpers import asbool # noqa
from .langhelpers import asint # noqa
functionality."""
import re
-import textwrap
import warnings
from . import compat
from .langhelpers import decorator
+from .langhelpers import inject_docstring_text
+from .langhelpers import inject_param_text
from .. import exc
decorated.__doc__ = doc
decorated._sa_warn = lambda: warnings.warn(message, wtype, stacklevel=3)
return decorated
-
-
-def _dedent_docstring(text):
- split_text = text.split("\n", 1)
- if len(split_text) == 1:
- return text
- else:
- firstline, remaining = split_text
- if not firstline.startswith(" "):
- return firstline + "\n" + textwrap.dedent(remaining)
- else:
- return textwrap.dedent(text)
-
-
-def inject_docstring_text(doctext, injecttext, pos):
- doctext = _dedent_docstring(doctext or "")
- lines = doctext.split("\n")
- injectlines = textwrap.dedent(injecttext).split("\n")
- if injectlines[0]:
- injectlines.insert(0, "")
-
- blanks = [num for num, line in enumerate(lines) if not line.strip()]
- blanks.insert(0, 0)
-
- inject_pos = blanks[min(pos, len(blanks) - 1)]
-
- lines = lines[0:inject_pos] + injectlines + lines[inject_pos:]
- return "\n".join(lines)
-
-
-def inject_param_text(doctext, inject_params):
- doclines = doctext.splitlines()
- lines = []
-
- to_inject = None
- while doclines:
- line = doclines.pop(0)
- if to_inject is None:
- m = re.match(r"(\s+):param (.+?):", line)
- if m:
- param = m.group(2)
- if param in inject_params:
- # default indent to that of :param: plus one
- indent = " " * len(m.group(1)) + " "
-
- # but if the next line has text, use that line's
- # indentntation
- if doclines:
- m2 = re.match(r"(\s+)\S", doclines[0])
- if m2:
- indent = " " * len(m2.group(1))
-
- to_inject = indent + inject_params[param]
- elif not line.rstrip():
- lines.append(line)
- lines.append(to_inject)
- lines.append("\n")
- to_inject = None
- lines.append(line)
-
- return "\n".join(lines)
import operator
import re
import sys
+import textwrap
import types
import warnings
idx += 1
return ["".join(token) for token in result]
+
+
+def add_parameter_text(params, text):
+ params = _collections.to_list(params)
+
+ def decorate(fn):
+ doc = fn.__doc__ is not None and fn.__doc__ or ""
+ if doc:
+ doc = inject_param_text(doc, {param: text for param in params})
+ fn.__doc__ = doc
+ return fn
+
+ return decorate
+
+
+def _dedent_docstring(text):
+ split_text = text.split("\n", 1)
+ if len(split_text) == 1:
+ return text
+ else:
+ firstline, remaining = split_text
+ if not firstline.startswith(" "):
+ return firstline + "\n" + textwrap.dedent(remaining)
+ else:
+ return textwrap.dedent(text)
+
+
+def inject_docstring_text(doctext, injecttext, pos):
+ doctext = _dedent_docstring(doctext or "")
+ lines = doctext.split("\n")
+ injectlines = textwrap.dedent(injecttext).split("\n")
+ if injectlines[0]:
+ injectlines.insert(0, "")
+
+ blanks = [num for num, line in enumerate(lines) if not line.strip()]
+ blanks.insert(0, 0)
+
+ inject_pos = blanks[min(pos, len(blanks) - 1)]
+
+ lines = lines[0:inject_pos] + injectlines + lines[inject_pos:]
+ return "\n".join(lines)
+
+
+def inject_param_text(doctext, inject_params):
+ doclines = doctext.splitlines()
+ lines = []
+
+ to_inject = None
+ while doclines:
+ line = doclines.pop(0)
+ if to_inject is None:
+ m = re.match(r"(\s+):param (?:\\\*\*?)?(.+?):", line)
+ if m:
+ param = m.group(2)
+ if param in inject_params:
+ # default indent to that of :param: plus one
+ indent = " " * len(m.group(1)) + " "
+
+ # but if the next line has text, use that line's
+ # indentntation
+ if doclines:
+ m2 = re.match(r"(\s+)\S", doclines[0])
+ if m2:
+ indent = " " * len(m2.group(1))
+
+ to_inject = indent + inject_params[param]
+ elif line.lstrip().startswith(":param "):
+ lines.append("\n")
+ lines.append(to_inject)
+ lines.append("\n")
+ to_inject = None
+ elif not line.rstrip():
+ lines.append(line)
+ lines.append(to_inject)
+ lines.append("\n")
+ to_inject = None
+ lines.append(line)
+
+ return "\n".join(lines)
Column(
"id",
Integer,
- ForeignKey("t1.id", deferrable=True, initially="XYZ"),
+ ForeignKey("t1.id", deferrable=True, initially="DEFERRED"),
primary_key=True,
),
)
schema.CreateTable(t2),
"CREATE TABLE t2 (id INTEGER NOT NULL, "
"PRIMARY KEY (id), FOREIGN KEY(id) REFERENCES t1 (id) "
- "DEFERRABLE INITIALLY XYZ)",
+ "DEFERRABLE INITIALLY DEFERRED)",
)
def test_match_kw_raises(self):
"WITH (buffering = off)",
)
+ def test_create_index_with_using_unusual_conditions(self):
+ m = MetaData()
+ tbl = Table("testtbl", m, Column("data", String))
+
+ self.assert_compile(
+ schema.CreateIndex(
+ Index("test_idx1", tbl.c.data, postgresql_using="GIST")
+ ),
+ "CREATE INDEX test_idx1 ON testtbl " "USING gist (data)",
+ )
+
+ self.assert_compile(
+ schema.CreateIndex(
+ Index(
+ "test_idx1",
+ tbl.c.data,
+ postgresql_using="some_custom_method",
+ )
+ ),
+ "CREATE INDEX test_idx1 ON testtbl "
+ "USING some_custom_method (data)",
+ )
+
+ assert_raises_message(
+ exc.CompileError,
+ "Unexpected SQL phrase: 'gin invalid sql'",
+ schema.CreateIndex(
+ Index(
+ "test_idx2", tbl.c.data, postgresql_using="gin invalid sql"
+ )
+ ).compile,
+ dialect=postgresql.dialect(),
+ )
+
def test_create_index_with_tablespace(self):
m = MetaData()
tbl = Table("testtbl", m, Column("data", String))
"(room::TEXT WITH =)",
)
+ def test_exclude_constraint_colname_needs_quoting(self):
+ m = MetaData()
+ cons = ExcludeConstraint(("Some Column Name", "="))
+ Table("testtbl", m, Column("Some Column Name", String), cons)
+ self.assert_compile(
+ schema.AddConstraint(cons),
+ "ALTER TABLE testtbl ADD EXCLUDE USING gist "
+ '("Some Column Name" WITH =)',
+ )
+
+ def test_exclude_constraint_with_using_unusual_conditions(self):
+ m = MetaData()
+ cons = ExcludeConstraint(("q", "="), using="not a keyword")
+ Table("testtbl", m, Column("q", String), cons)
+ assert_raises_message(
+ exc.CompileError,
+ "Unexpected SQL phrase: 'not a keyword'",
+ schema.AddConstraint(cons).compile,
+ dialect=postgresql.dialect(),
+ )
+
def test_exclude_constraint_cast(self):
m = MetaData()
tbl = Table("testtbl", m, Column("room", String))
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import eq_
-from sqlalchemy.testing import expect_warnings
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import in_
from sqlalchemy.testing import is_
.order_by("email_address")
)
- with expect_warnings("Can't resolve label reference 'email_address'"):
- self.assert_compile(
- q,
- "SELECT users.id AS users_id, users.name AS users_name, "
- "addresses_1.id AS addresses_1_id, addresses_1.user_id AS "
- "addresses_1_user_id, addresses_1.email_address AS "
- "addresses_1_email_address FROM users LEFT OUTER JOIN "
- "addresses AS addresses_1 ON users.id = addresses_1.user_id "
- "ORDER BY email_address",
- )
+ assert_raises_message(
+ sa.exc.CompileError,
+ "Can't resolve label reference for ORDER BY / GROUP BY.",
+ q.all,
+ )
def test_deferred_fk_col(self):
users, Dingaling, User, dingalings, Address, addresses = (
from sqlalchemy.orm.util import with_parent
from sqlalchemy.sql import expression
from sqlalchemy.sql import operators
-from sqlalchemy.testing import assert_warnings
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
ua = aliased(User)
q = s.query(ua).order_by("email_ad")
- def go():
- self.assert_compile(
- q,
- "SELECT (SELECT max(addresses.email_address) AS max_1 "
- "FROM addresses WHERE addresses.user_id = users_1.id) "
- "AS anon_1, users_1.id AS users_1_id, "
- "users_1.name AS users_1_name FROM users AS users_1 "
- "ORDER BY email_ad",
- )
-
- assert_warnings(
- go, ["Can't resolve label reference 'email_ad'"], regex=True
+ assert_raises_message(
+ sa.exc.CompileError,
+ "Can't resolve label reference for ORDER BY / GROUP BY",
+ q.with_labels().statement.compile,
)
def test_order_by_column_labeled_prop_attr_aliased_one(self):
def test_fulltext(self):
User = self.classes.User
- with expect_warnings("Textual SQL"):
- eq_(
- create_session()
- .query(User)
- .from_statement("select * from users order by id")
- .all(),
- [User(id=7), User(id=8), User(id=9), User(id=10)],
- )
+ assert_raises_message(
+ sa_exc.ArgumentError,
+ "Textual SQL expression",
+ create_session().query(User).from_statement,
+ "select * from users order by id",
+ )
eq_(
create_session()
def test_binds_coerce(self):
User = self.classes.User
- with expect_warnings("Textual SQL expression"):
- eq_(
- create_session()
- .query(User)
- .filter("id in (:id1, :id2)")
- .params(id1=8, id2=9)
- .all(),
- [User(id=8), User(id=9)],
- )
+ assert_raises_message(
+ sa_exc.ArgumentError,
+ r"Textual SQL expression 'id in \(:id1, :id2\)' "
+ "should be explicitly declared",
+ create_session().query(User).filter,
+ "id in (:id1, :id2)",
+ )
def test_as_column(self):
User = self.classes.User
# the queries here are again "invalid" from a SQL perspective, as the
# "name" field isn't matched up to anything.
#
- with expect_warnings("Can't resolve label reference 'name';"):
- self.assert_compile(
- s.query(User)
- .options(joinedload("addresses"))
- .order_by(desc("name"))
- .limit(1),
- "SELECT anon_1.users_id AS anon_1_users_id, "
- "anon_1.users_name AS anon_1_users_name, "
- "addresses_1.id AS addresses_1_id, "
- "addresses_1.user_id AS addresses_1_user_id, "
- "addresses_1.email_address AS addresses_1_email_address "
- "FROM (SELECT users.id AS users_id, users.name AS users_name "
- "FROM users ORDER BY users.name "
- "DESC LIMIT :param_1) AS anon_1 "
- "LEFT OUTER JOIN addresses AS addresses_1 "
- "ON anon_1.users_id = addresses_1.user_id "
- "ORDER BY name DESC, addresses_1.id",
- )
+
+ q = (
+ s.query(User)
+ .options(joinedload("addresses"))
+ .order_by(desc("name"))
+ .limit(1)
+ )
+ assert_raises_message(
+ sa_exc.CompileError,
+ "Can't resolve label reference for ORDER BY / GROUP BY.",
+ q.with_labels().statement.compile,
+ )
def test_order_by_w_eager_two(self):
User = self.classes.User
s = create_session()
- with expect_warnings("Can't resolve label reference 'name';"):
- self.assert_compile(
- s.query(User)
- .options(joinedload("addresses"))
- .order_by("name")
- .limit(1),
- "SELECT anon_1.users_id AS anon_1_users_id, "
- "anon_1.users_name AS anon_1_users_name, "
- "addresses_1.id AS addresses_1_id, "
- "addresses_1.user_id AS addresses_1_user_id, "
- "addresses_1.email_address AS addresses_1_email_address "
- "FROM (SELECT users.id AS users_id, users.name AS users_name "
- "FROM users ORDER BY users.name "
- "LIMIT :param_1) AS anon_1 "
- "LEFT OUTER JOIN addresses AS addresses_1 "
- "ON anon_1.users_id = addresses_1.user_id "
- "ORDER BY name, addresses_1.id",
- )
+ q = (
+ s.query(User)
+ .options(joinedload("addresses"))
+ .order_by("name")
+ .limit(1)
+ )
+ assert_raises_message(
+ sa_exc.CompileError,
+ "Can't resolve label reference for ORDER BY / GROUP BY.",
+ q.with_labels().statement.compile,
+ )
def test_order_by_w_eager_three(self):
User = self.classes.User
.limit(1)
.offset(0)
)
- with expect_warnings(
- "Can't resolve label reference 'email_address desc'"
- ):
- eq_(
- [
- (
- User(
- id=7,
- orders=[Order(id=1), Order(id=3), Order(id=5)],
- addresses=[Address(id=1)],
- ),
- "jack@bean.com",
- )
- ],
- result.all(),
- )
+
+ assert_raises_message(
+ sa_exc.CompileError,
+ "Can't resolve label reference for ORDER BY / GROUP BY",
+ result.all,
+ )
-class TextWarningTest(QueryTest, AssertsCompiledSQL):
- def _test(self, fn, arg, offending_clause, expected):
+class TextErrorTest(QueryTest, AssertsCompiledSQL):
+ def _test(self, fn, arg, offending_clause):
assert_raises_message(
- sa.exc.SAWarning,
+ sa.exc.ArgumentError,
r"Textual (?:SQL|column|SQL FROM) expression %(stmt)r should be "
r"explicitly declared (?:with|as) text\(%(stmt)r\)"
% {"stmt": util.ellipses_string(offending_clause)},
arg,
)
- with expect_warnings("Textual "):
- stmt = fn(arg)
- self.assert_compile(stmt, expected)
-
def test_filter(self):
User = self.classes.User
- self._test(
- Session().query(User.id).filter,
- "myid == 5",
- "myid == 5",
- "SELECT users.id AS users_id FROM users WHERE myid == 5",
- )
+ self._test(Session().query(User.id).filter, "myid == 5", "myid == 5")
def test_having(self):
User = self.classes.User
- self._test(
- Session().query(User.id).having,
- "myid == 5",
- "myid == 5",
- "SELECT users.id AS users_id FROM users HAVING myid == 5",
- )
+ self._test(Session().query(User.id).having, "myid == 5", "myid == 5")
def test_from_statement(self):
User = self.classes.User
Session().query(User.id).from_statement,
"select id from user",
"select id from user",
- "select id from user",
)
schema_translate_map=schema_translate_map,
)
+ def test_fk_render(self):
+ a = Table("a", MetaData(), Column("q", Integer))
+ b = Table("b", MetaData(), Column("p", Integer))
+
+ self.assert_compile(
+ schema.AddConstraint(
+ schema.ForeignKeyConstraint([a.c.q], [b.c.p])
+ ),
+ "ALTER TABLE a ADD FOREIGN KEY(q) REFERENCES b (p)",
+ )
+
+ self.assert_compile(
+ schema.AddConstraint(
+ schema.ForeignKeyConstraint(
+ [a.c.q], [b.c.p], onupdate="SET NULL", ondelete="CASCADE"
+ )
+ ),
+ "ALTER TABLE a ADD FOREIGN KEY(q) REFERENCES b (p) "
+ "ON DELETE CASCADE ON UPDATE SET NULL",
+ )
+
+ self.assert_compile(
+ schema.AddConstraint(
+ schema.ForeignKeyConstraint(
+ [a.c.q], [b.c.p], initially="DEFERRED"
+ )
+ ),
+ "ALTER TABLE a ADD FOREIGN KEY(q) REFERENCES b (p) "
+ "INITIALLY DEFERRED",
+ )
+
+ def test_fk_illegal_sql_phrases(self):
+ a = Table("a", MetaData(), Column("q", Integer))
+ b = Table("b", MetaData(), Column("p", Integer))
+
+ for kw in ("onupdate", "ondelete", "initially"):
+ for phrase in (
+ "NOT SQL",
+ "INITALLY NOT SQL",
+ "FOO RESTRICT",
+ "CASCADE WRONG",
+ "SET NULL",
+ ):
+ const = schema.AddConstraint(
+ schema.ForeignKeyConstraint(
+ [a.c.q], [b.c.p], **{kw: phrase}
+ )
+ )
+ assert_raises_message(
+ exc.CompileError,
+ r"Unexpected SQL phrase: '%s'" % phrase,
+ const.compile,
+ )
+
class SchemaTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = "default"
):
assert_raises_message(
sa.exc.ArgumentError,
- "SQL expression object or string expected, got object of type "
+ "SQL expression object expected, got object of type "
"<.* 'list'> instead",
t.select,
[const],
def test_underscores(self):
self.assert_compile(func.if_(), "if()")
+ def test_underscores_packages(self):
+ self.assert_compile(func.foo_.bar_.if_(), "foo.bar.if()")
+
+ def test_uppercase(self):
+ # for now, we need to keep case insensitivity
+ self.assert_compile(func.NOW(), "NOW()")
+
+ def test_uppercase_packages(self):
+ # for now, we need to keep case insensitivity
+ self.assert_compile(func.FOO.BAR.NOW(), "FOO.BAR.NOW()")
+
+ def test_mixed_case(self):
+ # for now, we need to keep case insensitivity
+ self.assert_compile(func.SomeFunction(), "SomeFunction()")
+
+ def test_mixed_case_packages(self):
+ # for now, we need to keep case insensitivity
+ self.assert_compile(
+ func.Foo.Bar.SomeFunction(), "Foo.Bar.SomeFunction()"
+ )
+
+ def test_quote_special_chars(self):
+ # however we need to be quoting any other identifiers
+ self.assert_compile(
+ getattr(func, "im a function")(), '"im a function"()'
+ )
+
+ def test_quote_special_chars_packages(self):
+ # however we need to be quoting any other identifiers
+ self.assert_compile(
+ getattr(
+ getattr(getattr(func, "im foo package"), "im bar package"),
+ "im a function",
+ )(),
+ '"im foo package"."im bar package"."im a function"()',
+ )
+
def test_generic_now(self):
assert isinstance(func.now().type, sqltypes.DateTime)
from sqlalchemy.sql import table
from sqlalchemy.sql import util as sql_util
from sqlalchemy.testing import assert_raises_message
-from sqlalchemy.testing import assert_warnings
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import eq_
-from sqlalchemy.testing import expect_warnings
from sqlalchemy.testing import fixtures
from sqlalchemy.types import NullType
eq_(set(t.element._bindparams), set(["bat", "foo", "bar"]))
-class TextWarningsTest(fixtures.TestBase, AssertsCompiledSQL):
+class TextErrorsTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = "default"
- def _test(self, fn, arg, offending_clause, expected):
- with expect_warnings("Textual "):
- stmt = fn(arg)
- self.assert_compile(stmt, expected)
-
+ def _test(self, fn, arg, offending_clause):
assert_raises_message(
- exc.SAWarning,
+ exc.ArgumentError,
r"Textual (?:SQL|column|SQL FROM) expression %(stmt)r should be "
r"explicitly declared (?:with|as) text\(%(stmt)r\)"
% {"stmt": util.ellipses_string(offending_clause)},
)
def test_where(self):
- self._test(
- select([table1.c.myid]).where,
- "myid == 5",
- "myid == 5",
- "SELECT mytable.myid FROM mytable WHERE myid == 5",
- )
+ self._test(select([table1.c.myid]).where, "myid == 5", "myid == 5")
def test_column(self):
- self._test(select, ["myid"], "myid", "SELECT myid")
+ self._test(select, ["myid"], "myid")
def test_having(self):
- self._test(
- select([table1.c.myid]).having,
- "myid == 5",
- "myid == 5",
- "SELECT mytable.myid FROM mytable HAVING myid == 5",
- )
+ self._test(select([table1.c.myid]).having, "myid == 5", "myid == 5")
def test_from(self):
- self._test(
- select([table1.c.myid]).select_from,
- "mytable",
- "mytable",
- "SELECT mytable.myid FROM mytable, mytable", # two FROMs
- )
+ self._test(select([table1.c.myid]).select_from, "mytable", "mytable")
class OrderByLabelResolutionTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = "default"
- def _test_warning(self, stmt, offending_clause, expected):
- with expect_warnings(
- "Can't resolve label reference %r;" % offending_clause
- ):
- self.assert_compile(stmt, expected)
+ def _test_exception(self, stmt, offending_clause):
assert_raises_message(
- exc.SAWarning,
- "Can't resolve label reference %r; converting to text"
- % offending_clause,
+ exc.CompileError,
+ r"Can't resolve label reference for ORDER BY / GROUP BY. "
+ "Textual SQL "
+ "expression %r should be explicitly "
+ r"declared as text\(%r\)" % (offending_clause, offending_clause),
stmt.compile,
)
def test_unresolvable_warning_order_by(self):
stmt = select([table1.c.myid]).order_by("foobar")
- self._test_warning(
- stmt, "foobar", "SELECT mytable.myid FROM mytable ORDER BY foobar"
- )
+ self._test_exception(stmt, "foobar")
def test_group_by_label(self):
stmt = select([table1.c.myid.label("foo")]).group_by("foo")
def test_unresolvable_warning_group_by(self):
stmt = select([table1.c.myid]).group_by("foobar")
- self._test_warning(
- stmt, "foobar", "SELECT mytable.myid FROM mytable GROUP BY foobar"
- )
+ self._test_exception(stmt, "foobar")
def test_asc(self):
stmt = select([table1.c.myid]).order_by(asc("name"), "description")
.order_by("myid", "t1name", "x")
)
- def go():
- # the labels here are anonymized, so label naming
- # can't catch these.
- self.assert_compile(
- s1,
- "SELECT mytable_1.myid AS mytable_1_myid, "
- "mytable_1.name AS name_1, foo(:foo_2) AS foo_1 "
- "FROM mytable AS mytable_1 ORDER BY mytable_1.myid, t1name, x",
- )
-
- assert_warnings(
- go,
- [
- "Can't resolve label reference 't1name'",
- "Can't resolve label reference 'x'",
- ],
- regex=True,
+ assert_raises_message(
+ exc.CompileError,
+ r"Can't resolve label reference for ORDER BY / GROUP BY. "
+ "Textual SQL "
+ "expression 't1name' should be explicitly "
+ r"declared as text\('t1name'\)",
+ s1.compile,
)
def test_columnadapter_non_anonymized(self):