From b9e3cacb0e7025a31458f09d0cb0c72da0c5f660 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Mon, 17 Nov 2025 13:11:24 -0500 Subject: [PATCH] add TString support Added support for Python 3.14+ template strings (t-strings) via the new :func:`_sql.tstring` construct, as defined in :pep:`750`. This feature allows for ergonomic SQL statement construction by automatically interpolating Python values and SQLAlchemy expressions within template strings. Part of the challenge here is the syntax only works on py314, so we have to exclude the test file at many levels when py314 is not used. not sure yet how i want to adjust pep8 tests and rules for this. Fixes: #12548 Change-Id: Ia060d1387ff452fe4f5d924f683529a22a8e1f72 --- .pre-commit-config.yaml | 6 +- doc/build/changelog/migration_21.rst | 64 ++ doc/build/changelog/unreleased_21/12548.rst | 13 + doc/build/core/sqlelement.rst | 7 + lib/sqlalchemy/__init__.py | 2 + lib/sqlalchemy/orm/context.py | 9 +- lib/sqlalchemy/orm/query.py | 6 +- lib/sqlalchemy/sql/__init__.py | 3 + lib/sqlalchemy/sql/_elements_constructors.py | 83 +++ lib/sqlalchemy/sql/coercions.py | 4 + lib/sqlalchemy/sql/compiler.py | 33 +- lib/sqlalchemy/sql/elements.py | 459 +++++++----- lib/sqlalchemy/sql/expression.py | 2 + lib/sqlalchemy/sql/roles.py | 11 +- lib/sqlalchemy/sql/selectable.py | 13 +- lib/sqlalchemy/sql/util.py | 6 +- lib/sqlalchemy/util/compat.py | 22 + noxfile.py | 4 +- pyproject.toml | 2 + test/conftest.py | 4 + test/orm/test_query.py | 658 ------------------ test/orm/test_text.py | 689 +++++++++++++++++++ test/orm/test_tstring_py314.py | 207 ++++++ test/profiles.txt | 4 +- test/sql/test_compare.py | 9 +- test/sql/test_statement_params.py | 3 + test/sql/test_tstrings_py314.py | 438 ++++++++++++ test/typing/plain_files/orm/typed_queries.py | 37 +- 28 files changed, 1939 insertions(+), 859 deletions(-) create mode 100644 doc/build/changelog/unreleased_21/12548.rst create mode 100644 test/orm/test_text.py create mode 100644 test/orm/test_tstring_py314.py create mode 100644 test/sql/test_tstrings_py314.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index dfbc82115b..07d00fbfb0 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,5 +1,7 @@ -# See https://pre-commit.com for more information -# See https://pre-commit.com/hooks.html for more hooks + +default_language_version: + python: python3.14 + repos: - repo: https://github.com/python/black rev: 25.11.0 diff --git a/doc/build/changelog/migration_21.rst b/doc/build/changelog/migration_21.rst index d8e7da5cef..bf0a29cdc2 100644 --- a/doc/build/changelog/migration_21.rst +++ b/doc/build/changelog/migration_21.rst @@ -497,6 +497,70 @@ E.g.:: New Features and Improvements - Core ===================================== +.. _change_12548: + +Template String (t-string) Support for Python 3.14+ +---------------------------------------------------- + +SQLAlchemy 2.1 adds support for Python 3.14+ template strings (t-strings) +via the new :func:`_sql.tstring` construct, as defined in :pep:`750`. +This feature provides a more ergonomic way to construct SQL statements by +automatically interpolating Python values and SQLAlchemy expressions within +template strings. + +The :func:`_sql.tstring` function works similarly to :func:`_sql.text`, but +automatically handles different types of interpolated values: + +* **String literals** from the template are rendered directly as SQL +* **SQLAlchemy expressions** (columns, functions, subqueries, etc.) are + embedded as clause elements +* **Plain Python values** are automatically wrapped with :func:`_sql.literal` + +Example usage:: + + from sqlalchemy import tstring, select, literal, JSON + + # Python values become bound values + user_id = 42 + stmt = tstring(t"SELECT * FROM users WHERE id = {user_id}") + # renders: SELECT * FROM users WHERE id = :param_1 + + # SQLAlchemy expressions are embedded + from sqlalchemy import table, column + + stmt = tstring(t"SELECT {column('q')} FROM {table('t')}") + # renders: SELECT q FROM t + + # Apply explicit SQL types to bound values using literal() + some_json = {"foo": "bar"} + stmt = tstring(t"SELECT {literal(some_json, JSON)}") + +Like :func:`_sql.text`, the :class:`_sql.TString` construct supports the +:meth:`_sql.TString.columns` method to specify return columns and their types:: + + from sqlalchemy import column, Integer, String + + stmt = tstring(t"SELECT id, name FROM users").columns( + column("id", Integer), column("name", String) + ) + + for id, name in connection.execute(stmt): + print(id, name) + +The :func:`_sql.tstring` construct is fully compatible with SQLAlchemy's +statement caching system. Statements with the same structure but different +literal values will share the same cache key, providing optimal performance. + +.. seealso:: + + :func:`_sql.tstring` + + :class:`_sql.TString` + + `PEP 750 `_ - Template Strings + +:ticket:`12548` + .. _change_10635: ``Row`` now represents individual column types directly without ``Tuple`` diff --git a/doc/build/changelog/unreleased_21/12548.rst b/doc/build/changelog/unreleased_21/12548.rst new file mode 100644 index 0000000000..5ca668f70a --- /dev/null +++ b/doc/build/changelog/unreleased_21/12548.rst @@ -0,0 +1,13 @@ +.. change:: + :tags: feature, sql + :tickets: 12548 + + Added support for Python 3.14+ template strings (t-strings) via the new + :func:`_sql.tstring` construct. This feature makes use of Python 3.14 + template strings as defined in :pep:`750`, allowing for ergonomic SQL + statement construction by automatically interpolating Python values and + SQLAlchemy expressions within template strings. + + .. seealso:: + + :ref:`change_12548` - in :ref:`migration_21_toplevel` diff --git a/doc/build/core/sqlelement.rst b/doc/build/core/sqlelement.rst index 88dc810efa..047d8594cd 100644 --- a/doc/build/core/sqlelement.rst +++ b/doc/build/core/sqlelement.rst @@ -65,6 +65,8 @@ used when building up SQLAlchemy Expression Language constructs. .. autofunction:: text +.. autofunction:: tstring + .. autofunction:: true .. autofunction:: try_cast @@ -221,6 +223,11 @@ The classes here are generated using the constructors listed at .. autoclass:: TextClause :members: + :inherited-members: columns + +.. autoclass:: TString + :members: + :inherited-members: columns .. autoclass:: TryCast :members: diff --git a/lib/sqlalchemy/__init__.py b/lib/sqlalchemy/__init__.py index 08035c408c..0e93c24e83 100644 --- a/lib/sqlalchemy/__init__.py +++ b/lib/sqlalchemy/__init__.py @@ -212,6 +212,8 @@ from .sql.expression import true as true from .sql.expression import True_ as True_ from .sql.expression import try_cast as try_cast from .sql.expression import TryCast as TryCast +from .sql.expression import TString as TString +from .sql.expression import tstring as tstring from .sql.expression import Tuple as Tuple from .sql.expression import tuple_ as tuple_ from .sql.expression import type_coerce as type_coerce diff --git a/lib/sqlalchemy/orm/context.py b/lib/sqlalchemy/orm/context.py index 3ac216babf..848547a9dd 100644 --- a/lib/sqlalchemy/orm/context.py +++ b/lib/sqlalchemy/orm/context.py @@ -880,10 +880,11 @@ class _ORMFromStatementCompileState(_ORMCompileState): self.order_by = None - if isinstance(self.statement, expression.TextClause): - # TextClause has no "column" objects at all. for this case, - # we generate columns from our _QueryEntity objects, then - # flip on all the "please match no matter what" parameters. + if self.statement._is_text_clause: + # AbstractTextClause (TextClause, TString) has no "column" + # objects at all. for this case, we generate columns from our + # _QueryEntity objects, then flip on all the + # "please match no matter what" parameters. self.extra_criteria_entities = {} for entity in self._entities: diff --git a/lib/sqlalchemy/orm/query.py b/lib/sqlalchemy/orm/query.py index 2eb2c5e008..c28c0a45d4 100644 --- a/lib/sqlalchemy/orm/query.py +++ b/lib/sqlalchemy/orm/query.py @@ -2754,7 +2754,7 @@ class Query( @_generative @_assertions(_no_clauseelement_condition) - def from_statement(self, statement: ExecutableReturnsRows) -> Self: + def from_statement(self, statement: roles.SelectStatementRole) -> Self: """Execute the given SELECT statement and return results. This method bypasses all internal statement compilation, and the @@ -2771,10 +2771,10 @@ class Query( :meth:`_sql.Select.from_statement` - v2 comparable method. """ - statement = coercions.expect( + _statement = coercions.expect( roles.SelectStatementRole, statement, apply_propagate_attrs=self ) - self._statement = statement + self._statement = _statement return self def first(self) -> Optional[_T]: diff --git a/lib/sqlalchemy/sql/__init__.py b/lib/sqlalchemy/sql/__init__.py index c49c0f6c3a..c19fec59d0 100644 --- a/lib/sqlalchemy/sql/__init__.py +++ b/lib/sqlalchemy/sql/__init__.py @@ -98,9 +98,12 @@ from .expression import TableClause as TableClause from .expression import TableSample as TableSample from .expression import tablesample as tablesample from .expression import text as text +from .expression import TextClause as TextClause from .expression import true as true from .expression import True_ as True_ from .expression import try_cast as try_cast +from .expression import TString as TString +from .expression import tstring as tstring from .expression import tuple_ as tuple_ from .expression import type_coerce as type_coerce from .expression import union as union diff --git a/lib/sqlalchemy/sql/_elements_constructors.py b/lib/sqlalchemy/sql/_elements_constructors.py index 76669cb7ca..6fddb590b5 100644 --- a/lib/sqlalchemy/sql/_elements_constructors.py +++ b/lib/sqlalchemy/sql/_elements_constructors.py @@ -45,6 +45,7 @@ from .elements import Over from .elements import TextClause from .elements import True_ from .elements import TryCast +from .elements import TString from .elements import Tuple from .elements import TypeCoerce from .elements import UnaryExpression @@ -52,6 +53,8 @@ from .elements import WithinGroup from .functions import FunctionElement if typing.TYPE_CHECKING: + from string.templatelib import Template + from ._typing import _ByArgument from ._typing import _ColumnExpressionArgument from ._typing import _ColumnExpressionOrLiteralArgument @@ -1817,6 +1820,86 @@ def text(text: str) -> TextClause: return TextClause(text) +def tstring(template: Template) -> TString: + r"""Construct a new :class:`_expression.TString` clause, + representing a SQL template string using Python 3.14+ t-strings. + + .. versionadded:: 2.1 + + E.g.:: + + from sqlalchemy import tstring + + a = 5 + b = 10 + stmt = tstring(t"select {a}, {b}") + result = connection.execute(stmt) + + The :func:`_expression.tstring` function accepts a Python 3.14+ + template string (t-string) and processes it to create a SQL statement. + Unlike :func:`_expression.text`, which requires manual bind parameter + specification, :func:`_expression.tstring` automatically handles + interpolation of Python values and SQLAlchemy expressions. + + **Interpolation Behavior**: + + - **SQL content** expressed in the plain string portions of the template + are rendered directly as SQL + - **SQLAlchemy expressions** (columns, functions, etc.) are embedded + as clause elements + - **Plain Python values** are automatically wrapped in + :func:`_expression.literal` + + For example:: + + from sqlalchemy import tstring, select, literal, JSON, table, column + + # Python values become bound parameters + user_id = 42 + stmt = tstring(t"SELECT * FROM users WHERE id = {user_id}") + # renders: SELECT * FROM users WHERE id = :param_1 + + # SQLAlchemy expressions are embedded + stmt = tstring(t"SELECT {column('q')} FROM {table('t')}") + # renders: SELECT q FROM t + + # Apply explicit SQL types to bound values using literal() + some_json = {"foo": "bar"} + stmt = tstring(t"SELECT {literal(some_json, JSON)}") + + **Column Specification**: + + Like :func:`_expression.text`, the :func:`_expression.tstring` construct + supports the :meth:`_expression.TString.columns` method to specify + return columns and their types:: + + from sqlalchemy import tstring, column, Integer, String + + stmt = tstring(t"SELECT id, name FROM users").columns( + column("id", Integer), column("name", String) + ) + + for id, name in connection.execute(stmt): + print(id, name) + + :param template: + a Python 3.14+ template string (t-string) containing SQL fragments + and Python expressions to be interpolated. + + .. seealso:: + + :ref:`tutorial_select_arbitrary_text` - in the :ref:`unified_tutorial` + + :class:`_expression.TString` + + :func:`_expression.text` + + `PEP 750 `_ - Template Strings + + """ + return TString(template) + + def true() -> True_: """Return a constant :class:`.True_` construct. diff --git a/lib/sqlalchemy/sql/coercions.py b/lib/sqlalchemy/sql/coercions.py index c967ab0c98..7d51056422 100644 --- a/lib/sqlalchemy/sql/coercions.py +++ b/lib/sqlalchemy/sql/coercions.py @@ -795,6 +795,10 @@ class ExpressionElementImpl(_ColumnCoercions, RoleImpl): ) +class TStringElementImpl(ExpressionElementImpl, RoleImpl): + __slots__ = () + + class BinaryElementImpl(ExpressionElementImpl, RoleImpl): __slots__ = () diff --git a/lib/sqlalchemy/sql/compiler.py b/lib/sqlalchemy/sql/compiler.py index d03167dbce..bda6b81666 100644 --- a/lib/sqlalchemy/sql/compiler.py +++ b/lib/sqlalchemy/sql/compiler.py @@ -2597,8 +2597,15 @@ class SQLCompiler(Compiled): within_columns_clause=False, render_label_as_label=None, result_map_targets=(), + within_tstring=False, **kw, ): + if within_tstring: + raise exc.CompileError( + "Using label() directly inside tstring is not supported " + "as it is ambiguous how the label expression should be " + "rendered without knowledge of how it's being used in SQL" + ) # only render labels within the columns clause # or ORDER BY clause of a select. dialect-specific compilers # can modify this behavior. @@ -2760,6 +2767,23 @@ class SQLCompiler(Compiled): ), ) + def visit_tstring(self, tstring, add_to_result_map=None, **kw): + if self._collect_params: + self._add_to_params(tstring) + + if not self.stack: + self.isplaintext = True + + if add_to_result_map: + # tstring() object is present in the columns clause of a + # select(). Add a no-name entry to the result map so that + # row[tstring()] produces a result + add_to_result_map(None, None, (tstring,), sqltypes.NULLTYPE) + + # Process each part and concatenate + kw["within_tstring"] = True + return "".join(self.process(part, **kw) for part in tstring.parts) + def visit_textual_select( self, taf, compound_index=None, asfrom=False, **kw ): @@ -4394,6 +4418,7 @@ class SQLCompiler(Compiled): lateral=False, enclosing_alias=None, from_linter=None, + within_tstring=False, **kwargs, ): if lateral: @@ -4430,7 +4455,7 @@ class SQLCompiler(Compiled): else: kwargs["enclosing_alias"] = alias - if asfrom or ashint: + if asfrom or ashint or within_tstring: if isinstance(alias.name, elements._truncated_label): alias_name = self._truncated_identifier("alias", alias.name) else: @@ -4438,7 +4463,7 @@ class SQLCompiler(Compiled): if ashint: return self.preparer.format_alias(alias, alias_name) - elif asfrom: + elif asfrom or within_tstring: if from_linter: from_linter.froms[alias._de_clone()] = alias_name @@ -4825,6 +4850,7 @@ class SQLCompiler(Compiled): within_columns_clause=within_columns_clause, add_to_result_map=add_to_result_map, include_table=include_table, + within_tstring=False, ) return result_expr._compiler_dispatch(self, **column_clause_args) @@ -5483,12 +5509,13 @@ class SQLCompiler(Compiled): from_linter=None, ambiguous_table_name_map=None, enclosing_alias=None, + within_tstring=False, **kwargs, ): if from_linter: from_linter.froms[table] = table.fullname - if asfrom or ashint: + if asfrom or ashint or within_tstring: effective_schema = self.preparer.schema_for_object(table) if use_schema and effective_schema: diff --git a/lib/sqlalchemy/sql/elements.py b/lib/sqlalchemy/sql/elements.py index 7a640ccacc..8f0d7e0a28 100644 --- a/lib/sqlalchemy/sql/elements.py +++ b/lib/sqlalchemy/sql/elements.py @@ -81,6 +81,7 @@ from .. import util from ..util import deprecated from ..util import HasMemoized_ro_memoized_attribute from ..util import TypingOnly +from ..util.compat import Template from ..util.typing import Self from ..util.typing import TupleAny from ..util.typing import Unpack @@ -2323,7 +2324,7 @@ class TypeClause(DQLDMLClauseElement): self.type = type_ -class TextClause( +class AbstractTextClause( roles.DDLConstraintColumnRole, roles.DDLExpressionRole, roles.StatementOptionRole, @@ -2336,8 +2337,211 @@ class TextClause( ExecutableStatement, DQLDMLClauseElement, roles.BinaryElementRole[Any], - inspection.Inspectable["TextClause"], ): + """Base class for textual SQL constructs like TextClause and TString.""" + + __visit_name__: str + + _is_text_clause = True + _is_textual = True + _is_implicitly_boolean = False + _render_label_in_columns_clause = False + _omit_from_statements = False + _is_collection_aggregate = False + + @property + def _hide_froms(self) -> Iterable[FromClause]: + return () + + def __and__(self, other): + # support use in select.where(), query.filter() + return and_(self, other) + + @property + def _select_iterable(self) -> _SelectIterable: + return (self,) + + # help in those cases where text/tstring() is + # interpreted in a column expression situation + key: Optional[str] = None + _label: Optional[str] = None + + _allow_label_resolve = False + + @property + def type(self) -> TypeEngine[Any]: + return type_api.NULLTYPE + + @property + def comparator(self): + return self.type.comparator_factory(self) # type: ignore + + def self_group( + self, against: Optional[OperatorType] = None + ) -> Union[Self, Grouping[Any]]: + if against is operators.in_op: + return Grouping(self) + else: + return self + + def bindparams( + self, + *binds: BindParameter[Any], + **names_to_values: Any, + ) -> Self: + """Establish the values and/or types of bound parameters within + this :class:`_expression.AbstractTextClause` construct. + + This is implemented only for :class:`.TextClause` will raise + ``NotImplementedError`` for :class:`.TString`. + + """ + raise NotImplementedError() + + @util.preload_module("sqlalchemy.sql.selectable") + def columns( + self, + *cols: _ColumnExpressionArgument[Any], + **types: _TypeEngineArgument[Any], + ) -> TextualSelect: + r"""Turn this :class:`_expression.AbstractTextClause` object into a + :class:`_expression.TextualSelect` + object that serves the same role as a SELECT + statement. + + The :class:`_expression.TextualSelect` is part of the + :class:`_expression.SelectBase` + hierarchy and can be embedded into another statement by using the + :meth:`_expression.TextualSelect.subquery` method to produce a + :class:`.Subquery` + object, which can then be SELECTed from. + + This function essentially bridges the gap between an entirely + textual SELECT statement and the SQL expression language concept + of a "selectable":: + + from sqlalchemy.sql import column, text + + stmt = text("SELECT id, name FROM some_table") + stmt = stmt.columns(column("id"), column("name")).subquery("st") + + stmt = ( + select(mytable) + .select_from(mytable.join(stmt, mytable.c.name == stmt.c.name)) + .where(stmt.c.id > 5) + ) + + Above, we pass a series of :func:`_expression.column` elements to the + :meth:`_expression.AbstractTextClause.columns` method positionally. + These :func:`_expression.column` elements now become first class + elements upon the :attr:`_expression.TextualSelect.selected_columns` + column collection, which then become part of the :attr:`.Subquery.c` + collection after :meth:`_expression.TextualSelect.subquery` is invoked. + + The column expressions we pass to + :meth:`_expression.AbstractTextClause.columns` may also be typed; when + we do so, these :class:`.TypeEngine` objects become the effective + return type of the column, so that SQLAlchemy's result-set-processing + systems may be used on the return values. This is often needed for + types such as date or boolean types, as well as for unicode processing + on some dialect configurations:: + + stmt = text("SELECT id, name, timestamp FROM some_table") + stmt = stmt.columns( + column("id", Integer), + column("name", Unicode), + column("timestamp", DateTime), + ) + + for id, name, timestamp in connection.execute(stmt): + print(id, name, timestamp) + + As a shortcut to the above syntax, keyword arguments referring to + types alone may be used, if only type conversion is needed:: + + stmt = text("SELECT id, name, timestamp FROM some_table") + stmt = stmt.columns(id=Integer, name=Unicode, timestamp=DateTime) + + for id, name, timestamp in connection.execute(stmt): + print(id, name, timestamp) + + The positional form of :meth:`_expression.AbstractTextClause.columns` + also provides the unique feature of **positional column targeting**, + which is particularly useful when using the ORM with complex textual + queries. If we specify the columns from our model to + :meth:`_expression.AbstractTextClause.columns`, the result set will + match to those columns positionally, meaning the name or origin of the + column in the textual SQL doesn't matter:: + + stmt = text( + "SELECT users.id, addresses.id, users.id, " + "users.name, addresses.email_address AS email " + "FROM users JOIN addresses ON users.id=addresses.user_id " + "WHERE users.id = 1" + ).columns( + User.id, + Address.id, + Address.user_id, + User.name, + Address.email_address, + ) + + query = ( + session.query(User) + .from_statement(stmt) + .options(contains_eager(User.addresses)) + ) + + The :meth:`_expression.AbstractTextClause.columns` method provides a + direct route to calling :meth:`_expression.FromClause.subquery` as well + as :meth:`_expression.SelectBase.cte` against a textual SELECT + statement:: + + stmt = stmt.columns(id=Integer, name=String).cte("st") + + stmt = select(sometable).where(sometable.c.id == stmt.c.id) + + :param \*cols: A series of :class:`_expression.ColumnElement` objects, + typically + :class:`_schema.Column` objects from a :class:`_schema.Table` + or ORM level + column-mapped attributes, representing a set of columns that this + textual string will SELECT from. + + :param \**types: A mapping of string names to :class:`.TypeEngine` + type objects indicating the datatypes to use for names that are + SELECTed from the textual string. Prefer to use the ``*cols`` + argument as it also indicates positional ordering. + + """ + selectable = util.preloaded.sql_selectable + + input_cols: List[NamedColumn[Any]] = [ + coercions.expect(roles.LabeledColumnExprRole, col) for col in cols + ] + + positional_input_cols = [ + ( + ColumnClause(col.key, types.pop(col.key)) + if col.key in types + else col + ) + for col in input_cols + ] + keyed_input_cols: List[NamedColumn[Any]] = [ + ColumnClause(key, type_) for key, type_ in types.items() + ] + + elem = selectable.TextualSelect.__new__(selectable.TextualSelect) + elem._init( + self, + positional_input_cols + keyed_input_cols, + positional=bool(positional_input_cols) and not keyed_input_cols, + ) + return elem + + +class TextClause(AbstractTextClause, inspection.Inspectable["TextClause"]): """Represent a literal SQL text fragment. E.g.:: @@ -2364,40 +2568,10 @@ class TextClause( ("text", InternalTraversal.dp_string), ] + ExecutableStatement._executable_traverse_internals - _is_text_clause = True - - _is_textual = True - _bind_params_regex = re.compile(r"(? Iterable[FromClause]: - return () - - def __and__(self, other): - # support use in select.where(), query.filter() - return and_(self, other) - - @property - def _select_iterable(self) -> _SelectIterable: - return (self,) - - # help in those cases where text() is - # interpreted in a column expression situation - key: Optional[str] = None - _label: Optional[str] = None - - _allow_label_resolve = False @property - def _is_star(self): # type: ignore[override] + def _is_star(self) -> bool: # type: ignore[override] return self.text == "*" def __init__(self, text: str): @@ -2542,168 +2716,101 @@ class TextClause( new_params[key] = existing._with_value(value, required=False) return self - @util.preload_module("sqlalchemy.sql.selectable") - def columns( - self, - *cols: _ColumnExpressionArgument[Any], - **types: _TypeEngineArgument[Any], - ) -> TextualSelect: - r"""Turn this :class:`_expression.TextClause` object into a - :class:`_expression.TextualSelect` - object that serves the same role as a SELECT - statement. - - The :class:`_expression.TextualSelect` is part of the - :class:`_expression.SelectBase` - hierarchy and can be embedded into another statement by using the - :meth:`_expression.TextualSelect.subquery` method to produce a - :class:`.Subquery` - object, which can then be SELECTed from. - - This function essentially bridges the gap between an entirely - textual SELECT statement and the SQL expression language concept - of a "selectable":: - - from sqlalchemy.sql import column, text + @property + def type(self) -> TypeEngine[Any]: + return type_api.NULLTYPE - stmt = text("SELECT id, name FROM some_table") - stmt = stmt.columns(column("id"), column("name")).subquery("st") + @property + def comparator(self): + # TODO: this seems wrong, it seems like we might not + # be using this method. + return self.type.comparator_factory(self) # type: ignore - stmt = ( - select(mytable) - .select_from(mytable.join(stmt, mytable.c.name == stmt.c.name)) - .where(stmt.c.id > 5) - ) + def self_group( + self, against: Optional[OperatorType] = None + ) -> Union[Self, Grouping[Any]]: + if against is operators.in_op: + return Grouping(self) + else: + return self - Above, we pass a series of :func:`_expression.column` elements to the - :meth:`_expression.TextClause.columns` method positionally. These - :func:`_expression.column` - elements now become first class elements upon the - :attr:`_expression.TextualSelect.selected_columns` column collection, - which then - become part of the :attr:`.Subquery.c` collection after - :meth:`_expression.TextualSelect.subquery` is invoked. - The column expressions we pass to - :meth:`_expression.TextClause.columns` may - also be typed; when we do so, these :class:`.TypeEngine` objects become - the effective return type of the column, so that SQLAlchemy's - result-set-processing systems may be used on the return values. - This is often needed for types such as date or boolean types, as well - as for unicode processing on some dialect configurations:: +class TString(AbstractTextClause, inspection.Inspectable["TString"]): + """Represent a SQL template string using Python 3.14+ t-strings. - stmt = text("SELECT id, name, timestamp FROM some_table") - stmt = stmt.columns( - column("id", Integer), - column("name", Unicode), - column("timestamp", DateTime), - ) + E.g.:: - for id, name, timestamp in connection.execute(stmt): - print(id, name, timestamp) + from sqlalchemy import tstring, column - As a shortcut to the above syntax, keyword arguments referring to - types alone may be used, if only type conversion is needed:: + a = 5 + b = 10 + stmt = tstring(t"select {a}, {b}") + result = connection.execute(stmt) - stmt = text("SELECT id, name, timestamp FROM some_table") - stmt = stmt.columns(id=Integer, name=Unicode, timestamp=DateTime) + The :class:`_expression.TString` construct is produced using the + :func:`_expression.tstring` function; see that function for full + documentation. - for id, name, timestamp in connection.execute(stmt): - print(id, name, timestamp) + .. versionadded:: 2.1 - The positional form of :meth:`_expression.TextClause.columns` - also provides the - unique feature of **positional column targeting**, which is - particularly useful when using the ORM with complex textual queries. If - we specify the columns from our model to - :meth:`_expression.TextClause.columns`, - the result set will match to those columns positionally, meaning the - name or origin of the column in the textual SQL doesn't matter:: + .. seealso:: - stmt = text( - "SELECT users.id, addresses.id, users.id, " - "users.name, addresses.email_address AS email " - "FROM users JOIN addresses ON users.id=addresses.user_id " - "WHERE users.id = 1" - ).columns( - User.id, - Address.id, - Address.user_id, - User.name, - Address.email_address, - ) + :func:`_expression.tstring` - query = ( - session.query(User) - .from_statement(stmt) - .options(contains_eager(User.addresses)) - ) + """ - The :meth:`_expression.TextClause.columns` method provides a direct - route to calling :meth:`_expression.FromClause.subquery` as well as - :meth:`_expression.SelectBase.cte` - against a textual SELECT statement:: + __visit_name__ = "tstring" - stmt = stmt.columns(id=Integer, name=String).cte("st") + _traverse_internals: _TraverseInternalsType = [ + ("parts", InternalTraversal.dp_clauseelement_list) + ] + ExecutableStatement._executable_traverse_internals - stmt = select(sometable).where(sometable.c.id == stmt.c.id) + @property + def _is_star(self) -> bool: # type: ignore[override] + return ( + len(self.parts) == 1 + and isinstance(self.parts[0], TextClause) + and self.parts[0]._is_star + ) - :param \*cols: A series of :class:`_expression.ColumnElement` objects, - typically - :class:`_schema.Column` objects from a :class:`_schema.Table` - or ORM level - column-mapped attributes, representing a set of columns that this - textual string will SELECT from. + def __init__(self, template: Template): + """Construct a :class:`_expression.TString` from a Python 3.14+ + template string. - :param \**types: A mapping of string names to :class:`.TypeEngine` - type objects indicating the datatypes to use for names that are - SELECTed from the textual string. Prefer to use the ``*cols`` - argument as it also indicates positional ordering. + :param template: a Python 3.14+ template string (t-string) that + contains SQL fragments and Python expressions to be interpolated. """ - selectable = util.preloaded.sql_selectable + self.parts: List[ClauseElement] = [] - input_cols: List[NamedColumn[Any]] = [ - coercions.expect(roles.LabeledColumnExprRole, col) for col in cols - ] + if not isinstance(template, Template): + raise exc.ArgumentError("pep-750 Tstring (e.g. t'...') expected") - positional_input_cols = [ - ( - ColumnClause(col.key, types.pop(col.key)) - if col.key in types - else col - ) - for col in input_cols - ] - keyed_input_cols: List[NamedColumn[Any]] = [ - ColumnClause(key, type_) for key, type_ in types.items() - ] - - elem = selectable.TextualSelect.__new__(selectable.TextualSelect) - elem._init( - self, - positional_input_cols + keyed_input_cols, - positional=bool(positional_input_cols) and not keyed_input_cols, - ) - return elem + for part in template: + if isinstance(part, str): + self.parts.append(TextClause(part)) + else: + assert hasattr(part, "value") + self.parts.append( + coercions.expect(roles.TStringElementRole, part.value) + ) - @property - def type(self) -> TypeEngine[Any]: - return type_api.NULLTYPE + def bindparams( + self, + *binds: BindParameter[Any], + **names_to_values: Any, + ) -> Self: + """Not supported for TString constructs. - @property - def comparator(self): - # TODO: this seems wrong, it seems like we might not - # be using this method. - return self.type.comparator_factory(self) # type: ignore + TString constructs do not support .bindparams(). Bind parameters + are automatically created from interpolated values. - def self_group( - self, against: Optional[OperatorType] = None - ) -> Union[Self, Grouping[Any]]: - if against is operators.in_op: - return Grouping(self) - else: - return self + """ + raise NotImplementedError( + "TString constructs do not support .bindparams(). " + "Bind parameters are automatically created " + "from interpolated values." + ) class Null(SingletonConstant, roles.ConstExprRole[None], ColumnElement[None]): @@ -4282,13 +4389,19 @@ class Grouping(GroupedElement, ColumnElement[_T]): ] element: Union[ - TextClause, ClauseList, ColumnElement[_T], CompilerColumnElement + AbstractTextClause, + ClauseList, + ColumnElement[_T], + CompilerColumnElement, ] def __init__( self, element: Union[ - TextClause, ClauseList, ColumnElement[_T], CompilerColumnElement + AbstractTextClause, + ClauseList, + ColumnElement[_T], + CompilerColumnElement, ], ): self.element = element diff --git a/lib/sqlalchemy/sql/expression.py b/lib/sqlalchemy/sql/expression.py index 60029dbf4f..8f788b2584 100644 --- a/lib/sqlalchemy/sql/expression.py +++ b/lib/sqlalchemy/sql/expression.py @@ -42,6 +42,7 @@ from ._elements_constructors import over as over from ._elements_constructors import text as text from ._elements_constructors import true as true from ._elements_constructors import try_cast as try_cast +from ._elements_constructors import tstring as tstring from ._elements_constructors import tuple_ as tuple_ from ._elements_constructors import type_coerce as type_coerce from ._elements_constructors import within_group as within_group @@ -107,6 +108,7 @@ from .elements import SQLColumnExpression as SQLColumnExpression from .elements import TextClause as TextClause from .elements import True_ as True_ from .elements import TryCast as TryCast +from .elements import TString as TString from .elements import Tuple as Tuple from .elements import TypeClause as TypeClause from .elements import TypeCoerce as TypeCoerce diff --git a/lib/sqlalchemy/sql/roles.py b/lib/sqlalchemy/sql/roles.py index 5a23d0d4d9..0f328c1f82 100644 --- a/lib/sqlalchemy/sql/roles.py +++ b/lib/sqlalchemy/sql/roles.py @@ -105,7 +105,16 @@ class TruncatedLabelRole(StringRole, SQLRole): _role_name = "String SQL identifier" -class ColumnsClauseRole(AllowsLambdaRole, UsesInspection, ColumnListRole): +class TStringElementRole(UsesInspection, SQLRole): + """Role for elements that can be interpolated into a TString.""" + + __slots__ = () + _role_name = "TString interpolatable element" + + +class ColumnsClauseRole( + TStringElementRole, AllowsLambdaRole, UsesInspection, ColumnListRole +): __slots__ = () _role_name = ( "Column expression, FROM clause, or other columns clause element" diff --git a/lib/sqlalchemy/sql/selectable.py b/lib/sqlalchemy/sql/selectable.py index 48fc75e3a1..5342cd012a 100644 --- a/lib/sqlalchemy/sql/selectable.py +++ b/lib/sqlalchemy/sql/selectable.py @@ -95,6 +95,7 @@ from .elements import DQLDMLClauseElement from .elements import GroupedElement from .elements import literal_column from .elements import TableValuedColumn +from .elements import TextClause from .elements import UnaryExpression from .operators import OperatorType from .sqltypes import NULLTYPE @@ -142,11 +143,11 @@ if TYPE_CHECKING: from .ddl import CreateTableAs from .dml import Delete from .dml import Update + from .elements import AbstractTextClause from .elements import BinaryExpression from .elements import KeyedColumnElement from .elements import Label from .elements import NamedColumn - from .elements import TextClause from .functions import Function from .schema import ForeignKey from .schema import ForeignKeyConstraint @@ -158,7 +159,7 @@ if TYPE_CHECKING: _ColumnsClauseElement = Union["FromClause", ColumnElement[Any], "TextClause"] _LabelConventionCallable = Callable[ - [Union["ColumnElement[Any]", "TextClause"]], Optional[str] + [Union["ColumnElement[Any]", "AbstractTextClause"]], Optional[str] ] @@ -197,7 +198,7 @@ _SetupJoinsElement = Tuple[ ] -_SelectIterable = Iterable[Union["ColumnElement[Any]", "TextClause"]] +_SelectIterable = Iterable[Union["ColumnElement[Any]", "AbstractTextClause"]] class _OffsetLimitParam(BindParameter[int]): @@ -2345,7 +2346,7 @@ class _ColumnsPlusNames(NamedTuple): required_label_name was not given """ - column: Union[ColumnElement[Any], TextClause] + column: Union[ColumnElement[Any], AbstractTextClause] """ the ColumnElement itself """ @@ -4879,7 +4880,7 @@ class SelectState(util.MemoizedSlots, CompileState): names = set() def go( - c: Union[ColumnElement[Any], TextClause], + c: Union[ColumnElement[Any], AbstractTextClause], col_name: Optional[str] = None, ) -> Optional[str]: if is_text_clause(c): @@ -7286,7 +7287,7 @@ class TextualSelect(SelectBase, ExecutableReturnsRows, Generative): def _init( self, - text: TextClause, + text: AbstractTextClause, columns: List[NamedColumn[Any]], positional: bool = False, ) -> None: diff --git a/lib/sqlalchemy/sql/util.py b/lib/sqlalchemy/sql/util.py index 906ca5e7d7..9736b1ce70 100644 --- a/lib/sqlalchemy/sql/util.py +++ b/lib/sqlalchemy/sql/util.py @@ -74,8 +74,8 @@ if typing.TYPE_CHECKING: from ._typing import _EquivalentColumnMap from ._typing import _LimitOffsetType from ._typing import _TypeEngineArgument + from .elements import AbstractTextClause from .elements import BinaryExpression - from .elements import TextClause from .selectable import _JoinTargetElement from .selectable import _SelectIterable from .selectable import Selectable @@ -884,14 +884,14 @@ def reduce_columns( columns: _SelectIterable, *clauses: Optional[ClauseElement], **kw: bool, -) -> Sequence[Union[ColumnElement[Any], TextClause]]: ... +) -> Sequence[Union[ColumnElement[Any], AbstractTextClause]]: ... def reduce_columns( columns: _SelectIterable, *clauses: Optional[ClauseElement], **kw: bool, -) -> Collection[Union[ColumnElement[Any], TextClause]]: +) -> Collection[Union[ColumnElement[Any], AbstractTextClause]]: r"""given a list of columns, return a 'reduced' set based on natural equivalents. diff --git a/lib/sqlalchemy/util/compat.py b/lib/sqlalchemy/util/compat.py index 26e2210d64..a1c6504e47 100644 --- a/lib/sqlalchemy/util/compat.py +++ b/lib/sqlalchemy/util/compat.py @@ -31,6 +31,7 @@ from typing import Sequence from typing import Set from typing import Tuple from typing import Type +from typing import TYPE_CHECKING py314b1 = sys.version_info >= (3, 14, 0, "beta", 1) py314 = sys.version_info >= (3, 14) @@ -50,6 +51,27 @@ has_refcount_gc = bool(cpython) dottedgetter = operator.attrgetter +if py314 or TYPE_CHECKING: + from string.templatelib import Template as Template +else: + + class Template: # type: ignore[no-redef] + """Minimal Template for Python < 3.14 (test usage only).""" + + def __init__(self, *parts: Any): + self._parts = parts + + @property + def strings(self) -> Tuple[str, ...]: + return tuple(p for p in self._parts if isinstance(p, str)) + + @property + def interpolations(self) -> Tuple[Any, ...]: + return tuple(p for p in self._parts if not isinstance(p, str)) + + def __iter__(self) -> Any: + return iter(self._parts) + class FullArgSpec(typing.NamedTuple): args: List[str] diff --git a/noxfile.py b/noxfile.py index 9afd989673..d320710ff2 100644 --- a/noxfile.py +++ b/noxfile.py @@ -332,7 +332,7 @@ def test_pep484(session: nox.Session) -> None: ) -@nox.session(name="mypy") +@nox.session(name="mypy", python="3.14") def test_mypy(session: nox.Session) -> None: """run the typing integration test suite""" @@ -351,7 +351,7 @@ def test_mypy(session: nox.Session) -> None: session.run(*cmd, *posargs) -@nox.session(name="pep8") +@nox.session(name="pep8", python="3.14") def test_pep8(session: nox.Session) -> None: """Run linting and formatting checks.""" diff --git a/pyproject.toml b/pyproject.toml index b29f168e31..c06c26fb29 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -245,6 +245,8 @@ ignore = [ "A003","A005", "D", "E203","E305","E701","E704","E711","E712","E721","E722","E741", + # F542: t-string without any placeholders (used in documentation examples) + "F542", "I300", "N801","N802","N806", "RST304","RST303","RST299","RST399", diff --git a/test/conftest.py b/test/conftest.py index b7f2d945ca..90597d8882 100755 --- a/test/conftest.py +++ b/test/conftest.py @@ -16,6 +16,10 @@ os.environ["SQLALCHEMY_WARN_20"] = "true" collect_ignore_glob = [] +# omit py314.py test files on earlier versions of python +if sys.version_info < (3, 14): + collect_ignore_glob.append("*_py314.py") + # this requires that sqlalchemy.testing was not already # imported in order to work pytest.register_assert_rewrite("sqlalchemy.testing.assertions") diff --git a/test/orm/test_query.py b/test/orm/test_query.py index c2ce24d278..5863b553fc 100644 --- a/test/orm/test_query.py +++ b/test/orm/test_query.py @@ -31,7 +31,6 @@ from sqlalchemy import null from sqlalchemy import or_ from sqlalchemy import select from sqlalchemy import String -from sqlalchemy import table from sqlalchemy import testing from sqlalchemy import text from sqlalchemy import true @@ -55,7 +54,6 @@ from sqlalchemy.orm import joinedload from sqlalchemy.orm import lazyload from sqlalchemy.orm import Query from sqlalchemy.orm import relationship -from sqlalchemy.orm import selectinload from sqlalchemy.orm import Session from sqlalchemy.orm import subqueryload from sqlalchemy.orm import synonym @@ -5812,662 +5810,6 @@ class HintsTest(QueryTest, AssertsCompiledSQL): ) -class TextTest(QueryTest, AssertsCompiledSQL): - __dialect__ = "default" - - def test_needs_text(self): - User = self.classes.User - - assert_raises_message( - sa_exc.ArgumentError, - "Textual SQL expression", - fixture_session().query(User).from_statement, - "select * from users order by id", - ) - - def test_select_star(self): - User = self.classes.User - - eq_( - fixture_session() - .query(User) - .from_statement(text("select * from users order by id")) - .first(), - User(id=7), - ) - eq_( - fixture_session() - .query(User) - .from_statement( - text("select * from users where name='nonexistent'") - ) - .first(), - None, - ) - - def test_select_star_future(self): - User = self.classes.User - - sess = fixture_session() - eq_( - sess.execute( - select(User).from_statement( - text("select * from users order by id") - ) - ) - .scalars() - .first(), - User(id=7), - ) - eq_( - sess.execute( - select(User).from_statement( - text("select * from users where name='nonexistent'") - ) - ) - .scalars() - .first(), - None, - ) - - def test_columns_mismatched(self): - # test that columns using column._label match, as well as that - # ordering doesn't matter - User = self.classes.User - - s = fixture_session() - q = s.query(User).from_statement( - text( - "select name, 27 as foo, id as users_id from users order by id" - ) - ) - eq_( - q.all(), - [ - User(id=7, name="jack"), - User(id=8, name="ed"), - User(id=9, name="fred"), - User(id=10, name="chuck"), - ], - ) - - def test_columns_mismatched_future(self): - # test that columns using column._label match, as well as that - # ordering doesn't matter - User = self.classes.User - - s = fixture_session() - q = select(User).from_statement( - text( - "select name, 27 as foo, id as users_id from users order by id" - ) - ) - eq_( - s.execute(q).scalars().all(), - [ - User(id=7, name="jack"), - User(id=8, name="ed"), - User(id=9, name="fred"), - User(id=10, name="chuck"), - ], - ) - - def test_columns_multi_table_uselabels(self): - # test that columns using column._label match, as well as that - # ordering doesn't matter. - User = self.classes.User - Address = self.classes.Address - - s = fixture_session() - q = s.query(User, Address).from_statement( - text( - "select users.name AS users_name, users.id AS users_id, " - "addresses.id AS addresses_id FROM users JOIN addresses " - "ON users.id = addresses.user_id WHERE users.id=8 " - "ORDER BY addresses.id" - ) - ) - - eq_( - q.all(), - [ - (User(id=8), Address(id=2)), - (User(id=8), Address(id=3)), - (User(id=8), Address(id=4)), - ], - ) - - def test_columns_multi_table_uselabels_future(self): - # test that columns using column._label match, as well as that - # ordering doesn't matter. - User = self.classes.User - Address = self.classes.Address - - s = fixture_session() - q = select(User, Address).from_statement( - text( - "select users.name AS users_name, users.id AS users_id, " - "addresses.id AS addresses_id FROM users JOIN addresses " - "ON users.id = addresses.user_id WHERE users.id=8 " - "ORDER BY addresses.id" - ) - ) - - eq_( - s.execute(q).all(), - [ - (User(id=8), Address(id=2)), - (User(id=8), Address(id=3)), - (User(id=8), Address(id=4)), - ], - ) - - def test_columns_multi_table_uselabels_contains_eager(self): - # test that columns using column._label match, as well as that - # ordering doesn't matter. - User = self.classes.User - Address = self.classes.Address - - s = fixture_session() - q = ( - s.query(User) - .from_statement( - text( - "select users.name AS users_name, users.id AS users_id, " - "addresses.id AS addresses_id FROM users JOIN addresses " - "ON users.id = addresses.user_id WHERE users.id=8 " - "ORDER BY addresses.id" - ) - ) - .options(contains_eager(User.addresses)) - ) - - def go(): - r = q.all() - eq_(r[0].addresses, [Address(id=2), Address(id=3), Address(id=4)]) - - self.assert_sql_count(testing.db, go, 1) - - def test_columns_multi_table_uselabels_contains_eager_future(self): - # test that columns using column._label match, as well as that - # ordering doesn't matter. - User = self.classes.User - Address = self.classes.Address - - s = fixture_session() - q = ( - select(User) - .from_statement( - text( - "select users.name AS users_name, users.id AS users_id, " - "addresses.id AS addresses_id FROM users JOIN addresses " - "ON users.id = addresses.user_id WHERE users.id=8 " - "ORDER BY addresses.id" - ) - ) - .options(contains_eager(User.addresses)) - ) - - def go(): - r = s.execute(q).unique().scalars().all() - eq_(r[0].addresses, [Address(id=2), Address(id=3), Address(id=4)]) - - self.assert_sql_count(testing.db, go, 1) - - def test_columns_multi_table_uselabels_cols_contains_eager(self): - # test that columns using column._label match, as well as that - # ordering doesn't matter. - User = self.classes.User - Address = self.classes.Address - - s = fixture_session() - q = ( - s.query(User) - .from_statement( - text( - "select users.name AS users_name, users.id AS users_id, " - "addresses.id AS addresses_id FROM users JOIN addresses " - "ON users.id = addresses.user_id WHERE users.id=8 " - "ORDER BY addresses.id" - ).columns(User.name, User.id, Address.id) - ) - .options(contains_eager(User.addresses)) - ) - - def go(): - r = q.all() - eq_(r[0].addresses, [Address(id=2), Address(id=3), Address(id=4)]) - - self.assert_sql_count(testing.db, go, 1) - - def test_columns_multi_table_uselabels_cols_contains_eager_future(self): - # test that columns using column._label match, as well as that - # ordering doesn't matter. - User = self.classes.User - Address = self.classes.Address - - s = fixture_session() - q = ( - select(User) - .from_statement( - text( - "select users.name AS users_name, users.id AS users_id, " - "addresses.id AS addresses_id FROM users JOIN addresses " - "ON users.id = addresses.user_id WHERE users.id=8 " - "ORDER BY addresses.id" - ).columns(User.name, User.id, Address.id) - ) - .options(contains_eager(User.addresses)) - ) - - def go(): - r = s.execute(q).unique().scalars().all() - eq_(r[0].addresses, [Address(id=2), Address(id=3), Address(id=4)]) - - self.assert_sql_count(testing.db, go, 1) - - def test_textual_select_orm_columns(self): - # test that columns using column._label match, as well as that - # ordering doesn't matter. - User = self.classes.User - Address = self.classes.Address - users = self.tables.users - addresses = self.tables.addresses - - s = fixture_session() - q = s.query(User.name, User.id, Address.id).from_statement( - text( - "select users.name AS users_name, users.id AS users_id, " - "addresses.id AS addresses_id FROM users JOIN addresses " - "ON users.id = addresses.user_id WHERE users.id=8 " - "ORDER BY addresses.id" - ).columns(users.c.name, users.c.id, addresses.c.id) - ) - - eq_(q.all(), [("ed", 8, 2), ("ed", 8, 3), ("ed", 8, 4)]) - - @testing.combinations( - ( - False, - subqueryload, - ), - ( - True, - subqueryload, - ), - (False, selectinload), - (True, selectinload), - ) - def test_related_eagerload_against_text(self, add_columns, loader_option): - # new in 1.4. textual selects have columns so subqueryloaders - # and selectinloaders can join onto them. we add columns - # automatiacally to TextClause as well, however subqueryloader - # is not working at the moment due to execution model refactor, - # it creates a subquery w/ adapter before those columns are - # available. this is a super edge case and as we want to rewrite - # the loaders to use select(), maybe we can get it then. - User = self.classes.User - - text_clause = text("select * from users") - if add_columns: - text_clause = text_clause.columns(User.id, User.name) - - s = fixture_session() - q = ( - s.query(User) - .from_statement(text_clause) - .options(loader_option(User.addresses)) - ) - - def go(): - eq_(set(q.all()), set(self.static.user_address_result)) - - if loader_option is subqueryload: - # subqueryload necessarily degrades to lazy loads for a text - # statement. - self.assert_sql_count(testing.db, go, 5) - else: - self.assert_sql_count(testing.db, go, 2) - - def test_whereclause(self): - User = self.classes.User - - eq_( - fixture_session().query(User).filter(text("id in (8, 9)")).all(), - [User(id=8), User(id=9)], - ) - - eq_( - fixture_session() - .query(User) - .filter(text("name='fred'")) - .filter(text("id=9")) - .all(), - [User(id=9)], - ) - eq_( - fixture_session() - .query(User) - .filter(text("name='fred'")) - .filter(User.id == 9) - .all(), - [User(id=9)], - ) - - def test_whereclause_future(self): - User = self.classes.User - - s = fixture_session() - eq_( - s.execute(select(User).filter(text("id in (8, 9)"))) - .scalars() - .all(), - [User(id=8), User(id=9)], - ) - - eq_( - s.execute( - select(User).filter(text("name='fred'")).filter(text("id=9")) - ) - .scalars() - .all(), - [User(id=9)], - ) - eq_( - s.execute( - select(User).filter(text("name='fred'")).filter(User.id == 9) - ) - .scalars() - .all(), - [User(id=9)], - ) - - def test_binds_coerce(self): - User = self.classes.User - - assert_raises_message( - sa_exc.ArgumentError, - r"Textual SQL expression 'id in \(:id1, :id2\)' " - "should be explicitly declared", - fixture_session().query(User).filter, - "id in (:id1, :id2)", - ) - - def test_plain_textual_column(self): - User = self.classes.User - - s = fixture_session() - - self.assert_compile( - s.query(User.id, text("users.name")), - "SELECT users.id AS users_id, users.name FROM users", - ) - - eq_( - s.query(User.id, text("users.name")).all(), - [(7, "jack"), (8, "ed"), (9, "fred"), (10, "chuck")], - ) - - eq_( - s.query(User.id, literal_column("name")).order_by(User.id).all(), - [(7, "jack"), (8, "ed"), (9, "fred"), (10, "chuck")], - ) - - def test_via_select(self): - User = self.classes.User - s = fixture_session() - eq_( - s.query(User) - .from_statement( - select(column("id"), column("name")) - .select_from(table("users")) - .order_by("id") - ) - .all(), - [User(id=7), User(id=8), User(id=9), User(id=10)], - ) - - def test_via_textasfrom_from_statement(self): - User = self.classes.User - s = fixture_session() - - eq_( - s.query(User) - .from_statement( - text("select * from users order by id").columns( - id=Integer, name=String - ) - ) - .all(), - [User(id=7), User(id=8), User(id=9), User(id=10)], - ) - - def test_columns_via_textasfrom_from_statement(self): - User = self.classes.User - s = fixture_session() - - eq_( - s.query(User.id, User.name) - .from_statement( - text("select * from users order by id").columns( - id=Integer, name=String - ) - ) - .all(), - [(7, "jack"), (8, "ed"), (9, "fred"), (10, "chuck")], - ) - - def test_via_textasfrom_use_mapped_columns(self): - User = self.classes.User - s = fixture_session() - - eq_( - s.query(User) - .from_statement( - text("select * from users order by id").columns( - User.id, User.name - ) - ) - .all(), - [User(id=7), User(id=8), User(id=9), User(id=10)], - ) - - def test_via_textasfrom_aliased(self): - User = self.classes.User - s = fixture_session() - - ua = aliased( - User, - text("select * from users").columns(User.id, User.name).subquery(), - ) - - eq_( - s.query(ua).order_by(ua.id).all(), - [User(id=7), User(id=8), User(id=9), User(id=10)], - ) - - def test_group_by_accepts_text(self): - User = self.classes.User - s = fixture_session() - - q = s.query(User).group_by(text("name")) - self.assert_compile( - q, - "SELECT users.id AS users_id, users.name AS users_name " - "FROM users GROUP BY name", - ) - - def test_order_by_w_eager_one(self): - User = self.classes.User - s = fixture_session() - - # from 1.0.0 thru 1.0.2, the "name" symbol here was considered - # to be part of the things we need to ORDER BY and it was being - # placed into the inner query's columns clause, as part of - # query._compound_eager_statement where we add unwrap_order_by() - # to the columns clause. However, as #3392 illustrates, unlocatable - # string expressions like "name desc" will only fail in this scenario, - # so in general the changing of the query structure with string labels - # is dangerous. - # - # the queries here are again "invalid" from a SQL perspective, as the - # "name" field isn't matched up to anything. - # - - q = ( - s.query(User) - .options(joinedload(User.addresses)) - .order_by(desc("name")) - .limit(1) - ) - assert_raises_message( - sa_exc.CompileError, - "Can't resolve label reference for ORDER BY / GROUP BY.", - q.set_label_style( - LABEL_STYLE_TABLENAME_PLUS_COL - ).statement.compile, - ) - - def test_order_by_w_eager_two(self): - User = self.classes.User - s = fixture_session() - - q = ( - s.query(User) - .options(joinedload(User.addresses)) - .order_by("name") - .limit(1) - ) - assert_raises_message( - sa_exc.CompileError, - "Can't resolve label reference for ORDER BY / GROUP BY.", - q.set_label_style( - LABEL_STYLE_TABLENAME_PLUS_COL - ).statement.compile, - ) - - def test_order_by_w_eager_three(self): - User = self.classes.User - s = fixture_session() - - self.assert_compile( - s.query(User) - .options(joinedload(User.addresses)) - .order_by("users_name") - .limit(1), - "SELECT anon_1.users_id AS anon_1_users_id, " - "anon_1.users_name AS anon_1_users_name, " - "addresses_1.id AS addresses_1_id, " - "addresses_1.user_id AS addresses_1_user_id, " - "addresses_1.email_address AS addresses_1_email_address " - "FROM (SELECT users.id AS users_id, users.name AS users_name " - "FROM users ORDER BY users.name " - "LIMIT :param_1) AS anon_1 " - "LEFT OUTER JOIN addresses AS addresses_1 " - "ON anon_1.users_id = addresses_1.user_id " - "ORDER BY anon_1.users_name, addresses_1.id", - ) - - # however! this works (again?) - eq_( - s.query(User) - .options(joinedload(User.addresses)) - .order_by("users_name") - .first(), - User(name="chuck", addresses=[]), - ) - - def test_order_by_w_eager_four(self): - User = self.classes.User - Address = self.classes.Address - s = fixture_session() - - self.assert_compile( - s.query(User) - .options(joinedload(User.addresses)) - .order_by(desc("users_name")) - .limit(1), - "SELECT anon_1.users_id AS anon_1_users_id, " - "anon_1.users_name AS anon_1_users_name, " - "addresses_1.id AS addresses_1_id, " - "addresses_1.user_id AS addresses_1_user_id, " - "addresses_1.email_address AS addresses_1_email_address " - "FROM (SELECT users.id AS users_id, users.name AS users_name " - "FROM users ORDER BY users.name DESC " - "LIMIT :param_1) AS anon_1 " - "LEFT OUTER JOIN addresses AS addresses_1 " - "ON anon_1.users_id = addresses_1.user_id " - "ORDER BY anon_1.users_name DESC, addresses_1.id", - ) - - # however! this works (again?) - eq_( - s.query(User) - .options(joinedload(User.addresses)) - .order_by(desc("users_name")) - .first(), - User(name="jack", addresses=[Address()]), - ) - - def test_order_by_w_eager_five(self): - """essentially the same as test_eager_relations -> test_limit_3, - but test for textual label elements that are freeform. - this is again #3392.""" - - User = self.classes.User - Address = self.classes.Address - - sess = fixture_session() - - q = sess.query(User, Address.email_address.label("email_address")) - - result = ( - q.join(User.addresses) - .options(joinedload(User.orders)) - .order_by("email_address desc") - .limit(1) - .offset(0) - ) - - assert_raises_message( - sa_exc.CompileError, - "Can't resolve label reference for ORDER BY / GROUP BY", - result.all, - ) - - -class TextErrorTest(QueryTest, AssertsCompiledSQL): - def _test(self, fn, arg, offending_clause): - assert_raises_message( - sa.exc.ArgumentError, - r"Textual (?:SQL|column|SQL FROM) expression %(stmt)r should be " - r"explicitly declared (?:with|as) text\(%(stmt)r\)" - % {"stmt": util.ellipses_string(offending_clause)}, - fn, - arg, - ) - - def test_filter(self): - User = self.classes.User - self._test( - fixture_session().query(User.id).filter, "myid == 5", "myid == 5" - ) - - def test_having(self): - User = self.classes.User - self._test( - fixture_session().query(User.id).having, "myid == 5", "myid == 5" - ) - - def test_from_statement(self): - User = self.classes.User - self._test( - fixture_session().query(User.id).from_statement, - "select id from user", - "select id from user", - ) - - class ParentTest(QueryTest, AssertsCompiledSQL): __dialect__ = "default" diff --git a/test/orm/test_text.py b/test/orm/test_text.py new file mode 100644 index 0000000000..e7f54da598 --- /dev/null +++ b/test/orm/test_text.py @@ -0,0 +1,689 @@ +import sqlalchemy as sa +from sqlalchemy import column +from sqlalchemy import desc +from sqlalchemy import exc as sa_exc +from sqlalchemy import Integer +from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL +from sqlalchemy import literal_column +from sqlalchemy import select +from sqlalchemy import String +from sqlalchemy import table +from sqlalchemy import testing +from sqlalchemy import text +from sqlalchemy import util +from sqlalchemy.orm import aliased +from sqlalchemy.orm import contains_eager +from sqlalchemy.orm import joinedload +from sqlalchemy.orm import selectinload +from sqlalchemy.orm import subqueryload +from sqlalchemy.testing import AssertsCompiledSQL +from sqlalchemy.testing.assertions import assert_raises_message +from sqlalchemy.testing.assertions import eq_ +from sqlalchemy.testing.fixtures import fixture_session +from test.orm import _fixtures + + +class QueryTest(_fixtures.FixtureTest): + run_setup_mappers = "once" + run_inserts = "once" + run_deletes = None + + @classmethod + def setup_mappers(cls): + cls._setup_stock_mapping() + + +class TextTest(QueryTest, AssertsCompiledSQL): + __dialect__ = "default" + + def test_needs_text(self): + User = self.classes.User + + assert_raises_message( + sa_exc.ArgumentError, + "Textual SQL expression", + fixture_session().query(User).from_statement, + "select * from users order by id", + ) + + def test_select_star(self): + User = self.classes.User + + eq_( + fixture_session() + .query(User) + .from_statement(text("select * from users order by id")) + .first(), + User(id=7), + ) + eq_( + fixture_session() + .query(User) + .from_statement( + text("select * from users where name='nonexistent'") + ) + .first(), + None, + ) + + def test_select_star_future(self): + User = self.classes.User + + sess = fixture_session() + eq_( + sess.execute( + select(User).from_statement( + text("select * from users order by id") + ) + ) + .scalars() + .first(), + User(id=7), + ) + eq_( + sess.execute( + select(User).from_statement( + text("select * from users where name='nonexistent'") + ) + ) + .scalars() + .first(), + None, + ) + + def test_columns_mismatched(self): + # test that columns using column._label match, as well as that + # ordering doesn't matter + User = self.classes.User + + s = fixture_session() + q = s.query(User).from_statement( + text( + "select name, 27 as foo, id as users_id from users order by id" + ) + ) + eq_( + q.all(), + [ + User(id=7, name="jack"), + User(id=8, name="ed"), + User(id=9, name="fred"), + User(id=10, name="chuck"), + ], + ) + + def test_columns_mismatched_future(self): + # test that columns using column._label match, as well as that + # ordering doesn't matter + User = self.classes.User + + s = fixture_session() + q = select(User).from_statement( + text( + "select name, 27 as foo, id as users_id from users order by id" + ) + ) + eq_( + s.execute(q).scalars().all(), + [ + User(id=7, name="jack"), + User(id=8, name="ed"), + User(id=9, name="fred"), + User(id=10, name="chuck"), + ], + ) + + def test_columns_multi_table_uselabels(self): + # test that columns using column._label match, as well as that + # ordering doesn't matter. + User = self.classes.User + Address = self.classes.Address + + s = fixture_session() + q = s.query(User, Address).from_statement( + text( + "select users.name AS users_name, users.id AS users_id, " + "addresses.id AS addresses_id FROM users JOIN addresses " + "ON users.id = addresses.user_id WHERE users.id=8 " + "ORDER BY addresses.id" + ) + ) + + eq_( + q.all(), + [ + (User(id=8), Address(id=2)), + (User(id=8), Address(id=3)), + (User(id=8), Address(id=4)), + ], + ) + + def test_columns_multi_table_uselabels_future(self): + # test that columns using column._label match, as well as that + # ordering doesn't matter. + User = self.classes.User + Address = self.classes.Address + + s = fixture_session() + q = select(User, Address).from_statement( + text( + "select users.name AS users_name, users.id AS users_id, " + "addresses.id AS addresses_id FROM users JOIN addresses " + "ON users.id = addresses.user_id WHERE users.id=8 " + "ORDER BY addresses.id" + ) + ) + + eq_( + s.execute(q).all(), + [ + (User(id=8), Address(id=2)), + (User(id=8), Address(id=3)), + (User(id=8), Address(id=4)), + ], + ) + + def test_columns_multi_table_uselabels_contains_eager(self): + # test that columns using column._label match, as well as that + # ordering doesn't matter. + User = self.classes.User + Address = self.classes.Address + + s = fixture_session() + q = ( + s.query(User) + .from_statement( + text( + "select users.name AS users_name, users.id AS users_id, " + "addresses.id AS addresses_id FROM users JOIN addresses " + "ON users.id = addresses.user_id WHERE users.id=8 " + "ORDER BY addresses.id" + ) + ) + .options(contains_eager(User.addresses)) + ) + + def go(): + r = q.all() + eq_(r[0].addresses, [Address(id=2), Address(id=3), Address(id=4)]) + + self.assert_sql_count(testing.db, go, 1) + + def test_columns_multi_table_uselabels_contains_eager_future(self): + # test that columns using column._label match, as well as that + # ordering doesn't matter. + User = self.classes.User + Address = self.classes.Address + + s = fixture_session() + q = ( + select(User) + .from_statement( + text( + "select users.name AS users_name, users.id AS users_id, " + "addresses.id AS addresses_id FROM users JOIN addresses " + "ON users.id = addresses.user_id WHERE users.id=8 " + "ORDER BY addresses.id" + ) + ) + .options(contains_eager(User.addresses)) + ) + + def go(): + r = s.execute(q).unique().scalars().all() + eq_(r[0].addresses, [Address(id=2), Address(id=3), Address(id=4)]) + + self.assert_sql_count(testing.db, go, 1) + + def test_columns_multi_table_uselabels_cols_contains_eager(self): + # test that columns using column._label match, as well as that + # ordering doesn't matter. + User = self.classes.User + Address = self.classes.Address + + s = fixture_session() + q = ( + s.query(User) + .from_statement( + text( + "select users.name AS users_name, users.id AS users_id, " + "addresses.id AS addresses_id FROM users JOIN addresses " + "ON users.id = addresses.user_id WHERE users.id=8 " + "ORDER BY addresses.id" + ).columns(User.name, User.id, Address.id) + ) + .options(contains_eager(User.addresses)) + ) + + def go(): + r = q.all() + eq_(r[0].addresses, [Address(id=2), Address(id=3), Address(id=4)]) + + self.assert_sql_count(testing.db, go, 1) + + def test_columns_multi_table_uselabels_cols_contains_eager_future(self): + # test that columns using column._label match, as well as that + # ordering doesn't matter. + User = self.classes.User + Address = self.classes.Address + + s = fixture_session() + q = ( + select(User) + .from_statement( + text( + "select users.name AS users_name, users.id AS users_id, " + "addresses.id AS addresses_id FROM users JOIN addresses " + "ON users.id = addresses.user_id WHERE users.id=8 " + "ORDER BY addresses.id" + ).columns(User.name, User.id, Address.id) + ) + .options(contains_eager(User.addresses)) + ) + + def go(): + r = s.execute(q).unique().scalars().all() + eq_(r[0].addresses, [Address(id=2), Address(id=3), Address(id=4)]) + + self.assert_sql_count(testing.db, go, 1) + + def test_textual_select_orm_columns(self): + # test that columns using column._label match, as well as that + # ordering doesn't matter. + User = self.classes.User + Address = self.classes.Address + users = self.tables.users + addresses = self.tables.addresses + + s = fixture_session() + q = s.query(User.name, User.id, Address.id).from_statement( + text( + "select users.name AS users_name, users.id AS users_id, " + "addresses.id AS addresses_id FROM users JOIN addresses " + "ON users.id = addresses.user_id WHERE users.id=8 " + "ORDER BY addresses.id" + ).columns(users.c.name, users.c.id, addresses.c.id) + ) + + eq_(q.all(), [("ed", 8, 2), ("ed", 8, 3), ("ed", 8, 4)]) + + @testing.combinations( + ( + False, + subqueryload, + ), + ( + True, + subqueryload, + ), + (False, selectinload), + (True, selectinload), + ) + def test_related_eagerload_against_text(self, add_columns, loader_option): + # new in 1.4. textual selects have columns so subqueryloaders + # and selectinloaders can join onto them. we add columns + # automatiacally to TextClause as well, however subqueryloader + # is not working at the moment due to execution model refactor, + # it creates a subquery w/ adapter before those columns are + # available. this is a super edge case and as we want to rewrite + # the loaders to use select(), maybe we can get it then. + User = self.classes.User + + text_clause = text("select * from users") + if add_columns: + text_clause = text_clause.columns(User.id, User.name) + + s = fixture_session() + q = ( + s.query(User) + .from_statement(text_clause) + .options(loader_option(User.addresses)) + ) + + def go(): + eq_(set(q.all()), set(self.static.user_address_result)) + + if loader_option is subqueryload: + # subqueryload necessarily degrades to lazy loads for a text + # statement. + self.assert_sql_count(testing.db, go, 5) + else: + self.assert_sql_count(testing.db, go, 2) + + def test_whereclause(self): + User = self.classes.User + + eq_( + fixture_session().query(User).filter(text("id in (8, 9)")).all(), + [User(id=8), User(id=9)], + ) + + eq_( + fixture_session() + .query(User) + .filter(text("name='fred'")) + .filter(text("id=9")) + .all(), + [User(id=9)], + ) + eq_( + fixture_session() + .query(User) + .filter(text("name='fred'")) + .filter(User.id == 9) + .all(), + [User(id=9)], + ) + + def test_whereclause_future(self): + User = self.classes.User + + s = fixture_session() + eq_( + s.execute(select(User).filter(text("id in (8, 9)"))) + .scalars() + .all(), + [User(id=8), User(id=9)], + ) + + eq_( + s.execute( + select(User).filter(text("name='fred'")).filter(text("id=9")) + ) + .scalars() + .all(), + [User(id=9)], + ) + eq_( + s.execute( + select(User).filter(text("name='fred'")).filter(User.id == 9) + ) + .scalars() + .all(), + [User(id=9)], + ) + + def test_binds_coerce(self): + User = self.classes.User + + assert_raises_message( + sa_exc.ArgumentError, + r"Textual SQL expression 'id in \(:id1, :id2\)' " + "should be explicitly declared", + fixture_session().query(User).filter, + "id in (:id1, :id2)", + ) + + def test_plain_textual_column(self): + User = self.classes.User + + s = fixture_session() + + self.assert_compile( + s.query(User.id, text("users.name")), + "SELECT users.id AS users_id, users.name FROM users", + ) + + eq_( + s.query(User.id, text("users.name")).all(), + [(7, "jack"), (8, "ed"), (9, "fred"), (10, "chuck")], + ) + + eq_( + s.query(User.id, literal_column("name")).order_by(User.id).all(), + [(7, "jack"), (8, "ed"), (9, "fred"), (10, "chuck")], + ) + + def test_via_select(self): + User = self.classes.User + s = fixture_session() + eq_( + s.query(User) + .from_statement( + select(column("id"), column("name")) + .select_from(table("users")) + .order_by("id") + ) + .all(), + [User(id=7), User(id=8), User(id=9), User(id=10)], + ) + + def test_via_textasfrom_from_statement(self): + User = self.classes.User + s = fixture_session() + + eq_( + s.query(User) + .from_statement( + text("select * from users order by id").columns( + id=Integer, name=String + ) + ) + .all(), + [User(id=7), User(id=8), User(id=9), User(id=10)], + ) + + def test_columns_via_textasfrom_from_statement(self): + User = self.classes.User + s = fixture_session() + + eq_( + s.query(User.id, User.name) + .from_statement( + text("select * from users order by id").columns( + id=Integer, name=String + ) + ) + .all(), + [(7, "jack"), (8, "ed"), (9, "fred"), (10, "chuck")], + ) + + def test_via_textasfrom_use_mapped_columns(self): + User = self.classes.User + s = fixture_session() + + eq_( + s.query(User) + .from_statement( + text("select * from users order by id").columns( + User.id, User.name + ) + ) + .all(), + [User(id=7), User(id=8), User(id=9), User(id=10)], + ) + + def test_via_textasfrom_aliased(self): + User = self.classes.User + s = fixture_session() + + ua = aliased( + User, + text("select * from users").columns(User.id, User.name).subquery(), + ) + + eq_( + s.query(ua).order_by(ua.id).all(), + [User(id=7), User(id=8), User(id=9), User(id=10)], + ) + + def test_group_by_accepts_text(self): + User = self.classes.User + s = fixture_session() + + q = s.query(User).group_by(text("name")) + self.assert_compile( + q, + "SELECT users.id AS users_id, users.name AS users_name " + "FROM users GROUP BY name", + ) + + def test_order_by_w_eager_one(self): + User = self.classes.User + s = fixture_session() + + # from 1.0.0 thru 1.0.2, the "name" symbol here was considered + # to be part of the things we need to ORDER BY and it was being + # placed into the inner query's columns clause, as part of + # query._compound_eager_statement where we add unwrap_order_by() + # to the columns clause. However, as #3392 illustrates, unlocatable + # string expressions like "name desc" will only fail in this scenario, + # so in general the changing of the query structure with string labels + # is dangerous. + # + # the queries here are again "invalid" from a SQL perspective, as the + # "name" field isn't matched up to anything. + # + + q = ( + s.query(User) + .options(joinedload(User.addresses)) + .order_by(desc("name")) + .limit(1) + ) + assert_raises_message( + sa_exc.CompileError, + "Can't resolve label reference for ORDER BY / GROUP BY.", + q.set_label_style( + LABEL_STYLE_TABLENAME_PLUS_COL + ).statement.compile, + ) + + def test_order_by_w_eager_two(self): + User = self.classes.User + s = fixture_session() + + q = ( + s.query(User) + .options(joinedload(User.addresses)) + .order_by("name") + .limit(1) + ) + assert_raises_message( + sa_exc.CompileError, + "Can't resolve label reference for ORDER BY / GROUP BY.", + q.set_label_style( + LABEL_STYLE_TABLENAME_PLUS_COL + ).statement.compile, + ) + + def test_order_by_w_eager_three(self): + User = self.classes.User + s = fixture_session() + + self.assert_compile( + s.query(User) + .options(joinedload(User.addresses)) + .order_by("users_name") + .limit(1), + "SELECT anon_1.users_id AS anon_1_users_id, " + "anon_1.users_name AS anon_1_users_name, " + "addresses_1.id AS addresses_1_id, " + "addresses_1.user_id AS addresses_1_user_id, " + "addresses_1.email_address AS addresses_1_email_address " + "FROM (SELECT users.id AS users_id, users.name AS users_name " + "FROM users ORDER BY users.name " + "LIMIT :param_1) AS anon_1 " + "LEFT OUTER JOIN addresses AS addresses_1 " + "ON anon_1.users_id = addresses_1.user_id " + "ORDER BY anon_1.users_name, addresses_1.id", + ) + + # however! this works (again?) + eq_( + s.query(User) + .options(joinedload(User.addresses)) + .order_by("users_name") + .first(), + User(name="chuck", addresses=[]), + ) + + def test_order_by_w_eager_four(self): + User = self.classes.User + Address = self.classes.Address + s = fixture_session() + + self.assert_compile( + s.query(User) + .options(joinedload(User.addresses)) + .order_by(desc("users_name")) + .limit(1), + "SELECT anon_1.users_id AS anon_1_users_id, " + "anon_1.users_name AS anon_1_users_name, " + "addresses_1.id AS addresses_1_id, " + "addresses_1.user_id AS addresses_1_user_id, " + "addresses_1.email_address AS addresses_1_email_address " + "FROM (SELECT users.id AS users_id, users.name AS users_name " + "FROM users ORDER BY users.name DESC " + "LIMIT :param_1) AS anon_1 " + "LEFT OUTER JOIN addresses AS addresses_1 " + "ON anon_1.users_id = addresses_1.user_id " + "ORDER BY anon_1.users_name DESC, addresses_1.id", + ) + + # however! this works (again?) + eq_( + s.query(User) + .options(joinedload(User.addresses)) + .order_by(desc("users_name")) + .first(), + User(name="jack", addresses=[Address()]), + ) + + def test_order_by_w_eager_five(self): + """essentially the same as test_eager_relations -> test_limit_3, + but test for textual label elements that are freeform. + this is again #3392.""" + + User = self.classes.User + Address = self.classes.Address + + sess = fixture_session() + + q = sess.query(User, Address.email_address.label("email_address")) + + result = ( + q.join(User.addresses) + .options(joinedload(User.orders)) + .order_by("email_address desc") + .limit(1) + .offset(0) + ) + + assert_raises_message( + sa_exc.CompileError, + "Can't resolve label reference for ORDER BY / GROUP BY", + result.all, + ) + + +class TextErrorTest(QueryTest, AssertsCompiledSQL): + def _test(self, fn, arg, offending_clause): + assert_raises_message( + sa.exc.ArgumentError, + r"Textual (?:SQL|column|SQL FROM) expression %(stmt)r should be " + r"explicitly declared (?:with|as) text\(%(stmt)r\)" + % {"stmt": util.ellipses_string(offending_clause)}, + fn, + arg, + ) + + def test_filter(self): + User = self.classes.User + self._test( + fixture_session().query(User.id).filter, "myid == 5", "myid == 5" + ) + + def test_having(self): + User = self.classes.User + self._test( + fixture_session().query(User.id).having, "myid == 5", "myid == 5" + ) + + def test_from_statement(self): + User = self.classes.User + self._test( + fixture_session().query(User.id).from_statement, + "select id from user", + "select id from user", + ) diff --git a/test/orm/test_tstring_py314.py b/test/orm/test_tstring_py314.py new file mode 100644 index 0000000000..305e55bcea --- /dev/null +++ b/test/orm/test_tstring_py314.py @@ -0,0 +1,207 @@ +"""Test the TString construct in ORM context for Python 3.14+ +template strings.""" + +from sqlalchemy import Integer +from sqlalchemy import select +from sqlalchemy import String +from sqlalchemy import tstring +from sqlalchemy.testing import AssertsCompiledSQL +from sqlalchemy.testing.assertions import eq_ +from sqlalchemy.testing.fixtures import fixture_session +from test.orm import _fixtures + + +class QueryTest(_fixtures.FixtureTest): + run_setup_mappers = "once" + run_inserts = "once" + run_deletes = None + + @classmethod + def setup_mappers(cls): + cls._setup_stock_mapping() + + +class TStringTest(QueryTest, AssertsCompiledSQL): + __dialect__ = "default" + + def test_select_star(self): + User = self.classes.User + + eq_( + fixture_session() + .query(User) + .from_statement( + tstring(t"select * from users where users.id={7} order by id") + ) + .first(), + User(id=7), + ) + eq_( + fixture_session() + .query(User) + .from_statement( + tstring(t"select * from users where name={'nonexistent'}") + ) + .first(), + None, + ) + + def test_select_star_future(self): + User = self.classes.User + + sess = fixture_session() + eq_( + sess.execute( + select(User).from_statement( + tstring( + t"select * from users where users.id={7} order by id" + ) + ) + ) + .scalars() + .first(), + User(id=7), + ) + eq_( + sess.execute( + select(User).from_statement( + tstring(t"select * from users where name={'nonexistent'}") + ) + ) + .scalars() + .first(), + None, + ) + + def test_entity_interpolation(self): + User = self.classes.User + + # Test interpolating entity columns and table in select and from clause + sess = fixture_session() + result = ( + sess.execute( + select(User).from_statement( + tstring(t"select * from {User} order by {User.id}") + ) + ) + .scalars() + .all() + ) + + eq_([u.name for u in result], ["jack", "ed", "fred", "chuck"]) + + def test_whereclause(self): + User = self.classes.User + + eq_( + fixture_session() + .query(User) + .filter(tstring(t"id in (8, 9)")) + .all(), + [User(id=8), User(id=9)], + ) + + eq_( + fixture_session() + .query(User) + .filter(tstring(t"name='fred'")) + .filter(tstring(t"id=9")) + .all(), + [User(id=9)], + ) + eq_( + fixture_session() + .query(User) + .filter(tstring(t"name='fred'")) + .filter(User.id == 9) + .all(), + [User(id=9)], + ) + + def test_whereclause_future(self): + User = self.classes.User + + s = fixture_session() + eq_( + s.execute(select(User).filter(tstring(t"id in (8, 9)"))) + .scalars() + .all(), + [User(id=8), User(id=9)], + ) + + eq_( + s.execute( + select(User) + .filter(tstring(t"name='fred'")) + .filter(tstring(t"id=9")) + ) + .scalars() + .all(), + [User(id=9)], + ) + eq_( + s.execute( + select(User) + .filter(tstring(t"name='fred'")) + .filter(User.id == 9) + ) + .scalars() + .all(), + [User(id=9)], + ) + + def test_via_textasfrom_from_statement(self): + User = self.classes.User + s = fixture_session() + + eq_( + s.query(User) + .from_statement( + tstring(t"select * from users order by id").columns( + id=Integer, name=String + ) + ) + .all(), + [User(id=7), User(id=8), User(id=9), User(id=10)], + ) + + def test_columns_via_textasfrom_from_statement(self): + User = self.classes.User + s = fixture_session() + + eq_( + s.query(User.id, User.name) + .from_statement( + tstring(t"select * from users order by id").columns( + id=Integer, name=String + ) + ) + .all(), + [(7, "jack"), (8, "ed"), (9, "fred"), (10, "chuck")], + ) + + def test_via_textasfrom_use_mapped_columns(self): + User = self.classes.User + s = fixture_session() + + eq_( + s.query(User) + .from_statement( + tstring(t"select * from users order by id").columns( + User.id, User.name + ) + ) + .all(), + [User(id=7), User(id=8), User(id=9), User(id=10)], + ) + + def test_group_by_accepts_tstring(self): + User = self.classes.User + s = fixture_session() + + q = s.query(User).group_by(tstring(t"name")) + self.assert_compile( + q, + "SELECT users.id AS users_id, users.name AS users_name " + "FROM users GROUP BY name", + ) diff --git a/test/profiles.txt b/test/profiles.txt index a40db06e4e..36c1a204d8 100644 --- a/test/profiles.txt +++ b/test/profiles.txt @@ -205,9 +205,9 @@ test.aaa_profiling.test_misc.CacheKeyTest.test_statement_key_is_cached x86_64_li # TEST: test.aaa_profiling.test_misc.CacheKeyTest.test_statement_key_is_not_cached -test.aaa_profiling.test_misc.CacheKeyTest.test_statement_key_is_not_cached x86_64_linux_cpython_3.13_sqlite_pysqlite_dbapiunicode_cextensions 4003 +test.aaa_profiling.test_misc.CacheKeyTest.test_statement_key_is_not_cached x86_64_linux_cpython_3.13_sqlite_pysqlite_dbapiunicode_cextensions 4403 test.aaa_profiling.test_misc.CacheKeyTest.test_statement_key_is_not_cached x86_64_linux_cpython_3.13_sqlite_pysqlite_dbapiunicode_nocextensions 7603 -test.aaa_profiling.test_misc.CacheKeyTest.test_statement_key_is_not_cached x86_64_linux_cpython_3.14_sqlite_pysqlite_dbapiunicode_cextensions 4003 +test.aaa_profiling.test_misc.CacheKeyTest.test_statement_key_is_not_cached x86_64_linux_cpython_3.14_sqlite_pysqlite_dbapiunicode_cextensions 4403 test.aaa_profiling.test_misc.CacheKeyTest.test_statement_key_is_not_cached x86_64_linux_cpython_3.14_sqlite_pysqlite_dbapiunicode_nocextensions 7203 # TEST: test.aaa_profiling.test_misc.EnumTest.test_create_enum_from_pep_435_w_expensive_members diff --git a/test/sql/test_compare.py b/test/sql/test_compare.py index fbc6db81c5..cb38540e45 100644 --- a/test/sql/test_compare.py +++ b/test/sql/test_compare.py @@ -66,6 +66,7 @@ from sqlalchemy.sql.elements import Immutable from sqlalchemy.sql.elements import Null from sqlalchemy.sql.elements import OrderByList from sqlalchemy.sql.elements import Slice +from sqlalchemy.sql.elements import TString from sqlalchemy.sql.elements import TypeClause from sqlalchemy.sql.elements import UnaryExpression from sqlalchemy.sql.functions import FunctionElement @@ -1752,6 +1753,7 @@ class HasCacheKeySubclass(fixtures.TestBase): SyntaxExtension, DialectKWArgs, Executable, + TString, ] ) ) @@ -1794,7 +1796,12 @@ class CompareAndCopyTest(CoreFixtures, fixtures.TestBase): need = set( cls for cls in all_hascachekey_subclasses( - ignore_subclasses=[Annotated, NoInit, SingletonConstant] + ignore_subclasses=[ + Annotated, + NoInit, + SingletonConstant, + TString, + ] ) if "orm" not in cls.__module__ and "compiler" not in cls.__module__ diff --git a/test/sql/test_statement_params.py b/test/sql/test_statement_params.py index 4c648d20bf..e14b74e208 100644 --- a/test/sql/test_statement_params.py +++ b/test/sql/test_statement_params.py @@ -8,6 +8,7 @@ from sqlalchemy.sql import dml from sqlalchemy.sql import func from sqlalchemy.sql import select from sqlalchemy.sql import text +from sqlalchemy.sql import tstring from sqlalchemy.sql.base import ExecutableStatement from sqlalchemy.sql.elements import literal from sqlalchemy.testing import eq_ @@ -18,6 +19,7 @@ from sqlalchemy.testing import ne_ from sqlalchemy.testing.schema import Table from sqlalchemy.types import Integer from sqlalchemy.types import Text +from sqlalchemy.util.compat import Template from sqlalchemy.util.langhelpers import class_hierarchy @@ -35,6 +37,7 @@ class BasicTests(fixtures.TestBase): def _relevant_impls(): return ( text("select 1 + 2"), + tstring(Template("select 1 + 2")), text("select 42 as q").columns(column("q", Integer)), func.max(42), select(1, 2).union(select(3, 4)), diff --git a/test/sql/test_tstrings_py314.py b/test/sql/test_tstrings_py314.py new file mode 100644 index 0000000000..e3d8cb37bc --- /dev/null +++ b/test/sql/test_tstrings_py314.py @@ -0,0 +1,438 @@ +"""Test the TString construct for Python 3.14+ template strings.""" + +from itertools import zip_longest + +from sqlalchemy import column +from sqlalchemy import exc +from sqlalchemy import Integer +from sqlalchemy import JSON +from sqlalchemy import literal +from sqlalchemy import select +from sqlalchemy import String +from sqlalchemy import tstring +from sqlalchemy.engine.interfaces import CacheStats +from sqlalchemy.sql import table +from sqlalchemy.sql.elements import ColumnClause +from sqlalchemy.sql.sqltypes import TypeEngine +from sqlalchemy.testing import AssertsCompiledSQL +from sqlalchemy.testing import eq_ +from sqlalchemy.testing import fixtures +from sqlalchemy.testing.assertions import expect_raises_message + +table1 = table( + "mytable", + column("myid", Integer), + column("name", String), + column("description", String), +) + +table2 = table( + "myothertable", column("otherid", Integer), column("othername", String) +) + + +class CompileTest(fixtures.TestBase, AssertsCompiledSQL): + __dialect__ = "default" + + def test_basic_literal_interpolation(self): + a = 5 + b = 10 + stmt = tstring(t"select {a}, {b}") + self.assert_compile( + stmt, + "select :param_1, :param_2", + checkparams={"param_1": 5, "param_2": 10}, + ) + + def test_no_strings(self): + with expect_raises_message( + exc.ArgumentError, r"pep-750 Tstring \(e.g. t'...'\) expected" + ): + tstring("select * from table") # type: ignore + + def test_tstring_literal_passthrough(self): + stmt = tstring(t"select * from foo where lala = bar") + self.assert_compile(stmt, "select * from foo where lala = bar") + + def test_sqlalchemy_expression_interpolation(self): + subq = select(literal(1)).scalar_subquery() + stmt = tstring(t"SELECT {subq}") + self.assert_compile( + stmt, + "SELECT (SELECT :param_1 AS anon_1)", + checkparams={"param_1": 1}, + ) + + def test_column_interpolation(self): + stmt = tstring(t"SELECT {table1.c.myid}, {table1.c.name} FROM mytable") + self.assert_compile( + stmt, "SELECT mytable.myid, mytable.name FROM mytable" + ) + + def test_column_interpolation_labeled(self): + # Labels are not supported inside tstring as they're ambiguous + # (should they render with AS in all contexts?) + label1 = table1.c.myid.label("label1") + label2 = table1.c.name.label("label2") + + with expect_raises_message( + exc.CompileError, + "Using label\\(\\) directly inside tstring is not supported", + ): + tstring(t"SELECT {label1}, {label2} FROM mytable").compile() + + def test_arithmetic_expression(self): + # Python arithmetic is evaluated before being passed to tstring + a = 1 + stmt = tstring(t"SELECT {a + 7}") + self.assert_compile( + stmt, "SELECT :param_1", checkparams={"param_1": 8} + ) + + def test_embed_tstring_as_select_criteria(self): + user_id = 123 + stmt = select(table1).where(tstring(t"{table1.c.myid} = {user_id}")) + self.assert_compile( + stmt, + "SELECT mytable.myid, mytable.name, mytable.description FROM " + "mytable WHERE mytable.myid = :param_1", + checkparams={"param_1": 123}, + ) + + def test_embed_tstring_as_fromclause(self): + status = "some status" + stmt = select(column("x")).select_from( + tstring( + t"foobar left outer join lala on foobar.foo = lala.foo " + t"AND foobar.status = {status}" + ) + ) + self.assert_compile( + stmt, + "SELECT x FROM foobar left outer join " + "lala on foobar.foo = lala.foo AND foobar.status = :param_1", + checkparams={"param_1": "some status"}, + ) + + def test_and_operator(self): + stmt = tstring(t"1 = 1") & tstring(t"2 = 2") + self.assert_compile(stmt, "1 = 1 AND 2 = 2") + + def test_multiple_literals(self): + a, b, c, d = 1, 2, 3, 4 + stmt = tstring(t"SELECT {a}, {b}, {c}, {d}") + self.assert_compile( + stmt, + "SELECT :param_1, :param_2, :param_3, :param_4", + checkparams={ + "param_1": 1, + "param_2": 2, + "param_3": 3, + "param_4": 4, + }, + ) + + def test_nested_tstring_execution(self): + inner = tstring(t"(SELECT {'some value'} AS anon_1)") + self.assert_compile( + tstring(t"select {inner}"), + "select (SELECT :param_1 AS anon_1)", + checkparams={"param_1": "some value"}, + ) + + def test_nested_scalar_subquery_execution(self): + inner = select(literal("some value")).scalar_subquery() + self.assert_compile( + tstring(t"select {inner}"), + "select (SELECT :param_1 AS anon_1)", + checkparams={"param_1": "some value"}, + ) + + def test_nested_subquery_execution(self): + inner = select(literal("some value")).subquery() + self.assert_compile( + tstring(t"select * from {inner}"), + "select * from (SELECT :param_1 AS anon_2) AS anon_1", + checkparams={"param_1": "some value"}, + ) + + +class ColumnsTest(fixtures.TestBase, AssertsCompiledSQL): + __dialect__ = "default" + + def _assert_columns(self, stmt, columns): + """Assert that stmt.selected_columns matches the given columns. + + Also verifies that the result map structure matches what we'd get + from a regular select() statement with the same columns. + """ + # Check that selected_columns matches + eq_( + [c.name for c in stmt.selected_columns], + [c.name for c in columns], + ) + for stmt_col, expected_col in zip(stmt.selected_columns, columns): + eq_(stmt_col.type._type_affinity, expected_col.type._type_affinity) + + # Verify result map structure matches what select() would produce + stmt_compiled = stmt.compile() + select_compiled = select(*columns).compile() + stmt_map = stmt_compiled._create_result_map() + select_map = select_compiled._create_result_map() + + # Compare result map structure using recursive comparison + eq_(list(stmt_map.keys()), list(select_map.keys())) + for key in stmt_map: + stmt_entry = stmt_map[key] + select_entry = select_map[key] + # Use recursive comparison for the entire entry tuple + assert self._compare_recursive( + stmt_entry, select_entry + ), f"Result map entries differ:\n {stmt_entry}\n {select_entry}" + + def _compare_recursive(self, left, right): + if isinstance(left, ColumnClause) and isinstance(right, ColumnClause): + return ( + left.name == right.name + and left.type._type_affinity == right.type._type_affinity + ) + elif isinstance(left, TypeEngine) and isinstance(right, TypeEngine): + return left._type_affinity == right._type_affinity + elif isinstance(left, (tuple, list)) and isinstance( + right, (tuple, list) + ): + return all( + self._compare_recursive(l, r) + for l, r in zip_longest(left, right) + ) + else: + return left == right + + def test_columns_positional(self): + cols = [column("id", Integer), column("name", String)] + stmt = tstring(t"SELECT id, name FROM users").columns(*cols) + self.assert_compile(stmt, "SELECT id, name FROM users") + self._assert_columns(stmt, cols) + + def test_columns_keyword(self): + stmt = tstring(t"SELECT id, name FROM users").columns( + id=Integer, name=String + ) + self.assert_compile(stmt, "SELECT id, name FROM users") + cols = [column("id", Integer), column("name", String)] + self._assert_columns(stmt, cols) + + def test_columns_mixed(self): + cols = [ + column("id", Integer), + column("name", String), + column("age", Integer), + ] + stmt = tstring(t"SELECT id, name, age FROM users").columns( + cols[0], name=String, age=Integer + ) + self.assert_compile(stmt, "SELECT id, name, age FROM users") + self._assert_columns(stmt, cols) + + def test_columns_subquery(self): + stmt = ( + tstring(t"SELECT id, name FROM users") + .columns(column("id", Integer), column("name", String)) + .subquery("st") + ) + outer = select(table1).select_from( + table1.join(stmt, table1.c.name == stmt.c.name) + ) + self.assert_compile( + outer, + "SELECT mytable.myid, mytable.name, mytable.description FROM " + "mytable JOIN (SELECT id, name FROM users) AS st ON " + "mytable.name = st.name", + ) + + +class ExecutionTest(fixtures.TestBase): + __backend__ = True + + def test_basic_execution(self, connection): + a = 1 + b = 2 + result = connection.execute(tstring(t"select {a + 7}, {b}")) + eq_(result.all(), [(8, 2)]) + + def test_json_literal_execution(self, connection): + some_json = {"foo": "bar"} + stmt = tstring(t"select {literal(some_json, JSON)}").columns( + column("jj", JSON) + ) + result = connection.execute(stmt) + row = result.scalar() + eq_(row, {"foo": "bar"}) + + def test_statement_caching(self, connection): + """Test that tstring statements are properly cached.""" + some_json = {"foo": "bar"} + stmt1 = tstring(t"select {literal(some_json, JSON)}").columns( + column("jj", JSON) + ) + result1 = connection.execute(stmt1) + eq_(result1.scalar(), {"foo": "bar"}) + + # Execute same structure with different value + some_json = {"foo": "newbar", "bat": "hoho"} + stmt2 = tstring(t"select {literal(some_json, JSON)}").columns( + column("jj", JSON) + ) + result2 = connection.execute(stmt2) + + # Should hit cache + if hasattr(result2.context, "cache_hit"): + eq_(result2.context.cache_hit, CacheStats.CACHE_HIT) + + eq_(result2.scalar(), {"foo": "newbar", "bat": "hoho"}) + + def test_nested_scalar_subquery_execution(self, connection): + inner = select(literal("some value")).scalar_subquery() + result = connection.execute(tstring(t"select {inner}")) + eq_(result.all(), [("some value",)]) + + def test_nested_subquery_execution(self, connection): + inner = select(literal("some value")).subquery() + result = connection.execute(tstring(t"select * from {inner}")) + eq_(result.all(), [("some value",)]) + + def test_multiple_values(self, connection): + values = [1, 2, 3, 4, 5] + result = connection.execute( + tstring( + t"select {values[0]}, {values[1]}, {values[2]}, " + t"{values[3]}, {values[4]}" + ) + ) + eq_(result.all(), [(1, 2, 3, 4, 5)]) + + +class IntegrationTest(fixtures.TablesTest): + __backend__ = True + + @classmethod + def define_tables(cls, metadata): + from sqlalchemy import Column + from sqlalchemy import Table + + Table( + "users", + metadata, + Column("id", Integer, primary_key=True), + Column("name", String(50)), + ) + + @classmethod + def insert_data(cls, connection): + connection.execute( + cls.tables.users.insert(), + [ + {"id": 1, "name": "alice"}, + {"id": 2, "name": "bob"}, + {"id": 3, "name": "charlie"}, + ], + ) + + def test_select_from_real_table(self, connection): + user_id = 2 + stmt = tstring(t"SELECT * FROM users WHERE id = {user_id}").columns( + column("id", Integer), column("name", String) + ) + result = connection.execute(stmt) + row = result.one() + eq_(row.id, 2) + eq_(row.name, "bob") + + def test_where_clause_with_real_table(self, connection): + users = self.tables.users + name_filter = "alice" + stmt = select(users).where( + tstring(t"{users.c.name} = {literal(name_filter)}") + ) + result = connection.execute(stmt) + row = result.one() + eq_(row.id, 1) + eq_(row.name, "alice") + + def test_complex_query(self, connection): + min_id = 1 + max_id = 2 + stmt = tstring( + t"SELECT id, name FROM users WHERE id >= {min_id} " + t"AND id <= {max_id}" + ).columns(column("id", Integer), column("name", String)) + result = connection.execute(stmt) + rows = result.all() + eq_(len(rows), 2) + eq_(rows[0].name, "alice") + eq_(rows[1].name, "bob") + + +class CacheKeyTest(fixtures.CacheKeyFixture, fixtures.TestBase): + """Test cache key generation for tstring constructs.""" + + @fixtures.CacheKeySuite.run_suite_tests + def test_tstring_cache_key(self): + + def stmt1(): + # Basic tstring with literal + a = 5 + return tstring(t"SELECT {a}") + + def stmt2(): + # Different structure - two literals + a = 5 + b = 10 + return tstring(t"SELECT {a}, {b}") + + def stmt3(): + # With column reference + return tstring(t"SELECT {table1.c.myid}") + + def stmt4(): + # Different column - different cache key + return tstring(t"SELECT {table1.c.name}") + + def stmt5(): + # With .columns() + a = 5 + return tstring(t"SELECT {a}").columns(column("val", Integer)) + + def stmt6(): + # String literal passthrough + return tstring(t"SELECT * FROM users") + + def stmt7(): + # Different string literal + return tstring(t"SELECT id FROM users") + + def stmt8(): + # With SQLAlchemy scalar subquery + return tstring(t"SELECT {select(literal(1)).scalar_subquery()}") + + def stmt9(): + # Mixed: text and literal + user_id = 42 + return tstring(t"SELECT * FROM users WHERE id = {user_id}") + + def stmt10(): + # Mixed: text and column + return tstring(t"SELECT * FROM users WHERE id = {table1.c.myid}") + + return lambda: [ + stmt1(), + stmt2(), + stmt3(), + stmt4(), + stmt5(), + stmt6(), + stmt7(), + stmt8(), + stmt9(), + stmt10(), + ] diff --git a/test/typing/plain_files/orm/typed_queries.py b/test/typing/plain_files/orm/typed_queries.py index d21922e869..8460dafcb9 100644 --- a/test/typing/plain_files/orm/typed_queries.py +++ b/test/typing/plain_files/orm/typed_queries.py @@ -1,5 +1,6 @@ from __future__ import annotations +from string.templatelib import Template from typing import Any from typing import assert_type from typing import Unpack @@ -19,6 +20,7 @@ from sqlalchemy import select from sqlalchemy import String from sqlalchemy import Table from sqlalchemy import text +from sqlalchemy import tstring from sqlalchemy import update from sqlalchemy.engine import Result from sqlalchemy.engine.row import Row @@ -31,6 +33,7 @@ from sqlalchemy.orm.query import Query from sqlalchemy.orm.query import RowReturningQuery from sqlalchemy.sql.dml import ReturningInsert from sqlalchemy.sql.elements import KeyedColumnElement +from sqlalchemy.sql.elements import TString from sqlalchemy.sql.expression import FromClause from sqlalchemy.sql.expression import TextClause from sqlalchemy.sql.selectable import ScalarSelect @@ -431,19 +434,23 @@ def t_dml_delete() -> None: assert_type(r1, Result[int, str]) -def t_from_statement() -> None: +def t_from_statement_text() -> None: t = text("select * from user") assert_type(t, TextClause) select(User).from_statement(t) + session.query(User).from_statement(t) + ts = text("select * from user").columns(User.id, User.name) assert_type(ts, TextualSelect) select(User).from_statement(ts) + session.query(User).from_statement(ts) + ts2 = text("select * from user").columns( user_table.c.id, user_table.c.name ) @@ -452,6 +459,34 @@ def t_from_statement() -> None: select(User).from_statement(ts2) + session.query(User).from_statement(ts2) + + +def t_from_statement_tstring(templ: Template) -> None: + t = tstring(templ) + + assert_type(t, TString) + + select(User).from_statement(t) + + session.query(User).from_statement(t) + + ts = tstring(templ).columns(User.id, User.name) + + assert_type(ts, TextualSelect) + + select(User).from_statement(ts) + + session.query(User).from_statement(ts) + + ts2 = tstring(templ).columns(user_table.c.id, user_table.c.name) + + assert_type(ts2, TextualSelect) + + select(User).from_statement(ts2) + + session.query(User).from_statement(ts2) + def t_aliased_fromclause() -> None: a1 = aliased(User, user_table) -- 2.47.3