-# See https://pre-commit.com for more information
-# See https://pre-commit.com/hooks.html for more hooks
+
+default_language_version:
+ python: python3.14
+
repos:
- repo: https://github.com/python/black
rev: 25.11.0
New Features and Improvements - Core
=====================================
+.. _change_12548:
+
+Template String (t-string) Support for Python 3.14+
+----------------------------------------------------
+
+SQLAlchemy 2.1 adds support for Python 3.14+ template strings (t-strings)
+via the new :func:`_sql.tstring` construct, as defined in :pep:`750`.
+This feature provides a more ergonomic way to construct SQL statements by
+automatically interpolating Python values and SQLAlchemy expressions within
+template strings.
+
+The :func:`_sql.tstring` function works similarly to :func:`_sql.text`, but
+automatically handles different types of interpolated values:
+
+* **String literals** from the template are rendered directly as SQL
+* **SQLAlchemy expressions** (columns, functions, subqueries, etc.) are
+ embedded as clause elements
+* **Plain Python values** are automatically wrapped with :func:`_sql.literal`
+
+Example usage::
+
+ from sqlalchemy import tstring, select, literal, JSON
+
+ # Python values become bound values
+ user_id = 42
+ stmt = tstring(t"SELECT * FROM users WHERE id = {user_id}")
+ # renders: SELECT * FROM users WHERE id = :param_1
+
+ # SQLAlchemy expressions are embedded
+ from sqlalchemy import table, column
+
+ stmt = tstring(t"SELECT {column('q')} FROM {table('t')}")
+ # renders: SELECT q FROM t
+
+ # Apply explicit SQL types to bound values using literal()
+ some_json = {"foo": "bar"}
+ stmt = tstring(t"SELECT {literal(some_json, JSON)}")
+
+Like :func:`_sql.text`, the :class:`_sql.TString` construct supports the
+:meth:`_sql.TString.columns` method to specify return columns and their types::
+
+ from sqlalchemy import column, Integer, String
+
+ stmt = tstring(t"SELECT id, name FROM users").columns(
+ column("id", Integer), column("name", String)
+ )
+
+ for id, name in connection.execute(stmt):
+ print(id, name)
+
+The :func:`_sql.tstring` construct is fully compatible with SQLAlchemy's
+statement caching system. Statements with the same structure but different
+literal values will share the same cache key, providing optimal performance.
+
+.. seealso::
+
+ :func:`_sql.tstring`
+
+ :class:`_sql.TString`
+
+ `PEP 750 <https://peps.python.org/pep-0750/>`_ - Template Strings
+
+:ticket:`12548`
+
.. _change_10635:
``Row`` now represents individual column types directly without ``Tuple``
--- /dev/null
+.. change::
+ :tags: feature, sql
+ :tickets: 12548
+
+ Added support for Python 3.14+ template strings (t-strings) via the new
+ :func:`_sql.tstring` construct. This feature makes use of Python 3.14
+ template strings as defined in :pep:`750`, allowing for ergonomic SQL
+ statement construction by automatically interpolating Python values and
+ SQLAlchemy expressions within template strings.
+
+ .. seealso::
+
+ :ref:`change_12548` - in :ref:`migration_21_toplevel`
.. autofunction:: text
+.. autofunction:: tstring
+
.. autofunction:: true
.. autofunction:: try_cast
.. autoclass:: TextClause
:members:
+ :inherited-members: columns
+
+.. autoclass:: TString
+ :members:
+ :inherited-members: columns
.. autoclass:: TryCast
:members:
from .sql.expression import True_ as True_
from .sql.expression import try_cast as try_cast
from .sql.expression import TryCast as TryCast
+from .sql.expression import TString as TString
+from .sql.expression import tstring as tstring
from .sql.expression import Tuple as Tuple
from .sql.expression import tuple_ as tuple_
from .sql.expression import type_coerce as type_coerce
self.order_by = None
- if isinstance(self.statement, expression.TextClause):
- # TextClause has no "column" objects at all. for this case,
- # we generate columns from our _QueryEntity objects, then
- # flip on all the "please match no matter what" parameters.
+ if self.statement._is_text_clause:
+ # AbstractTextClause (TextClause, TString) has no "column"
+ # objects at all. for this case, we generate columns from our
+ # _QueryEntity objects, then flip on all the
+ # "please match no matter what" parameters.
self.extra_criteria_entities = {}
for entity in self._entities:
@_generative
@_assertions(_no_clauseelement_condition)
- def from_statement(self, statement: ExecutableReturnsRows) -> Self:
+ def from_statement(self, statement: roles.SelectStatementRole) -> Self:
"""Execute the given SELECT statement and return results.
This method bypasses all internal statement compilation, and the
:meth:`_sql.Select.from_statement` - v2 comparable method.
"""
- statement = coercions.expect(
+ _statement = coercions.expect(
roles.SelectStatementRole, statement, apply_propagate_attrs=self
)
- self._statement = statement
+ self._statement = _statement
return self
def first(self) -> Optional[_T]:
from .expression import TableSample as TableSample
from .expression import tablesample as tablesample
from .expression import text as text
+from .expression import TextClause as TextClause
from .expression import true as true
from .expression import True_ as True_
from .expression import try_cast as try_cast
+from .expression import TString as TString
+from .expression import tstring as tstring
from .expression import tuple_ as tuple_
from .expression import type_coerce as type_coerce
from .expression import union as union
from .elements import TextClause
from .elements import True_
from .elements import TryCast
+from .elements import TString
from .elements import Tuple
from .elements import TypeCoerce
from .elements import UnaryExpression
from .functions import FunctionElement
if typing.TYPE_CHECKING:
+ from string.templatelib import Template
+
from ._typing import _ByArgument
from ._typing import _ColumnExpressionArgument
from ._typing import _ColumnExpressionOrLiteralArgument
return TextClause(text)
+def tstring(template: Template) -> TString:
+ r"""Construct a new :class:`_expression.TString` clause,
+ representing a SQL template string using Python 3.14+ t-strings.
+
+ .. versionadded:: 2.1
+
+ E.g.::
+
+ from sqlalchemy import tstring
+
+ a = 5
+ b = 10
+ stmt = tstring(t"select {a}, {b}")
+ result = connection.execute(stmt)
+
+ The :func:`_expression.tstring` function accepts a Python 3.14+
+ template string (t-string) and processes it to create a SQL statement.
+ Unlike :func:`_expression.text`, which requires manual bind parameter
+ specification, :func:`_expression.tstring` automatically handles
+ interpolation of Python values and SQLAlchemy expressions.
+
+ **Interpolation Behavior**:
+
+ - **SQL content** expressed in the plain string portions of the template
+ are rendered directly as SQL
+ - **SQLAlchemy expressions** (columns, functions, etc.) are embedded
+ as clause elements
+ - **Plain Python values** are automatically wrapped in
+ :func:`_expression.literal`
+
+ For example::
+
+ from sqlalchemy import tstring, select, literal, JSON, table, column
+
+ # Python values become bound parameters
+ user_id = 42
+ stmt = tstring(t"SELECT * FROM users WHERE id = {user_id}")
+ # renders: SELECT * FROM users WHERE id = :param_1
+
+ # SQLAlchemy expressions are embedded
+ stmt = tstring(t"SELECT {column('q')} FROM {table('t')}")
+ # renders: SELECT q FROM t
+
+ # Apply explicit SQL types to bound values using literal()
+ some_json = {"foo": "bar"}
+ stmt = tstring(t"SELECT {literal(some_json, JSON)}")
+
+ **Column Specification**:
+
+ Like :func:`_expression.text`, the :func:`_expression.tstring` construct
+ supports the :meth:`_expression.TString.columns` method to specify
+ return columns and their types::
+
+ from sqlalchemy import tstring, column, Integer, String
+
+ stmt = tstring(t"SELECT id, name FROM users").columns(
+ column("id", Integer), column("name", String)
+ )
+
+ for id, name in connection.execute(stmt):
+ print(id, name)
+
+ :param template:
+ a Python 3.14+ template string (t-string) containing SQL fragments
+ and Python expressions to be interpolated.
+
+ .. seealso::
+
+ :ref:`tutorial_select_arbitrary_text` - in the :ref:`unified_tutorial`
+
+ :class:`_expression.TString`
+
+ :func:`_expression.text`
+
+ `PEP 750 <https://peps.python.org/pep-0750/>`_ - Template Strings
+
+ """
+ return TString(template)
+
+
def true() -> True_:
"""Return a constant :class:`.True_` construct.
)
+class TStringElementImpl(ExpressionElementImpl, RoleImpl):
+ __slots__ = ()
+
+
class BinaryElementImpl(ExpressionElementImpl, RoleImpl):
__slots__ = ()
within_columns_clause=False,
render_label_as_label=None,
result_map_targets=(),
+ within_tstring=False,
**kw,
):
+ if within_tstring:
+ raise exc.CompileError(
+ "Using label() directly inside tstring is not supported "
+ "as it is ambiguous how the label expression should be "
+ "rendered without knowledge of how it's being used in SQL"
+ )
# only render labels within the columns clause
# or ORDER BY clause of a select. dialect-specific compilers
# can modify this behavior.
),
)
+ def visit_tstring(self, tstring, add_to_result_map=None, **kw):
+ if self._collect_params:
+ self._add_to_params(tstring)
+
+ if not self.stack:
+ self.isplaintext = True
+
+ if add_to_result_map:
+ # tstring() object is present in the columns clause of a
+ # select(). Add a no-name entry to the result map so that
+ # row[tstring()] produces a result
+ add_to_result_map(None, None, (tstring,), sqltypes.NULLTYPE)
+
+ # Process each part and concatenate
+ kw["within_tstring"] = True
+ return "".join(self.process(part, **kw) for part in tstring.parts)
+
def visit_textual_select(
self, taf, compound_index=None, asfrom=False, **kw
):
lateral=False,
enclosing_alias=None,
from_linter=None,
+ within_tstring=False,
**kwargs,
):
if lateral:
else:
kwargs["enclosing_alias"] = alias
- if asfrom or ashint:
+ if asfrom or ashint or within_tstring:
if isinstance(alias.name, elements._truncated_label):
alias_name = self._truncated_identifier("alias", alias.name)
else:
if ashint:
return self.preparer.format_alias(alias, alias_name)
- elif asfrom:
+ elif asfrom or within_tstring:
if from_linter:
from_linter.froms[alias._de_clone()] = alias_name
within_columns_clause=within_columns_clause,
add_to_result_map=add_to_result_map,
include_table=include_table,
+ within_tstring=False,
)
return result_expr._compiler_dispatch(self, **column_clause_args)
from_linter=None,
ambiguous_table_name_map=None,
enclosing_alias=None,
+ within_tstring=False,
**kwargs,
):
if from_linter:
from_linter.froms[table] = table.fullname
- if asfrom or ashint:
+ if asfrom or ashint or within_tstring:
effective_schema = self.preparer.schema_for_object(table)
if use_schema and effective_schema:
from ..util import deprecated
from ..util import HasMemoized_ro_memoized_attribute
from ..util import TypingOnly
+from ..util.compat import Template
from ..util.typing import Self
from ..util.typing import TupleAny
from ..util.typing import Unpack
self.type = type_
-class TextClause(
+class AbstractTextClause(
roles.DDLConstraintColumnRole,
roles.DDLExpressionRole,
roles.StatementOptionRole,
ExecutableStatement,
DQLDMLClauseElement,
roles.BinaryElementRole[Any],
- inspection.Inspectable["TextClause"],
):
+ """Base class for textual SQL constructs like TextClause and TString."""
+
+ __visit_name__: str
+
+ _is_text_clause = True
+ _is_textual = True
+ _is_implicitly_boolean = False
+ _render_label_in_columns_clause = False
+ _omit_from_statements = False
+ _is_collection_aggregate = False
+
+ @property
+ def _hide_froms(self) -> Iterable[FromClause]:
+ return ()
+
+ def __and__(self, other):
+ # support use in select.where(), query.filter()
+ return and_(self, other)
+
+ @property
+ def _select_iterable(self) -> _SelectIterable:
+ return (self,)
+
+ # help in those cases where text/tstring() is
+ # interpreted in a column expression situation
+ key: Optional[str] = None
+ _label: Optional[str] = None
+
+ _allow_label_resolve = False
+
+ @property
+ def type(self) -> TypeEngine[Any]:
+ return type_api.NULLTYPE
+
+ @property
+ def comparator(self):
+ return self.type.comparator_factory(self) # type: ignore
+
+ def self_group(
+ self, against: Optional[OperatorType] = None
+ ) -> Union[Self, Grouping[Any]]:
+ if against is operators.in_op:
+ return Grouping(self)
+ else:
+ return self
+
+ def bindparams(
+ self,
+ *binds: BindParameter[Any],
+ **names_to_values: Any,
+ ) -> Self:
+ """Establish the values and/or types of bound parameters within
+ this :class:`_expression.AbstractTextClause` construct.
+
+ This is implemented only for :class:`.TextClause` will raise
+ ``NotImplementedError`` for :class:`.TString`.
+
+ """
+ raise NotImplementedError()
+
+ @util.preload_module("sqlalchemy.sql.selectable")
+ def columns(
+ self,
+ *cols: _ColumnExpressionArgument[Any],
+ **types: _TypeEngineArgument[Any],
+ ) -> TextualSelect:
+ r"""Turn this :class:`_expression.AbstractTextClause` object into a
+ :class:`_expression.TextualSelect`
+ object that serves the same role as a SELECT
+ statement.
+
+ The :class:`_expression.TextualSelect` is part of the
+ :class:`_expression.SelectBase`
+ hierarchy and can be embedded into another statement by using the
+ :meth:`_expression.TextualSelect.subquery` method to produce a
+ :class:`.Subquery`
+ object, which can then be SELECTed from.
+
+ This function essentially bridges the gap between an entirely
+ textual SELECT statement and the SQL expression language concept
+ of a "selectable"::
+
+ from sqlalchemy.sql import column, text
+
+ stmt = text("SELECT id, name FROM some_table")
+ stmt = stmt.columns(column("id"), column("name")).subquery("st")
+
+ stmt = (
+ select(mytable)
+ .select_from(mytable.join(stmt, mytable.c.name == stmt.c.name))
+ .where(stmt.c.id > 5)
+ )
+
+ Above, we pass a series of :func:`_expression.column` elements to the
+ :meth:`_expression.AbstractTextClause.columns` method positionally.
+ These :func:`_expression.column` elements now become first class
+ elements upon the :attr:`_expression.TextualSelect.selected_columns`
+ column collection, which then become part of the :attr:`.Subquery.c`
+ collection after :meth:`_expression.TextualSelect.subquery` is invoked.
+
+ The column expressions we pass to
+ :meth:`_expression.AbstractTextClause.columns` may also be typed; when
+ we do so, these :class:`.TypeEngine` objects become the effective
+ return type of the column, so that SQLAlchemy's result-set-processing
+ systems may be used on the return values. This is often needed for
+ types such as date or boolean types, as well as for unicode processing
+ on some dialect configurations::
+
+ stmt = text("SELECT id, name, timestamp FROM some_table")
+ stmt = stmt.columns(
+ column("id", Integer),
+ column("name", Unicode),
+ column("timestamp", DateTime),
+ )
+
+ for id, name, timestamp in connection.execute(stmt):
+ print(id, name, timestamp)
+
+ As a shortcut to the above syntax, keyword arguments referring to
+ types alone may be used, if only type conversion is needed::
+
+ stmt = text("SELECT id, name, timestamp FROM some_table")
+ stmt = stmt.columns(id=Integer, name=Unicode, timestamp=DateTime)
+
+ for id, name, timestamp in connection.execute(stmt):
+ print(id, name, timestamp)
+
+ The positional form of :meth:`_expression.AbstractTextClause.columns`
+ also provides the unique feature of **positional column targeting**,
+ which is particularly useful when using the ORM with complex textual
+ queries. If we specify the columns from our model to
+ :meth:`_expression.AbstractTextClause.columns`, the result set will
+ match to those columns positionally, meaning the name or origin of the
+ column in the textual SQL doesn't matter::
+
+ stmt = text(
+ "SELECT users.id, addresses.id, users.id, "
+ "users.name, addresses.email_address AS email "
+ "FROM users JOIN addresses ON users.id=addresses.user_id "
+ "WHERE users.id = 1"
+ ).columns(
+ User.id,
+ Address.id,
+ Address.user_id,
+ User.name,
+ Address.email_address,
+ )
+
+ query = (
+ session.query(User)
+ .from_statement(stmt)
+ .options(contains_eager(User.addresses))
+ )
+
+ The :meth:`_expression.AbstractTextClause.columns` method provides a
+ direct route to calling :meth:`_expression.FromClause.subquery` as well
+ as :meth:`_expression.SelectBase.cte` against a textual SELECT
+ statement::
+
+ stmt = stmt.columns(id=Integer, name=String).cte("st")
+
+ stmt = select(sometable).where(sometable.c.id == stmt.c.id)
+
+ :param \*cols: A series of :class:`_expression.ColumnElement` objects,
+ typically
+ :class:`_schema.Column` objects from a :class:`_schema.Table`
+ or ORM level
+ column-mapped attributes, representing a set of columns that this
+ textual string will SELECT from.
+
+ :param \**types: A mapping of string names to :class:`.TypeEngine`
+ type objects indicating the datatypes to use for names that are
+ SELECTed from the textual string. Prefer to use the ``*cols``
+ argument as it also indicates positional ordering.
+
+ """
+ selectable = util.preloaded.sql_selectable
+
+ input_cols: List[NamedColumn[Any]] = [
+ coercions.expect(roles.LabeledColumnExprRole, col) for col in cols
+ ]
+
+ positional_input_cols = [
+ (
+ ColumnClause(col.key, types.pop(col.key))
+ if col.key in types
+ else col
+ )
+ for col in input_cols
+ ]
+ keyed_input_cols: List[NamedColumn[Any]] = [
+ ColumnClause(key, type_) for key, type_ in types.items()
+ ]
+
+ elem = selectable.TextualSelect.__new__(selectable.TextualSelect)
+ elem._init(
+ self,
+ positional_input_cols + keyed_input_cols,
+ positional=bool(positional_input_cols) and not keyed_input_cols,
+ )
+ return elem
+
+
+class TextClause(AbstractTextClause, inspection.Inspectable["TextClause"]):
"""Represent a literal SQL text fragment.
E.g.::
("text", InternalTraversal.dp_string),
] + ExecutableStatement._executable_traverse_internals
- _is_text_clause = True
-
- _is_textual = True
-
_bind_params_regex = re.compile(r"(?<![:\w\x5c]):(\w+)(?!:)", re.UNICODE)
- _is_implicitly_boolean = False
-
- _render_label_in_columns_clause = False
-
- _omit_from_statements = False
-
- _is_collection_aggregate = False
-
- @property
- def _hide_froms(self) -> Iterable[FromClause]:
- return ()
-
- def __and__(self, other):
- # support use in select.where(), query.filter()
- return and_(self, other)
-
- @property
- def _select_iterable(self) -> _SelectIterable:
- return (self,)
-
- # help in those cases where text() is
- # interpreted in a column expression situation
- key: Optional[str] = None
- _label: Optional[str] = None
-
- _allow_label_resolve = False
@property
- def _is_star(self): # type: ignore[override]
+ def _is_star(self) -> bool: # type: ignore[override]
return self.text == "*"
def __init__(self, text: str):
new_params[key] = existing._with_value(value, required=False)
return self
- @util.preload_module("sqlalchemy.sql.selectable")
- def columns(
- self,
- *cols: _ColumnExpressionArgument[Any],
- **types: _TypeEngineArgument[Any],
- ) -> TextualSelect:
- r"""Turn this :class:`_expression.TextClause` object into a
- :class:`_expression.TextualSelect`
- object that serves the same role as a SELECT
- statement.
-
- The :class:`_expression.TextualSelect` is part of the
- :class:`_expression.SelectBase`
- hierarchy and can be embedded into another statement by using the
- :meth:`_expression.TextualSelect.subquery` method to produce a
- :class:`.Subquery`
- object, which can then be SELECTed from.
-
- This function essentially bridges the gap between an entirely
- textual SELECT statement and the SQL expression language concept
- of a "selectable"::
-
- from sqlalchemy.sql import column, text
+ @property
+ def type(self) -> TypeEngine[Any]:
+ return type_api.NULLTYPE
- stmt = text("SELECT id, name FROM some_table")
- stmt = stmt.columns(column("id"), column("name")).subquery("st")
+ @property
+ def comparator(self):
+ # TODO: this seems wrong, it seems like we might not
+ # be using this method.
+ return self.type.comparator_factory(self) # type: ignore
- stmt = (
- select(mytable)
- .select_from(mytable.join(stmt, mytable.c.name == stmt.c.name))
- .where(stmt.c.id > 5)
- )
+ def self_group(
+ self, against: Optional[OperatorType] = None
+ ) -> Union[Self, Grouping[Any]]:
+ if against is operators.in_op:
+ return Grouping(self)
+ else:
+ return self
- Above, we pass a series of :func:`_expression.column` elements to the
- :meth:`_expression.TextClause.columns` method positionally. These
- :func:`_expression.column`
- elements now become first class elements upon the
- :attr:`_expression.TextualSelect.selected_columns` column collection,
- which then
- become part of the :attr:`.Subquery.c` collection after
- :meth:`_expression.TextualSelect.subquery` is invoked.
- The column expressions we pass to
- :meth:`_expression.TextClause.columns` may
- also be typed; when we do so, these :class:`.TypeEngine` objects become
- the effective return type of the column, so that SQLAlchemy's
- result-set-processing systems may be used on the return values.
- This is often needed for types such as date or boolean types, as well
- as for unicode processing on some dialect configurations::
+class TString(AbstractTextClause, inspection.Inspectable["TString"]):
+ """Represent a SQL template string using Python 3.14+ t-strings.
- stmt = text("SELECT id, name, timestamp FROM some_table")
- stmt = stmt.columns(
- column("id", Integer),
- column("name", Unicode),
- column("timestamp", DateTime),
- )
+ E.g.::
- for id, name, timestamp in connection.execute(stmt):
- print(id, name, timestamp)
+ from sqlalchemy import tstring, column
- As a shortcut to the above syntax, keyword arguments referring to
- types alone may be used, if only type conversion is needed::
+ a = 5
+ b = 10
+ stmt = tstring(t"select {a}, {b}")
+ result = connection.execute(stmt)
- stmt = text("SELECT id, name, timestamp FROM some_table")
- stmt = stmt.columns(id=Integer, name=Unicode, timestamp=DateTime)
+ The :class:`_expression.TString` construct is produced using the
+ :func:`_expression.tstring` function; see that function for full
+ documentation.
- for id, name, timestamp in connection.execute(stmt):
- print(id, name, timestamp)
+ .. versionadded:: 2.1
- The positional form of :meth:`_expression.TextClause.columns`
- also provides the
- unique feature of **positional column targeting**, which is
- particularly useful when using the ORM with complex textual queries. If
- we specify the columns from our model to
- :meth:`_expression.TextClause.columns`,
- the result set will match to those columns positionally, meaning the
- name or origin of the column in the textual SQL doesn't matter::
+ .. seealso::
- stmt = text(
- "SELECT users.id, addresses.id, users.id, "
- "users.name, addresses.email_address AS email "
- "FROM users JOIN addresses ON users.id=addresses.user_id "
- "WHERE users.id = 1"
- ).columns(
- User.id,
- Address.id,
- Address.user_id,
- User.name,
- Address.email_address,
- )
+ :func:`_expression.tstring`
- query = (
- session.query(User)
- .from_statement(stmt)
- .options(contains_eager(User.addresses))
- )
+ """
- The :meth:`_expression.TextClause.columns` method provides a direct
- route to calling :meth:`_expression.FromClause.subquery` as well as
- :meth:`_expression.SelectBase.cte`
- against a textual SELECT statement::
+ __visit_name__ = "tstring"
- stmt = stmt.columns(id=Integer, name=String).cte("st")
+ _traverse_internals: _TraverseInternalsType = [
+ ("parts", InternalTraversal.dp_clauseelement_list)
+ ] + ExecutableStatement._executable_traverse_internals
- stmt = select(sometable).where(sometable.c.id == stmt.c.id)
+ @property
+ def _is_star(self) -> bool: # type: ignore[override]
+ return (
+ len(self.parts) == 1
+ and isinstance(self.parts[0], TextClause)
+ and self.parts[0]._is_star
+ )
- :param \*cols: A series of :class:`_expression.ColumnElement` objects,
- typically
- :class:`_schema.Column` objects from a :class:`_schema.Table`
- or ORM level
- column-mapped attributes, representing a set of columns that this
- textual string will SELECT from.
+ def __init__(self, template: Template):
+ """Construct a :class:`_expression.TString` from a Python 3.14+
+ template string.
- :param \**types: A mapping of string names to :class:`.TypeEngine`
- type objects indicating the datatypes to use for names that are
- SELECTed from the textual string. Prefer to use the ``*cols``
- argument as it also indicates positional ordering.
+ :param template: a Python 3.14+ template string (t-string) that
+ contains SQL fragments and Python expressions to be interpolated.
"""
- selectable = util.preloaded.sql_selectable
+ self.parts: List[ClauseElement] = []
- input_cols: List[NamedColumn[Any]] = [
- coercions.expect(roles.LabeledColumnExprRole, col) for col in cols
- ]
+ if not isinstance(template, Template):
+ raise exc.ArgumentError("pep-750 Tstring (e.g. t'...') expected")
- positional_input_cols = [
- (
- ColumnClause(col.key, types.pop(col.key))
- if col.key in types
- else col
- )
- for col in input_cols
- ]
- keyed_input_cols: List[NamedColumn[Any]] = [
- ColumnClause(key, type_) for key, type_ in types.items()
- ]
-
- elem = selectable.TextualSelect.__new__(selectable.TextualSelect)
- elem._init(
- self,
- positional_input_cols + keyed_input_cols,
- positional=bool(positional_input_cols) and not keyed_input_cols,
- )
- return elem
+ for part in template:
+ if isinstance(part, str):
+ self.parts.append(TextClause(part))
+ else:
+ assert hasattr(part, "value")
+ self.parts.append(
+ coercions.expect(roles.TStringElementRole, part.value)
+ )
- @property
- def type(self) -> TypeEngine[Any]:
- return type_api.NULLTYPE
+ def bindparams(
+ self,
+ *binds: BindParameter[Any],
+ **names_to_values: Any,
+ ) -> Self:
+ """Not supported for TString constructs.
- @property
- def comparator(self):
- # TODO: this seems wrong, it seems like we might not
- # be using this method.
- return self.type.comparator_factory(self) # type: ignore
+ TString constructs do not support .bindparams(). Bind parameters
+ are automatically created from interpolated values.
- def self_group(
- self, against: Optional[OperatorType] = None
- ) -> Union[Self, Grouping[Any]]:
- if against is operators.in_op:
- return Grouping(self)
- else:
- return self
+ """
+ raise NotImplementedError(
+ "TString constructs do not support .bindparams(). "
+ "Bind parameters are automatically created "
+ "from interpolated values."
+ )
class Null(SingletonConstant, roles.ConstExprRole[None], ColumnElement[None]):
]
element: Union[
- TextClause, ClauseList, ColumnElement[_T], CompilerColumnElement
+ AbstractTextClause,
+ ClauseList,
+ ColumnElement[_T],
+ CompilerColumnElement,
]
def __init__(
self,
element: Union[
- TextClause, ClauseList, ColumnElement[_T], CompilerColumnElement
+ AbstractTextClause,
+ ClauseList,
+ ColumnElement[_T],
+ CompilerColumnElement,
],
):
self.element = element
from ._elements_constructors import text as text
from ._elements_constructors import true as true
from ._elements_constructors import try_cast as try_cast
+from ._elements_constructors import tstring as tstring
from ._elements_constructors import tuple_ as tuple_
from ._elements_constructors import type_coerce as type_coerce
from ._elements_constructors import within_group as within_group
from .elements import TextClause as TextClause
from .elements import True_ as True_
from .elements import TryCast as TryCast
+from .elements import TString as TString
from .elements import Tuple as Tuple
from .elements import TypeClause as TypeClause
from .elements import TypeCoerce as TypeCoerce
_role_name = "String SQL identifier"
-class ColumnsClauseRole(AllowsLambdaRole, UsesInspection, ColumnListRole):
+class TStringElementRole(UsesInspection, SQLRole):
+ """Role for elements that can be interpolated into a TString."""
+
+ __slots__ = ()
+ _role_name = "TString interpolatable element"
+
+
+class ColumnsClauseRole(
+ TStringElementRole, AllowsLambdaRole, UsesInspection, ColumnListRole
+):
__slots__ = ()
_role_name = (
"Column expression, FROM clause, or other columns clause element"
from .elements import GroupedElement
from .elements import literal_column
from .elements import TableValuedColumn
+from .elements import TextClause
from .elements import UnaryExpression
from .operators import OperatorType
from .sqltypes import NULLTYPE
from .ddl import CreateTableAs
from .dml import Delete
from .dml import Update
+ from .elements import AbstractTextClause
from .elements import BinaryExpression
from .elements import KeyedColumnElement
from .elements import Label
from .elements import NamedColumn
- from .elements import TextClause
from .functions import Function
from .schema import ForeignKey
from .schema import ForeignKeyConstraint
_ColumnsClauseElement = Union["FromClause", ColumnElement[Any], "TextClause"]
_LabelConventionCallable = Callable[
- [Union["ColumnElement[Any]", "TextClause"]], Optional[str]
+ [Union["ColumnElement[Any]", "AbstractTextClause"]], Optional[str]
]
]
-_SelectIterable = Iterable[Union["ColumnElement[Any]", "TextClause"]]
+_SelectIterable = Iterable[Union["ColumnElement[Any]", "AbstractTextClause"]]
class _OffsetLimitParam(BindParameter[int]):
required_label_name was not given
"""
- column: Union[ColumnElement[Any], TextClause]
+ column: Union[ColumnElement[Any], AbstractTextClause]
"""
the ColumnElement itself
"""
names = set()
def go(
- c: Union[ColumnElement[Any], TextClause],
+ c: Union[ColumnElement[Any], AbstractTextClause],
col_name: Optional[str] = None,
) -> Optional[str]:
if is_text_clause(c):
def _init(
self,
- text: TextClause,
+ text: AbstractTextClause,
columns: List[NamedColumn[Any]],
positional: bool = False,
) -> None:
from ._typing import _EquivalentColumnMap
from ._typing import _LimitOffsetType
from ._typing import _TypeEngineArgument
+ from .elements import AbstractTextClause
from .elements import BinaryExpression
- from .elements import TextClause
from .selectable import _JoinTargetElement
from .selectable import _SelectIterable
from .selectable import Selectable
columns: _SelectIterable,
*clauses: Optional[ClauseElement],
**kw: bool,
-) -> Sequence[Union[ColumnElement[Any], TextClause]]: ...
+) -> Sequence[Union[ColumnElement[Any], AbstractTextClause]]: ...
def reduce_columns(
columns: _SelectIterable,
*clauses: Optional[ClauseElement],
**kw: bool,
-) -> Collection[Union[ColumnElement[Any], TextClause]]:
+) -> Collection[Union[ColumnElement[Any], AbstractTextClause]]:
r"""given a list of columns, return a 'reduced' set based on natural
equivalents.
from typing import Set
from typing import Tuple
from typing import Type
+from typing import TYPE_CHECKING
py314b1 = sys.version_info >= (3, 14, 0, "beta", 1)
py314 = sys.version_info >= (3, 14)
dottedgetter = operator.attrgetter
+if py314 or TYPE_CHECKING:
+ from string.templatelib import Template as Template
+else:
+
+ class Template: # type: ignore[no-redef]
+ """Minimal Template for Python < 3.14 (test usage only)."""
+
+ def __init__(self, *parts: Any):
+ self._parts = parts
+
+ @property
+ def strings(self) -> Tuple[str, ...]:
+ return tuple(p for p in self._parts if isinstance(p, str))
+
+ @property
+ def interpolations(self) -> Tuple[Any, ...]:
+ return tuple(p for p in self._parts if not isinstance(p, str))
+
+ def __iter__(self) -> Any:
+ return iter(self._parts)
+
class FullArgSpec(typing.NamedTuple):
args: List[str]
)
-@nox.session(name="mypy")
+@nox.session(name="mypy", python="3.14")
def test_mypy(session: nox.Session) -> None:
"""run the typing integration test suite"""
session.run(*cmd, *posargs)
-@nox.session(name="pep8")
+@nox.session(name="pep8", python="3.14")
def test_pep8(session: nox.Session) -> None:
"""Run linting and formatting checks."""
"A003","A005",
"D",
"E203","E305","E701","E704","E711","E712","E721","E722","E741",
+ # F542: t-string without any placeholders (used in documentation examples)
+ "F542",
"I300",
"N801","N802","N806",
"RST304","RST303","RST299","RST399",
collect_ignore_glob = []
+# omit py314.py test files on earlier versions of python
+if sys.version_info < (3, 14):
+ collect_ignore_glob.append("*_py314.py")
+
# this requires that sqlalchemy.testing was not already
# imported in order to work
pytest.register_assert_rewrite("sqlalchemy.testing.assertions")
from sqlalchemy import or_
from sqlalchemy import select
from sqlalchemy import String
-from sqlalchemy import table
from sqlalchemy import testing
from sqlalchemy import text
from sqlalchemy import true
from sqlalchemy.orm import lazyload
from sqlalchemy.orm import Query
from sqlalchemy.orm import relationship
-from sqlalchemy.orm import selectinload
from sqlalchemy.orm import Session
from sqlalchemy.orm import subqueryload
from sqlalchemy.orm import synonym
)
-class TextTest(QueryTest, AssertsCompiledSQL):
- __dialect__ = "default"
-
- def test_needs_text(self):
- User = self.classes.User
-
- assert_raises_message(
- sa_exc.ArgumentError,
- "Textual SQL expression",
- fixture_session().query(User).from_statement,
- "select * from users order by id",
- )
-
- def test_select_star(self):
- User = self.classes.User
-
- eq_(
- fixture_session()
- .query(User)
- .from_statement(text("select * from users order by id"))
- .first(),
- User(id=7),
- )
- eq_(
- fixture_session()
- .query(User)
- .from_statement(
- text("select * from users where name='nonexistent'")
- )
- .first(),
- None,
- )
-
- def test_select_star_future(self):
- User = self.classes.User
-
- sess = fixture_session()
- eq_(
- sess.execute(
- select(User).from_statement(
- text("select * from users order by id")
- )
- )
- .scalars()
- .first(),
- User(id=7),
- )
- eq_(
- sess.execute(
- select(User).from_statement(
- text("select * from users where name='nonexistent'")
- )
- )
- .scalars()
- .first(),
- None,
- )
-
- def test_columns_mismatched(self):
- # test that columns using column._label match, as well as that
- # ordering doesn't matter
- User = self.classes.User
-
- s = fixture_session()
- q = s.query(User).from_statement(
- text(
- "select name, 27 as foo, id as users_id from users order by id"
- )
- )
- eq_(
- q.all(),
- [
- User(id=7, name="jack"),
- User(id=8, name="ed"),
- User(id=9, name="fred"),
- User(id=10, name="chuck"),
- ],
- )
-
- def test_columns_mismatched_future(self):
- # test that columns using column._label match, as well as that
- # ordering doesn't matter
- User = self.classes.User
-
- s = fixture_session()
- q = select(User).from_statement(
- text(
- "select name, 27 as foo, id as users_id from users order by id"
- )
- )
- eq_(
- s.execute(q).scalars().all(),
- [
- User(id=7, name="jack"),
- User(id=8, name="ed"),
- User(id=9, name="fred"),
- User(id=10, name="chuck"),
- ],
- )
-
- def test_columns_multi_table_uselabels(self):
- # test that columns using column._label match, as well as that
- # ordering doesn't matter.
- User = self.classes.User
- Address = self.classes.Address
-
- s = fixture_session()
- q = s.query(User, Address).from_statement(
- text(
- "select users.name AS users_name, users.id AS users_id, "
- "addresses.id AS addresses_id FROM users JOIN addresses "
- "ON users.id = addresses.user_id WHERE users.id=8 "
- "ORDER BY addresses.id"
- )
- )
-
- eq_(
- q.all(),
- [
- (User(id=8), Address(id=2)),
- (User(id=8), Address(id=3)),
- (User(id=8), Address(id=4)),
- ],
- )
-
- def test_columns_multi_table_uselabels_future(self):
- # test that columns using column._label match, as well as that
- # ordering doesn't matter.
- User = self.classes.User
- Address = self.classes.Address
-
- s = fixture_session()
- q = select(User, Address).from_statement(
- text(
- "select users.name AS users_name, users.id AS users_id, "
- "addresses.id AS addresses_id FROM users JOIN addresses "
- "ON users.id = addresses.user_id WHERE users.id=8 "
- "ORDER BY addresses.id"
- )
- )
-
- eq_(
- s.execute(q).all(),
- [
- (User(id=8), Address(id=2)),
- (User(id=8), Address(id=3)),
- (User(id=8), Address(id=4)),
- ],
- )
-
- def test_columns_multi_table_uselabels_contains_eager(self):
- # test that columns using column._label match, as well as that
- # ordering doesn't matter.
- User = self.classes.User
- Address = self.classes.Address
-
- s = fixture_session()
- q = (
- s.query(User)
- .from_statement(
- text(
- "select users.name AS users_name, users.id AS users_id, "
- "addresses.id AS addresses_id FROM users JOIN addresses "
- "ON users.id = addresses.user_id WHERE users.id=8 "
- "ORDER BY addresses.id"
- )
- )
- .options(contains_eager(User.addresses))
- )
-
- def go():
- r = q.all()
- eq_(r[0].addresses, [Address(id=2), Address(id=3), Address(id=4)])
-
- self.assert_sql_count(testing.db, go, 1)
-
- def test_columns_multi_table_uselabels_contains_eager_future(self):
- # test that columns using column._label match, as well as that
- # ordering doesn't matter.
- User = self.classes.User
- Address = self.classes.Address
-
- s = fixture_session()
- q = (
- select(User)
- .from_statement(
- text(
- "select users.name AS users_name, users.id AS users_id, "
- "addresses.id AS addresses_id FROM users JOIN addresses "
- "ON users.id = addresses.user_id WHERE users.id=8 "
- "ORDER BY addresses.id"
- )
- )
- .options(contains_eager(User.addresses))
- )
-
- def go():
- r = s.execute(q).unique().scalars().all()
- eq_(r[0].addresses, [Address(id=2), Address(id=3), Address(id=4)])
-
- self.assert_sql_count(testing.db, go, 1)
-
- def test_columns_multi_table_uselabels_cols_contains_eager(self):
- # test that columns using column._label match, as well as that
- # ordering doesn't matter.
- User = self.classes.User
- Address = self.classes.Address
-
- s = fixture_session()
- q = (
- s.query(User)
- .from_statement(
- text(
- "select users.name AS users_name, users.id AS users_id, "
- "addresses.id AS addresses_id FROM users JOIN addresses "
- "ON users.id = addresses.user_id WHERE users.id=8 "
- "ORDER BY addresses.id"
- ).columns(User.name, User.id, Address.id)
- )
- .options(contains_eager(User.addresses))
- )
-
- def go():
- r = q.all()
- eq_(r[0].addresses, [Address(id=2), Address(id=3), Address(id=4)])
-
- self.assert_sql_count(testing.db, go, 1)
-
- def test_columns_multi_table_uselabels_cols_contains_eager_future(self):
- # test that columns using column._label match, as well as that
- # ordering doesn't matter.
- User = self.classes.User
- Address = self.classes.Address
-
- s = fixture_session()
- q = (
- select(User)
- .from_statement(
- text(
- "select users.name AS users_name, users.id AS users_id, "
- "addresses.id AS addresses_id FROM users JOIN addresses "
- "ON users.id = addresses.user_id WHERE users.id=8 "
- "ORDER BY addresses.id"
- ).columns(User.name, User.id, Address.id)
- )
- .options(contains_eager(User.addresses))
- )
-
- def go():
- r = s.execute(q).unique().scalars().all()
- eq_(r[0].addresses, [Address(id=2), Address(id=3), Address(id=4)])
-
- self.assert_sql_count(testing.db, go, 1)
-
- def test_textual_select_orm_columns(self):
- # test that columns using column._label match, as well as that
- # ordering doesn't matter.
- User = self.classes.User
- Address = self.classes.Address
- users = self.tables.users
- addresses = self.tables.addresses
-
- s = fixture_session()
- q = s.query(User.name, User.id, Address.id).from_statement(
- text(
- "select users.name AS users_name, users.id AS users_id, "
- "addresses.id AS addresses_id FROM users JOIN addresses "
- "ON users.id = addresses.user_id WHERE users.id=8 "
- "ORDER BY addresses.id"
- ).columns(users.c.name, users.c.id, addresses.c.id)
- )
-
- eq_(q.all(), [("ed", 8, 2), ("ed", 8, 3), ("ed", 8, 4)])
-
- @testing.combinations(
- (
- False,
- subqueryload,
- ),
- (
- True,
- subqueryload,
- ),
- (False, selectinload),
- (True, selectinload),
- )
- def test_related_eagerload_against_text(self, add_columns, loader_option):
- # new in 1.4. textual selects have columns so subqueryloaders
- # and selectinloaders can join onto them. we add columns
- # automatiacally to TextClause as well, however subqueryloader
- # is not working at the moment due to execution model refactor,
- # it creates a subquery w/ adapter before those columns are
- # available. this is a super edge case and as we want to rewrite
- # the loaders to use select(), maybe we can get it then.
- User = self.classes.User
-
- text_clause = text("select * from users")
- if add_columns:
- text_clause = text_clause.columns(User.id, User.name)
-
- s = fixture_session()
- q = (
- s.query(User)
- .from_statement(text_clause)
- .options(loader_option(User.addresses))
- )
-
- def go():
- eq_(set(q.all()), set(self.static.user_address_result))
-
- if loader_option is subqueryload:
- # subqueryload necessarily degrades to lazy loads for a text
- # statement.
- self.assert_sql_count(testing.db, go, 5)
- else:
- self.assert_sql_count(testing.db, go, 2)
-
- def test_whereclause(self):
- User = self.classes.User
-
- eq_(
- fixture_session().query(User).filter(text("id in (8, 9)")).all(),
- [User(id=8), User(id=9)],
- )
-
- eq_(
- fixture_session()
- .query(User)
- .filter(text("name='fred'"))
- .filter(text("id=9"))
- .all(),
- [User(id=9)],
- )
- eq_(
- fixture_session()
- .query(User)
- .filter(text("name='fred'"))
- .filter(User.id == 9)
- .all(),
- [User(id=9)],
- )
-
- def test_whereclause_future(self):
- User = self.classes.User
-
- s = fixture_session()
- eq_(
- s.execute(select(User).filter(text("id in (8, 9)")))
- .scalars()
- .all(),
- [User(id=8), User(id=9)],
- )
-
- eq_(
- s.execute(
- select(User).filter(text("name='fred'")).filter(text("id=9"))
- )
- .scalars()
- .all(),
- [User(id=9)],
- )
- eq_(
- s.execute(
- select(User).filter(text("name='fred'")).filter(User.id == 9)
- )
- .scalars()
- .all(),
- [User(id=9)],
- )
-
- def test_binds_coerce(self):
- User = self.classes.User
-
- assert_raises_message(
- sa_exc.ArgumentError,
- r"Textual SQL expression 'id in \(:id1, :id2\)' "
- "should be explicitly declared",
- fixture_session().query(User).filter,
- "id in (:id1, :id2)",
- )
-
- def test_plain_textual_column(self):
- User = self.classes.User
-
- s = fixture_session()
-
- self.assert_compile(
- s.query(User.id, text("users.name")),
- "SELECT users.id AS users_id, users.name FROM users",
- )
-
- eq_(
- s.query(User.id, text("users.name")).all(),
- [(7, "jack"), (8, "ed"), (9, "fred"), (10, "chuck")],
- )
-
- eq_(
- s.query(User.id, literal_column("name")).order_by(User.id).all(),
- [(7, "jack"), (8, "ed"), (9, "fred"), (10, "chuck")],
- )
-
- def test_via_select(self):
- User = self.classes.User
- s = fixture_session()
- eq_(
- s.query(User)
- .from_statement(
- select(column("id"), column("name"))
- .select_from(table("users"))
- .order_by("id")
- )
- .all(),
- [User(id=7), User(id=8), User(id=9), User(id=10)],
- )
-
- def test_via_textasfrom_from_statement(self):
- User = self.classes.User
- s = fixture_session()
-
- eq_(
- s.query(User)
- .from_statement(
- text("select * from users order by id").columns(
- id=Integer, name=String
- )
- )
- .all(),
- [User(id=7), User(id=8), User(id=9), User(id=10)],
- )
-
- def test_columns_via_textasfrom_from_statement(self):
- User = self.classes.User
- s = fixture_session()
-
- eq_(
- s.query(User.id, User.name)
- .from_statement(
- text("select * from users order by id").columns(
- id=Integer, name=String
- )
- )
- .all(),
- [(7, "jack"), (8, "ed"), (9, "fred"), (10, "chuck")],
- )
-
- def test_via_textasfrom_use_mapped_columns(self):
- User = self.classes.User
- s = fixture_session()
-
- eq_(
- s.query(User)
- .from_statement(
- text("select * from users order by id").columns(
- User.id, User.name
- )
- )
- .all(),
- [User(id=7), User(id=8), User(id=9), User(id=10)],
- )
-
- def test_via_textasfrom_aliased(self):
- User = self.classes.User
- s = fixture_session()
-
- ua = aliased(
- User,
- text("select * from users").columns(User.id, User.name).subquery(),
- )
-
- eq_(
- s.query(ua).order_by(ua.id).all(),
- [User(id=7), User(id=8), User(id=9), User(id=10)],
- )
-
- def test_group_by_accepts_text(self):
- User = self.classes.User
- s = fixture_session()
-
- q = s.query(User).group_by(text("name"))
- self.assert_compile(
- q,
- "SELECT users.id AS users_id, users.name AS users_name "
- "FROM users GROUP BY name",
- )
-
- def test_order_by_w_eager_one(self):
- User = self.classes.User
- s = fixture_session()
-
- # from 1.0.0 thru 1.0.2, the "name" symbol here was considered
- # to be part of the things we need to ORDER BY and it was being
- # placed into the inner query's columns clause, as part of
- # query._compound_eager_statement where we add unwrap_order_by()
- # to the columns clause. However, as #3392 illustrates, unlocatable
- # string expressions like "name desc" will only fail in this scenario,
- # so in general the changing of the query structure with string labels
- # is dangerous.
- #
- # the queries here are again "invalid" from a SQL perspective, as the
- # "name" field isn't matched up to anything.
- #
-
- q = (
- s.query(User)
- .options(joinedload(User.addresses))
- .order_by(desc("name"))
- .limit(1)
- )
- assert_raises_message(
- sa_exc.CompileError,
- "Can't resolve label reference for ORDER BY / GROUP BY.",
- q.set_label_style(
- LABEL_STYLE_TABLENAME_PLUS_COL
- ).statement.compile,
- )
-
- def test_order_by_w_eager_two(self):
- User = self.classes.User
- s = fixture_session()
-
- q = (
- s.query(User)
- .options(joinedload(User.addresses))
- .order_by("name")
- .limit(1)
- )
- assert_raises_message(
- sa_exc.CompileError,
- "Can't resolve label reference for ORDER BY / GROUP BY.",
- q.set_label_style(
- LABEL_STYLE_TABLENAME_PLUS_COL
- ).statement.compile,
- )
-
- def test_order_by_w_eager_three(self):
- User = self.classes.User
- s = fixture_session()
-
- self.assert_compile(
- s.query(User)
- .options(joinedload(User.addresses))
- .order_by("users_name")
- .limit(1),
- "SELECT anon_1.users_id AS anon_1_users_id, "
- "anon_1.users_name AS anon_1_users_name, "
- "addresses_1.id AS addresses_1_id, "
- "addresses_1.user_id AS addresses_1_user_id, "
- "addresses_1.email_address AS addresses_1_email_address "
- "FROM (SELECT users.id AS users_id, users.name AS users_name "
- "FROM users ORDER BY users.name "
- "LIMIT :param_1) AS anon_1 "
- "LEFT OUTER JOIN addresses AS addresses_1 "
- "ON anon_1.users_id = addresses_1.user_id "
- "ORDER BY anon_1.users_name, addresses_1.id",
- )
-
- # however! this works (again?)
- eq_(
- s.query(User)
- .options(joinedload(User.addresses))
- .order_by("users_name")
- .first(),
- User(name="chuck", addresses=[]),
- )
-
- def test_order_by_w_eager_four(self):
- User = self.classes.User
- Address = self.classes.Address
- s = fixture_session()
-
- self.assert_compile(
- s.query(User)
- .options(joinedload(User.addresses))
- .order_by(desc("users_name"))
- .limit(1),
- "SELECT anon_1.users_id AS anon_1_users_id, "
- "anon_1.users_name AS anon_1_users_name, "
- "addresses_1.id AS addresses_1_id, "
- "addresses_1.user_id AS addresses_1_user_id, "
- "addresses_1.email_address AS addresses_1_email_address "
- "FROM (SELECT users.id AS users_id, users.name AS users_name "
- "FROM users ORDER BY users.name DESC "
- "LIMIT :param_1) AS anon_1 "
- "LEFT OUTER JOIN addresses AS addresses_1 "
- "ON anon_1.users_id = addresses_1.user_id "
- "ORDER BY anon_1.users_name DESC, addresses_1.id",
- )
-
- # however! this works (again?)
- eq_(
- s.query(User)
- .options(joinedload(User.addresses))
- .order_by(desc("users_name"))
- .first(),
- User(name="jack", addresses=[Address()]),
- )
-
- def test_order_by_w_eager_five(self):
- """essentially the same as test_eager_relations -> test_limit_3,
- but test for textual label elements that are freeform.
- this is again #3392."""
-
- User = self.classes.User
- Address = self.classes.Address
-
- sess = fixture_session()
-
- q = sess.query(User, Address.email_address.label("email_address"))
-
- result = (
- q.join(User.addresses)
- .options(joinedload(User.orders))
- .order_by("email_address desc")
- .limit(1)
- .offset(0)
- )
-
- assert_raises_message(
- sa_exc.CompileError,
- "Can't resolve label reference for ORDER BY / GROUP BY",
- result.all,
- )
-
-
-class TextErrorTest(QueryTest, AssertsCompiledSQL):
- def _test(self, fn, arg, offending_clause):
- assert_raises_message(
- sa.exc.ArgumentError,
- r"Textual (?:SQL|column|SQL FROM) expression %(stmt)r should be "
- r"explicitly declared (?:with|as) text\(%(stmt)r\)"
- % {"stmt": util.ellipses_string(offending_clause)},
- fn,
- arg,
- )
-
- def test_filter(self):
- User = self.classes.User
- self._test(
- fixture_session().query(User.id).filter, "myid == 5", "myid == 5"
- )
-
- def test_having(self):
- User = self.classes.User
- self._test(
- fixture_session().query(User.id).having, "myid == 5", "myid == 5"
- )
-
- def test_from_statement(self):
- User = self.classes.User
- self._test(
- fixture_session().query(User.id).from_statement,
- "select id from user",
- "select id from user",
- )
-
-
class ParentTest(QueryTest, AssertsCompiledSQL):
__dialect__ = "default"
--- /dev/null
+import sqlalchemy as sa
+from sqlalchemy import column
+from sqlalchemy import desc
+from sqlalchemy import exc as sa_exc
+from sqlalchemy import Integer
+from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL
+from sqlalchemy import literal_column
+from sqlalchemy import select
+from sqlalchemy import String
+from sqlalchemy import table
+from sqlalchemy import testing
+from sqlalchemy import text
+from sqlalchemy import util
+from sqlalchemy.orm import aliased
+from sqlalchemy.orm import contains_eager
+from sqlalchemy.orm import joinedload
+from sqlalchemy.orm import selectinload
+from sqlalchemy.orm import subqueryload
+from sqlalchemy.testing import AssertsCompiledSQL
+from sqlalchemy.testing.assertions import assert_raises_message
+from sqlalchemy.testing.assertions import eq_
+from sqlalchemy.testing.fixtures import fixture_session
+from test.orm import _fixtures
+
+
+class QueryTest(_fixtures.FixtureTest):
+ run_setup_mappers = "once"
+ run_inserts = "once"
+ run_deletes = None
+
+ @classmethod
+ def setup_mappers(cls):
+ cls._setup_stock_mapping()
+
+
+class TextTest(QueryTest, AssertsCompiledSQL):
+ __dialect__ = "default"
+
+ def test_needs_text(self):
+ User = self.classes.User
+
+ assert_raises_message(
+ sa_exc.ArgumentError,
+ "Textual SQL expression",
+ fixture_session().query(User).from_statement,
+ "select * from users order by id",
+ )
+
+ def test_select_star(self):
+ User = self.classes.User
+
+ eq_(
+ fixture_session()
+ .query(User)
+ .from_statement(text("select * from users order by id"))
+ .first(),
+ User(id=7),
+ )
+ eq_(
+ fixture_session()
+ .query(User)
+ .from_statement(
+ text("select * from users where name='nonexistent'")
+ )
+ .first(),
+ None,
+ )
+
+ def test_select_star_future(self):
+ User = self.classes.User
+
+ sess = fixture_session()
+ eq_(
+ sess.execute(
+ select(User).from_statement(
+ text("select * from users order by id")
+ )
+ )
+ .scalars()
+ .first(),
+ User(id=7),
+ )
+ eq_(
+ sess.execute(
+ select(User).from_statement(
+ text("select * from users where name='nonexistent'")
+ )
+ )
+ .scalars()
+ .first(),
+ None,
+ )
+
+ def test_columns_mismatched(self):
+ # test that columns using column._label match, as well as that
+ # ordering doesn't matter
+ User = self.classes.User
+
+ s = fixture_session()
+ q = s.query(User).from_statement(
+ text(
+ "select name, 27 as foo, id as users_id from users order by id"
+ )
+ )
+ eq_(
+ q.all(),
+ [
+ User(id=7, name="jack"),
+ User(id=8, name="ed"),
+ User(id=9, name="fred"),
+ User(id=10, name="chuck"),
+ ],
+ )
+
+ def test_columns_mismatched_future(self):
+ # test that columns using column._label match, as well as that
+ # ordering doesn't matter
+ User = self.classes.User
+
+ s = fixture_session()
+ q = select(User).from_statement(
+ text(
+ "select name, 27 as foo, id as users_id from users order by id"
+ )
+ )
+ eq_(
+ s.execute(q).scalars().all(),
+ [
+ User(id=7, name="jack"),
+ User(id=8, name="ed"),
+ User(id=9, name="fred"),
+ User(id=10, name="chuck"),
+ ],
+ )
+
+ def test_columns_multi_table_uselabels(self):
+ # test that columns using column._label match, as well as that
+ # ordering doesn't matter.
+ User = self.classes.User
+ Address = self.classes.Address
+
+ s = fixture_session()
+ q = s.query(User, Address).from_statement(
+ text(
+ "select users.name AS users_name, users.id AS users_id, "
+ "addresses.id AS addresses_id FROM users JOIN addresses "
+ "ON users.id = addresses.user_id WHERE users.id=8 "
+ "ORDER BY addresses.id"
+ )
+ )
+
+ eq_(
+ q.all(),
+ [
+ (User(id=8), Address(id=2)),
+ (User(id=8), Address(id=3)),
+ (User(id=8), Address(id=4)),
+ ],
+ )
+
+ def test_columns_multi_table_uselabels_future(self):
+ # test that columns using column._label match, as well as that
+ # ordering doesn't matter.
+ User = self.classes.User
+ Address = self.classes.Address
+
+ s = fixture_session()
+ q = select(User, Address).from_statement(
+ text(
+ "select users.name AS users_name, users.id AS users_id, "
+ "addresses.id AS addresses_id FROM users JOIN addresses "
+ "ON users.id = addresses.user_id WHERE users.id=8 "
+ "ORDER BY addresses.id"
+ )
+ )
+
+ eq_(
+ s.execute(q).all(),
+ [
+ (User(id=8), Address(id=2)),
+ (User(id=8), Address(id=3)),
+ (User(id=8), Address(id=4)),
+ ],
+ )
+
+ def test_columns_multi_table_uselabels_contains_eager(self):
+ # test that columns using column._label match, as well as that
+ # ordering doesn't matter.
+ User = self.classes.User
+ Address = self.classes.Address
+
+ s = fixture_session()
+ q = (
+ s.query(User)
+ .from_statement(
+ text(
+ "select users.name AS users_name, users.id AS users_id, "
+ "addresses.id AS addresses_id FROM users JOIN addresses "
+ "ON users.id = addresses.user_id WHERE users.id=8 "
+ "ORDER BY addresses.id"
+ )
+ )
+ .options(contains_eager(User.addresses))
+ )
+
+ def go():
+ r = q.all()
+ eq_(r[0].addresses, [Address(id=2), Address(id=3), Address(id=4)])
+
+ self.assert_sql_count(testing.db, go, 1)
+
+ def test_columns_multi_table_uselabels_contains_eager_future(self):
+ # test that columns using column._label match, as well as that
+ # ordering doesn't matter.
+ User = self.classes.User
+ Address = self.classes.Address
+
+ s = fixture_session()
+ q = (
+ select(User)
+ .from_statement(
+ text(
+ "select users.name AS users_name, users.id AS users_id, "
+ "addresses.id AS addresses_id FROM users JOIN addresses "
+ "ON users.id = addresses.user_id WHERE users.id=8 "
+ "ORDER BY addresses.id"
+ )
+ )
+ .options(contains_eager(User.addresses))
+ )
+
+ def go():
+ r = s.execute(q).unique().scalars().all()
+ eq_(r[0].addresses, [Address(id=2), Address(id=3), Address(id=4)])
+
+ self.assert_sql_count(testing.db, go, 1)
+
+ def test_columns_multi_table_uselabels_cols_contains_eager(self):
+ # test that columns using column._label match, as well as that
+ # ordering doesn't matter.
+ User = self.classes.User
+ Address = self.classes.Address
+
+ s = fixture_session()
+ q = (
+ s.query(User)
+ .from_statement(
+ text(
+ "select users.name AS users_name, users.id AS users_id, "
+ "addresses.id AS addresses_id FROM users JOIN addresses "
+ "ON users.id = addresses.user_id WHERE users.id=8 "
+ "ORDER BY addresses.id"
+ ).columns(User.name, User.id, Address.id)
+ )
+ .options(contains_eager(User.addresses))
+ )
+
+ def go():
+ r = q.all()
+ eq_(r[0].addresses, [Address(id=2), Address(id=3), Address(id=4)])
+
+ self.assert_sql_count(testing.db, go, 1)
+
+ def test_columns_multi_table_uselabels_cols_contains_eager_future(self):
+ # test that columns using column._label match, as well as that
+ # ordering doesn't matter.
+ User = self.classes.User
+ Address = self.classes.Address
+
+ s = fixture_session()
+ q = (
+ select(User)
+ .from_statement(
+ text(
+ "select users.name AS users_name, users.id AS users_id, "
+ "addresses.id AS addresses_id FROM users JOIN addresses "
+ "ON users.id = addresses.user_id WHERE users.id=8 "
+ "ORDER BY addresses.id"
+ ).columns(User.name, User.id, Address.id)
+ )
+ .options(contains_eager(User.addresses))
+ )
+
+ def go():
+ r = s.execute(q).unique().scalars().all()
+ eq_(r[0].addresses, [Address(id=2), Address(id=3), Address(id=4)])
+
+ self.assert_sql_count(testing.db, go, 1)
+
+ def test_textual_select_orm_columns(self):
+ # test that columns using column._label match, as well as that
+ # ordering doesn't matter.
+ User = self.classes.User
+ Address = self.classes.Address
+ users = self.tables.users
+ addresses = self.tables.addresses
+
+ s = fixture_session()
+ q = s.query(User.name, User.id, Address.id).from_statement(
+ text(
+ "select users.name AS users_name, users.id AS users_id, "
+ "addresses.id AS addresses_id FROM users JOIN addresses "
+ "ON users.id = addresses.user_id WHERE users.id=8 "
+ "ORDER BY addresses.id"
+ ).columns(users.c.name, users.c.id, addresses.c.id)
+ )
+
+ eq_(q.all(), [("ed", 8, 2), ("ed", 8, 3), ("ed", 8, 4)])
+
+ @testing.combinations(
+ (
+ False,
+ subqueryload,
+ ),
+ (
+ True,
+ subqueryload,
+ ),
+ (False, selectinload),
+ (True, selectinload),
+ )
+ def test_related_eagerload_against_text(self, add_columns, loader_option):
+ # new in 1.4. textual selects have columns so subqueryloaders
+ # and selectinloaders can join onto them. we add columns
+ # automatiacally to TextClause as well, however subqueryloader
+ # is not working at the moment due to execution model refactor,
+ # it creates a subquery w/ adapter before those columns are
+ # available. this is a super edge case and as we want to rewrite
+ # the loaders to use select(), maybe we can get it then.
+ User = self.classes.User
+
+ text_clause = text("select * from users")
+ if add_columns:
+ text_clause = text_clause.columns(User.id, User.name)
+
+ s = fixture_session()
+ q = (
+ s.query(User)
+ .from_statement(text_clause)
+ .options(loader_option(User.addresses))
+ )
+
+ def go():
+ eq_(set(q.all()), set(self.static.user_address_result))
+
+ if loader_option is subqueryload:
+ # subqueryload necessarily degrades to lazy loads for a text
+ # statement.
+ self.assert_sql_count(testing.db, go, 5)
+ else:
+ self.assert_sql_count(testing.db, go, 2)
+
+ def test_whereclause(self):
+ User = self.classes.User
+
+ eq_(
+ fixture_session().query(User).filter(text("id in (8, 9)")).all(),
+ [User(id=8), User(id=9)],
+ )
+
+ eq_(
+ fixture_session()
+ .query(User)
+ .filter(text("name='fred'"))
+ .filter(text("id=9"))
+ .all(),
+ [User(id=9)],
+ )
+ eq_(
+ fixture_session()
+ .query(User)
+ .filter(text("name='fred'"))
+ .filter(User.id == 9)
+ .all(),
+ [User(id=9)],
+ )
+
+ def test_whereclause_future(self):
+ User = self.classes.User
+
+ s = fixture_session()
+ eq_(
+ s.execute(select(User).filter(text("id in (8, 9)")))
+ .scalars()
+ .all(),
+ [User(id=8), User(id=9)],
+ )
+
+ eq_(
+ s.execute(
+ select(User).filter(text("name='fred'")).filter(text("id=9"))
+ )
+ .scalars()
+ .all(),
+ [User(id=9)],
+ )
+ eq_(
+ s.execute(
+ select(User).filter(text("name='fred'")).filter(User.id == 9)
+ )
+ .scalars()
+ .all(),
+ [User(id=9)],
+ )
+
+ def test_binds_coerce(self):
+ User = self.classes.User
+
+ assert_raises_message(
+ sa_exc.ArgumentError,
+ r"Textual SQL expression 'id in \(:id1, :id2\)' "
+ "should be explicitly declared",
+ fixture_session().query(User).filter,
+ "id in (:id1, :id2)",
+ )
+
+ def test_plain_textual_column(self):
+ User = self.classes.User
+
+ s = fixture_session()
+
+ self.assert_compile(
+ s.query(User.id, text("users.name")),
+ "SELECT users.id AS users_id, users.name FROM users",
+ )
+
+ eq_(
+ s.query(User.id, text("users.name")).all(),
+ [(7, "jack"), (8, "ed"), (9, "fred"), (10, "chuck")],
+ )
+
+ eq_(
+ s.query(User.id, literal_column("name")).order_by(User.id).all(),
+ [(7, "jack"), (8, "ed"), (9, "fred"), (10, "chuck")],
+ )
+
+ def test_via_select(self):
+ User = self.classes.User
+ s = fixture_session()
+ eq_(
+ s.query(User)
+ .from_statement(
+ select(column("id"), column("name"))
+ .select_from(table("users"))
+ .order_by("id")
+ )
+ .all(),
+ [User(id=7), User(id=8), User(id=9), User(id=10)],
+ )
+
+ def test_via_textasfrom_from_statement(self):
+ User = self.classes.User
+ s = fixture_session()
+
+ eq_(
+ s.query(User)
+ .from_statement(
+ text("select * from users order by id").columns(
+ id=Integer, name=String
+ )
+ )
+ .all(),
+ [User(id=7), User(id=8), User(id=9), User(id=10)],
+ )
+
+ def test_columns_via_textasfrom_from_statement(self):
+ User = self.classes.User
+ s = fixture_session()
+
+ eq_(
+ s.query(User.id, User.name)
+ .from_statement(
+ text("select * from users order by id").columns(
+ id=Integer, name=String
+ )
+ )
+ .all(),
+ [(7, "jack"), (8, "ed"), (9, "fred"), (10, "chuck")],
+ )
+
+ def test_via_textasfrom_use_mapped_columns(self):
+ User = self.classes.User
+ s = fixture_session()
+
+ eq_(
+ s.query(User)
+ .from_statement(
+ text("select * from users order by id").columns(
+ User.id, User.name
+ )
+ )
+ .all(),
+ [User(id=7), User(id=8), User(id=9), User(id=10)],
+ )
+
+ def test_via_textasfrom_aliased(self):
+ User = self.classes.User
+ s = fixture_session()
+
+ ua = aliased(
+ User,
+ text("select * from users").columns(User.id, User.name).subquery(),
+ )
+
+ eq_(
+ s.query(ua).order_by(ua.id).all(),
+ [User(id=7), User(id=8), User(id=9), User(id=10)],
+ )
+
+ def test_group_by_accepts_text(self):
+ User = self.classes.User
+ s = fixture_session()
+
+ q = s.query(User).group_by(text("name"))
+ self.assert_compile(
+ q,
+ "SELECT users.id AS users_id, users.name AS users_name "
+ "FROM users GROUP BY name",
+ )
+
+ def test_order_by_w_eager_one(self):
+ User = self.classes.User
+ s = fixture_session()
+
+ # from 1.0.0 thru 1.0.2, the "name" symbol here was considered
+ # to be part of the things we need to ORDER BY and it was being
+ # placed into the inner query's columns clause, as part of
+ # query._compound_eager_statement where we add unwrap_order_by()
+ # to the columns clause. However, as #3392 illustrates, unlocatable
+ # string expressions like "name desc" will only fail in this scenario,
+ # so in general the changing of the query structure with string labels
+ # is dangerous.
+ #
+ # the queries here are again "invalid" from a SQL perspective, as the
+ # "name" field isn't matched up to anything.
+ #
+
+ q = (
+ s.query(User)
+ .options(joinedload(User.addresses))
+ .order_by(desc("name"))
+ .limit(1)
+ )
+ assert_raises_message(
+ sa_exc.CompileError,
+ "Can't resolve label reference for ORDER BY / GROUP BY.",
+ q.set_label_style(
+ LABEL_STYLE_TABLENAME_PLUS_COL
+ ).statement.compile,
+ )
+
+ def test_order_by_w_eager_two(self):
+ User = self.classes.User
+ s = fixture_session()
+
+ q = (
+ s.query(User)
+ .options(joinedload(User.addresses))
+ .order_by("name")
+ .limit(1)
+ )
+ assert_raises_message(
+ sa_exc.CompileError,
+ "Can't resolve label reference for ORDER BY / GROUP BY.",
+ q.set_label_style(
+ LABEL_STYLE_TABLENAME_PLUS_COL
+ ).statement.compile,
+ )
+
+ def test_order_by_w_eager_three(self):
+ User = self.classes.User
+ s = fixture_session()
+
+ self.assert_compile(
+ s.query(User)
+ .options(joinedload(User.addresses))
+ .order_by("users_name")
+ .limit(1),
+ "SELECT anon_1.users_id AS anon_1_users_id, "
+ "anon_1.users_name AS anon_1_users_name, "
+ "addresses_1.id AS addresses_1_id, "
+ "addresses_1.user_id AS addresses_1_user_id, "
+ "addresses_1.email_address AS addresses_1_email_address "
+ "FROM (SELECT users.id AS users_id, users.name AS users_name "
+ "FROM users ORDER BY users.name "
+ "LIMIT :param_1) AS anon_1 "
+ "LEFT OUTER JOIN addresses AS addresses_1 "
+ "ON anon_1.users_id = addresses_1.user_id "
+ "ORDER BY anon_1.users_name, addresses_1.id",
+ )
+
+ # however! this works (again?)
+ eq_(
+ s.query(User)
+ .options(joinedload(User.addresses))
+ .order_by("users_name")
+ .first(),
+ User(name="chuck", addresses=[]),
+ )
+
+ def test_order_by_w_eager_four(self):
+ User = self.classes.User
+ Address = self.classes.Address
+ s = fixture_session()
+
+ self.assert_compile(
+ s.query(User)
+ .options(joinedload(User.addresses))
+ .order_by(desc("users_name"))
+ .limit(1),
+ "SELECT anon_1.users_id AS anon_1_users_id, "
+ "anon_1.users_name AS anon_1_users_name, "
+ "addresses_1.id AS addresses_1_id, "
+ "addresses_1.user_id AS addresses_1_user_id, "
+ "addresses_1.email_address AS addresses_1_email_address "
+ "FROM (SELECT users.id AS users_id, users.name AS users_name "
+ "FROM users ORDER BY users.name DESC "
+ "LIMIT :param_1) AS anon_1 "
+ "LEFT OUTER JOIN addresses AS addresses_1 "
+ "ON anon_1.users_id = addresses_1.user_id "
+ "ORDER BY anon_1.users_name DESC, addresses_1.id",
+ )
+
+ # however! this works (again?)
+ eq_(
+ s.query(User)
+ .options(joinedload(User.addresses))
+ .order_by(desc("users_name"))
+ .first(),
+ User(name="jack", addresses=[Address()]),
+ )
+
+ def test_order_by_w_eager_five(self):
+ """essentially the same as test_eager_relations -> test_limit_3,
+ but test for textual label elements that are freeform.
+ this is again #3392."""
+
+ User = self.classes.User
+ Address = self.classes.Address
+
+ sess = fixture_session()
+
+ q = sess.query(User, Address.email_address.label("email_address"))
+
+ result = (
+ q.join(User.addresses)
+ .options(joinedload(User.orders))
+ .order_by("email_address desc")
+ .limit(1)
+ .offset(0)
+ )
+
+ assert_raises_message(
+ sa_exc.CompileError,
+ "Can't resolve label reference for ORDER BY / GROUP BY",
+ result.all,
+ )
+
+
+class TextErrorTest(QueryTest, AssertsCompiledSQL):
+ def _test(self, fn, arg, offending_clause):
+ assert_raises_message(
+ sa.exc.ArgumentError,
+ r"Textual (?:SQL|column|SQL FROM) expression %(stmt)r should be "
+ r"explicitly declared (?:with|as) text\(%(stmt)r\)"
+ % {"stmt": util.ellipses_string(offending_clause)},
+ fn,
+ arg,
+ )
+
+ def test_filter(self):
+ User = self.classes.User
+ self._test(
+ fixture_session().query(User.id).filter, "myid == 5", "myid == 5"
+ )
+
+ def test_having(self):
+ User = self.classes.User
+ self._test(
+ fixture_session().query(User.id).having, "myid == 5", "myid == 5"
+ )
+
+ def test_from_statement(self):
+ User = self.classes.User
+ self._test(
+ fixture_session().query(User.id).from_statement,
+ "select id from user",
+ "select id from user",
+ )
--- /dev/null
+"""Test the TString construct in ORM context for Python 3.14+
+template strings."""
+
+from sqlalchemy import Integer
+from sqlalchemy import select
+from sqlalchemy import String
+from sqlalchemy import tstring
+from sqlalchemy.testing import AssertsCompiledSQL
+from sqlalchemy.testing.assertions import eq_
+from sqlalchemy.testing.fixtures import fixture_session
+from test.orm import _fixtures
+
+
+class QueryTest(_fixtures.FixtureTest):
+ run_setup_mappers = "once"
+ run_inserts = "once"
+ run_deletes = None
+
+ @classmethod
+ def setup_mappers(cls):
+ cls._setup_stock_mapping()
+
+
+class TStringTest(QueryTest, AssertsCompiledSQL):
+ __dialect__ = "default"
+
+ def test_select_star(self):
+ User = self.classes.User
+
+ eq_(
+ fixture_session()
+ .query(User)
+ .from_statement(
+ tstring(t"select * from users where users.id={7} order by id")
+ )
+ .first(),
+ User(id=7),
+ )
+ eq_(
+ fixture_session()
+ .query(User)
+ .from_statement(
+ tstring(t"select * from users where name={'nonexistent'}")
+ )
+ .first(),
+ None,
+ )
+
+ def test_select_star_future(self):
+ User = self.classes.User
+
+ sess = fixture_session()
+ eq_(
+ sess.execute(
+ select(User).from_statement(
+ tstring(
+ t"select * from users where users.id={7} order by id"
+ )
+ )
+ )
+ .scalars()
+ .first(),
+ User(id=7),
+ )
+ eq_(
+ sess.execute(
+ select(User).from_statement(
+ tstring(t"select * from users where name={'nonexistent'}")
+ )
+ )
+ .scalars()
+ .first(),
+ None,
+ )
+
+ def test_entity_interpolation(self):
+ User = self.classes.User
+
+ # Test interpolating entity columns and table in select and from clause
+ sess = fixture_session()
+ result = (
+ sess.execute(
+ select(User).from_statement(
+ tstring(t"select * from {User} order by {User.id}")
+ )
+ )
+ .scalars()
+ .all()
+ )
+
+ eq_([u.name for u in result], ["jack", "ed", "fred", "chuck"])
+
+ def test_whereclause(self):
+ User = self.classes.User
+
+ eq_(
+ fixture_session()
+ .query(User)
+ .filter(tstring(t"id in (8, 9)"))
+ .all(),
+ [User(id=8), User(id=9)],
+ )
+
+ eq_(
+ fixture_session()
+ .query(User)
+ .filter(tstring(t"name='fred'"))
+ .filter(tstring(t"id=9"))
+ .all(),
+ [User(id=9)],
+ )
+ eq_(
+ fixture_session()
+ .query(User)
+ .filter(tstring(t"name='fred'"))
+ .filter(User.id == 9)
+ .all(),
+ [User(id=9)],
+ )
+
+ def test_whereclause_future(self):
+ User = self.classes.User
+
+ s = fixture_session()
+ eq_(
+ s.execute(select(User).filter(tstring(t"id in (8, 9)")))
+ .scalars()
+ .all(),
+ [User(id=8), User(id=9)],
+ )
+
+ eq_(
+ s.execute(
+ select(User)
+ .filter(tstring(t"name='fred'"))
+ .filter(tstring(t"id=9"))
+ )
+ .scalars()
+ .all(),
+ [User(id=9)],
+ )
+ eq_(
+ s.execute(
+ select(User)
+ .filter(tstring(t"name='fred'"))
+ .filter(User.id == 9)
+ )
+ .scalars()
+ .all(),
+ [User(id=9)],
+ )
+
+ def test_via_textasfrom_from_statement(self):
+ User = self.classes.User
+ s = fixture_session()
+
+ eq_(
+ s.query(User)
+ .from_statement(
+ tstring(t"select * from users order by id").columns(
+ id=Integer, name=String
+ )
+ )
+ .all(),
+ [User(id=7), User(id=8), User(id=9), User(id=10)],
+ )
+
+ def test_columns_via_textasfrom_from_statement(self):
+ User = self.classes.User
+ s = fixture_session()
+
+ eq_(
+ s.query(User.id, User.name)
+ .from_statement(
+ tstring(t"select * from users order by id").columns(
+ id=Integer, name=String
+ )
+ )
+ .all(),
+ [(7, "jack"), (8, "ed"), (9, "fred"), (10, "chuck")],
+ )
+
+ def test_via_textasfrom_use_mapped_columns(self):
+ User = self.classes.User
+ s = fixture_session()
+
+ eq_(
+ s.query(User)
+ .from_statement(
+ tstring(t"select * from users order by id").columns(
+ User.id, User.name
+ )
+ )
+ .all(),
+ [User(id=7), User(id=8), User(id=9), User(id=10)],
+ )
+
+ def test_group_by_accepts_tstring(self):
+ User = self.classes.User
+ s = fixture_session()
+
+ q = s.query(User).group_by(tstring(t"name"))
+ self.assert_compile(
+ q,
+ "SELECT users.id AS users_id, users.name AS users_name "
+ "FROM users GROUP BY name",
+ )
# TEST: test.aaa_profiling.test_misc.CacheKeyTest.test_statement_key_is_not_cached
-test.aaa_profiling.test_misc.CacheKeyTest.test_statement_key_is_not_cached x86_64_linux_cpython_3.13_sqlite_pysqlite_dbapiunicode_cextensions 4003
+test.aaa_profiling.test_misc.CacheKeyTest.test_statement_key_is_not_cached x86_64_linux_cpython_3.13_sqlite_pysqlite_dbapiunicode_cextensions 4403
test.aaa_profiling.test_misc.CacheKeyTest.test_statement_key_is_not_cached x86_64_linux_cpython_3.13_sqlite_pysqlite_dbapiunicode_nocextensions 7603
-test.aaa_profiling.test_misc.CacheKeyTest.test_statement_key_is_not_cached x86_64_linux_cpython_3.14_sqlite_pysqlite_dbapiunicode_cextensions 4003
+test.aaa_profiling.test_misc.CacheKeyTest.test_statement_key_is_not_cached x86_64_linux_cpython_3.14_sqlite_pysqlite_dbapiunicode_cextensions 4403
test.aaa_profiling.test_misc.CacheKeyTest.test_statement_key_is_not_cached x86_64_linux_cpython_3.14_sqlite_pysqlite_dbapiunicode_nocextensions 7203
# TEST: test.aaa_profiling.test_misc.EnumTest.test_create_enum_from_pep_435_w_expensive_members
from sqlalchemy.sql.elements import Null
from sqlalchemy.sql.elements import OrderByList
from sqlalchemy.sql.elements import Slice
+from sqlalchemy.sql.elements import TString
from sqlalchemy.sql.elements import TypeClause
from sqlalchemy.sql.elements import UnaryExpression
from sqlalchemy.sql.functions import FunctionElement
SyntaxExtension,
DialectKWArgs,
Executable,
+ TString,
]
)
)
need = set(
cls
for cls in all_hascachekey_subclasses(
- ignore_subclasses=[Annotated, NoInit, SingletonConstant]
+ ignore_subclasses=[
+ Annotated,
+ NoInit,
+ SingletonConstant,
+ TString,
+ ]
)
if "orm" not in cls.__module__
and "compiler" not in cls.__module__
from sqlalchemy.sql import func
from sqlalchemy.sql import select
from sqlalchemy.sql import text
+from sqlalchemy.sql import tstring
from sqlalchemy.sql.base import ExecutableStatement
from sqlalchemy.sql.elements import literal
from sqlalchemy.testing import eq_
from sqlalchemy.testing.schema import Table
from sqlalchemy.types import Integer
from sqlalchemy.types import Text
+from sqlalchemy.util.compat import Template
from sqlalchemy.util.langhelpers import class_hierarchy
def _relevant_impls():
return (
text("select 1 + 2"),
+ tstring(Template("select 1 + 2")),
text("select 42 as q").columns(column("q", Integer)),
func.max(42),
select(1, 2).union(select(3, 4)),
--- /dev/null
+"""Test the TString construct for Python 3.14+ template strings."""
+
+from itertools import zip_longest
+
+from sqlalchemy import column
+from sqlalchemy import exc
+from sqlalchemy import Integer
+from sqlalchemy import JSON
+from sqlalchemy import literal
+from sqlalchemy import select
+from sqlalchemy import String
+from sqlalchemy import tstring
+from sqlalchemy.engine.interfaces import CacheStats
+from sqlalchemy.sql import table
+from sqlalchemy.sql.elements import ColumnClause
+from sqlalchemy.sql.sqltypes import TypeEngine
+from sqlalchemy.testing import AssertsCompiledSQL
+from sqlalchemy.testing import eq_
+from sqlalchemy.testing import fixtures
+from sqlalchemy.testing.assertions import expect_raises_message
+
+table1 = table(
+ "mytable",
+ column("myid", Integer),
+ column("name", String),
+ column("description", String),
+)
+
+table2 = table(
+ "myothertable", column("otherid", Integer), column("othername", String)
+)
+
+
+class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
+ __dialect__ = "default"
+
+ def test_basic_literal_interpolation(self):
+ a = 5
+ b = 10
+ stmt = tstring(t"select {a}, {b}")
+ self.assert_compile(
+ stmt,
+ "select :param_1, :param_2",
+ checkparams={"param_1": 5, "param_2": 10},
+ )
+
+ def test_no_strings(self):
+ with expect_raises_message(
+ exc.ArgumentError, r"pep-750 Tstring \(e.g. t'...'\) expected"
+ ):
+ tstring("select * from table") # type: ignore
+
+ def test_tstring_literal_passthrough(self):
+ stmt = tstring(t"select * from foo where lala = bar")
+ self.assert_compile(stmt, "select * from foo where lala = bar")
+
+ def test_sqlalchemy_expression_interpolation(self):
+ subq = select(literal(1)).scalar_subquery()
+ stmt = tstring(t"SELECT {subq}")
+ self.assert_compile(
+ stmt,
+ "SELECT (SELECT :param_1 AS anon_1)",
+ checkparams={"param_1": 1},
+ )
+
+ def test_column_interpolation(self):
+ stmt = tstring(t"SELECT {table1.c.myid}, {table1.c.name} FROM mytable")
+ self.assert_compile(
+ stmt, "SELECT mytable.myid, mytable.name FROM mytable"
+ )
+
+ def test_column_interpolation_labeled(self):
+ # Labels are not supported inside tstring as they're ambiguous
+ # (should they render with AS in all contexts?)
+ label1 = table1.c.myid.label("label1")
+ label2 = table1.c.name.label("label2")
+
+ with expect_raises_message(
+ exc.CompileError,
+ "Using label\\(\\) directly inside tstring is not supported",
+ ):
+ tstring(t"SELECT {label1}, {label2} FROM mytable").compile()
+
+ def test_arithmetic_expression(self):
+ # Python arithmetic is evaluated before being passed to tstring
+ a = 1
+ stmt = tstring(t"SELECT {a + 7}")
+ self.assert_compile(
+ stmt, "SELECT :param_1", checkparams={"param_1": 8}
+ )
+
+ def test_embed_tstring_as_select_criteria(self):
+ user_id = 123
+ stmt = select(table1).where(tstring(t"{table1.c.myid} = {user_id}"))
+ self.assert_compile(
+ stmt,
+ "SELECT mytable.myid, mytable.name, mytable.description FROM "
+ "mytable WHERE mytable.myid = :param_1",
+ checkparams={"param_1": 123},
+ )
+
+ def test_embed_tstring_as_fromclause(self):
+ status = "some status"
+ stmt = select(column("x")).select_from(
+ tstring(
+ t"foobar left outer join lala on foobar.foo = lala.foo "
+ t"AND foobar.status = {status}"
+ )
+ )
+ self.assert_compile(
+ stmt,
+ "SELECT x FROM foobar left outer join "
+ "lala on foobar.foo = lala.foo AND foobar.status = :param_1",
+ checkparams={"param_1": "some status"},
+ )
+
+ def test_and_operator(self):
+ stmt = tstring(t"1 = 1") & tstring(t"2 = 2")
+ self.assert_compile(stmt, "1 = 1 AND 2 = 2")
+
+ def test_multiple_literals(self):
+ a, b, c, d = 1, 2, 3, 4
+ stmt = tstring(t"SELECT {a}, {b}, {c}, {d}")
+ self.assert_compile(
+ stmt,
+ "SELECT :param_1, :param_2, :param_3, :param_4",
+ checkparams={
+ "param_1": 1,
+ "param_2": 2,
+ "param_3": 3,
+ "param_4": 4,
+ },
+ )
+
+ def test_nested_tstring_execution(self):
+ inner = tstring(t"(SELECT {'some value'} AS anon_1)")
+ self.assert_compile(
+ tstring(t"select {inner}"),
+ "select (SELECT :param_1 AS anon_1)",
+ checkparams={"param_1": "some value"},
+ )
+
+ def test_nested_scalar_subquery_execution(self):
+ inner = select(literal("some value")).scalar_subquery()
+ self.assert_compile(
+ tstring(t"select {inner}"),
+ "select (SELECT :param_1 AS anon_1)",
+ checkparams={"param_1": "some value"},
+ )
+
+ def test_nested_subquery_execution(self):
+ inner = select(literal("some value")).subquery()
+ self.assert_compile(
+ tstring(t"select * from {inner}"),
+ "select * from (SELECT :param_1 AS anon_2) AS anon_1",
+ checkparams={"param_1": "some value"},
+ )
+
+
+class ColumnsTest(fixtures.TestBase, AssertsCompiledSQL):
+ __dialect__ = "default"
+
+ def _assert_columns(self, stmt, columns):
+ """Assert that stmt.selected_columns matches the given columns.
+
+ Also verifies that the result map structure matches what we'd get
+ from a regular select() statement with the same columns.
+ """
+ # Check that selected_columns matches
+ eq_(
+ [c.name for c in stmt.selected_columns],
+ [c.name for c in columns],
+ )
+ for stmt_col, expected_col in zip(stmt.selected_columns, columns):
+ eq_(stmt_col.type._type_affinity, expected_col.type._type_affinity)
+
+ # Verify result map structure matches what select() would produce
+ stmt_compiled = stmt.compile()
+ select_compiled = select(*columns).compile()
+ stmt_map = stmt_compiled._create_result_map()
+ select_map = select_compiled._create_result_map()
+
+ # Compare result map structure using recursive comparison
+ eq_(list(stmt_map.keys()), list(select_map.keys()))
+ for key in stmt_map:
+ stmt_entry = stmt_map[key]
+ select_entry = select_map[key]
+ # Use recursive comparison for the entire entry tuple
+ assert self._compare_recursive(
+ stmt_entry, select_entry
+ ), f"Result map entries differ:\n {stmt_entry}\n {select_entry}"
+
+ def _compare_recursive(self, left, right):
+ if isinstance(left, ColumnClause) and isinstance(right, ColumnClause):
+ return (
+ left.name == right.name
+ and left.type._type_affinity == right.type._type_affinity
+ )
+ elif isinstance(left, TypeEngine) and isinstance(right, TypeEngine):
+ return left._type_affinity == right._type_affinity
+ elif isinstance(left, (tuple, list)) and isinstance(
+ right, (tuple, list)
+ ):
+ return all(
+ self._compare_recursive(l, r)
+ for l, r in zip_longest(left, right)
+ )
+ else:
+ return left == right
+
+ def test_columns_positional(self):
+ cols = [column("id", Integer), column("name", String)]
+ stmt = tstring(t"SELECT id, name FROM users").columns(*cols)
+ self.assert_compile(stmt, "SELECT id, name FROM users")
+ self._assert_columns(stmt, cols)
+
+ def test_columns_keyword(self):
+ stmt = tstring(t"SELECT id, name FROM users").columns(
+ id=Integer, name=String
+ )
+ self.assert_compile(stmt, "SELECT id, name FROM users")
+ cols = [column("id", Integer), column("name", String)]
+ self._assert_columns(stmt, cols)
+
+ def test_columns_mixed(self):
+ cols = [
+ column("id", Integer),
+ column("name", String),
+ column("age", Integer),
+ ]
+ stmt = tstring(t"SELECT id, name, age FROM users").columns(
+ cols[0], name=String, age=Integer
+ )
+ self.assert_compile(stmt, "SELECT id, name, age FROM users")
+ self._assert_columns(stmt, cols)
+
+ def test_columns_subquery(self):
+ stmt = (
+ tstring(t"SELECT id, name FROM users")
+ .columns(column("id", Integer), column("name", String))
+ .subquery("st")
+ )
+ outer = select(table1).select_from(
+ table1.join(stmt, table1.c.name == stmt.c.name)
+ )
+ self.assert_compile(
+ outer,
+ "SELECT mytable.myid, mytable.name, mytable.description FROM "
+ "mytable JOIN (SELECT id, name FROM users) AS st ON "
+ "mytable.name = st.name",
+ )
+
+
+class ExecutionTest(fixtures.TestBase):
+ __backend__ = True
+
+ def test_basic_execution(self, connection):
+ a = 1
+ b = 2
+ result = connection.execute(tstring(t"select {a + 7}, {b}"))
+ eq_(result.all(), [(8, 2)])
+
+ def test_json_literal_execution(self, connection):
+ some_json = {"foo": "bar"}
+ stmt = tstring(t"select {literal(some_json, JSON)}").columns(
+ column("jj", JSON)
+ )
+ result = connection.execute(stmt)
+ row = result.scalar()
+ eq_(row, {"foo": "bar"})
+
+ def test_statement_caching(self, connection):
+ """Test that tstring statements are properly cached."""
+ some_json = {"foo": "bar"}
+ stmt1 = tstring(t"select {literal(some_json, JSON)}").columns(
+ column("jj", JSON)
+ )
+ result1 = connection.execute(stmt1)
+ eq_(result1.scalar(), {"foo": "bar"})
+
+ # Execute same structure with different value
+ some_json = {"foo": "newbar", "bat": "hoho"}
+ stmt2 = tstring(t"select {literal(some_json, JSON)}").columns(
+ column("jj", JSON)
+ )
+ result2 = connection.execute(stmt2)
+
+ # Should hit cache
+ if hasattr(result2.context, "cache_hit"):
+ eq_(result2.context.cache_hit, CacheStats.CACHE_HIT)
+
+ eq_(result2.scalar(), {"foo": "newbar", "bat": "hoho"})
+
+ def test_nested_scalar_subquery_execution(self, connection):
+ inner = select(literal("some value")).scalar_subquery()
+ result = connection.execute(tstring(t"select {inner}"))
+ eq_(result.all(), [("some value",)])
+
+ def test_nested_subquery_execution(self, connection):
+ inner = select(literal("some value")).subquery()
+ result = connection.execute(tstring(t"select * from {inner}"))
+ eq_(result.all(), [("some value",)])
+
+ def test_multiple_values(self, connection):
+ values = [1, 2, 3, 4, 5]
+ result = connection.execute(
+ tstring(
+ t"select {values[0]}, {values[1]}, {values[2]}, "
+ t"{values[3]}, {values[4]}"
+ )
+ )
+ eq_(result.all(), [(1, 2, 3, 4, 5)])
+
+
+class IntegrationTest(fixtures.TablesTest):
+ __backend__ = True
+
+ @classmethod
+ def define_tables(cls, metadata):
+ from sqlalchemy import Column
+ from sqlalchemy import Table
+
+ Table(
+ "users",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("name", String(50)),
+ )
+
+ @classmethod
+ def insert_data(cls, connection):
+ connection.execute(
+ cls.tables.users.insert(),
+ [
+ {"id": 1, "name": "alice"},
+ {"id": 2, "name": "bob"},
+ {"id": 3, "name": "charlie"},
+ ],
+ )
+
+ def test_select_from_real_table(self, connection):
+ user_id = 2
+ stmt = tstring(t"SELECT * FROM users WHERE id = {user_id}").columns(
+ column("id", Integer), column("name", String)
+ )
+ result = connection.execute(stmt)
+ row = result.one()
+ eq_(row.id, 2)
+ eq_(row.name, "bob")
+
+ def test_where_clause_with_real_table(self, connection):
+ users = self.tables.users
+ name_filter = "alice"
+ stmt = select(users).where(
+ tstring(t"{users.c.name} = {literal(name_filter)}")
+ )
+ result = connection.execute(stmt)
+ row = result.one()
+ eq_(row.id, 1)
+ eq_(row.name, "alice")
+
+ def test_complex_query(self, connection):
+ min_id = 1
+ max_id = 2
+ stmt = tstring(
+ t"SELECT id, name FROM users WHERE id >= {min_id} "
+ t"AND id <= {max_id}"
+ ).columns(column("id", Integer), column("name", String))
+ result = connection.execute(stmt)
+ rows = result.all()
+ eq_(len(rows), 2)
+ eq_(rows[0].name, "alice")
+ eq_(rows[1].name, "bob")
+
+
+class CacheKeyTest(fixtures.CacheKeyFixture, fixtures.TestBase):
+ """Test cache key generation for tstring constructs."""
+
+ @fixtures.CacheKeySuite.run_suite_tests
+ def test_tstring_cache_key(self):
+
+ def stmt1():
+ # Basic tstring with literal
+ a = 5
+ return tstring(t"SELECT {a}")
+
+ def stmt2():
+ # Different structure - two literals
+ a = 5
+ b = 10
+ return tstring(t"SELECT {a}, {b}")
+
+ def stmt3():
+ # With column reference
+ return tstring(t"SELECT {table1.c.myid}")
+
+ def stmt4():
+ # Different column - different cache key
+ return tstring(t"SELECT {table1.c.name}")
+
+ def stmt5():
+ # With .columns()
+ a = 5
+ return tstring(t"SELECT {a}").columns(column("val", Integer))
+
+ def stmt6():
+ # String literal passthrough
+ return tstring(t"SELECT * FROM users")
+
+ def stmt7():
+ # Different string literal
+ return tstring(t"SELECT id FROM users")
+
+ def stmt8():
+ # With SQLAlchemy scalar subquery
+ return tstring(t"SELECT {select(literal(1)).scalar_subquery()}")
+
+ def stmt9():
+ # Mixed: text and literal
+ user_id = 42
+ return tstring(t"SELECT * FROM users WHERE id = {user_id}")
+
+ def stmt10():
+ # Mixed: text and column
+ return tstring(t"SELECT * FROM users WHERE id = {table1.c.myid}")
+
+ return lambda: [
+ stmt1(),
+ stmt2(),
+ stmt3(),
+ stmt4(),
+ stmt5(),
+ stmt6(),
+ stmt7(),
+ stmt8(),
+ stmt9(),
+ stmt10(),
+ ]
from __future__ import annotations
+from string.templatelib import Template
from typing import Any
from typing import assert_type
from typing import Unpack
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import text
+from sqlalchemy import tstring
from sqlalchemy import update
from sqlalchemy.engine import Result
from sqlalchemy.engine.row import Row
from sqlalchemy.orm.query import RowReturningQuery
from sqlalchemy.sql.dml import ReturningInsert
from sqlalchemy.sql.elements import KeyedColumnElement
+from sqlalchemy.sql.elements import TString
from sqlalchemy.sql.expression import FromClause
from sqlalchemy.sql.expression import TextClause
from sqlalchemy.sql.selectable import ScalarSelect
assert_type(r1, Result[int, str])
-def t_from_statement() -> None:
+def t_from_statement_text() -> None:
t = text("select * from user")
assert_type(t, TextClause)
select(User).from_statement(t)
+ session.query(User).from_statement(t)
+
ts = text("select * from user").columns(User.id, User.name)
assert_type(ts, TextualSelect)
select(User).from_statement(ts)
+ session.query(User).from_statement(ts)
+
ts2 = text("select * from user").columns(
user_table.c.id, user_table.c.name
)
select(User).from_statement(ts2)
+ session.query(User).from_statement(ts2)
+
+
+def t_from_statement_tstring(templ: Template) -> None:
+ t = tstring(templ)
+
+ assert_type(t, TString)
+
+ select(User).from_statement(t)
+
+ session.query(User).from_statement(t)
+
+ ts = tstring(templ).columns(User.id, User.name)
+
+ assert_type(ts, TextualSelect)
+
+ select(User).from_statement(ts)
+
+ session.query(User).from_statement(ts)
+
+ ts2 = tstring(templ).columns(user_table.c.id, user_table.c.name)
+
+ assert_type(ts2, TextualSelect)
+
+ select(User).from_statement(ts2)
+
+ session.query(User).from_statement(ts2)
+
def t_aliased_fromclause() -> None:
a1 = aliased(User, user_table)