]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
add TString support
authorMike Bayer <mike_mp@zzzcomputing.com>
Mon, 17 Nov 2025 18:11:24 +0000 (13:11 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sun, 30 Nov 2025 19:38:13 +0000 (14:38 -0500)
Added support for Python 3.14+ template strings (t-strings) via the new
:func:`_sql.tstring` construct, as defined in :pep:`750`. This feature
allows for ergonomic SQL statement construction by automatically
interpolating Python values and SQLAlchemy expressions within template
strings.

Part of the challenge here is the syntax only works on py314, so we have
to exclude the test file at many levels when py314 is not used.  not
sure yet how i want to adjust pep8 tests and rules for this.

Fixes: #12548
Change-Id: Ia060d1387ff452fe4f5d924f683529a22a8e1f72

28 files changed:
.pre-commit-config.yaml
doc/build/changelog/migration_21.rst
doc/build/changelog/unreleased_21/12548.rst [new file with mode: 0644]
doc/build/core/sqlelement.rst
lib/sqlalchemy/__init__.py
lib/sqlalchemy/orm/context.py
lib/sqlalchemy/orm/query.py
lib/sqlalchemy/sql/__init__.py
lib/sqlalchemy/sql/_elements_constructors.py
lib/sqlalchemy/sql/coercions.py
lib/sqlalchemy/sql/compiler.py
lib/sqlalchemy/sql/elements.py
lib/sqlalchemy/sql/expression.py
lib/sqlalchemy/sql/roles.py
lib/sqlalchemy/sql/selectable.py
lib/sqlalchemy/sql/util.py
lib/sqlalchemy/util/compat.py
noxfile.py
pyproject.toml
test/conftest.py
test/orm/test_query.py
test/orm/test_text.py [new file with mode: 0644]
test/orm/test_tstring_py314.py [new file with mode: 0644]
test/profiles.txt
test/sql/test_compare.py
test/sql/test_statement_params.py
test/sql/test_tstrings_py314.py [new file with mode: 0644]
test/typing/plain_files/orm/typed_queries.py

index dfbc82115bdc33f8508e0d7e8d96bad6fbaffce7..07d00fbfb082db5ec0647bd4520cc10fbbfde755 100644 (file)
@@ -1,5 +1,7 @@
-# See https://pre-commit.com for more information
-# See https://pre-commit.com/hooks.html for more hooks
+
+default_language_version:
+    python: python3.14
+
 repos:
 -   repo: https://github.com/python/black
     rev: 25.11.0
index d8e7da5cef854b69abe9919deb039f34318c1992..bf0a29cdc2813b08a76a8e502f3a1c526abe2688 100644 (file)
@@ -497,6 +497,70 @@ E.g.::
 New Features and Improvements - Core
 =====================================
 
+.. _change_12548:
+
+Template String (t-string) Support for Python 3.14+
+----------------------------------------------------
+
+SQLAlchemy 2.1 adds support for Python 3.14+ template strings (t-strings)
+via the new :func:`_sql.tstring` construct, as defined in :pep:`750`.
+This feature provides a more ergonomic way to construct SQL statements by
+automatically interpolating Python values and SQLAlchemy expressions within
+template strings.
+
+The :func:`_sql.tstring` function works similarly to :func:`_sql.text`, but
+automatically handles different types of interpolated values:
+
+* **String literals** from the template are rendered directly as SQL
+* **SQLAlchemy expressions** (columns, functions, subqueries, etc.) are
+  embedded as clause elements
+* **Plain Python values** are automatically wrapped with :func:`_sql.literal`
+
+Example usage::
+
+    from sqlalchemy import tstring, select, literal, JSON
+
+    # Python values become bound values
+    user_id = 42
+    stmt = tstring(t"SELECT * FROM users WHERE id = {user_id}")
+    # renders: SELECT * FROM users WHERE id = :param_1
+
+    # SQLAlchemy expressions are embedded
+    from sqlalchemy import table, column
+
+    stmt = tstring(t"SELECT {column('q')} FROM {table('t')}")
+    # renders: SELECT q FROM t
+
+    # Apply explicit SQL types to bound values using literal()
+    some_json = {"foo": "bar"}
+    stmt = tstring(t"SELECT {literal(some_json, JSON)}")
+
+Like :func:`_sql.text`, the :class:`_sql.TString` construct supports the
+:meth:`_sql.TString.columns` method to specify return columns and their types::
+
+    from sqlalchemy import column, Integer, String
+
+    stmt = tstring(t"SELECT id, name FROM users").columns(
+        column("id", Integer), column("name", String)
+    )
+
+    for id, name in connection.execute(stmt):
+        print(id, name)
+
+The :func:`_sql.tstring` construct is fully compatible with SQLAlchemy's
+statement caching system. Statements with the same structure but different
+literal values will share the same cache key, providing optimal performance.
+
+.. seealso::
+
+    :func:`_sql.tstring`
+
+    :class:`_sql.TString`
+
+    `PEP 750 <https://peps.python.org/pep-0750/>`_ - Template Strings
+
+:ticket:`12548`
+
 .. _change_10635:
 
 ``Row`` now represents individual column types directly without ``Tuple``
diff --git a/doc/build/changelog/unreleased_21/12548.rst b/doc/build/changelog/unreleased_21/12548.rst
new file mode 100644 (file)
index 0000000..5ca668f
--- /dev/null
@@ -0,0 +1,13 @@
+.. change::
+    :tags: feature, sql
+    :tickets: 12548
+
+    Added support for Python 3.14+ template strings (t-strings) via the new
+    :func:`_sql.tstring` construct. This feature makes use of Python 3.14
+    template strings as defined in :pep:`750`, allowing for ergonomic SQL
+    statement construction by automatically interpolating Python values and
+    SQLAlchemy expressions within template strings.
+
+    .. seealso::
+
+        :ref:`change_12548` - in :ref:`migration_21_toplevel`
index 88dc810efaf2e4db9ab8b3984d629d9fcefaabcc..047d8594cda2fd0d0f1e611e45067b5bb4803934 100644 (file)
@@ -65,6 +65,8 @@ used when building up SQLAlchemy Expression Language constructs.
 
 .. autofunction:: text
 
+.. autofunction:: tstring
+
 .. autofunction:: true
 
 .. autofunction:: try_cast
@@ -221,6 +223,11 @@ The classes here are generated using the constructors listed at
 
 .. autoclass:: TextClause
    :members:
+   :inherited-members: columns
+
+.. autoclass:: TString
+   :members:
+   :inherited-members: columns
 
 .. autoclass:: TryCast
    :members:
index 08035c408c993a3da9387f3130d00d3546446e59..0e93c24e83c24f4a2cac80859fda3787c0a6ab2c 100644 (file)
@@ -212,6 +212,8 @@ from .sql.expression import true as true
 from .sql.expression import True_ as True_
 from .sql.expression import try_cast as try_cast
 from .sql.expression import TryCast as TryCast
+from .sql.expression import TString as TString
+from .sql.expression import tstring as tstring
 from .sql.expression import Tuple as Tuple
 from .sql.expression import tuple_ as tuple_
 from .sql.expression import type_coerce as type_coerce
index 3ac216babfceead2e81591a94f64624b126a1504..848547a9dde0e315c8fec3b0e1262bcf15244fed 100644 (file)
@@ -880,10 +880,11 @@ class _ORMFromStatementCompileState(_ORMCompileState):
 
         self.order_by = None
 
-        if isinstance(self.statement, expression.TextClause):
-            # TextClause has no "column" objects at all.  for this case,
-            # we generate columns from our _QueryEntity objects, then
-            # flip on all the "please match no matter what" parameters.
+        if self.statement._is_text_clause:
+            # AbstractTextClause (TextClause, TString) has no "column"
+            # objects at all. for this case, we generate columns from our
+            # _QueryEntity objects, then flip on all the
+            # "please match no matter what" parameters.
             self.extra_criteria_entities = {}
 
             for entity in self._entities:
index 2eb2c5e008f23520219145bb928d3e5e370370f5..c28c0a45d47e24e588c3f48cf91de5af426d5df0 100644 (file)
@@ -2754,7 +2754,7 @@ class Query(
 
     @_generative
     @_assertions(_no_clauseelement_condition)
-    def from_statement(self, statement: ExecutableReturnsRows) -> Self:
+    def from_statement(self, statement: roles.SelectStatementRole) -> Self:
         """Execute the given SELECT statement and return results.
 
         This method bypasses all internal statement compilation, and the
@@ -2771,10 +2771,10 @@ class Query(
             :meth:`_sql.Select.from_statement` - v2 comparable method.
 
         """
-        statement = coercions.expect(
+        _statement = coercions.expect(
             roles.SelectStatementRole, statement, apply_propagate_attrs=self
         )
-        self._statement = statement
+        self._statement = _statement
         return self
 
     def first(self) -> Optional[_T]:
index c49c0f6c3ad7ddd89807a7c43bc3143a990ef2e6..c19fec59d0be4598aa7a99d397efdf4ba3ee41ac 100644 (file)
@@ -98,9 +98,12 @@ from .expression import TableClause as TableClause
 from .expression import TableSample as TableSample
 from .expression import tablesample as tablesample
 from .expression import text as text
+from .expression import TextClause as TextClause
 from .expression import true as true
 from .expression import True_ as True_
 from .expression import try_cast as try_cast
+from .expression import TString as TString
+from .expression import tstring as tstring
 from .expression import tuple_ as tuple_
 from .expression import type_coerce as type_coerce
 from .expression import union as union
index 76669cb7ca57c7b96d52f06a6d3356a52ace6535..6fddb590b54807664da7de489c975753912ff5ef 100644 (file)
@@ -45,6 +45,7 @@ from .elements import Over
 from .elements import TextClause
 from .elements import True_
 from .elements import TryCast
+from .elements import TString
 from .elements import Tuple
 from .elements import TypeCoerce
 from .elements import UnaryExpression
@@ -52,6 +53,8 @@ from .elements import WithinGroup
 from .functions import FunctionElement
 
 if typing.TYPE_CHECKING:
+    from string.templatelib import Template
+
     from ._typing import _ByArgument
     from ._typing import _ColumnExpressionArgument
     from ._typing import _ColumnExpressionOrLiteralArgument
@@ -1817,6 +1820,86 @@ def text(text: str) -> TextClause:
     return TextClause(text)
 
 
+def tstring(template: Template) -> TString:
+    r"""Construct a new :class:`_expression.TString` clause,
+    representing a SQL template string using Python 3.14+ t-strings.
+
+    .. versionadded:: 2.1
+
+    E.g.::
+
+        from sqlalchemy import tstring
+
+        a = 5
+        b = 10
+        stmt = tstring(t"select {a}, {b}")
+        result = connection.execute(stmt)
+
+    The :func:`_expression.tstring` function accepts a Python 3.14+
+    template string (t-string) and processes it to create a SQL statement.
+    Unlike :func:`_expression.text`, which requires manual bind parameter
+    specification, :func:`_expression.tstring` automatically handles
+    interpolation of Python values and SQLAlchemy expressions.
+
+    **Interpolation Behavior**:
+
+    - **SQL content** expressed in the plain string portions of the template
+      are rendered directly as SQL
+    - **SQLAlchemy expressions** (columns, functions, etc.) are embedded
+      as clause elements
+    - **Plain Python values** are automatically wrapped in
+      :func:`_expression.literal`
+
+    For example::
+
+        from sqlalchemy import tstring, select, literal, JSON, table, column
+
+        # Python values become bound parameters
+        user_id = 42
+        stmt = tstring(t"SELECT * FROM users WHERE id = {user_id}")
+        # renders: SELECT * FROM users WHERE id = :param_1
+
+        # SQLAlchemy expressions are embedded
+        stmt = tstring(t"SELECT {column('q')} FROM {table('t')}")
+        # renders: SELECT q FROM t
+
+        # Apply explicit SQL types to bound values using literal()
+        some_json = {"foo": "bar"}
+        stmt = tstring(t"SELECT {literal(some_json, JSON)}")
+
+    **Column Specification**:
+
+    Like :func:`_expression.text`, the :func:`_expression.tstring` construct
+    supports the :meth:`_expression.TString.columns` method to specify
+    return columns and their types::
+
+        from sqlalchemy import tstring, column, Integer, String
+
+        stmt = tstring(t"SELECT id, name FROM users").columns(
+            column("id", Integer), column("name", String)
+        )
+
+        for id, name in connection.execute(stmt):
+            print(id, name)
+
+    :param template:
+      a Python 3.14+ template string (t-string) containing SQL fragments
+      and Python expressions to be interpolated.
+
+    .. seealso::
+
+        :ref:`tutorial_select_arbitrary_text` - in the :ref:`unified_tutorial`
+
+        :class:`_expression.TString`
+
+        :func:`_expression.text`
+
+        `PEP 750 <https://peps.python.org/pep-0750/>`_ - Template Strings
+
+    """
+    return TString(template)
+
+
 def true() -> True_:
     """Return a constant :class:`.True_` construct.
 
index c967ab0c9873c56347a63ab9831334ae8c46c729..7d510564227e7b5dbf86f64c9c86d3723f78adae 100644 (file)
@@ -795,6 +795,10 @@ class ExpressionElementImpl(_ColumnCoercions, RoleImpl):
         )
 
 
+class TStringElementImpl(ExpressionElementImpl, RoleImpl):
+    __slots__ = ()
+
+
 class BinaryElementImpl(ExpressionElementImpl, RoleImpl):
     __slots__ = ()
 
index d03167dbce8c042e75a9516f2f6b5412516c50d6..bda6b8166668ee283edceee15fa6ed925dffc8a1 100644 (file)
@@ -2597,8 +2597,15 @@ class SQLCompiler(Compiled):
         within_columns_clause=False,
         render_label_as_label=None,
         result_map_targets=(),
+        within_tstring=False,
         **kw,
     ):
+        if within_tstring:
+            raise exc.CompileError(
+                "Using label() directly inside tstring is not supported "
+                "as it is ambiguous how the label expression should be "
+                "rendered without knowledge of how it's being used in SQL"
+            )
         # only render labels within the columns clause
         # or ORDER BY clause of a select.  dialect-specific compilers
         # can modify this behavior.
@@ -2760,6 +2767,23 @@ class SQLCompiler(Compiled):
             ),
         )
 
+    def visit_tstring(self, tstring, add_to_result_map=None, **kw):
+        if self._collect_params:
+            self._add_to_params(tstring)
+
+        if not self.stack:
+            self.isplaintext = True
+
+        if add_to_result_map:
+            # tstring() object is present in the columns clause of a
+            # select().   Add a no-name entry to the result map so that
+            # row[tstring()] produces a result
+            add_to_result_map(None, None, (tstring,), sqltypes.NULLTYPE)
+
+        # Process each part and concatenate
+        kw["within_tstring"] = True
+        return "".join(self.process(part, **kw) for part in tstring.parts)
+
     def visit_textual_select(
         self, taf, compound_index=None, asfrom=False, **kw
     ):
@@ -4394,6 +4418,7 @@ class SQLCompiler(Compiled):
         lateral=False,
         enclosing_alias=None,
         from_linter=None,
+        within_tstring=False,
         **kwargs,
     ):
         if lateral:
@@ -4430,7 +4455,7 @@ class SQLCompiler(Compiled):
         else:
             kwargs["enclosing_alias"] = alias
 
-        if asfrom or ashint:
+        if asfrom or ashint or within_tstring:
             if isinstance(alias.name, elements._truncated_label):
                 alias_name = self._truncated_identifier("alias", alias.name)
             else:
@@ -4438,7 +4463,7 @@ class SQLCompiler(Compiled):
 
         if ashint:
             return self.preparer.format_alias(alias, alias_name)
-        elif asfrom:
+        elif asfrom or within_tstring:
             if from_linter:
                 from_linter.froms[alias._de_clone()] = alias_name
 
@@ -4825,6 +4850,7 @@ class SQLCompiler(Compiled):
             within_columns_clause=within_columns_clause,
             add_to_result_map=add_to_result_map,
             include_table=include_table,
+            within_tstring=False,
         )
         return result_expr._compiler_dispatch(self, **column_clause_args)
 
@@ -5483,12 +5509,13 @@ class SQLCompiler(Compiled):
         from_linter=None,
         ambiguous_table_name_map=None,
         enclosing_alias=None,
+        within_tstring=False,
         **kwargs,
     ):
         if from_linter:
             from_linter.froms[table] = table.fullname
 
-        if asfrom or ashint:
+        if asfrom or ashint or within_tstring:
             effective_schema = self.preparer.schema_for_object(table)
 
             if use_schema and effective_schema:
index 7a640ccacc8a0b1188fda7382a523051d8b6a1c5..8f0d7e0a283b4f54a3b269f6a756bf7d738725c6 100644 (file)
@@ -81,6 +81,7 @@ from .. import util
 from ..util import deprecated
 from ..util import HasMemoized_ro_memoized_attribute
 from ..util import TypingOnly
+from ..util.compat import Template
 from ..util.typing import Self
 from ..util.typing import TupleAny
 from ..util.typing import Unpack
@@ -2323,7 +2324,7 @@ class TypeClause(DQLDMLClauseElement):
         self.type = type_
 
 
-class TextClause(
+class AbstractTextClause(
     roles.DDLConstraintColumnRole,
     roles.DDLExpressionRole,
     roles.StatementOptionRole,
@@ -2336,8 +2337,211 @@ class TextClause(
     ExecutableStatement,
     DQLDMLClauseElement,
     roles.BinaryElementRole[Any],
-    inspection.Inspectable["TextClause"],
 ):
+    """Base class for textual SQL constructs like TextClause and TString."""
+
+    __visit_name__: str
+
+    _is_text_clause = True
+    _is_textual = True
+    _is_implicitly_boolean = False
+    _render_label_in_columns_clause = False
+    _omit_from_statements = False
+    _is_collection_aggregate = False
+
+    @property
+    def _hide_froms(self) -> Iterable[FromClause]:
+        return ()
+
+    def __and__(self, other):
+        # support use in select.where(), query.filter()
+        return and_(self, other)
+
+    @property
+    def _select_iterable(self) -> _SelectIterable:
+        return (self,)
+
+    # help in those cases where text/tstring() is
+    # interpreted in a column expression situation
+    key: Optional[str] = None
+    _label: Optional[str] = None
+
+    _allow_label_resolve = False
+
+    @property
+    def type(self) -> TypeEngine[Any]:
+        return type_api.NULLTYPE
+
+    @property
+    def comparator(self):
+        return self.type.comparator_factory(self)  # type: ignore
+
+    def self_group(
+        self, against: Optional[OperatorType] = None
+    ) -> Union[Self, Grouping[Any]]:
+        if against is operators.in_op:
+            return Grouping(self)
+        else:
+            return self
+
+    def bindparams(
+        self,
+        *binds: BindParameter[Any],
+        **names_to_values: Any,
+    ) -> Self:
+        """Establish the values and/or types of bound parameters within
+        this :class:`_expression.AbstractTextClause` construct.
+
+        This is implemented only for :class:`.TextClause` will raise
+        ``NotImplementedError`` for :class:`.TString`.
+
+        """
+        raise NotImplementedError()
+
+    @util.preload_module("sqlalchemy.sql.selectable")
+    def columns(
+        self,
+        *cols: _ColumnExpressionArgument[Any],
+        **types: _TypeEngineArgument[Any],
+    ) -> TextualSelect:
+        r"""Turn this :class:`_expression.AbstractTextClause` object into a
+        :class:`_expression.TextualSelect`
+        object that serves the same role as a SELECT
+        statement.
+
+        The :class:`_expression.TextualSelect` is part of the
+        :class:`_expression.SelectBase`
+        hierarchy and can be embedded into another statement by using the
+        :meth:`_expression.TextualSelect.subquery` method to produce a
+        :class:`.Subquery`
+        object, which can then be SELECTed from.
+
+        This function essentially bridges the gap between an entirely
+        textual SELECT statement and the SQL expression language concept
+        of a "selectable"::
+
+            from sqlalchemy.sql import column, text
+
+            stmt = text("SELECT id, name FROM some_table")
+            stmt = stmt.columns(column("id"), column("name")).subquery("st")
+
+            stmt = (
+                select(mytable)
+                .select_from(mytable.join(stmt, mytable.c.name == stmt.c.name))
+                .where(stmt.c.id > 5)
+            )
+
+        Above, we pass a series of :func:`_expression.column` elements to the
+        :meth:`_expression.AbstractTextClause.columns` method positionally.
+        These :func:`_expression.column` elements now become first class
+        elements upon the :attr:`_expression.TextualSelect.selected_columns`
+        column collection, which then become part of the :attr:`.Subquery.c`
+        collection after :meth:`_expression.TextualSelect.subquery` is invoked.
+
+        The column expressions we pass to
+        :meth:`_expression.AbstractTextClause.columns` may also be typed; when
+        we do so, these :class:`.TypeEngine` objects become the effective
+        return type of the column, so that SQLAlchemy's result-set-processing
+        systems may be used on the return values. This is often needed for
+        types such as date or boolean types, as well as for unicode processing
+        on some dialect configurations::
+
+            stmt = text("SELECT id, name, timestamp FROM some_table")
+            stmt = stmt.columns(
+                column("id", Integer),
+                column("name", Unicode),
+                column("timestamp", DateTime),
+            )
+
+            for id, name, timestamp in connection.execute(stmt):
+                print(id, name, timestamp)
+
+        As a shortcut to the above syntax, keyword arguments referring to
+        types alone may be used, if only type conversion is needed::
+
+            stmt = text("SELECT id, name, timestamp FROM some_table")
+            stmt = stmt.columns(id=Integer, name=Unicode, timestamp=DateTime)
+
+            for id, name, timestamp in connection.execute(stmt):
+                print(id, name, timestamp)
+
+        The positional form of :meth:`_expression.AbstractTextClause.columns`
+        also provides the unique feature of **positional column targeting**,
+        which is particularly useful when using the ORM with complex textual
+        queries. If we specify the columns from our model to
+        :meth:`_expression.AbstractTextClause.columns`, the result set will
+        match to those columns positionally, meaning the name or origin of the
+        column in the textual SQL doesn't matter::
+
+            stmt = text(
+                "SELECT users.id, addresses.id, users.id, "
+                "users.name, addresses.email_address AS email "
+                "FROM users JOIN addresses ON users.id=addresses.user_id "
+                "WHERE users.id = 1"
+            ).columns(
+                User.id,
+                Address.id,
+                Address.user_id,
+                User.name,
+                Address.email_address,
+            )
+
+            query = (
+                session.query(User)
+                .from_statement(stmt)
+                .options(contains_eager(User.addresses))
+            )
+
+        The :meth:`_expression.AbstractTextClause.columns` method provides a
+        direct route to calling :meth:`_expression.FromClause.subquery` as well
+        as :meth:`_expression.SelectBase.cte` against a textual SELECT
+        statement::
+
+            stmt = stmt.columns(id=Integer, name=String).cte("st")
+
+            stmt = select(sometable).where(sometable.c.id == stmt.c.id)
+
+        :param \*cols: A series of :class:`_expression.ColumnElement` objects,
+         typically
+         :class:`_schema.Column` objects from a :class:`_schema.Table`
+         or ORM level
+         column-mapped attributes, representing a set of columns that this
+         textual string will SELECT from.
+
+        :param \**types: A mapping of string names to :class:`.TypeEngine`
+         type objects indicating the datatypes to use for names that are
+         SELECTed from the textual string.  Prefer to use the ``*cols``
+         argument as it also indicates positional ordering.
+
+        """
+        selectable = util.preloaded.sql_selectable
+
+        input_cols: List[NamedColumn[Any]] = [
+            coercions.expect(roles.LabeledColumnExprRole, col) for col in cols
+        ]
+
+        positional_input_cols = [
+            (
+                ColumnClause(col.key, types.pop(col.key))
+                if col.key in types
+                else col
+            )
+            for col in input_cols
+        ]
+        keyed_input_cols: List[NamedColumn[Any]] = [
+            ColumnClause(key, type_) for key, type_ in types.items()
+        ]
+
+        elem = selectable.TextualSelect.__new__(selectable.TextualSelect)
+        elem._init(
+            self,
+            positional_input_cols + keyed_input_cols,
+            positional=bool(positional_input_cols) and not keyed_input_cols,
+        )
+        return elem
+
+
+class TextClause(AbstractTextClause, inspection.Inspectable["TextClause"]):
     """Represent a literal SQL text fragment.
 
     E.g.::
@@ -2364,40 +2568,10 @@ class TextClause(
         ("text", InternalTraversal.dp_string),
     ] + ExecutableStatement._executable_traverse_internals
 
-    _is_text_clause = True
-
-    _is_textual = True
-
     _bind_params_regex = re.compile(r"(?<![:\w\x5c]):(\w+)(?!:)", re.UNICODE)
-    _is_implicitly_boolean = False
-
-    _render_label_in_columns_clause = False
-
-    _omit_from_statements = False
-
-    _is_collection_aggregate = False
-
-    @property
-    def _hide_froms(self) -> Iterable[FromClause]:
-        return ()
-
-    def __and__(self, other):
-        # support use in select.where(), query.filter()
-        return and_(self, other)
-
-    @property
-    def _select_iterable(self) -> _SelectIterable:
-        return (self,)
-
-    # help in those cases where text() is
-    # interpreted in a column expression situation
-    key: Optional[str] = None
-    _label: Optional[str] = None
-
-    _allow_label_resolve = False
 
     @property
-    def _is_star(self):  # type: ignore[override]
+    def _is_star(self) -> bool:  # type: ignore[override]
         return self.text == "*"
 
     def __init__(self, text: str):
@@ -2542,168 +2716,101 @@ class TextClause(
                 new_params[key] = existing._with_value(value, required=False)
         return self
 
-    @util.preload_module("sqlalchemy.sql.selectable")
-    def columns(
-        self,
-        *cols: _ColumnExpressionArgument[Any],
-        **types: _TypeEngineArgument[Any],
-    ) -> TextualSelect:
-        r"""Turn this :class:`_expression.TextClause` object into a
-        :class:`_expression.TextualSelect`
-        object that serves the same role as a SELECT
-        statement.
-
-        The :class:`_expression.TextualSelect` is part of the
-        :class:`_expression.SelectBase`
-        hierarchy and can be embedded into another statement by using the
-        :meth:`_expression.TextualSelect.subquery` method to produce a
-        :class:`.Subquery`
-        object, which can then be SELECTed from.
-
-        This function essentially bridges the gap between an entirely
-        textual SELECT statement and the SQL expression language concept
-        of a "selectable"::
-
-            from sqlalchemy.sql import column, text
+    @property
+    def type(self) -> TypeEngine[Any]:
+        return type_api.NULLTYPE
 
-            stmt = text("SELECT id, name FROM some_table")
-            stmt = stmt.columns(column("id"), column("name")).subquery("st")
+    @property
+    def comparator(self):
+        # TODO: this seems wrong, it seems like we might not
+        # be using this method.
+        return self.type.comparator_factory(self)  # type: ignore
 
-            stmt = (
-                select(mytable)
-                .select_from(mytable.join(stmt, mytable.c.name == stmt.c.name))
-                .where(stmt.c.id > 5)
-            )
+    def self_group(
+        self, against: Optional[OperatorType] = None
+    ) -> Union[Self, Grouping[Any]]:
+        if against is operators.in_op:
+            return Grouping(self)
+        else:
+            return self
 
-        Above, we pass a series of :func:`_expression.column` elements to the
-        :meth:`_expression.TextClause.columns` method positionally.  These
-        :func:`_expression.column`
-        elements now become first class elements upon the
-        :attr:`_expression.TextualSelect.selected_columns` column collection,
-        which then
-        become part of the :attr:`.Subquery.c` collection after
-        :meth:`_expression.TextualSelect.subquery` is invoked.
 
-        The column expressions we pass to
-        :meth:`_expression.TextClause.columns` may
-        also be typed; when we do so, these :class:`.TypeEngine` objects become
-        the effective return type of the column, so that SQLAlchemy's
-        result-set-processing systems may be used on the return values.
-        This is often needed for types such as date or boolean types, as well
-        as for unicode processing on some dialect configurations::
+class TString(AbstractTextClause, inspection.Inspectable["TString"]):
+    """Represent a SQL template string using Python 3.14+ t-strings.
 
-            stmt = text("SELECT id, name, timestamp FROM some_table")
-            stmt = stmt.columns(
-                column("id", Integer),
-                column("name", Unicode),
-                column("timestamp", DateTime),
-            )
+    E.g.::
 
-            for id, name, timestamp in connection.execute(stmt):
-                print(id, name, timestamp)
+        from sqlalchemy import tstring, column
 
-        As a shortcut to the above syntax, keyword arguments referring to
-        types alone may be used, if only type conversion is needed::
+        a = 5
+        b = 10
+        stmt = tstring(t"select {a}, {b}")
+        result = connection.execute(stmt)
 
-            stmt = text("SELECT id, name, timestamp FROM some_table")
-            stmt = stmt.columns(id=Integer, name=Unicode, timestamp=DateTime)
+    The :class:`_expression.TString` construct is produced using the
+    :func:`_expression.tstring` function; see that function for full
+    documentation.
 
-            for id, name, timestamp in connection.execute(stmt):
-                print(id, name, timestamp)
+    .. versionadded:: 2.1
 
-        The positional form of :meth:`_expression.TextClause.columns`
-        also provides the
-        unique feature of **positional column targeting**, which is
-        particularly useful when using the ORM with complex textual queries. If
-        we specify the columns from our model to
-        :meth:`_expression.TextClause.columns`,
-        the result set will match to those columns positionally, meaning the
-        name or origin of the column in the textual SQL doesn't matter::
+    .. seealso::
 
-            stmt = text(
-                "SELECT users.id, addresses.id, users.id, "
-                "users.name, addresses.email_address AS email "
-                "FROM users JOIN addresses ON users.id=addresses.user_id "
-                "WHERE users.id = 1"
-            ).columns(
-                User.id,
-                Address.id,
-                Address.user_id,
-                User.name,
-                Address.email_address,
-            )
+        :func:`_expression.tstring`
 
-            query = (
-                session.query(User)
-                .from_statement(stmt)
-                .options(contains_eager(User.addresses))
-            )
+    """
 
-        The :meth:`_expression.TextClause.columns` method provides a direct
-        route to calling :meth:`_expression.FromClause.subquery` as well as
-        :meth:`_expression.SelectBase.cte`
-        against a textual SELECT statement::
+    __visit_name__ = "tstring"
 
-            stmt = stmt.columns(id=Integer, name=String).cte("st")
+    _traverse_internals: _TraverseInternalsType = [
+        ("parts", InternalTraversal.dp_clauseelement_list)
+    ] + ExecutableStatement._executable_traverse_internals
 
-            stmt = select(sometable).where(sometable.c.id == stmt.c.id)
+    @property
+    def _is_star(self) -> bool:  # type: ignore[override]
+        return (
+            len(self.parts) == 1
+            and isinstance(self.parts[0], TextClause)
+            and self.parts[0]._is_star
+        )
 
-        :param \*cols: A series of :class:`_expression.ColumnElement` objects,
-         typically
-         :class:`_schema.Column` objects from a :class:`_schema.Table`
-         or ORM level
-         column-mapped attributes, representing a set of columns that this
-         textual string will SELECT from.
+    def __init__(self, template: Template):
+        """Construct a :class:`_expression.TString` from a Python 3.14+
+        template string.
 
-        :param \**types: A mapping of string names to :class:`.TypeEngine`
-         type objects indicating the datatypes to use for names that are
-         SELECTed from the textual string.  Prefer to use the ``*cols``
-         argument as it also indicates positional ordering.
+        :param template: a Python 3.14+ template string (t-string) that
+         contains SQL fragments and Python expressions to be interpolated.
 
         """
-        selectable = util.preloaded.sql_selectable
+        self.parts: List[ClauseElement] = []
 
-        input_cols: List[NamedColumn[Any]] = [
-            coercions.expect(roles.LabeledColumnExprRole, col) for col in cols
-        ]
+        if not isinstance(template, Template):
+            raise exc.ArgumentError("pep-750 Tstring (e.g. t'...') expected")
 
-        positional_input_cols = [
-            (
-                ColumnClause(col.key, types.pop(col.key))
-                if col.key in types
-                else col
-            )
-            for col in input_cols
-        ]
-        keyed_input_cols: List[NamedColumn[Any]] = [
-            ColumnClause(key, type_) for key, type_ in types.items()
-        ]
-
-        elem = selectable.TextualSelect.__new__(selectable.TextualSelect)
-        elem._init(
-            self,
-            positional_input_cols + keyed_input_cols,
-            positional=bool(positional_input_cols) and not keyed_input_cols,
-        )
-        return elem
+        for part in template:
+            if isinstance(part, str):
+                self.parts.append(TextClause(part))
+            else:
+                assert hasattr(part, "value")
+                self.parts.append(
+                    coercions.expect(roles.TStringElementRole, part.value)
+                )
 
-    @property
-    def type(self) -> TypeEngine[Any]:
-        return type_api.NULLTYPE
+    def bindparams(
+        self,
+        *binds: BindParameter[Any],
+        **names_to_values: Any,
+    ) -> Self:
+        """Not supported for TString constructs.
 
-    @property
-    def comparator(self):
-        # TODO: this seems wrong, it seems like we might not
-        # be using this method.
-        return self.type.comparator_factory(self)  # type: ignore
+        TString constructs do not support .bindparams(). Bind parameters
+        are automatically created from interpolated values.
 
-    def self_group(
-        self, against: Optional[OperatorType] = None
-    ) -> Union[Self, Grouping[Any]]:
-        if against is operators.in_op:
-            return Grouping(self)
-        else:
-            return self
+        """
+        raise NotImplementedError(
+            "TString constructs do not support .bindparams(). "
+            "Bind parameters are automatically created "
+            "from interpolated values."
+        )
 
 
 class Null(SingletonConstant, roles.ConstExprRole[None], ColumnElement[None]):
@@ -4282,13 +4389,19 @@ class Grouping(GroupedElement, ColumnElement[_T]):
     ]
 
     element: Union[
-        TextClause, ClauseList, ColumnElement[_T], CompilerColumnElement
+        AbstractTextClause,
+        ClauseList,
+        ColumnElement[_T],
+        CompilerColumnElement,
     ]
 
     def __init__(
         self,
         element: Union[
-            TextClause, ClauseList, ColumnElement[_T], CompilerColumnElement
+            AbstractTextClause,
+            ClauseList,
+            ColumnElement[_T],
+            CompilerColumnElement,
         ],
     ):
         self.element = element
index 60029dbf4fc791c586b0b0e5b70c5ddb4b6613e3..8f788b2584409d7de0f68c351ee430a6f2896758 100644 (file)
@@ -42,6 +42,7 @@ from ._elements_constructors import over as over
 from ._elements_constructors import text as text
 from ._elements_constructors import true as true
 from ._elements_constructors import try_cast as try_cast
+from ._elements_constructors import tstring as tstring
 from ._elements_constructors import tuple_ as tuple_
 from ._elements_constructors import type_coerce as type_coerce
 from ._elements_constructors import within_group as within_group
@@ -107,6 +108,7 @@ from .elements import SQLColumnExpression as SQLColumnExpression
 from .elements import TextClause as TextClause
 from .elements import True_ as True_
 from .elements import TryCast as TryCast
+from .elements import TString as TString
 from .elements import Tuple as Tuple
 from .elements import TypeClause as TypeClause
 from .elements import TypeCoerce as TypeCoerce
index 5a23d0d4d9d9acf64dbab1a55cb0a5f504a4e772..0f328c1f82b394b9efbb4b944f6021867107087d 100644 (file)
@@ -105,7 +105,16 @@ class TruncatedLabelRole(StringRole, SQLRole):
     _role_name = "String SQL identifier"
 
 
-class ColumnsClauseRole(AllowsLambdaRole, UsesInspection, ColumnListRole):
+class TStringElementRole(UsesInspection, SQLRole):
+    """Role for elements that can be interpolated into a TString."""
+
+    __slots__ = ()
+    _role_name = "TString interpolatable element"
+
+
+class ColumnsClauseRole(
+    TStringElementRole, AllowsLambdaRole, UsesInspection, ColumnListRole
+):
     __slots__ = ()
     _role_name = (
         "Column expression, FROM clause, or other columns clause element"
index 48fc75e3a11b8ea0a7a963cfd93ff67cca381a48..5342cd012add630bf173436b9bea4876a4e33661 100644 (file)
@@ -95,6 +95,7 @@ from .elements import DQLDMLClauseElement
 from .elements import GroupedElement
 from .elements import literal_column
 from .elements import TableValuedColumn
+from .elements import TextClause
 from .elements import UnaryExpression
 from .operators import OperatorType
 from .sqltypes import NULLTYPE
@@ -142,11 +143,11 @@ if TYPE_CHECKING:
     from .ddl import CreateTableAs
     from .dml import Delete
     from .dml import Update
+    from .elements import AbstractTextClause
     from .elements import BinaryExpression
     from .elements import KeyedColumnElement
     from .elements import Label
     from .elements import NamedColumn
-    from .elements import TextClause
     from .functions import Function
     from .schema import ForeignKey
     from .schema import ForeignKeyConstraint
@@ -158,7 +159,7 @@ if TYPE_CHECKING:
 
 _ColumnsClauseElement = Union["FromClause", ColumnElement[Any], "TextClause"]
 _LabelConventionCallable = Callable[
-    [Union["ColumnElement[Any]", "TextClause"]], Optional[str]
+    [Union["ColumnElement[Any]", "AbstractTextClause"]], Optional[str]
 ]
 
 
@@ -197,7 +198,7 @@ _SetupJoinsElement = Tuple[
 ]
 
 
-_SelectIterable = Iterable[Union["ColumnElement[Any]", "TextClause"]]
+_SelectIterable = Iterable[Union["ColumnElement[Any]", "AbstractTextClause"]]
 
 
 class _OffsetLimitParam(BindParameter[int]):
@@ -2345,7 +2346,7 @@ class _ColumnsPlusNames(NamedTuple):
     required_label_name was not given
     """
 
-    column: Union[ColumnElement[Any], TextClause]
+    column: Union[ColumnElement[Any], AbstractTextClause]
     """
     the ColumnElement itself
     """
@@ -4879,7 +4880,7 @@ class SelectState(util.MemoizedSlots, CompileState):
         names = set()
 
         def go(
-            c: Union[ColumnElement[Any], TextClause],
+            c: Union[ColumnElement[Any], AbstractTextClause],
             col_name: Optional[str] = None,
         ) -> Optional[str]:
             if is_text_clause(c):
@@ -7286,7 +7287,7 @@ class TextualSelect(SelectBase, ExecutableReturnsRows, Generative):
 
     def _init(
         self,
-        text: TextClause,
+        text: AbstractTextClause,
         columns: List[NamedColumn[Any]],
         positional: bool = False,
     ) -> None:
index 906ca5e7d7f491c04c61446645f12a0d90a9c769..9736b1ce70946e43cd220492e2ce2b029460a32a 100644 (file)
@@ -74,8 +74,8 @@ if typing.TYPE_CHECKING:
     from ._typing import _EquivalentColumnMap
     from ._typing import _LimitOffsetType
     from ._typing import _TypeEngineArgument
+    from .elements import AbstractTextClause
     from .elements import BinaryExpression
-    from .elements import TextClause
     from .selectable import _JoinTargetElement
     from .selectable import _SelectIterable
     from .selectable import Selectable
@@ -884,14 +884,14 @@ def reduce_columns(
     columns: _SelectIterable,
     *clauses: Optional[ClauseElement],
     **kw: bool,
-) -> Sequence[Union[ColumnElement[Any], TextClause]]: ...
+) -> Sequence[Union[ColumnElement[Any], AbstractTextClause]]: ...
 
 
 def reduce_columns(
     columns: _SelectIterable,
     *clauses: Optional[ClauseElement],
     **kw: bool,
-) -> Collection[Union[ColumnElement[Any], TextClause]]:
+) -> Collection[Union[ColumnElement[Any], AbstractTextClause]]:
     r"""given a list of columns, return a 'reduced' set based on natural
     equivalents.
 
index 26e2210d6408cf6563abf94f62e8596d250a43ce..a1c6504e4723966fad0d59624b7e2a0af480a538 100644 (file)
@@ -31,6 +31,7 @@ from typing import Sequence
 from typing import Set
 from typing import Tuple
 from typing import Type
+from typing import TYPE_CHECKING
 
 py314b1 = sys.version_info >= (3, 14, 0, "beta", 1)
 py314 = sys.version_info >= (3, 14)
@@ -50,6 +51,27 @@ has_refcount_gc = bool(cpython)
 
 dottedgetter = operator.attrgetter
 
+if py314 or TYPE_CHECKING:
+    from string.templatelib import Template as Template
+else:
+
+    class Template:  # type: ignore[no-redef]
+        """Minimal Template for Python < 3.14 (test usage only)."""
+
+        def __init__(self, *parts: Any):
+            self._parts = parts
+
+        @property
+        def strings(self) -> Tuple[str, ...]:
+            return tuple(p for p in self._parts if isinstance(p, str))
+
+        @property
+        def interpolations(self) -> Tuple[Any, ...]:
+            return tuple(p for p in self._parts if not isinstance(p, str))
+
+        def __iter__(self) -> Any:
+            return iter(self._parts)
+
 
 class FullArgSpec(typing.NamedTuple):
     args: List[str]
index 9afd989673c9f3c35d322327393c815268ea6183..d320710ff2e7cdafa1d2b926261ec4841835b566 100644 (file)
@@ -332,7 +332,7 @@ def test_pep484(session: nox.Session) -> None:
     )
 
 
-@nox.session(name="mypy")
+@nox.session(name="mypy", python="3.14")
 def test_mypy(session: nox.Session) -> None:
     """run the typing integration test suite"""
 
@@ -351,7 +351,7 @@ def test_mypy(session: nox.Session) -> None:
     session.run(*cmd, *posargs)
 
 
-@nox.session(name="pep8")
+@nox.session(name="pep8", python="3.14")
 def test_pep8(session: nox.Session) -> None:
     """Run linting and formatting checks."""
 
index b29f168e3164ae7837cc6413dbf55093247aaef2..c06c26fb2913c75ddb842f1a39d05c495169c391 100644 (file)
@@ -245,6 +245,8 @@ ignore = [
     "A003","A005",
     "D",
     "E203","E305","E701","E704","E711","E712","E721","E722","E741",
+    # F542: t-string without any placeholders (used in documentation examples)
+    "F542",
     "I300",
     "N801","N802","N806",
     "RST304","RST303","RST299","RST399",
index b7f2d945cacce6b892323753dcb4cbb56f0aacb0..90597d8882cd089d637a433fd09226d0899739fe 100755 (executable)
@@ -16,6 +16,10 @@ os.environ["SQLALCHEMY_WARN_20"] = "true"
 
 collect_ignore_glob = []
 
+# omit py314.py test files on earlier versions of python
+if sys.version_info < (3, 14):
+    collect_ignore_glob.append("*_py314.py")
+
 # this requires that sqlalchemy.testing was not already
 # imported in order to work
 pytest.register_assert_rewrite("sqlalchemy.testing.assertions")
index c2ce24d278ff6f6805483cd33c84ed4fb809f909..5863b553fc175caeb2137ccd34db2645c1fb39f2 100644 (file)
@@ -31,7 +31,6 @@ from sqlalchemy import null
 from sqlalchemy import or_
 from sqlalchemy import select
 from sqlalchemy import String
-from sqlalchemy import table
 from sqlalchemy import testing
 from sqlalchemy import text
 from sqlalchemy import true
@@ -55,7 +54,6 @@ from sqlalchemy.orm import joinedload
 from sqlalchemy.orm import lazyload
 from sqlalchemy.orm import Query
 from sqlalchemy.orm import relationship
-from sqlalchemy.orm import selectinload
 from sqlalchemy.orm import Session
 from sqlalchemy.orm import subqueryload
 from sqlalchemy.orm import synonym
@@ -5812,662 +5810,6 @@ class HintsTest(QueryTest, AssertsCompiledSQL):
         )
 
 
-class TextTest(QueryTest, AssertsCompiledSQL):
-    __dialect__ = "default"
-
-    def test_needs_text(self):
-        User = self.classes.User
-
-        assert_raises_message(
-            sa_exc.ArgumentError,
-            "Textual SQL expression",
-            fixture_session().query(User).from_statement,
-            "select * from users order by id",
-        )
-
-    def test_select_star(self):
-        User = self.classes.User
-
-        eq_(
-            fixture_session()
-            .query(User)
-            .from_statement(text("select * from users order by id"))
-            .first(),
-            User(id=7),
-        )
-        eq_(
-            fixture_session()
-            .query(User)
-            .from_statement(
-                text("select * from users where name='nonexistent'")
-            )
-            .first(),
-            None,
-        )
-
-    def test_select_star_future(self):
-        User = self.classes.User
-
-        sess = fixture_session()
-        eq_(
-            sess.execute(
-                select(User).from_statement(
-                    text("select * from users order by id")
-                )
-            )
-            .scalars()
-            .first(),
-            User(id=7),
-        )
-        eq_(
-            sess.execute(
-                select(User).from_statement(
-                    text("select * from users where name='nonexistent'")
-                )
-            )
-            .scalars()
-            .first(),
-            None,
-        )
-
-    def test_columns_mismatched(self):
-        # test that columns using column._label match, as well as that
-        # ordering doesn't matter
-        User = self.classes.User
-
-        s = fixture_session()
-        q = s.query(User).from_statement(
-            text(
-                "select name, 27 as foo, id as users_id from users order by id"
-            )
-        )
-        eq_(
-            q.all(),
-            [
-                User(id=7, name="jack"),
-                User(id=8, name="ed"),
-                User(id=9, name="fred"),
-                User(id=10, name="chuck"),
-            ],
-        )
-
-    def test_columns_mismatched_future(self):
-        # test that columns using column._label match, as well as that
-        # ordering doesn't matter
-        User = self.classes.User
-
-        s = fixture_session()
-        q = select(User).from_statement(
-            text(
-                "select name, 27 as foo, id as users_id from users order by id"
-            )
-        )
-        eq_(
-            s.execute(q).scalars().all(),
-            [
-                User(id=7, name="jack"),
-                User(id=8, name="ed"),
-                User(id=9, name="fred"),
-                User(id=10, name="chuck"),
-            ],
-        )
-
-    def test_columns_multi_table_uselabels(self):
-        # test that columns using column._label match, as well as that
-        # ordering doesn't matter.
-        User = self.classes.User
-        Address = self.classes.Address
-
-        s = fixture_session()
-        q = s.query(User, Address).from_statement(
-            text(
-                "select users.name AS users_name, users.id AS users_id, "
-                "addresses.id AS addresses_id FROM users JOIN addresses "
-                "ON users.id = addresses.user_id WHERE users.id=8 "
-                "ORDER BY addresses.id"
-            )
-        )
-
-        eq_(
-            q.all(),
-            [
-                (User(id=8), Address(id=2)),
-                (User(id=8), Address(id=3)),
-                (User(id=8), Address(id=4)),
-            ],
-        )
-
-    def test_columns_multi_table_uselabels_future(self):
-        # test that columns using column._label match, as well as that
-        # ordering doesn't matter.
-        User = self.classes.User
-        Address = self.classes.Address
-
-        s = fixture_session()
-        q = select(User, Address).from_statement(
-            text(
-                "select users.name AS users_name, users.id AS users_id, "
-                "addresses.id AS addresses_id FROM users JOIN addresses "
-                "ON users.id = addresses.user_id WHERE users.id=8 "
-                "ORDER BY addresses.id"
-            )
-        )
-
-        eq_(
-            s.execute(q).all(),
-            [
-                (User(id=8), Address(id=2)),
-                (User(id=8), Address(id=3)),
-                (User(id=8), Address(id=4)),
-            ],
-        )
-
-    def test_columns_multi_table_uselabels_contains_eager(self):
-        # test that columns using column._label match, as well as that
-        # ordering doesn't matter.
-        User = self.classes.User
-        Address = self.classes.Address
-
-        s = fixture_session()
-        q = (
-            s.query(User)
-            .from_statement(
-                text(
-                    "select users.name AS users_name, users.id AS users_id, "
-                    "addresses.id AS addresses_id FROM users JOIN addresses "
-                    "ON users.id = addresses.user_id WHERE users.id=8 "
-                    "ORDER BY addresses.id"
-                )
-            )
-            .options(contains_eager(User.addresses))
-        )
-
-        def go():
-            r = q.all()
-            eq_(r[0].addresses, [Address(id=2), Address(id=3), Address(id=4)])
-
-        self.assert_sql_count(testing.db, go, 1)
-
-    def test_columns_multi_table_uselabels_contains_eager_future(self):
-        # test that columns using column._label match, as well as that
-        # ordering doesn't matter.
-        User = self.classes.User
-        Address = self.classes.Address
-
-        s = fixture_session()
-        q = (
-            select(User)
-            .from_statement(
-                text(
-                    "select users.name AS users_name, users.id AS users_id, "
-                    "addresses.id AS addresses_id FROM users JOIN addresses "
-                    "ON users.id = addresses.user_id WHERE users.id=8 "
-                    "ORDER BY addresses.id"
-                )
-            )
-            .options(contains_eager(User.addresses))
-        )
-
-        def go():
-            r = s.execute(q).unique().scalars().all()
-            eq_(r[0].addresses, [Address(id=2), Address(id=3), Address(id=4)])
-
-        self.assert_sql_count(testing.db, go, 1)
-
-    def test_columns_multi_table_uselabels_cols_contains_eager(self):
-        # test that columns using column._label match, as well as that
-        # ordering doesn't matter.
-        User = self.classes.User
-        Address = self.classes.Address
-
-        s = fixture_session()
-        q = (
-            s.query(User)
-            .from_statement(
-                text(
-                    "select users.name AS users_name, users.id AS users_id, "
-                    "addresses.id AS addresses_id FROM users JOIN addresses "
-                    "ON users.id = addresses.user_id WHERE users.id=8 "
-                    "ORDER BY addresses.id"
-                ).columns(User.name, User.id, Address.id)
-            )
-            .options(contains_eager(User.addresses))
-        )
-
-        def go():
-            r = q.all()
-            eq_(r[0].addresses, [Address(id=2), Address(id=3), Address(id=4)])
-
-        self.assert_sql_count(testing.db, go, 1)
-
-    def test_columns_multi_table_uselabels_cols_contains_eager_future(self):
-        # test that columns using column._label match, as well as that
-        # ordering doesn't matter.
-        User = self.classes.User
-        Address = self.classes.Address
-
-        s = fixture_session()
-        q = (
-            select(User)
-            .from_statement(
-                text(
-                    "select users.name AS users_name, users.id AS users_id, "
-                    "addresses.id AS addresses_id FROM users JOIN addresses "
-                    "ON users.id = addresses.user_id WHERE users.id=8 "
-                    "ORDER BY addresses.id"
-                ).columns(User.name, User.id, Address.id)
-            )
-            .options(contains_eager(User.addresses))
-        )
-
-        def go():
-            r = s.execute(q).unique().scalars().all()
-            eq_(r[0].addresses, [Address(id=2), Address(id=3), Address(id=4)])
-
-        self.assert_sql_count(testing.db, go, 1)
-
-    def test_textual_select_orm_columns(self):
-        # test that columns using column._label match, as well as that
-        # ordering doesn't matter.
-        User = self.classes.User
-        Address = self.classes.Address
-        users = self.tables.users
-        addresses = self.tables.addresses
-
-        s = fixture_session()
-        q = s.query(User.name, User.id, Address.id).from_statement(
-            text(
-                "select users.name AS users_name, users.id AS users_id, "
-                "addresses.id AS addresses_id FROM users JOIN addresses "
-                "ON users.id = addresses.user_id WHERE users.id=8 "
-                "ORDER BY addresses.id"
-            ).columns(users.c.name, users.c.id, addresses.c.id)
-        )
-
-        eq_(q.all(), [("ed", 8, 2), ("ed", 8, 3), ("ed", 8, 4)])
-
-    @testing.combinations(
-        (
-            False,
-            subqueryload,
-        ),
-        (
-            True,
-            subqueryload,
-        ),
-        (False, selectinload),
-        (True, selectinload),
-    )
-    def test_related_eagerload_against_text(self, add_columns, loader_option):
-        # new in 1.4.   textual selects have columns so subqueryloaders
-        # and selectinloaders can join onto them.   we add columns
-        # automatiacally to TextClause as well, however subqueryloader
-        # is not working at the moment due to execution model refactor,
-        # it creates a subquery w/ adapter before those columns are
-        # available.  this is a super edge case and as we want to rewrite
-        # the loaders to use select(), maybe we can get it then.
-        User = self.classes.User
-
-        text_clause = text("select * from users")
-        if add_columns:
-            text_clause = text_clause.columns(User.id, User.name)
-
-        s = fixture_session()
-        q = (
-            s.query(User)
-            .from_statement(text_clause)
-            .options(loader_option(User.addresses))
-        )
-
-        def go():
-            eq_(set(q.all()), set(self.static.user_address_result))
-
-        if loader_option is subqueryload:
-            # subqueryload necessarily degrades to lazy loads for a text
-            # statement.
-            self.assert_sql_count(testing.db, go, 5)
-        else:
-            self.assert_sql_count(testing.db, go, 2)
-
-    def test_whereclause(self):
-        User = self.classes.User
-
-        eq_(
-            fixture_session().query(User).filter(text("id in (8, 9)")).all(),
-            [User(id=8), User(id=9)],
-        )
-
-        eq_(
-            fixture_session()
-            .query(User)
-            .filter(text("name='fred'"))
-            .filter(text("id=9"))
-            .all(),
-            [User(id=9)],
-        )
-        eq_(
-            fixture_session()
-            .query(User)
-            .filter(text("name='fred'"))
-            .filter(User.id == 9)
-            .all(),
-            [User(id=9)],
-        )
-
-    def test_whereclause_future(self):
-        User = self.classes.User
-
-        s = fixture_session()
-        eq_(
-            s.execute(select(User).filter(text("id in (8, 9)")))
-            .scalars()
-            .all(),
-            [User(id=8), User(id=9)],
-        )
-
-        eq_(
-            s.execute(
-                select(User).filter(text("name='fred'")).filter(text("id=9"))
-            )
-            .scalars()
-            .all(),
-            [User(id=9)],
-        )
-        eq_(
-            s.execute(
-                select(User).filter(text("name='fred'")).filter(User.id == 9)
-            )
-            .scalars()
-            .all(),
-            [User(id=9)],
-        )
-
-    def test_binds_coerce(self):
-        User = self.classes.User
-
-        assert_raises_message(
-            sa_exc.ArgumentError,
-            r"Textual SQL expression 'id in \(:id1, :id2\)' "
-            "should be explicitly declared",
-            fixture_session().query(User).filter,
-            "id in (:id1, :id2)",
-        )
-
-    def test_plain_textual_column(self):
-        User = self.classes.User
-
-        s = fixture_session()
-
-        self.assert_compile(
-            s.query(User.id, text("users.name")),
-            "SELECT users.id AS users_id, users.name FROM users",
-        )
-
-        eq_(
-            s.query(User.id, text("users.name")).all(),
-            [(7, "jack"), (8, "ed"), (9, "fred"), (10, "chuck")],
-        )
-
-        eq_(
-            s.query(User.id, literal_column("name")).order_by(User.id).all(),
-            [(7, "jack"), (8, "ed"), (9, "fred"), (10, "chuck")],
-        )
-
-    def test_via_select(self):
-        User = self.classes.User
-        s = fixture_session()
-        eq_(
-            s.query(User)
-            .from_statement(
-                select(column("id"), column("name"))
-                .select_from(table("users"))
-                .order_by("id")
-            )
-            .all(),
-            [User(id=7), User(id=8), User(id=9), User(id=10)],
-        )
-
-    def test_via_textasfrom_from_statement(self):
-        User = self.classes.User
-        s = fixture_session()
-
-        eq_(
-            s.query(User)
-            .from_statement(
-                text("select * from users order by id").columns(
-                    id=Integer, name=String
-                )
-            )
-            .all(),
-            [User(id=7), User(id=8), User(id=9), User(id=10)],
-        )
-
-    def test_columns_via_textasfrom_from_statement(self):
-        User = self.classes.User
-        s = fixture_session()
-
-        eq_(
-            s.query(User.id, User.name)
-            .from_statement(
-                text("select * from users order by id").columns(
-                    id=Integer, name=String
-                )
-            )
-            .all(),
-            [(7, "jack"), (8, "ed"), (9, "fred"), (10, "chuck")],
-        )
-
-    def test_via_textasfrom_use_mapped_columns(self):
-        User = self.classes.User
-        s = fixture_session()
-
-        eq_(
-            s.query(User)
-            .from_statement(
-                text("select * from users order by id").columns(
-                    User.id, User.name
-                )
-            )
-            .all(),
-            [User(id=7), User(id=8), User(id=9), User(id=10)],
-        )
-
-    def test_via_textasfrom_aliased(self):
-        User = self.classes.User
-        s = fixture_session()
-
-        ua = aliased(
-            User,
-            text("select * from users").columns(User.id, User.name).subquery(),
-        )
-
-        eq_(
-            s.query(ua).order_by(ua.id).all(),
-            [User(id=7), User(id=8), User(id=9), User(id=10)],
-        )
-
-    def test_group_by_accepts_text(self):
-        User = self.classes.User
-        s = fixture_session()
-
-        q = s.query(User).group_by(text("name"))
-        self.assert_compile(
-            q,
-            "SELECT users.id AS users_id, users.name AS users_name "
-            "FROM users GROUP BY name",
-        )
-
-    def test_order_by_w_eager_one(self):
-        User = self.classes.User
-        s = fixture_session()
-
-        # from 1.0.0 thru 1.0.2, the "name" symbol here was considered
-        # to be part of the things we need to ORDER BY and it was being
-        # placed into the inner query's columns clause, as part of
-        # query._compound_eager_statement where we add unwrap_order_by()
-        # to the columns clause.  However, as #3392 illustrates, unlocatable
-        # string expressions like "name desc" will only fail in this scenario,
-        # so in general the changing of the query structure with string labels
-        # is dangerous.
-        #
-        # the queries here are again "invalid" from a SQL perspective, as the
-        # "name" field isn't matched up to anything.
-        #
-
-        q = (
-            s.query(User)
-            .options(joinedload(User.addresses))
-            .order_by(desc("name"))
-            .limit(1)
-        )
-        assert_raises_message(
-            sa_exc.CompileError,
-            "Can't resolve label reference for ORDER BY / GROUP BY.",
-            q.set_label_style(
-                LABEL_STYLE_TABLENAME_PLUS_COL
-            ).statement.compile,
-        )
-
-    def test_order_by_w_eager_two(self):
-        User = self.classes.User
-        s = fixture_session()
-
-        q = (
-            s.query(User)
-            .options(joinedload(User.addresses))
-            .order_by("name")
-            .limit(1)
-        )
-        assert_raises_message(
-            sa_exc.CompileError,
-            "Can't resolve label reference for ORDER BY / GROUP BY.",
-            q.set_label_style(
-                LABEL_STYLE_TABLENAME_PLUS_COL
-            ).statement.compile,
-        )
-
-    def test_order_by_w_eager_three(self):
-        User = self.classes.User
-        s = fixture_session()
-
-        self.assert_compile(
-            s.query(User)
-            .options(joinedload(User.addresses))
-            .order_by("users_name")
-            .limit(1),
-            "SELECT anon_1.users_id AS anon_1_users_id, "
-            "anon_1.users_name AS anon_1_users_name, "
-            "addresses_1.id AS addresses_1_id, "
-            "addresses_1.user_id AS addresses_1_user_id, "
-            "addresses_1.email_address AS addresses_1_email_address "
-            "FROM (SELECT users.id AS users_id, users.name AS users_name "
-            "FROM users ORDER BY users.name "
-            "LIMIT :param_1) AS anon_1 "
-            "LEFT OUTER JOIN addresses AS addresses_1 "
-            "ON anon_1.users_id = addresses_1.user_id "
-            "ORDER BY anon_1.users_name, addresses_1.id",
-        )
-
-        # however! this works (again?)
-        eq_(
-            s.query(User)
-            .options(joinedload(User.addresses))
-            .order_by("users_name")
-            .first(),
-            User(name="chuck", addresses=[]),
-        )
-
-    def test_order_by_w_eager_four(self):
-        User = self.classes.User
-        Address = self.classes.Address
-        s = fixture_session()
-
-        self.assert_compile(
-            s.query(User)
-            .options(joinedload(User.addresses))
-            .order_by(desc("users_name"))
-            .limit(1),
-            "SELECT anon_1.users_id AS anon_1_users_id, "
-            "anon_1.users_name AS anon_1_users_name, "
-            "addresses_1.id AS addresses_1_id, "
-            "addresses_1.user_id AS addresses_1_user_id, "
-            "addresses_1.email_address AS addresses_1_email_address "
-            "FROM (SELECT users.id AS users_id, users.name AS users_name "
-            "FROM users ORDER BY users.name DESC "
-            "LIMIT :param_1) AS anon_1 "
-            "LEFT OUTER JOIN addresses AS addresses_1 "
-            "ON anon_1.users_id = addresses_1.user_id "
-            "ORDER BY anon_1.users_name DESC, addresses_1.id",
-        )
-
-        # however! this works (again?)
-        eq_(
-            s.query(User)
-            .options(joinedload(User.addresses))
-            .order_by(desc("users_name"))
-            .first(),
-            User(name="jack", addresses=[Address()]),
-        )
-
-    def test_order_by_w_eager_five(self):
-        """essentially the same as test_eager_relations -> test_limit_3,
-        but test for textual label elements that are freeform.
-        this is again #3392."""
-
-        User = self.classes.User
-        Address = self.classes.Address
-
-        sess = fixture_session()
-
-        q = sess.query(User, Address.email_address.label("email_address"))
-
-        result = (
-            q.join(User.addresses)
-            .options(joinedload(User.orders))
-            .order_by("email_address desc")
-            .limit(1)
-            .offset(0)
-        )
-
-        assert_raises_message(
-            sa_exc.CompileError,
-            "Can't resolve label reference for ORDER BY / GROUP BY",
-            result.all,
-        )
-
-
-class TextErrorTest(QueryTest, AssertsCompiledSQL):
-    def _test(self, fn, arg, offending_clause):
-        assert_raises_message(
-            sa.exc.ArgumentError,
-            r"Textual (?:SQL|column|SQL FROM) expression %(stmt)r should be "
-            r"explicitly declared (?:with|as) text\(%(stmt)r\)"
-            % {"stmt": util.ellipses_string(offending_clause)},
-            fn,
-            arg,
-        )
-
-    def test_filter(self):
-        User = self.classes.User
-        self._test(
-            fixture_session().query(User.id).filter, "myid == 5", "myid == 5"
-        )
-
-    def test_having(self):
-        User = self.classes.User
-        self._test(
-            fixture_session().query(User.id).having, "myid == 5", "myid == 5"
-        )
-
-    def test_from_statement(self):
-        User = self.classes.User
-        self._test(
-            fixture_session().query(User.id).from_statement,
-            "select id from user",
-            "select id from user",
-        )
-
-
 class ParentTest(QueryTest, AssertsCompiledSQL):
     __dialect__ = "default"
 
diff --git a/test/orm/test_text.py b/test/orm/test_text.py
new file mode 100644 (file)
index 0000000..e7f54da
--- /dev/null
@@ -0,0 +1,689 @@
+import sqlalchemy as sa
+from sqlalchemy import column
+from sqlalchemy import desc
+from sqlalchemy import exc as sa_exc
+from sqlalchemy import Integer
+from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL
+from sqlalchemy import literal_column
+from sqlalchemy import select
+from sqlalchemy import String
+from sqlalchemy import table
+from sqlalchemy import testing
+from sqlalchemy import text
+from sqlalchemy import util
+from sqlalchemy.orm import aliased
+from sqlalchemy.orm import contains_eager
+from sqlalchemy.orm import joinedload
+from sqlalchemy.orm import selectinload
+from sqlalchemy.orm import subqueryload
+from sqlalchemy.testing import AssertsCompiledSQL
+from sqlalchemy.testing.assertions import assert_raises_message
+from sqlalchemy.testing.assertions import eq_
+from sqlalchemy.testing.fixtures import fixture_session
+from test.orm import _fixtures
+
+
+class QueryTest(_fixtures.FixtureTest):
+    run_setup_mappers = "once"
+    run_inserts = "once"
+    run_deletes = None
+
+    @classmethod
+    def setup_mappers(cls):
+        cls._setup_stock_mapping()
+
+
+class TextTest(QueryTest, AssertsCompiledSQL):
+    __dialect__ = "default"
+
+    def test_needs_text(self):
+        User = self.classes.User
+
+        assert_raises_message(
+            sa_exc.ArgumentError,
+            "Textual SQL expression",
+            fixture_session().query(User).from_statement,
+            "select * from users order by id",
+        )
+
+    def test_select_star(self):
+        User = self.classes.User
+
+        eq_(
+            fixture_session()
+            .query(User)
+            .from_statement(text("select * from users order by id"))
+            .first(),
+            User(id=7),
+        )
+        eq_(
+            fixture_session()
+            .query(User)
+            .from_statement(
+                text("select * from users where name='nonexistent'")
+            )
+            .first(),
+            None,
+        )
+
+    def test_select_star_future(self):
+        User = self.classes.User
+
+        sess = fixture_session()
+        eq_(
+            sess.execute(
+                select(User).from_statement(
+                    text("select * from users order by id")
+                )
+            )
+            .scalars()
+            .first(),
+            User(id=7),
+        )
+        eq_(
+            sess.execute(
+                select(User).from_statement(
+                    text("select * from users where name='nonexistent'")
+                )
+            )
+            .scalars()
+            .first(),
+            None,
+        )
+
+    def test_columns_mismatched(self):
+        # test that columns using column._label match, as well as that
+        # ordering doesn't matter
+        User = self.classes.User
+
+        s = fixture_session()
+        q = s.query(User).from_statement(
+            text(
+                "select name, 27 as foo, id as users_id from users order by id"
+            )
+        )
+        eq_(
+            q.all(),
+            [
+                User(id=7, name="jack"),
+                User(id=8, name="ed"),
+                User(id=9, name="fred"),
+                User(id=10, name="chuck"),
+            ],
+        )
+
+    def test_columns_mismatched_future(self):
+        # test that columns using column._label match, as well as that
+        # ordering doesn't matter
+        User = self.classes.User
+
+        s = fixture_session()
+        q = select(User).from_statement(
+            text(
+                "select name, 27 as foo, id as users_id from users order by id"
+            )
+        )
+        eq_(
+            s.execute(q).scalars().all(),
+            [
+                User(id=7, name="jack"),
+                User(id=8, name="ed"),
+                User(id=9, name="fred"),
+                User(id=10, name="chuck"),
+            ],
+        )
+
+    def test_columns_multi_table_uselabels(self):
+        # test that columns using column._label match, as well as that
+        # ordering doesn't matter.
+        User = self.classes.User
+        Address = self.classes.Address
+
+        s = fixture_session()
+        q = s.query(User, Address).from_statement(
+            text(
+                "select users.name AS users_name, users.id AS users_id, "
+                "addresses.id AS addresses_id FROM users JOIN addresses "
+                "ON users.id = addresses.user_id WHERE users.id=8 "
+                "ORDER BY addresses.id"
+            )
+        )
+
+        eq_(
+            q.all(),
+            [
+                (User(id=8), Address(id=2)),
+                (User(id=8), Address(id=3)),
+                (User(id=8), Address(id=4)),
+            ],
+        )
+
+    def test_columns_multi_table_uselabels_future(self):
+        # test that columns using column._label match, as well as that
+        # ordering doesn't matter.
+        User = self.classes.User
+        Address = self.classes.Address
+
+        s = fixture_session()
+        q = select(User, Address).from_statement(
+            text(
+                "select users.name AS users_name, users.id AS users_id, "
+                "addresses.id AS addresses_id FROM users JOIN addresses "
+                "ON users.id = addresses.user_id WHERE users.id=8 "
+                "ORDER BY addresses.id"
+            )
+        )
+
+        eq_(
+            s.execute(q).all(),
+            [
+                (User(id=8), Address(id=2)),
+                (User(id=8), Address(id=3)),
+                (User(id=8), Address(id=4)),
+            ],
+        )
+
+    def test_columns_multi_table_uselabels_contains_eager(self):
+        # test that columns using column._label match, as well as that
+        # ordering doesn't matter.
+        User = self.classes.User
+        Address = self.classes.Address
+
+        s = fixture_session()
+        q = (
+            s.query(User)
+            .from_statement(
+                text(
+                    "select users.name AS users_name, users.id AS users_id, "
+                    "addresses.id AS addresses_id FROM users JOIN addresses "
+                    "ON users.id = addresses.user_id WHERE users.id=8 "
+                    "ORDER BY addresses.id"
+                )
+            )
+            .options(contains_eager(User.addresses))
+        )
+
+        def go():
+            r = q.all()
+            eq_(r[0].addresses, [Address(id=2), Address(id=3), Address(id=4)])
+
+        self.assert_sql_count(testing.db, go, 1)
+
+    def test_columns_multi_table_uselabels_contains_eager_future(self):
+        # test that columns using column._label match, as well as that
+        # ordering doesn't matter.
+        User = self.classes.User
+        Address = self.classes.Address
+
+        s = fixture_session()
+        q = (
+            select(User)
+            .from_statement(
+                text(
+                    "select users.name AS users_name, users.id AS users_id, "
+                    "addresses.id AS addresses_id FROM users JOIN addresses "
+                    "ON users.id = addresses.user_id WHERE users.id=8 "
+                    "ORDER BY addresses.id"
+                )
+            )
+            .options(contains_eager(User.addresses))
+        )
+
+        def go():
+            r = s.execute(q).unique().scalars().all()
+            eq_(r[0].addresses, [Address(id=2), Address(id=3), Address(id=4)])
+
+        self.assert_sql_count(testing.db, go, 1)
+
+    def test_columns_multi_table_uselabels_cols_contains_eager(self):
+        # test that columns using column._label match, as well as that
+        # ordering doesn't matter.
+        User = self.classes.User
+        Address = self.classes.Address
+
+        s = fixture_session()
+        q = (
+            s.query(User)
+            .from_statement(
+                text(
+                    "select users.name AS users_name, users.id AS users_id, "
+                    "addresses.id AS addresses_id FROM users JOIN addresses "
+                    "ON users.id = addresses.user_id WHERE users.id=8 "
+                    "ORDER BY addresses.id"
+                ).columns(User.name, User.id, Address.id)
+            )
+            .options(contains_eager(User.addresses))
+        )
+
+        def go():
+            r = q.all()
+            eq_(r[0].addresses, [Address(id=2), Address(id=3), Address(id=4)])
+
+        self.assert_sql_count(testing.db, go, 1)
+
+    def test_columns_multi_table_uselabels_cols_contains_eager_future(self):
+        # test that columns using column._label match, as well as that
+        # ordering doesn't matter.
+        User = self.classes.User
+        Address = self.classes.Address
+
+        s = fixture_session()
+        q = (
+            select(User)
+            .from_statement(
+                text(
+                    "select users.name AS users_name, users.id AS users_id, "
+                    "addresses.id AS addresses_id FROM users JOIN addresses "
+                    "ON users.id = addresses.user_id WHERE users.id=8 "
+                    "ORDER BY addresses.id"
+                ).columns(User.name, User.id, Address.id)
+            )
+            .options(contains_eager(User.addresses))
+        )
+
+        def go():
+            r = s.execute(q).unique().scalars().all()
+            eq_(r[0].addresses, [Address(id=2), Address(id=3), Address(id=4)])
+
+        self.assert_sql_count(testing.db, go, 1)
+
+    def test_textual_select_orm_columns(self):
+        # test that columns using column._label match, as well as that
+        # ordering doesn't matter.
+        User = self.classes.User
+        Address = self.classes.Address
+        users = self.tables.users
+        addresses = self.tables.addresses
+
+        s = fixture_session()
+        q = s.query(User.name, User.id, Address.id).from_statement(
+            text(
+                "select users.name AS users_name, users.id AS users_id, "
+                "addresses.id AS addresses_id FROM users JOIN addresses "
+                "ON users.id = addresses.user_id WHERE users.id=8 "
+                "ORDER BY addresses.id"
+            ).columns(users.c.name, users.c.id, addresses.c.id)
+        )
+
+        eq_(q.all(), [("ed", 8, 2), ("ed", 8, 3), ("ed", 8, 4)])
+
+    @testing.combinations(
+        (
+            False,
+            subqueryload,
+        ),
+        (
+            True,
+            subqueryload,
+        ),
+        (False, selectinload),
+        (True, selectinload),
+    )
+    def test_related_eagerload_against_text(self, add_columns, loader_option):
+        # new in 1.4.   textual selects have columns so subqueryloaders
+        # and selectinloaders can join onto them.   we add columns
+        # automatiacally to TextClause as well, however subqueryloader
+        # is not working at the moment due to execution model refactor,
+        # it creates a subquery w/ adapter before those columns are
+        # available.  this is a super edge case and as we want to rewrite
+        # the loaders to use select(), maybe we can get it then.
+        User = self.classes.User
+
+        text_clause = text("select * from users")
+        if add_columns:
+            text_clause = text_clause.columns(User.id, User.name)
+
+        s = fixture_session()
+        q = (
+            s.query(User)
+            .from_statement(text_clause)
+            .options(loader_option(User.addresses))
+        )
+
+        def go():
+            eq_(set(q.all()), set(self.static.user_address_result))
+
+        if loader_option is subqueryload:
+            # subqueryload necessarily degrades to lazy loads for a text
+            # statement.
+            self.assert_sql_count(testing.db, go, 5)
+        else:
+            self.assert_sql_count(testing.db, go, 2)
+
+    def test_whereclause(self):
+        User = self.classes.User
+
+        eq_(
+            fixture_session().query(User).filter(text("id in (8, 9)")).all(),
+            [User(id=8), User(id=9)],
+        )
+
+        eq_(
+            fixture_session()
+            .query(User)
+            .filter(text("name='fred'"))
+            .filter(text("id=9"))
+            .all(),
+            [User(id=9)],
+        )
+        eq_(
+            fixture_session()
+            .query(User)
+            .filter(text("name='fred'"))
+            .filter(User.id == 9)
+            .all(),
+            [User(id=9)],
+        )
+
+    def test_whereclause_future(self):
+        User = self.classes.User
+
+        s = fixture_session()
+        eq_(
+            s.execute(select(User).filter(text("id in (8, 9)")))
+            .scalars()
+            .all(),
+            [User(id=8), User(id=9)],
+        )
+
+        eq_(
+            s.execute(
+                select(User).filter(text("name='fred'")).filter(text("id=9"))
+            )
+            .scalars()
+            .all(),
+            [User(id=9)],
+        )
+        eq_(
+            s.execute(
+                select(User).filter(text("name='fred'")).filter(User.id == 9)
+            )
+            .scalars()
+            .all(),
+            [User(id=9)],
+        )
+
+    def test_binds_coerce(self):
+        User = self.classes.User
+
+        assert_raises_message(
+            sa_exc.ArgumentError,
+            r"Textual SQL expression 'id in \(:id1, :id2\)' "
+            "should be explicitly declared",
+            fixture_session().query(User).filter,
+            "id in (:id1, :id2)",
+        )
+
+    def test_plain_textual_column(self):
+        User = self.classes.User
+
+        s = fixture_session()
+
+        self.assert_compile(
+            s.query(User.id, text("users.name")),
+            "SELECT users.id AS users_id, users.name FROM users",
+        )
+
+        eq_(
+            s.query(User.id, text("users.name")).all(),
+            [(7, "jack"), (8, "ed"), (9, "fred"), (10, "chuck")],
+        )
+
+        eq_(
+            s.query(User.id, literal_column("name")).order_by(User.id).all(),
+            [(7, "jack"), (8, "ed"), (9, "fred"), (10, "chuck")],
+        )
+
+    def test_via_select(self):
+        User = self.classes.User
+        s = fixture_session()
+        eq_(
+            s.query(User)
+            .from_statement(
+                select(column("id"), column("name"))
+                .select_from(table("users"))
+                .order_by("id")
+            )
+            .all(),
+            [User(id=7), User(id=8), User(id=9), User(id=10)],
+        )
+
+    def test_via_textasfrom_from_statement(self):
+        User = self.classes.User
+        s = fixture_session()
+
+        eq_(
+            s.query(User)
+            .from_statement(
+                text("select * from users order by id").columns(
+                    id=Integer, name=String
+                )
+            )
+            .all(),
+            [User(id=7), User(id=8), User(id=9), User(id=10)],
+        )
+
+    def test_columns_via_textasfrom_from_statement(self):
+        User = self.classes.User
+        s = fixture_session()
+
+        eq_(
+            s.query(User.id, User.name)
+            .from_statement(
+                text("select * from users order by id").columns(
+                    id=Integer, name=String
+                )
+            )
+            .all(),
+            [(7, "jack"), (8, "ed"), (9, "fred"), (10, "chuck")],
+        )
+
+    def test_via_textasfrom_use_mapped_columns(self):
+        User = self.classes.User
+        s = fixture_session()
+
+        eq_(
+            s.query(User)
+            .from_statement(
+                text("select * from users order by id").columns(
+                    User.id, User.name
+                )
+            )
+            .all(),
+            [User(id=7), User(id=8), User(id=9), User(id=10)],
+        )
+
+    def test_via_textasfrom_aliased(self):
+        User = self.classes.User
+        s = fixture_session()
+
+        ua = aliased(
+            User,
+            text("select * from users").columns(User.id, User.name).subquery(),
+        )
+
+        eq_(
+            s.query(ua).order_by(ua.id).all(),
+            [User(id=7), User(id=8), User(id=9), User(id=10)],
+        )
+
+    def test_group_by_accepts_text(self):
+        User = self.classes.User
+        s = fixture_session()
+
+        q = s.query(User).group_by(text("name"))
+        self.assert_compile(
+            q,
+            "SELECT users.id AS users_id, users.name AS users_name "
+            "FROM users GROUP BY name",
+        )
+
+    def test_order_by_w_eager_one(self):
+        User = self.classes.User
+        s = fixture_session()
+
+        # from 1.0.0 thru 1.0.2, the "name" symbol here was considered
+        # to be part of the things we need to ORDER BY and it was being
+        # placed into the inner query's columns clause, as part of
+        # query._compound_eager_statement where we add unwrap_order_by()
+        # to the columns clause.  However, as #3392 illustrates, unlocatable
+        # string expressions like "name desc" will only fail in this scenario,
+        # so in general the changing of the query structure with string labels
+        # is dangerous.
+        #
+        # the queries here are again "invalid" from a SQL perspective, as the
+        # "name" field isn't matched up to anything.
+        #
+
+        q = (
+            s.query(User)
+            .options(joinedload(User.addresses))
+            .order_by(desc("name"))
+            .limit(1)
+        )
+        assert_raises_message(
+            sa_exc.CompileError,
+            "Can't resolve label reference for ORDER BY / GROUP BY.",
+            q.set_label_style(
+                LABEL_STYLE_TABLENAME_PLUS_COL
+            ).statement.compile,
+        )
+
+    def test_order_by_w_eager_two(self):
+        User = self.classes.User
+        s = fixture_session()
+
+        q = (
+            s.query(User)
+            .options(joinedload(User.addresses))
+            .order_by("name")
+            .limit(1)
+        )
+        assert_raises_message(
+            sa_exc.CompileError,
+            "Can't resolve label reference for ORDER BY / GROUP BY.",
+            q.set_label_style(
+                LABEL_STYLE_TABLENAME_PLUS_COL
+            ).statement.compile,
+        )
+
+    def test_order_by_w_eager_three(self):
+        User = self.classes.User
+        s = fixture_session()
+
+        self.assert_compile(
+            s.query(User)
+            .options(joinedload(User.addresses))
+            .order_by("users_name")
+            .limit(1),
+            "SELECT anon_1.users_id AS anon_1_users_id, "
+            "anon_1.users_name AS anon_1_users_name, "
+            "addresses_1.id AS addresses_1_id, "
+            "addresses_1.user_id AS addresses_1_user_id, "
+            "addresses_1.email_address AS addresses_1_email_address "
+            "FROM (SELECT users.id AS users_id, users.name AS users_name "
+            "FROM users ORDER BY users.name "
+            "LIMIT :param_1) AS anon_1 "
+            "LEFT OUTER JOIN addresses AS addresses_1 "
+            "ON anon_1.users_id = addresses_1.user_id "
+            "ORDER BY anon_1.users_name, addresses_1.id",
+        )
+
+        # however! this works (again?)
+        eq_(
+            s.query(User)
+            .options(joinedload(User.addresses))
+            .order_by("users_name")
+            .first(),
+            User(name="chuck", addresses=[]),
+        )
+
+    def test_order_by_w_eager_four(self):
+        User = self.classes.User
+        Address = self.classes.Address
+        s = fixture_session()
+
+        self.assert_compile(
+            s.query(User)
+            .options(joinedload(User.addresses))
+            .order_by(desc("users_name"))
+            .limit(1),
+            "SELECT anon_1.users_id AS anon_1_users_id, "
+            "anon_1.users_name AS anon_1_users_name, "
+            "addresses_1.id AS addresses_1_id, "
+            "addresses_1.user_id AS addresses_1_user_id, "
+            "addresses_1.email_address AS addresses_1_email_address "
+            "FROM (SELECT users.id AS users_id, users.name AS users_name "
+            "FROM users ORDER BY users.name DESC "
+            "LIMIT :param_1) AS anon_1 "
+            "LEFT OUTER JOIN addresses AS addresses_1 "
+            "ON anon_1.users_id = addresses_1.user_id "
+            "ORDER BY anon_1.users_name DESC, addresses_1.id",
+        )
+
+        # however! this works (again?)
+        eq_(
+            s.query(User)
+            .options(joinedload(User.addresses))
+            .order_by(desc("users_name"))
+            .first(),
+            User(name="jack", addresses=[Address()]),
+        )
+
+    def test_order_by_w_eager_five(self):
+        """essentially the same as test_eager_relations -> test_limit_3,
+        but test for textual label elements that are freeform.
+        this is again #3392."""
+
+        User = self.classes.User
+        Address = self.classes.Address
+
+        sess = fixture_session()
+
+        q = sess.query(User, Address.email_address.label("email_address"))
+
+        result = (
+            q.join(User.addresses)
+            .options(joinedload(User.orders))
+            .order_by("email_address desc")
+            .limit(1)
+            .offset(0)
+        )
+
+        assert_raises_message(
+            sa_exc.CompileError,
+            "Can't resolve label reference for ORDER BY / GROUP BY",
+            result.all,
+        )
+
+
+class TextErrorTest(QueryTest, AssertsCompiledSQL):
+    def _test(self, fn, arg, offending_clause):
+        assert_raises_message(
+            sa.exc.ArgumentError,
+            r"Textual (?:SQL|column|SQL FROM) expression %(stmt)r should be "
+            r"explicitly declared (?:with|as) text\(%(stmt)r\)"
+            % {"stmt": util.ellipses_string(offending_clause)},
+            fn,
+            arg,
+        )
+
+    def test_filter(self):
+        User = self.classes.User
+        self._test(
+            fixture_session().query(User.id).filter, "myid == 5", "myid == 5"
+        )
+
+    def test_having(self):
+        User = self.classes.User
+        self._test(
+            fixture_session().query(User.id).having, "myid == 5", "myid == 5"
+        )
+
+    def test_from_statement(self):
+        User = self.classes.User
+        self._test(
+            fixture_session().query(User.id).from_statement,
+            "select id from user",
+            "select id from user",
+        )
diff --git a/test/orm/test_tstring_py314.py b/test/orm/test_tstring_py314.py
new file mode 100644 (file)
index 0000000..305e55b
--- /dev/null
@@ -0,0 +1,207 @@
+"""Test the TString construct in ORM context for Python 3.14+
+template strings."""
+
+from sqlalchemy import Integer
+from sqlalchemy import select
+from sqlalchemy import String
+from sqlalchemy import tstring
+from sqlalchemy.testing import AssertsCompiledSQL
+from sqlalchemy.testing.assertions import eq_
+from sqlalchemy.testing.fixtures import fixture_session
+from test.orm import _fixtures
+
+
+class QueryTest(_fixtures.FixtureTest):
+    run_setup_mappers = "once"
+    run_inserts = "once"
+    run_deletes = None
+
+    @classmethod
+    def setup_mappers(cls):
+        cls._setup_stock_mapping()
+
+
+class TStringTest(QueryTest, AssertsCompiledSQL):
+    __dialect__ = "default"
+
+    def test_select_star(self):
+        User = self.classes.User
+
+        eq_(
+            fixture_session()
+            .query(User)
+            .from_statement(
+                tstring(t"select * from users where users.id={7} order by id")
+            )
+            .first(),
+            User(id=7),
+        )
+        eq_(
+            fixture_session()
+            .query(User)
+            .from_statement(
+                tstring(t"select * from users where name={'nonexistent'}")
+            )
+            .first(),
+            None,
+        )
+
+    def test_select_star_future(self):
+        User = self.classes.User
+
+        sess = fixture_session()
+        eq_(
+            sess.execute(
+                select(User).from_statement(
+                    tstring(
+                        t"select * from users where users.id={7} order by id"
+                    )
+                )
+            )
+            .scalars()
+            .first(),
+            User(id=7),
+        )
+        eq_(
+            sess.execute(
+                select(User).from_statement(
+                    tstring(t"select * from users where name={'nonexistent'}")
+                )
+            )
+            .scalars()
+            .first(),
+            None,
+        )
+
+    def test_entity_interpolation(self):
+        User = self.classes.User
+
+        # Test interpolating entity columns and table in select and from clause
+        sess = fixture_session()
+        result = (
+            sess.execute(
+                select(User).from_statement(
+                    tstring(t"select * from {User} order by {User.id}")
+                )
+            )
+            .scalars()
+            .all()
+        )
+
+        eq_([u.name for u in result], ["jack", "ed", "fred", "chuck"])
+
+    def test_whereclause(self):
+        User = self.classes.User
+
+        eq_(
+            fixture_session()
+            .query(User)
+            .filter(tstring(t"id in (8, 9)"))
+            .all(),
+            [User(id=8), User(id=9)],
+        )
+
+        eq_(
+            fixture_session()
+            .query(User)
+            .filter(tstring(t"name='fred'"))
+            .filter(tstring(t"id=9"))
+            .all(),
+            [User(id=9)],
+        )
+        eq_(
+            fixture_session()
+            .query(User)
+            .filter(tstring(t"name='fred'"))
+            .filter(User.id == 9)
+            .all(),
+            [User(id=9)],
+        )
+
+    def test_whereclause_future(self):
+        User = self.classes.User
+
+        s = fixture_session()
+        eq_(
+            s.execute(select(User).filter(tstring(t"id in (8, 9)")))
+            .scalars()
+            .all(),
+            [User(id=8), User(id=9)],
+        )
+
+        eq_(
+            s.execute(
+                select(User)
+                .filter(tstring(t"name='fred'"))
+                .filter(tstring(t"id=9"))
+            )
+            .scalars()
+            .all(),
+            [User(id=9)],
+        )
+        eq_(
+            s.execute(
+                select(User)
+                .filter(tstring(t"name='fred'"))
+                .filter(User.id == 9)
+            )
+            .scalars()
+            .all(),
+            [User(id=9)],
+        )
+
+    def test_via_textasfrom_from_statement(self):
+        User = self.classes.User
+        s = fixture_session()
+
+        eq_(
+            s.query(User)
+            .from_statement(
+                tstring(t"select * from users order by id").columns(
+                    id=Integer, name=String
+                )
+            )
+            .all(),
+            [User(id=7), User(id=8), User(id=9), User(id=10)],
+        )
+
+    def test_columns_via_textasfrom_from_statement(self):
+        User = self.classes.User
+        s = fixture_session()
+
+        eq_(
+            s.query(User.id, User.name)
+            .from_statement(
+                tstring(t"select * from users order by id").columns(
+                    id=Integer, name=String
+                )
+            )
+            .all(),
+            [(7, "jack"), (8, "ed"), (9, "fred"), (10, "chuck")],
+        )
+
+    def test_via_textasfrom_use_mapped_columns(self):
+        User = self.classes.User
+        s = fixture_session()
+
+        eq_(
+            s.query(User)
+            .from_statement(
+                tstring(t"select * from users order by id").columns(
+                    User.id, User.name
+                )
+            )
+            .all(),
+            [User(id=7), User(id=8), User(id=9), User(id=10)],
+        )
+
+    def test_group_by_accepts_tstring(self):
+        User = self.classes.User
+        s = fixture_session()
+
+        q = s.query(User).group_by(tstring(t"name"))
+        self.assert_compile(
+            q,
+            "SELECT users.id AS users_id, users.name AS users_name "
+            "FROM users GROUP BY name",
+        )
index a40db06e4efb6351ada9a37fd8731abbc244a368..36c1a204d865e9d5c246afa68bf5b7dbc7773747 100644 (file)
@@ -205,9 +205,9 @@ test.aaa_profiling.test_misc.CacheKeyTest.test_statement_key_is_cached x86_64_li
 
 # TEST: test.aaa_profiling.test_misc.CacheKeyTest.test_statement_key_is_not_cached
 
-test.aaa_profiling.test_misc.CacheKeyTest.test_statement_key_is_not_cached x86_64_linux_cpython_3.13_sqlite_pysqlite_dbapiunicode_cextensions 4003
+test.aaa_profiling.test_misc.CacheKeyTest.test_statement_key_is_not_cached x86_64_linux_cpython_3.13_sqlite_pysqlite_dbapiunicode_cextensions 4403
 test.aaa_profiling.test_misc.CacheKeyTest.test_statement_key_is_not_cached x86_64_linux_cpython_3.13_sqlite_pysqlite_dbapiunicode_nocextensions 7603
-test.aaa_profiling.test_misc.CacheKeyTest.test_statement_key_is_not_cached x86_64_linux_cpython_3.14_sqlite_pysqlite_dbapiunicode_cextensions 4003
+test.aaa_profiling.test_misc.CacheKeyTest.test_statement_key_is_not_cached x86_64_linux_cpython_3.14_sqlite_pysqlite_dbapiunicode_cextensions 4403
 test.aaa_profiling.test_misc.CacheKeyTest.test_statement_key_is_not_cached x86_64_linux_cpython_3.14_sqlite_pysqlite_dbapiunicode_nocextensions 7203
 
 # TEST: test.aaa_profiling.test_misc.EnumTest.test_create_enum_from_pep_435_w_expensive_members
index fbc6db81c5eb3622d23b528478dc926715a04592..cb38540e45d17a3a17b9ba39d49e4e525170d828 100644 (file)
@@ -66,6 +66,7 @@ from sqlalchemy.sql.elements import Immutable
 from sqlalchemy.sql.elements import Null
 from sqlalchemy.sql.elements import OrderByList
 from sqlalchemy.sql.elements import Slice
+from sqlalchemy.sql.elements import TString
 from sqlalchemy.sql.elements import TypeClause
 from sqlalchemy.sql.elements import UnaryExpression
 from sqlalchemy.sql.functions import FunctionElement
@@ -1752,6 +1753,7 @@ class HasCacheKeySubclass(fixtures.TestBase):
                 SyntaxExtension,
                 DialectKWArgs,
                 Executable,
+                TString,
             ]
         )
     )
