)
@overload
- def scalar_one(self: Result[Tuple[_T]]) -> _T:
+ def scalar_one(self: Result[_T]) -> _T:
...
@overload
)
@overload
- def scalar_one_or_none(self: Result[Tuple[_T]]) -> Optional[_T]:
+ def scalar_one_or_none(self: Result[_T]) -> Optional[_T]:
...
@overload
)
@overload
- def scalar(self: Result[Tuple[_T]]) -> Optional[_T]:
+ def scalar(self: Result[_T]) -> Optional[_T]:
...
@overload
from ...engine.base import Transaction
from ...exc import ArgumentError
from ...util.concurrency import greenlet_spawn
+from ...util.typing import TypeVarTuple
+from ...util.typing import Unpack
if TYPE_CHECKING:
from ...engine.cursor import CursorResult
from ...sql.selectable import TypedReturnsRows
_T = TypeVar("_T", bound=Any)
+_Ts = TypeVarTuple("_Ts")
def create_async_engine(url: Union[str, URL], **kw: Any) -> AsyncEngine:
@overload
def stream(
self,
- statement: TypedReturnsRows[_T],
+ statement: TypedReturnsRows[Unpack[_Ts]],
parameters: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> GeneratorStartableContext[AsyncResult[_T]]:
+ ) -> GeneratorStartableContext[AsyncResult[Unpack[_Ts]]]:
...
@overload
@overload
async def execute(
self,
- statement: TypedReturnsRows[_T],
+ statement: TypedReturnsRows[Unpack[_Ts]],
parameters: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> CursorResult[_T]:
+ ) -> CursorResult[Unpack[_Ts]]:
...
@overload
@overload
async def scalar(
self,
- statement: TypedReturnsRows[Tuple[_T]],
+ statement: TypedReturnsRows[_T],
parameters: Optional[_CoreSingleExecuteParams] = None,
*,
execution_options: Optional[CoreExecuteOptionsParameter] = None,
@overload
async def scalars(
self,
- statement: TypedReturnsRows[Tuple[_T]],
+ statement: TypedReturnsRows[_T],
parameters: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: Optional[CoreExecuteOptionsParameter] = None,
@overload
def stream_scalars(
self,
- statement: TypedReturnsRows[Tuple[_T]],
+ statement: TypedReturnsRows[_T],
parameters: Optional[_CoreSingleExecuteParams] = None,
*,
execution_options: Optional[CoreExecuteOptionsParameter] = None,
return await greenlet_spawn(self._only_one_row, True, False, False)
@overload
- async def scalar_one(self: AsyncResult[Tuple[_T]]) -> _T:
+ async def scalar_one(self: AsyncResult[_T]) -> _T:
...
@overload
@overload
async def scalar_one_or_none(
- self: AsyncResult[Tuple[_T]],
+ self: AsyncResult[_T],
) -> Optional[_T]:
...
return await greenlet_spawn(self._only_one_row, True, True, False)
@overload
- async def scalar(self: AsyncResult[Tuple[_T]]) -> Optional[_T]:
+ async def scalar(self: AsyncResult[_T]) -> Optional[_T]:
...
@overload
"""
return await greenlet_spawn(self._only_one_row, False, False, True)
- async def freeze(self) -> FrozenResult[Tuple[Unpack[_Ts]]]:
+ async def freeze(self) -> FrozenResult[Unpack[_Ts]]:
"""Return a callable object that will produce copies of this
:class:`_asyncio.AsyncResult` when invoked.
@overload
async def execute(
self,
- statement: TypedReturnsRows[Tuple[Unpack[_Ts]]],
+ statement: TypedReturnsRows[Unpack[_Ts]],
params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
@overload
async def scalar(
self,
- statement: TypedReturnsRows[Tuple[_T]],
+ statement: TypedReturnsRows[_T],
params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
@overload
async def scalars(
self,
- statement: TypedReturnsRows[Tuple[_T]],
+ statement: TypedReturnsRows[_T],
params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
@overload
async def stream(
self,
- statement: TypedReturnsRows[_T],
+ statement: TypedReturnsRows[Unpack[_Ts]],
params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
**kw: Any,
- ) -> AsyncResult[_T]:
+ ) -> AsyncResult[Unpack[_Ts]]:
...
@overload
@overload
async def stream_scalars(
self,
- statement: TypedReturnsRows[Tuple[_T]],
+ statement: TypedReturnsRows[_T],
params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
@overload
def execute(
self,
- statement: TypedReturnsRows[Tuple[Unpack[_Ts]]],
+ statement: TypedReturnsRows[Unpack[_Ts]],
params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
@overload
def scalar(
self,
- statement: TypedReturnsRows[Tuple[_T]],
+ statement: TypedReturnsRows[_T],
params: Optional[_CoreSingleExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
@overload
def scalars(
self,
- statement: TypedReturnsRows[Tuple[_T]],
+ statement: TypedReturnsRows[_T],
params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
@overload
def execute(
self,
- statement: TypedReturnsRows[Tuple[Unpack[_Ts]]],
+ statement: TypedReturnsRows[Unpack[_Ts]],
params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
@overload
def scalar(
self,
- statement: TypedReturnsRows[Tuple[_T]],
+ statement: TypedReturnsRows[_T],
params: Optional[_CoreSingleExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
@overload
def scalars(
self,
- statement: TypedReturnsRows[Tuple[_T]],
+ statement: TypedReturnsRows[_T],
params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
"produce a SQL statement and execute it with session.scalars()."
)
- def select(self) -> Select[Tuple[_T]]:
+ def select(self) -> Select[_T]:
"""Produce a :class:`_sql.Select` construct that represents the
rows within this instance-local :class:`_orm.WriteOnlyCollection`.
# statically generated** by tools/generate_sel_v1_overloads.py
@overload
- def with_only_columns(self, __ent0: _TCCA[_T0]) -> Select[Tuple[_T0]]:
+ def with_only_columns(self, __ent0: _TCCA[_T0]) -> Select[_T0]:
...
@overload
def with_only_columns(
self, __ent0: _TCCA[_T0], __ent1: _TCCA[_T1]
- ) -> Select[Tuple[_T0, _T1]]:
+ ) -> Select[_T0, _T1]:
...
@overload
def with_only_columns(
self, __ent0: _TCCA[_T0], __ent1: _TCCA[_T1], __ent2: _TCCA[_T2]
- ) -> Select[Tuple[_T0, _T1, _T2]]:
+ ) -> Select[_T0, _T1, _T2]:
...
@overload
__ent1: _TCCA[_T1],
__ent2: _TCCA[_T2],
__ent3: _TCCA[_T3],
- ) -> Select[Tuple[_T0, _T1, _T2, _T3]]:
+ ) -> Select[_T0, _T1, _T2, _T3]:
...
@overload
__ent2: _TCCA[_T2],
__ent3: _TCCA[_T3],
__ent4: _TCCA[_T4],
- ) -> Select[Tuple[_T0, _T1, _T2, _T3, _T4]]:
+ ) -> Select[_T0, _T1, _T2, _T3, _T4]:
...
@overload
__ent3: _TCCA[_T3],
__ent4: _TCCA[_T4],
__ent5: _TCCA[_T5],
- ) -> Select[Tuple[_T0, _T1, _T2, _T3, _T4, _T5]]:
+ ) -> Select[_T0, _T1, _T2, _T3, _T4, _T5]:
...
@overload
__ent4: _TCCA[_T4],
__ent5: _TCCA[_T5],
__ent6: _TCCA[_T6],
- ) -> Select[Tuple[_T0, _T1, _T2, _T3, _T4, _T5, _T6]]:
+ ) -> Select[_T0, _T1, _T2, _T3, _T4, _T5, _T6]:
...
@overload
__ent5: _TCCA[_T5],
__ent6: _TCCA[_T6],
__ent7: _TCCA[_T7],
- ) -> Select[Tuple[_T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7]]:
+ ) -> Select[_T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7]:
...
# END OVERLOADED FUNCTIONS self.with_only_columns
result = conn.execute(text("select * from table"))
- # EXPECTED_TYPE: CursorResult[Any]
+ # EXPECTED_TYPE: CursorResult[Unpack[.*tuple[Any, ...]]]
reveal_type(result)
with e.begin() as conn:
result = conn.execute(text("select * from table"))
- # EXPECTED_TYPE: CursorResult[Any]
+ # EXPECTED_TYPE: CursorResult[Unpack[.*tuple[Any, ...]]]
reveal_type(result)
engine = create_engine("postgresql://scott:tiger@localhost/test")
# EXPECTED_TYPE: SQLCoreOperations[bool]
reveal_type(expr4)
- # EXPECTED_TYPE: Select[Tuple[bool]]
+ # EXPECTED_TYPE: Select[bool]
reveal_type(stmt2)
stmt = select(Vertex).where(Vertex.start.in_([Point(3, 4)]))
-# EXPECTED_TYPE: Select[Tuple[Vertex]]
+# EXPECTED_TYPE: Select[Vertex]
reveal_type(stmt)
# EXPECTED_TYPE: composite.Point
stmt = select(Vertex).where(Vertex.start.in_([Point(3, 4)]))
-# EXPECTED_TYPE: Select[Tuple[Vertex]]
+# EXPECTED_TYPE: Select[Vertex]
reveal_type(stmt)
# EXPECTED_TYPE: composite.Point
def do_something_with_mapped_class(
cls_: MappedClassProtocol[Employee],
) -> None:
- # EXPECTED_TYPE: Select[Any]
+ # EXPECTED_TYPE: Select[Unpack[.*tuple[Any, ...]]]
reveal_type(cls_.__table__.select())
# EXPECTED_TYPE: Mapper[Employee]
message_query = select(Message)
if TYPE_CHECKING:
- # EXPECTED_TYPE: Select[Tuple[Message]]
+ # EXPECTED_TYPE: Select[Message]
reveal_type(message_query)
return session.scalars(message_query).all()
poly_query = select(PolymorphicMessage)
if TYPE_CHECKING:
- # EXPECTED_TYPE: Select[Tuple[Message]]
+ # EXPECTED_TYPE: Select[Message]
reveal_type(poly_query)
return session.scalars(poly_query).all()
q2 = sess.query(User.id).filter_by(id=7)
rows2 = q2.all()
- # EXPECTED_TYPE: List[Row[Tuple[int]]]
+ # EXPECTED_TYPE: List[.*Row[.*int].*]
reveal_type(rows2)
# test #8280
# test #9125
for row in sess.query(User.id, User.name):
- # EXPECTED_TYPE: Row[Tuple[int, str]]
+ # EXPECTED_TYPE: .*Row[int, str].*
reveal_type(row)
for uobj1 in sess.query(User):
def t_select_1() -> None:
stmt = select(User.id, User.name).filter(User.id == 5)
- # EXPECTED_TYPE: Select[Tuple[int, str]]
+ # EXPECTED_TYPE: Select[int, str]
reveal_type(stmt)
result = session.execute(stmt)
- # EXPECTED_TYPE: Result[int, str]
+ # EXPECTED_TYPE: .*Result[int, str].*
reveal_type(result)
.fetch(User.id)
)
- # EXPECTED_TYPE: Select[Tuple[User]]
+ # EXPECTED_TYPE: Select[User]
reveal_type(stmt)
result = session.execute(stmt)
- # EXPECTED_TYPE: Result[User]
+ # EXPECTED_TYPE: .*Result[User].*
reveal_type(result)
stmt = select(ua.id, ua.name).filter(User.id == 5)
- # EXPECTED_TYPE: Select[Tuple[int, str]]
+ # EXPECTED_TYPE: Select[int, str]
reveal_type(stmt)
result = session.execute(stmt)
- # EXPECTED_TYPE: Result[int, str]
+ # EXPECTED_TYPE: .*Result[int, str].*
reveal_type(result)
ua = aliased(User)
stmt = select(ua, User).filter(User.id == 5)
- # EXPECTED_TYPE: Select[Tuple[User, User]]
+ # EXPECTED_TYPE: Select[User, User]
reveal_type(stmt)
result = session.execute(stmt)
reveal_type(q1.all())
# mypy switches to builtins.list for some reason here
- # EXPECTED_RE_TYPE: .*\.[Ll]ist\[.*Row\*?\[Tuple\[.*User\]\]\]
+ # EXPECTED_RE_TYPE: .*\.[Ll]ist\[.*Row\*?\[.*User\].*\]
reveal_type(q1.only_return_tuples(True).all())
# EXPECTED_TYPE: List[Tuple[User]]
def t_legacy_query_cols_1() -> None:
q1 = session.query(User.id, User.name).filter(User.id == 5)
- # EXPECTED_TYPE: RowReturningQuery[Tuple[int, str]]
+ # EXPECTED_TYPE: RowReturningQuery[int, str]
reveal_type(q1)
- # EXPECTED_TYPE: Row[Tuple[int, str]]
+ # EXPECTED_TYPE: .*Row[int, str].*
reveal_type(q1.one())
r1 = q1.one()
- x, y = r1.t
+ x, y = r1
# EXPECTED_TYPE: int
reveal_type(x)
def t_legacy_query_cols_tupleq_1() -> None:
q1 = session.query(User.id, User.name).filter(User.id == 5)
- # EXPECTED_TYPE: RowReturningQuery[Tuple[int, str]]
+ # EXPECTED_TYPE: RowReturningQuery[int, str]
reveal_type(q1)
q2 = q1.tuples()
q2 = q1.with_entities(User.id, User.name)
- # EXPECTED_TYPE: RowReturningQuery[Tuple[int, str]]
+ # EXPECTED_TYPE: RowReturningQuery[int, str]
reveal_type(q2)
- # EXPECTED_TYPE: Row[Tuple[int, str]]
+ # EXPECTED_TYPE: .*Row[int, str].*
reveal_type(q2.one())
r1 = q2.one()
- x, y = r1.t
+ x, y = r1
# EXPECTED_TYPE: int
reveal_type(x)
def t_select_with_only_cols() -> None:
q1 = select(User).where(User.id == 5)
- # EXPECTED_TYPE: Select[Tuple[User]]
+ # EXPECTED_TYPE: Select[User]
reveal_type(q1)
q2 = q1.with_only_columns(User.id, User.name)
- # EXPECTED_TYPE: Select[Tuple[int, str]]
+ # EXPECTED_TYPE: Select[int, str]
reveal_type(q2)
row = connection.execute(q2).one()
- # EXPECTED_TYPE: Row[Tuple[int, str]]
+ # EXPECTED_TYPE: .*Row[int, str].*
reveal_type(row)
- x, y = row.t
+ x, y = row
# EXPECTED_TYPE: int
reveal_type(x)
a1 = aliased(User)
q1 = session.query(User, a1, User.name).filter(User.id == 5)
- # EXPECTED_TYPE: RowReturningQuery[Tuple[User, User, str]]
+ # EXPECTED_TYPE: RowReturningQuery[User, User, str]
reveal_type(q1)
- # EXPECTED_TYPE: Row[Tuple[User, User, str]]
+ # EXPECTED_TYPE: .*Row[User, User, str].*
reveal_type(q1.one())
r1 = q1.one()
- x, y, z = r1.t
+ x, y, z = r1
# EXPECTED_TYPE: User
reveal_type(x)
a1 = aliased(User)
q2 = q1.with_entities(User, a1, User.name).filter(User.id == 5)
- # EXPECTED_TYPE: RowReturningQuery[Tuple[User, User, str]]
+ # EXPECTED_TYPE: RowReturningQuery[User, User, str]
reveal_type(q2)
- # EXPECTED_TYPE: Row[Tuple[User, User, str]]
+ # EXPECTED_TYPE: .*Row[User, User, str].*
reveal_type(q2.one())
r1 = q2.one()
- x, y, z = r1.t
+ x, y, z = r1
# EXPECTED_TYPE: User
reveal_type(x)
q2 = q1.add_columns(User.data)
# note this should not match Select
- # EXPECTED_TYPE: Select[Any]
+ # EXPECTED_TYPE: Select[Unpack[.*tuple[Any, ...]]]
reveal_type(q2)
# mypy would downgrade to Any rather than picking the basemost type.
# with typing integrated into Select etc. we can at least get a Select
# object back.
- # EXPECTED_TYPE: Select[Any]
+ # EXPECTED_TYPE: Select[Unpack[.*tuple[Any, ...]]]
reveal_type(s2)
# so a fully explicit type may be given
# plain FromClause etc we at least get Select
s3 = select(s1)
- # EXPECTED_TYPE: Select[Any]
+ # EXPECTED_TYPE: Select[Unpack[.*tuple[Any, ...]]]
reveal_type(s3)
t1 = User.__table__
s4 = select(t1)
- # EXPECTED_TYPE: Select[Any]
+ # EXPECTED_TYPE: Select[Unpack[.*tuple[Any, ...]]]
reveal_type(s4)
r1 = session.execute(s1)
- # EXPECTED_TYPE: Result[Tuple[int, str]]
+ # EXPECTED_TYPE: Result[int, str]
reveal_type(r1)
s2 = insert(User).returning(User)
r2 = session.execute(s2)
- # EXPECTED_TYPE: Result[Tuple[User]]
+ # EXPECTED_TYPE: Result[User]
reveal_type(r2)
s3 = insert(User).returning(func.foo(), column("q"))
- # EXPECTED_TYPE: ReturningInsert[Any]
+ # EXPECTED_TYPE: ReturningInsert[Unpack[.*tuple[Any, ...]]]
reveal_type(s3)
r3 = session.execute(s3)
- # EXPECTED_TYPE: Result[Any]
+ # EXPECTED_TYPE: Result[Unpack[.*tuple[Any, ...]]]
reveal_type(r3)
def t_dml_bare_insert() -> None:
s1 = insert(User)
r1 = session.execute(s1)
- # EXPECTED_TYPE: CursorResult[Any]
+ # EXPECTED_TYPE: CursorResult[Unpack[.*tuple[Any, ...]]]
reveal_type(r1)
# EXPECTED_TYPE: int
reveal_type(r1.rowcount)
def t_dml_bare_update() -> None:
s1 = update(User)
r1 = session.execute(s1)
- # EXPECTED_TYPE: CursorResult[Any]
+ # EXPECTED_TYPE: CursorResult[Unpack[.*tuple[Any, ...]]]
reveal_type(r1)
# EXPECTED_TYPE: int
reveal_type(r1.rowcount)
def t_dml_update_with_values() -> None:
s1 = update(User).values({User.id: 123, User.data: "value"})
r1 = session.execute(s1)
- # EXPECTED_TYPE: CursorResult[Any]
+ # EXPECTED_TYPE: CursorResult[Unpack[.*tuple[Any, ...]]]
reveal_type(r1)
# EXPECTED_TYPE: int
reveal_type(r1.rowcount)
def t_dml_bare_delete() -> None:
s1 = delete(User)
r1 = session.execute(s1)
- # EXPECTED_TYPE: CursorResult[Any]
+ # EXPECTED_TYPE: CursorResult[Unpack[.*tuple[Any, ...]]]
reveal_type(r1)
# EXPECTED_TYPE: int
reveal_type(r1.rowcount)
r1 = session.execute(s1)
- # EXPECTED_TYPE: Result[Tuple[int, str]]
+ # EXPECTED_TYPE: Result[int, str]
reveal_type(r1)
r1 = session.execute(s1)
- # EXPECTED_TYPE: Result[Tuple[int, str]]
+ # EXPECTED_TYPE: Result[int, str]
reveal_type(r1)
stmt = select(e1)
-# EXPECTED_TYPE: Select[Tuple[bool]]
+# EXPECTED_TYPE: Select[bool]
reveal_type(stmt)
stmt = stmt.where(e1)
stmt = select(e2)
-# EXPECTED_TYPE: Select[Tuple[bool]]
+# EXPECTED_TYPE: Select[bool]
reveal_type(stmt)
stmt = stmt.where(e2)
stmt2 = (
select(User.id).order_by(asc("id"), desc("email")).group_by("email", "id")
)
-# EXPECTED_TYPE: Select[Tuple[int]]
+# EXPECTED_TYPE: Select[int]
reveal_type(stmt2)
stmt2 = select(User.id).order_by(User.id).group_by(User.email)
stmt2 = (
select(User.id).order_by(User.id, User.email).group_by(User.email, User.id)
)
-# EXPECTED_TYPE: Select[Tuple[int]]
+# EXPECTED_TYPE: Select[int]
reveal_type(stmt2)
q1 = Session().query(User.id).order_by("email").group_by("email")
q1 = Session().query(User.id).order_by("id", "email").group_by("email", "id")
-# EXPECTED_TYPE: RowReturningQuery[Tuple[int]]
+# EXPECTED_TYPE: RowReturningQuery[int]
reveal_type(q1)
q1 = Session().query(User.id).order_by(User.id).group_by(User.email)
.order_by(User.id, User.email)
.group_by(User.email, User.id)
)
-# EXPECTED_TYPE: RowReturningQuery[Tuple[int]]
+# EXPECTED_TYPE: RowReturningQuery[int]
reveal_type(q1)
# test 9174
stmt1 = select(func.aggregate_strings(column("x"), column("x")))
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*str\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*str\]
reveal_type(stmt1)
stmt2 = select(func.char_length(column("x")))
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*int\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*int\]
reveal_type(stmt2)
stmt3 = select(func.concat())
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*str\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*str\]
reveal_type(stmt3)
stmt4 = select(func.count(column("x")))
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*int\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*int\]
reveal_type(stmt4)
stmt5 = select(func.cume_dist())
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*Decimal\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*Decimal\]
reveal_type(stmt5)
stmt6 = select(func.current_date())
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*date\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*date\]
reveal_type(stmt6)
stmt7 = select(func.current_time())
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*time\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*time\]
reveal_type(stmt7)
stmt8 = select(func.current_timestamp())
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*datetime\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*datetime\]
reveal_type(stmt8)
stmt9 = select(func.current_user())
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*str\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*str\]
reveal_type(stmt9)
stmt10 = select(func.dense_rank())
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*int\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*int\]
reveal_type(stmt10)
stmt11 = select(func.localtime())
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*datetime\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*datetime\]
reveal_type(stmt11)
stmt12 = select(func.localtimestamp())
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*datetime\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*datetime\]
reveal_type(stmt12)
stmt13 = select(func.next_value(column("x")))
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*int\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*int\]
reveal_type(stmt13)
stmt14 = select(func.now())
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*datetime\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*datetime\]
reveal_type(stmt14)
stmt15 = select(func.percent_rank())
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*Decimal\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*Decimal\]
reveal_type(stmt15)
stmt16 = select(func.rank())
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*int\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*int\]
reveal_type(stmt16)
stmt17 = select(func.session_user())
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*str\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*str\]
reveal_type(stmt17)
stmt18 = select(func.sysdate())
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*datetime\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*datetime\]
reveal_type(stmt18)
stmt19 = select(func.user())
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*str\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*str\]
reveal_type(stmt19)
# END GENERATED FUNCTION TYPING TESTS
from __future__ import annotations
-from typing import Tuple
from typing import TYPE_CHECKING
from sqlalchemy import Column
result = conn.execute(s6)
if TYPE_CHECKING:
- # EXPECTED_TYPE: CursorResult[Any]
+ # EXPECTED_TYPE: CursorResult[Unpack[.*tuple[Any, ...]]]
reveal_type(result)
# we can type these like this
- my_result: Result[Tuple[User]] = conn.execute(s6)
+ my_result: Result[User] = conn.execute(s6)
if TYPE_CHECKING:
# pyright and mypy disagree on the specific type here,
# mypy sees Result as we said, pyright seems to upgrade it to
# CursorResult
- # EXPECTED_RE_TYPE: .*(?:Cursor)?Result\[Tuple\[.*User\]\]
+ # EXPECTED_RE_TYPE: .*(?:Cursor)?Result\[.*User\]
reveal_type(my_result)
single_stmt = select(User.name).where(User.name == "foo")
-# EXPECTED_RE_TYPE: sqlalchemy..*Select\*?\[Tuple\[builtins.str\*?\]\]
+# EXPECTED_RE_TYPE: sqlalchemy..*Select\*?\[builtins.str\*?\]
reveal_type(single_stmt)
multi_stmt = select(User.id, User.name).where(User.name == "foo")
-# EXPECTED_RE_TYPE: sqlalchemy..*Select\*?\[Tuple\[builtins.int\*?, builtins.str\*?\]\]
+# EXPECTED_RE_TYPE: sqlalchemy..*Select\*?\[builtins.int\*?, builtins.str\*?\]
reveal_type(multi_stmt)
def t_result_ctxmanager() -> None:
with connection.execute(select(column("q", Integer))) as r1:
- # EXPECTED_TYPE: CursorResult[Tuple[int]]
+ # EXPECTED_TYPE: CursorResult[int]
reveal_type(r1)
with r1.mappings() as r1m:
reveal_type(r2)
with session.execute(select(User.id)) as r3:
- # EXPECTED_TYPE: Result[Tuple[int]]
+ # EXPECTED_TYPE: Result[int]
reveal_type(r3)
with session.scalars(select(User.id)) as r4:
r1 = session.execute(s1)
- # EXPECTED_RE_TYPE: sqlalchemy..*.Result\[Tuple\[builtins.int\*?, typed_results.User\*?, builtins.str\*?\]\]
+ # EXPECTED_RE_TYPE: sqlalchemy..*.Result\[builtins.int\*?, typed_results.User\*?, builtins.str\*?\]
reveal_type(r1)
s2 = select(User, a1).where(User.name == "foo")
r2 = session.execute(s2)
- # EXPECTED_RE_TYPE: sqlalchemy.*Result\[Tuple\[typed_results.User\*?, typed_results.User\*?\]\]
+ # EXPECTED_RE_TYPE: sqlalchemy.*Result\[typed_results.User\*?, typed_results.User\*?\]
reveal_type(r2)
row = r2.t.one()
# automatically typed since they are dynamically generated
a1_id = cast(Mapped[int], a1.id)
s3 = select(User.id, a1_id, a1, User).where(User.name == "foo")
- # EXPECTED_RE_TYPE: sqlalchemy.*Select\*?\[Tuple\[builtins.int\*?, builtins.int\*?, typed_results.User\*?, typed_results.User\*?\]\]
+ # EXPECTED_RE_TYPE: sqlalchemy.*Select\*?\[builtins.int\*?, builtins.int\*?, typed_results.User\*?, typed_results.User\*?\]
reveal_type(s3)
# testing Mapped[entity]
some_mp = cast(Mapped[User], object())
s4 = select(some_mp, a1, User).where(User.name == "foo")
- # NOTEXPECTED_RE_TYPE: sqlalchemy..*Select\*?\[Tuple\[typed_results.User\*?, typed_results.User\*?, typed_results.User\*?\]\]
+ # NOTEXPECTED_RE_TYPE: sqlalchemy..*Select\*?\[typed_results.User\*?, typed_results.User\*?, typed_results.User\*?\]
- # sqlalchemy.sql._gen_overloads.Select[Tuple[typed_results.User, typed_results.User, typed_results.User]]
+ # sqlalchemy.sql._gen_overloads.Select[typed_results.User, typed_results.User, typed_results.User]
- # EXPECTED_TYPE: Select[Tuple[User, User, User]]
+ # EXPECTED_TYPE: Select[User, User, User]
reveal_type(s4)
# test plain core expressions
s5 = select(x, y, User.name + "hi")
- # EXPECTED_RE_TYPE: sqlalchemy..*Select\*?\[Tuple\[builtins.int\*?, builtins.int\*?\, builtins.str\*?]\]
+ # EXPECTED_RE_TYPE: sqlalchemy..*Select\*?\[builtins.int\*?, builtins.int\*?\, builtins.str\*?]
reveal_type(s5)
def t_ambiguous_result_type_one() -> None:
stmt = select(column("q", Integer), table("x", column("y")))
- # EXPECTED_TYPE: Select[Any]
+ # EXPECTED_TYPE: Select[Unpack[.*tuple[Any, ...]]]
reveal_type(stmt)
result = session.execute(stmt)
- # EXPECTED_TYPE: Result[Any]
+ # EXPECTED_TYPE: Result[Unpack[.*tuple[Any, ...]]]
reveal_type(result)
def t_ambiguous_result_type_two() -> None:
stmt = select(column("q"))
- # EXPECTED_TYPE: Select[Tuple[Any]]
+ # EXPECTED_TYPE: Select[Any]
reveal_type(stmt)
result = session.execute(stmt)
- # EXPECTED_TYPE: Result[Any]
+ # EXPECTED_TYPE: Result[Unpack[.*tuple[Any, ...]]]
reveal_type(result)
a1 = aliased(User)
s1 = select(a1)
- # EXPECTED_TYPE: Select[Tuple[User]]
+ # EXPECTED_TYPE: Select[User]
reveal_type(s1)
s4 = select(a1.name, a1, a1, User).where(User.name == "foo")
- # EXPECTED_TYPE: Select[Tuple[str, User, User, User]]
+ # EXPECTED_TYPE: Select[str, User, User, User]
reveal_type(s4)
def t_connection_execute_multi_row_t() -> None:
result = connection.execute(multi_stmt)
- # EXPECTED_RE_TYPE: sqlalchemy.*CursorResult\[Tuple\[builtins.int\*?, builtins.str\*?\]\]
+ # EXPECTED_RE_TYPE: sqlalchemy.*CursorResult\[builtins.int\*?, builtins.str\*?\]
reveal_type(result)
row = result.one()
- # EXPECTED_RE_TYPE: sqlalchemy.*Row\[Tuple\[builtins.int\*?, builtins.str\*?\]\]
+ # EXPECTED_RE_TYPE: .*sqlalchemy.*Row\[builtins.int\*?, builtins.str\*?\].*
reveal_type(row)
x, y = row.t
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str]
- stmt: Select[Tuple[User, Other]] = select(User, Other).outerjoin(
+ stmt: Select[User, Other] = select(User, Other).outerjoin(
Other, User.id == Other.id
)
- stmt2: Select[Tuple[User, Optional[Other]]] = select(
+ stmt2: Select[User, Optional[Other]] = select(
User, Nullable(Other)
).outerjoin(Other, User.id == Other.id)
- stmt3: Select[Tuple[int, Optional[str]]] = select(
+ stmt3: Select[int, Optional[str]] = select(
User.id, Nullable(Other.name)
).outerjoin(Other, User.id == Other.id)
def go(W: Optional[Type[Other]]) -> None:
- stmt4: Select[Tuple[str, Other]] = select(
+ stmt4: Select[str, Other] = select(
NotNullable(User.value), NotNullable(W)
).where(User.value.is_not(None))
print(stmt4)