@@ -1794,7 +1796,12 @@ class CompareAndCopyTest(CoreFixtures, fixtures.TestBase):
         need = set(
             cls
             for cls in all_hascachekey_subclasses(
-                ignore_subclasses=[Annotated, NoInit, SingletonConstant]
+                ignore_subclasses=[
+                    Annotated,
+                    NoInit,
+                    SingletonConstant,
+                    TString,
+                ]
             )
             if "orm" not in cls.__module__
             and "compiler" not in cls.__module__
index 4c648d20bf630f2feed6de9f3cc38365cc90ddbc..e14b74e2084ae385d72b5503c37d75247da60057 100644 (file)
@@ -8,6 +8,7 @@ from sqlalchemy.sql import dml
 from sqlalchemy.sql import func
 from sqlalchemy.sql import select
 from sqlalchemy.sql import text
+from sqlalchemy.sql import tstring
 from sqlalchemy.sql.base import ExecutableStatement
 from sqlalchemy.sql.elements import literal
 from sqlalchemy.testing import eq_
@@ -18,6 +19,7 @@ from sqlalchemy.testing import ne_
 from sqlalchemy.testing.schema import Table
 from sqlalchemy.types import Integer
 from sqlalchemy.types import Text
+from sqlalchemy.util.compat import Template
 from sqlalchemy.util.langhelpers import class_hierarchy
 
 
@@ -35,6 +37,7 @@ class BasicTests(fixtures.TestBase):
     def _relevant_impls():
         return (
             text("select 1 + 2"),
+            tstring(Template("select 1 + 2")),
             text("select 42 as q").columns(column("q", Integer)),
             func.max(42),
             select(1, 2).union(select(3, 4)),
diff --git a/test/sql/test_tstrings_py314.py b/test/sql/test_tstrings_py314.py
new file mode 100644 (file)
index 0000000..e3d8cb3
--- /dev/null
@@ -0,0 +1,438 @@
+"""Test the TString construct for Python 3.14+ template strings."""
+
+from itertools import zip_longest
+
+from sqlalchemy import column
+from sqlalchemy import exc
+from sqlalchemy import Integer
+from sqlalchemy import JSON
+from sqlalchemy import literal
+from sqlalchemy import select
+from sqlalchemy import String
+from sqlalchemy import tstring
+from sqlalchemy.engine.interfaces import CacheStats
+from sqlalchemy.sql import table
+from sqlalchemy.sql.elements import ColumnClause
+from sqlalchemy.sql.sqltypes import TypeEngine
+from sqlalchemy.testing import AssertsCompiledSQL
+from sqlalchemy.testing import eq_
+from sqlalchemy.testing import fixtures
+from sqlalchemy.testing.assertions import expect_raises_message
+
+table1 = table(
+    "mytable",
+    column("myid", Integer),
+    column("name", String),
+    column("description", String),
+)
+
+table2 = table(
+    "myothertable", column("otherid", Integer), column("othername", String)
+)
+
+
+class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
+    __dialect__ = "default"
+
+    def test_basic_literal_interpolation(self):
+        a = 5
+        b = 10
+        stmt = tstring(t"select {a}, {b}")
+        self.assert_compile(
+            stmt,
+            "select :param_1, :param_2",
+            checkparams={"param_1": 5, "param_2": 10},
+        )
+
+    def test_no_strings(self):
+        with expect_raises_message(
+            exc.ArgumentError, r"pep-750 Tstring \(e.g. t'...'\) expected"
+        ):
+            tstring("select * from table")  # type: ignore
+
+    def test_tstring_literal_passthrough(self):
+        stmt = tstring(t"select * from foo where lala = bar")
+        self.assert_compile(stmt, "select * from foo where lala = bar")
+
+    def test_sqlalchemy_expression_interpolation(self):
+        subq = select(literal(1)).scalar_subquery()
+        stmt = tstring(t"SELECT {subq}")
+        self.assert_compile(
+            stmt,
+            "SELECT (SELECT :param_1 AS anon_1)",
+            checkparams={"param_1": 1},
+        )
+
+    def test_column_interpolation(self):
+        stmt = tstring(t"SELECT {table1.c.myid}, {table1.c.name} FROM mytable")
+        self.assert_compile(
+            stmt, "SELECT mytable.myid, mytable.name FROM mytable"
+        )
+
+    def test_column_interpolation_labeled(self):
+        # Labels are not supported inside tstring as they're ambiguous
+        # (should they render with AS in all contexts?)
+        label1 = table1.c.myid.label("label1")
+        label2 = table1.c.name.label("label2")
+
+        with expect_raises_message(
+            exc.CompileError,
+            "Using label\\(\\) directly inside tstring is not supported",
+        ):
+            tstring(t"SELECT {label1}, {label2} FROM mytable").compile()
+
+    def test_arithmetic_expression(self):
+        # Python arithmetic is evaluated before being passed to tstring
+        a = 1
+        stmt = tstring(t"SELECT {a + 7}")
+        self.assert_compile(
+            stmt, "SELECT :param_1", checkparams={"param_1": 8}
+        )
+
+    def test_embed_tstring_as_select_criteria(self):
+        user_id = 123
+        stmt = select(table1).where(tstring(t"{table1.c.myid} = {user_id}"))
+        self.assert_compile(
+            stmt,
+            "SELECT mytable.myid, mytable.name, mytable.description FROM "
+            "mytable WHERE mytable.myid = :param_1",
+            checkparams={"param_1": 123},
+        )
+
+    def test_embed_tstring_as_fromclause(self):
+        status = "some status"
+        stmt = select(column("x")).select_from(
+            tstring(
+                t"foobar left outer join lala on foobar.foo = lala.foo "
+                t"AND foobar.status = {status}"
+            )
+        )
+        self.assert_compile(
+            stmt,
+            "SELECT x FROM foobar left outer join "
+            "lala on foobar.foo = lala.foo AND foobar.status = :param_1",
+            checkparams={"param_1": "some status"},
+        )
+
+    def test_and_operator(self):
+        stmt = tstring(t"1 = 1") & tstring(t"2 = 2")
+        self.assert_compile(stmt, "1 = 1 AND 2 = 2")
+
+    def test_multiple_literals(self):
+        a, b, c, d = 1, 2, 3, 4
+        stmt = tstring(t"SELECT {a}, {b}, {c}, {d}")
+        self.assert_compile(
+            stmt,
+            "SELECT :param_1, :param_2, :param_3, :param_4",
+            checkparams={
+                "param_1": 1,
+                "param_2": 2,
+                "param_3": 3,
+                "param_4": 4,
+            },
+        )
+
+    def test_nested_tstring_execution(self):
+        inner = tstring(t"(SELECT {'some value'} AS anon_1)")
+        self.assert_compile(
+            tstring(t"select {inner}"),
+            "select (SELECT :param_1 AS anon_1)",
+            checkparams={"param_1": "some value"},
+        )
+
+    def test_nested_scalar_subquery_execution(self):
+        inner = select(literal("some value")).scalar_subquery()
+        self.assert_compile(
+            tstring(t"select {inner}"),
+            "select (SELECT :param_1 AS anon_1)",
+            checkparams={"param_1": "some value"},
+        )
+
+    def test_nested_subquery_execution(self):
+        inner = select(literal("some value")).subquery()
+        self.assert_compile(
+            tstring(t"select * from {inner}"),
+            "select * from (SELECT :param_1 AS anon_2) AS anon_1",
+            checkparams={"param_1": "some value"},
+        )
+
+
+class ColumnsTest(fixtures.TestBase, AssertsCompiledSQL):
+    __dialect__ = "default"
+
+    def _assert_columns(self, stmt, columns):
+        """Assert that stmt.selected_columns matches the given columns.
+
+        Also verifies that the result map structure matches what we'd get
+        from a regular select() statement with the same columns.
+        """
+        # Check that selected_columns matches
+        eq_(
+            [c.name for c in stmt.selected_columns],
+            [c.name for c in columns],
+        )
+        for stmt_col, expected_col in zip(stmt.selected_columns, columns):
+            eq_(stmt_col.type._type_affinity, expected_col.type._type_affinity)
+
+        # Verify result map structure matches what select() would produce
+        stmt_compiled = stmt.compile()
+        select_compiled = select(*columns).compile()
+        stmt_map = stmt_compiled._create_result_map()
+        select_map = select_compiled._create_result_map()
+
+        # Compare result map structure using recursive comparison
+        eq_(list(stmt_map.keys()), list(select_map.keys()))
+        for key in stmt_map:
+            stmt_entry = stmt_map[key]
+            select_entry = select_map[key]
+            # Use recursive comparison for the entire entry tuple
+            assert self._compare_recursive(
+                stmt_entry, select_entry
+            ), f"Result map entries differ:\n  {stmt_entry}\n  {select_entry}"
+
+    def _compare_recursive(self, left, right):
+        if isinstance(left, ColumnClause) and isinstance(right, ColumnClause):
+            return (
+                left.name == right.name
+                and left.type._type_affinity == right.type._type_affinity
+            )
+        elif isinstance(left, TypeEngine) and isinstance(right, TypeEngine):
+            return left._type_affinity == right._type_affinity
+        elif isinstance(left, (tuple, list)) and isinstance(
+            right, (tuple, list)
+        ):
+            return all(
+                self._compare_recursive(l, r)
+                for l, r in zip_longest(left, right)
+            )
+        else:
+            return left == right
+
+    def test_columns_positional(self):
+        cols = [column("id", Integer), column("name", String)]
+        stmt = tstring(t"SELECT id, name FROM users").columns(*cols)
+        self.assert_compile(stmt, "SELECT id, name FROM users")
+        self._assert_columns(stmt, cols)
+
+    def test_columns_keyword(self):
+        stmt = tstring(t"SELECT id, name FROM users").columns(
+            id=Integer, name=String
+        )
+        self.assert_compile(stmt, "SELECT id, name FROM users")
+        cols = [column("id", Integer), column("name", String)]
+        self._assert_columns(stmt, cols)
+
+    def test_columns_mixed(self):
+        cols = [
+            column("id", Integer),
+            column("name", String),
+            column("age", Integer),
+        ]
+        stmt = tstring(t"SELECT id, name, age FROM users").columns(
+            cols[0], name=String, age=Integer
+        )
+        self.assert_compile(stmt, "SELECT id, name, age FROM users")
+        self._assert_columns(stmt, cols)
+
+    def test_columns_subquery(self):
+        stmt = (
+            tstring(t"SELECT id, name FROM users")
+            .columns(column("id", Integer), column("name", String))
+            .subquery("st")
+        )
+        outer = select(table1).select_from(
+            table1.join(stmt, table1.c.name == stmt.c.name)
+        )
+        self.assert_compile(
+            outer,
+            "SELECT mytable.myid, mytable.name, mytable.description FROM "
+            "mytable JOIN (SELECT id, name FROM users) AS st ON "
+            "mytable.name = st.name",
+        )
+
+
+class ExecutionTest(fixtures.TestBase):
+    __backend__ = True
+
+    def test_basic_execution(self, connection):
+        a = 1
+        b = 2
+        result = connection.execute(tstring(t"select {a + 7}, {b}"))
+        eq_(result.all(), [(8, 2)])
+
+    def test_json_literal_execution(self, connection):
+        some_json = {"foo": "bar"}
+        stmt = tstring(t"select {literal(some_json, JSON)}").columns(
+            column("jj", JSON)
+        )
+        result = connection.execute(stmt)
+        row = result.scalar()
+        eq_(row, {"foo": "bar"})
+
+    def test_statement_caching(self, connection):
+        """Test that tstring statements are properly cached."""
+        some_json = {"foo": "bar"}
+        stmt1 = tstring(t"select {literal(some_json, JSON)}").columns(
+            column("jj", JSON)
+        )
+        result1 = connection.execute(stmt1)
+        eq_(result1.scalar(), {"foo": "bar"})
+
+        # Execute same structure with different value
+        some_json = {"foo": "newbar", "bat": "hoho"}
+        stmt2 = tstring(t"select {literal(some_json, JSON)}").columns(
+            column("jj", JSON)
+        )
+        result2 = connection.execute(stmt2)
+
+        # Should hit cache
+        if hasattr(result2.context, "cache_hit"):
+            eq_(result2.context.cache_hit, CacheStats.CACHE_HIT)
+
+        eq_(result2.scalar(), {"foo": "newbar", "bat": "hoho"})
+
+    def test_nested_scalar_subquery_execution(self, connection):
+        inner = select(literal("some value")).scalar_subquery()
+        result = connection.execute(tstring(t"select {inner}"))
+        eq_(result.all(), [("some value",)])
+
+    def test_nested_subquery_execution(self, connection):
+        inner = select(literal("some value")).subquery()
+        result = connection.execute(tstring(t"select * from {inner}"))
+        eq_(result.all(), [("some value",)])
+
+    def test_multiple_values(self, connection):
+        values = [1, 2, 3, 4, 5]
+        result = connection.execute(
+            tstring(
+                t"select {values[0]}, {values[1]}, {values[2]}, "
+                t"{values[3]}, {values[4]}"
+            )
+        )
+        eq_(result.all(), [(1, 2, 3, 4, 5)])
+
+
+class IntegrationTest(fixtures.TablesTest):
+    __backend__ = True
+
+    @classmethod
+    def define_tables(cls, metadata):
+        from sqlalchemy import Column
+        from sqlalchemy import Table
+
+        Table(
+            "users",
+            metadata,
+            Column("id", Integer, primary_key=True),
+            Column("name", String(50)),
+        )
+
+    @classmethod
+    def insert_data(cls, connection):
+        connection.execute(
+            cls.tables.users.insert(),
+            [
+                {"id": 1, "name": "alice"},
+                {"id": 2, "name": "bob"},
+                {"id": 3, "name": "charlie"},
+            ],
+        )
+
+    def test_select_from_real_table(self, connection):
+        user_id = 2
+        stmt = tstring(t"SELECT * FROM users WHERE id = {user_id}").columns(
+            column("id", Integer), column("name", String)
+        )
+        result = connection.execute(stmt)
+        row = result.one()
+        eq_(row.id, 2)
+        eq_(row.name, "bob")
+
+    def test_where_clause_with_real_table(self, connection):
+        users = self.tables.users
+        name_filter = "alice"
+        stmt = select(users).where(
+            tstring(t"{users.c.name} = {literal(name_filter)}")
+        )
+        result = connection.execute(stmt)
+        row = result.one()
+        eq_(row.id, 1)
+        eq_(row.name, "alice")
+
+    def test_complex_query(self, connection):
+        min_id = 1
+        max_id = 2
+        stmt = tstring(
+            t"SELECT id, name FROM users WHERE id >= {min_id} "
+            t"AND id <= {max_id}"
+        ).columns(column("id", Integer), column("name", String))
+        result = connection.execute(stmt)
+        rows = result.all()
+        eq_(len(rows), 2)
+        eq_(rows[0].name, "alice")
+        eq_(rows[1].name, "bob")
+
+
+class CacheKeyTest(fixtures.CacheKeyFixture, fixtures.TestBase):
+    """Test cache key generation for tstring constructs."""
+
+    @fixtures.CacheKeySuite.run_suite_tests
+    def test_tstring_cache_key(self):
+
+        def stmt1():
+            # Basic tstring with literal
+            a = 5
+            return tstring(t"SELECT {a}")
+
+        def stmt2():
+            # Different structure - two literals
+            a = 5
+            b = 10
+            return tstring(t"SELECT {a}, {b}")
+
+        def stmt3():
+            # With column reference
+            return tstring(t"SELECT {table1.c.myid}")
+
+        def stmt4():
+            # Different column - different cache key
+            return tstring(t"SELECT {table1.c.name}")
+
+        def stmt5():
+            # With .columns()
+            a = 5
+            return tstring(t"SELECT {a}").columns(column("val", Integer))
+
+        def stmt6():
+            # String literal passthrough
+            return tstring(t"SELECT * FROM users")
+
+        def stmt7():
+            # Different string literal
+            return tstring(t"SELECT id FROM users")
+
+        def stmt8():
+            # With SQLAlchemy scalar subquery
+            return tstring(t"SELECT {select(literal(1)).scalar_subquery()}")
+
+        def stmt9():
+            # Mixed: text and literal
+            user_id = 42
+            return tstring(t"SELECT * FROM users WHERE id = {user_id}")
+
+        def stmt10():
+            # Mixed: text and column
+            return tstring(t"SELECT * FROM users WHERE id = {table1.c.myid}")
+
+        return lambda: [
+            stmt1(),
+            stmt2(),
+            stmt3(),
+            stmt4(),
+            stmt5(),
+            stmt6(),
+            stmt7(),
+            stmt8(),
+            stmt9(),
+            stmt10(),
+        ]
index d21922e8691e4e610d87afbda3dc6d927052c184..8460dafcb9a7b08f2a69e3372acad388355926cb 100644 (file)
@@ -1,5 +1,6 @@
 from __future__ import annotations
 
+from string.templatelib import Template
 from typing import Any
 from typing import assert_type
 from typing import Unpack
@@ -19,6 +20,7 @@ from sqlalchemy import select
 from sqlalchemy import String
 from sqlalchemy import Table
 from sqlalchemy import text
+from sqlalchemy import tstring
 from sqlalchemy import update
 from sqlalchemy.engine import Result
 from sqlalchemy.engine.row import Row
@@ -31,6 +33,7 @@ from sqlalchemy.orm.query import Query
 from sqlalchemy.orm.query import RowReturningQuery
 from sqlalchemy.sql.dml import ReturningInsert
 from sqlalchemy.sql.elements import KeyedColumnElement
+from sqlalchemy.sql.elements import TString
 from sqlalchemy.sql.expression import FromClause
 from sqlalchemy.sql.expression import TextClause
 from sqlalchemy.sql.selectable import ScalarSelect
@@ -431,19 +434,23 @@ def t_dml_delete() -> None:
     assert_type(r1, Result[int, str])
 
 
-def t_from_statement() -> None:
+def t_from_statement_text() -> None:
     t = text("select * from user")
 
     assert_type(t, TextClause)
 
     select(User).from_statement(t)
 
+    session.query(User).from_statement(t)
+
     ts = text("select * from user").columns(User.id, User.name)
 
     assert_type(ts, TextualSelect)
 
     select(User).from_statement(ts)
 
+    session.query(User).from_statement(ts)
+
     ts2 = text("select * from user").columns(
         user_table.c.id, user_table.c.name
     )
@@ -452,6 +459,34 @@ def t_from_statement() -> None:
 
     select(User).from_statement(ts2)
 
+    session.query(User).from_statement(ts2)
+
+
+def t_from_statement_tstring(templ: Template) -> None:
+    t = tstring(templ)
+
+    assert_type(t, TString)
+
+    select(User).from_statement(t)
+
+    session.query(User).from_statement(t)
+
+    ts = tstring(templ).columns(User.id, User.name)
+
+    assert_type(ts, TextualSelect)
+
+    select(User).from_statement(ts)
+
+    session.query(User).from_statement(ts)
+
+    ts2 = tstring(templ).columns(user_table.c.id, user_table.c.name)
+
+    assert_type(ts2, TextualSelect)
+
+    select(User).from_statement(ts2)
+
+    session.query(User).from_statement(ts2)
+
 
 def t_aliased_fromclause() -> None:
     a1 = aliased(User, user_table)