matrix:
# emulated wheels on linux take too much time, split wheels into multiple runs
python:
- - "cp38-* cp39-*"
+ - "cp39-*"
- "cp310-* cp311-*"
- "cp312-* cp313-*"
wheel_mode:
- "macos-latest"
- "macos-13"
python-version:
- - "3.8"
- "3.9"
- "3.10"
- "3.11"
- "3.12"
- - "3.13.0-alpha - 3.13"
+ - "3.13"
- "pypy-3.10"
build-type:
- "cext"
architecture: x86
- os: "macos-latest"
architecture: x64
- - os: "macos-latest"
- python-version: "3.8"
- os: "macos-latest"
python-version: "3.9"
# macos 13: uses intel macs. no arm64, x86
strategy:
matrix:
python-version:
- - cp38-cp38
- cp39-cp39
- cp310-cp310
- cp311-cp311
os:
- "ubuntu-latest"
python-version:
- - "3.8"
- "3.9"
- "3.10"
- "3.11"
- "3.12"
- - "3.13.0-alpha - 3.13"
+ - "3.13"
tox-env:
- mypy
- pep484
os: "ubuntu-latest"
exclude:
# run pep484 only on 3.10+
- - tox-env: pep484
- python-version: "3.8"
- tox-env: pep484
python-version: "3.9"
.. change::
:tags: change, installation
- :tickets: 10357
+ :tickets: 10357, 12029
- Python 3.8 or above is now required; support for Python 3.7 is dropped as
- this version is EOL.
+ Python 3.9 or above is now required; support for Python 3.8 and 3.7 is
+ dropped as these versions are EOL.
SQLAlchemy 2.1 supports the following platforms:
-* cPython 3.8 and higher
+* cPython 3.9 and higher
* Python-3 compatible versions of `PyPy <http://pypy.org/>`_
.. versionchanged:: 2.1
- SQLAlchemy now targets Python 3.8 and above.
+ SQLAlchemy now targets Python 3.9 and above.
Supported Installation Methods
parent_id: Mapped[int] = mapped_column(primary_key=True)
# use a list
- children: Mapped[List["Child"]] = relationship()
+ children: Mapped[list["Child"]] = relationship()
class Child(Base):
Or for a ``set``, illustrated in the same
``Parent.children`` collection::
- from typing import Set
from sqlalchemy import ForeignKey
from sqlalchemy.orm import DeclarativeBase
parent_id: Mapped[int] = mapped_column(primary_key=True)
# use a set
- children: Mapped[Set["Child"]] = relationship()
+ children: Mapped[set["Child"]] = relationship()
class Child(Base):
child_id: Mapped[int] = mapped_column(primary_key=True)
parent_id: Mapped[int] = mapped_column(ForeignKey("parent.id"))
-.. note:: If using Python 3.8, annotations for collections need
- to use ``typing.List`` or ``typing.Set``, e.g. ``Mapped[List["Child"]]`` or
- ``Mapped[Set["Child"]]``; the ``list`` and ``set`` Python built-ins
- don't yet support generic annotation in these Python versions, such as::
-
- from typing import List
-
-
- class Parent(Base):
- __tablename__ = "parent"
-
- parent_id: Mapped[int] = mapped_column(primary_key=True)
-
- # use a List, Python 3.8 and earlier
- children: Mapped[List["Child"]] = relationship()
-
When using mappings without the :class:`_orm.Mapped` annotation, such as when
using :ref:`imperative mappings <orm_imperative_mapping>` or untyped
Python code, as well as in a few special cases, the collection class for a
from ..sql.compiler import RM_RENDERED_NAME
from ..sql.compiler import RM_TYPE
from ..sql.type_api import TypeEngine
-from ..util import compat
from ..util.typing import Literal
from ..util.typing import Self
from ..util.typing import TupleAny
assert not self._tuplefilter
return self._make_new_metadata(
- keymap=compat.dict_union(
- self._keymap,
- {
- new: keymap_by_position[idx]
- for idx, new in enumerate(
- invoked_statement._all_selected_columns
- )
- if idx in keymap_by_position
- },
- ),
+ keymap=self._keymap
+ | {
+ new: keymap_by_position[idx]
+ for idx, new in enumerate(
+ invoked_statement._all_selected_columns
+ )
+ if idx in keymap_by_position
+ },
unpickled=self._unpickled,
processors=self._processors,
tuplefilter=None,
is_mypy = is_re = True
expected_msg = f'Revealed type is "{expected_msg}"'
- if mypy_14 and util.py39:
- # use_lowercase_names, py39 and above
- # https://github.com/python/mypy/blob/304997bfb85200fb521ac727ee0ce3e6085e5278/mypy/options.py#L363 # noqa: E501
-
- # skip first character which could be capitalized
- # "List item x not found" type of message
- expected_msg = expected_msg[0] + re.sub(
- (
- r"\b(List|Tuple|Dict|Set)\b"
- if is_type
- else r"\b(List|Tuple|Dict|Set|Type)\b"
- ),
- lambda m: m.group(1).lower(),
- expected_msg[1:],
- )
-
if mypy_14 and util.py310:
# use_or_syntax, py310 and above
# https://github.com/python/mypy/blob/304997bfb85200fb521ac727ee0ce3e6085e5278/mypy/options.py#L368 # noqa: E501
return exclusions.skip_if(check)
- @property
- def python39(self):
- return exclusions.only_if(
- lambda: util.py39, "Python 3.9 or above required"
- )
-
@property
def python310(self):
return exclusions.only_if(
from .compat import py311 as py311
from .compat import py312 as py312
from .compat import py313 as py313
-from .compat import py39 as py39
from .compat import pypy as pypy
from .compat import win32 as win32
from .concurrency import await_ as await_
py312 = sys.version_info >= (3, 12)
py311 = sys.version_info >= (3, 11)
py310 = sys.version_info >= (3, 10)
-py39 = sys.version_info >= (3, 9)
pypy = platform.python_implementation() == "PyPy"
cpython = platform.python_implementation() == "CPython"
)
-if py39:
- # python stubs don't have a public type for this. not worth
- # making a protocol
- def md5_not_for_security() -> Any:
- return hashlib.md5(usedforsecurity=False)
-
-else:
-
- def md5_not_for_security() -> Any:
- return hashlib.md5()
-
-
-if typing.TYPE_CHECKING or py39:
- # pep 584 dict union
- dict_union = operator.or_ # noqa
-else:
-
- def dict_union(a: dict, b: dict) -> dict:
- a = a.copy()
- a.update(b)
- return a
+# python stubs don't have a public type for this. not worth
+# making a protocol
+def md5_not_for_security() -> Any:
+ return hashlib.md5(usedforsecurity=False)
if py310:
else:
def get_annotations(obj: Any) -> Mapping[str, Any]:
- # it's been observed that cls.__annotations__ can be non present.
- # it's not clear what causes this, running under tox py38 it
- # happens, running straight pytest it doesnt
-
# https://docs.python.org/3/howto/annotations.html#annotations-howto
if isinstance(obj, type):
ann = obj.__dict__.get("__annotations__", None)
else:
- ann = getattr(obj, "__annotations__", None)
+ ann = obj.__annotations__
if ann is None:
return _collections.EMPTY_DICT
def is_newtype(type_: Optional[_AnnotationScanType]) -> TypeGuard[NewType]:
return hasattr(type_, "__supertype__")
-
- # doesn't work in 3.8, 3.7 as it passes a closure, not an
+ # doesn't work in 3.9, 3.8, 3.7 as it passes a closure, not an
# object instance
# return isinstance(type_, NewType)
"Operating System :: OS Independent",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
- "Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: Implementation :: PyPy",
"Topic :: Database :: Front-Ends",
]
-requires-python = ">=3.8"
+requires-python = ">=3.9"
dependencies = [
"typing-extensions >= 4.6.0",
]
[tool.black]
line-length = 79
-target-version = ['py38']
+target-version = ['py39']
[tool.zimports]
i2 = util.immutabledict({"a": 42, 42: "a"})
eq_(str(i2), "immutabledict({'a': 42, 42: 'a'})")
- @testing.requires.python39
def test_pep584(self):
i = util.immutabledict({"a": 2})
with expect_raises_message(TypeError, "object is immutable"):
import setuptools # noqa: F401
except ImportError:
testing.skip_test("setuptools is required")
- with mock.patch("setuptools.setup", mock.MagicMock()), mock.patch.dict(
- "os.environ",
- {"DISABLE_SQLALCHEMY_CEXT": "", "REQUIRE_SQLALCHEMY_CEXT": ""},
+ with (
+ mock.patch("setuptools.setup", mock.MagicMock()),
+ mock.patch.dict(
+ "os.environ",
+ {"DISABLE_SQLALCHEMY_CEXT": "", "REQUIRE_SQLALCHEMY_CEXT": ""},
+ ),
):
import setup
"""
engine = testing_engine()
_server_version = [None]
- with mock.patch.object(
- engine.dialect,
- "_get_server_version_info",
- lambda conn: engine.dialect._parse_server_version(
- _server_version[0]
+ with (
+ mock.patch.object(
+ engine.dialect,
+ "_get_server_version_info",
+ lambda conn: engine.dialect._parse_server_version(
+ _server_version[0]
+ ),
+ ),
+ mock.patch.object(
+ engine.dialect, "_set_mariadb", lambda *arg: None
+ ),
+ mock.patch.object(
+ engine.dialect,
+ "get_isolation_level",
+ lambda *arg: "REPEATABLE READ",
),
- ), mock.patch.object(
- engine.dialect, "_set_mariadb", lambda *arg: None
- ), mock.patch.object(
- engine.dialect,
- "get_isolation_level",
- lambda *arg: "REPEATABLE READ",
):
def go(server_version):
# TODO: this test is assuming too much of arbitrary dialects and would
# be better suited tested against a single mock dialect that does not
# have any special behaviors
- with patch.object(
- testing.db.dialect, "dbapi", Mock(Error=DBAPIError)
- ), patch.object(
- testing.db.dialect, "loaded_dbapi", Mock(Error=DBAPIError)
- ), patch.object(
- testing.db.dialect, "is_disconnect", lambda *arg: False
- ), patch.object(
- testing.db.dialect,
- "do_execute",
- Mock(side_effect=NonStandardException),
- ), patch.object(
- testing.db.dialect.execution_ctx_cls,
- "handle_dbapi_exception",
- Mock(),
+ with (
+ patch.object(testing.db.dialect, "dbapi", Mock(Error=DBAPIError)),
+ patch.object(
+ testing.db.dialect, "loaded_dbapi", Mock(Error=DBAPIError)
+ ),
+ patch.object(
+ testing.db.dialect, "is_disconnect", lambda *arg: False
+ ),
+ patch.object(
+ testing.db.dialect,
+ "do_execute",
+ Mock(side_effect=NonStandardException),
+ ),
+ patch.object(
+ testing.db.dialect.execution_ctx_cls,
+ "handle_dbapi_exception",
+ Mock(),
+ ),
):
with testing.db.connect() as conn:
assert_raises(
engine = engines.testing_engine()
close_mock = Mock()
- with mock.patch.object(
- engine._connection_cls,
- "begin",
- Mock(side_effect=Exception("boom")),
- ), mock.patch.object(engine._connection_cls, "close", close_mock):
+ with (
+ mock.patch.object(
+ engine._connection_cls,
+ "begin",
+ Mock(side_effect=Exception("boom")),
+ ),
+ mock.patch.object(engine._connection_cls, "close", close_mock),
+ ):
with expect_raises_message(Exception, "boom"):
with engine.begin():
pass
# as part of create
# note we can't use an event to ensure begin() is not called
# because create also blocks events from happening
- with mock.patch.object(
- e1.dialect, "initialize", side_effect=init
- ) as m1, mock.patch.object(
- e1._connection_cls, "begin"
- ) as begin_mock:
+ with (
+ mock.patch.object(
+ e1.dialect, "initialize", side_effect=init
+ ) as m1,
+ mock.patch.object(e1._connection_cls, "begin") as begin_mock,
+ ):
@event.listens_for(e1, "connect", insert=True)
def go1(dbapi_conn, xyz):
def conn_tracker(conn, opt):
opt["conn_tracked"] = True
- with mock.patch.object(
- engine.dialect, "set_connection_execution_options"
- ) as conn_opt, mock.patch.object(
- engine.dialect, "set_engine_execution_options"
- ) as engine_opt:
+ with (
+ mock.patch.object(
+ engine.dialect, "set_connection_execution_options"
+ ) as conn_opt,
+ mock.patch.object(
+ engine.dialect, "set_engine_execution_options"
+ ) as engine_opt,
+ ):
e2 = engine.execution_options(e1="opt_e1")
c1 = engine.connect()
c2 = c1.execution_options(c1="opt_c1")
nonlocal init_connection
init_connection = connection
- with mock.patch.object(
- e._connection_cls, "begin"
- ) as mock_begin, mock.patch.object(
- e.dialect, "initialize", Mock(side_effect=mock_initialize)
- ) as mock_init:
+ with (
+ mock.patch.object(e._connection_cls, "begin") as mock_begin,
+ mock.patch.object(
+ e.dialect, "initialize", Mock(side_effect=mock_initialize)
+ ) as mock_init,
+ ):
conn = e.connect()
eq_(mock_begin.mock_calls, [])
# "safe" datatypes so that the DBAPI does not actually need
# setinputsizes() called in order to work.
- with mock.patch.object(
- engine.dialect, "bind_typing", BindTyping.SETINPUTSIZES
- ), mock.patch.object(
- engine.dialect, "do_set_input_sizes", do_set_input_sizes
- ), mock.patch.object(
- engine.dialect.execution_ctx_cls, "pre_exec", pre_exec
+ with (
+ mock.patch.object(
+ engine.dialect, "bind_typing", BindTyping.SETINPUTSIZES
+ ),
+ mock.patch.object(
+ engine.dialect, "do_set_input_sizes", do_set_input_sizes
+ ),
+ mock.patch.object(
+ engine.dialect.execution_ctx_cls, "pre_exec", pre_exec
+ ),
):
yield engine, canary
def test_underscore_replacement(self, connection_no_trans):
conn = connection_no_trans
- with mock.patch.object(
- conn.dialect, "set_isolation_level"
- ) as mock_sil, mock.patch.object(
- conn.dialect,
- "_gen_allowed_isolation_levels",
- mock.Mock(return_value=["READ COMMITTED", "REPEATABLE READ"]),
+ with (
+ mock.patch.object(conn.dialect, "set_isolation_level") as mock_sil,
+ mock.patch.object(
+ conn.dialect,
+ "_gen_allowed_isolation_levels",
+ mock.Mock(return_value=["READ COMMITTED", "REPEATABLE READ"]),
+ ),
):
conn.execution_options(isolation_level="REPEATABLE_READ")
dbapi_conn = conn.connection.dbapi_connection
def test_casing_replacement(self, connection_no_trans):
conn = connection_no_trans
- with mock.patch.object(
- conn.dialect, "set_isolation_level"
- ) as mock_sil, mock.patch.object(
- conn.dialect,
- "_gen_allowed_isolation_levels",
- mock.Mock(return_value=["READ COMMITTED", "REPEATABLE READ"]),
+ with (
+ mock.patch.object(conn.dialect, "set_isolation_level") as mock_sil,
+ mock.patch.object(
+ conn.dialect,
+ "_gen_allowed_isolation_levels",
+ mock.Mock(return_value=["READ COMMITTED", "REPEATABLE READ"]),
+ ),
):
conn.execution_options(isolation_level="repeatable_read")
dbapi_conn = conn.connection.dbapi_connection
event.listen(engine, "rollback_twophase", harness.rollback_twophase)
event.listen(engine, "commit_twophase", harness.commit_twophase)
- with mock.patch.object(
- engine.dialect, "do_rollback", harness.do_rollback
- ), mock.patch.object(engine.dialect, "do_commit", harness.do_commit):
+ with (
+ mock.patch.object(
+ engine.dialect, "do_rollback", harness.do_rollback
+ ),
+ mock.patch.object(engine.dialect, "do_commit", harness.do_commit),
+ ):
yield harness
event.remove(engine, "rollback", harness.rollback)
# the thing here that emits the warning is the correct path
from sqlalchemy.pool.base import _finalize_fairy
- with mock.patch.object(
- pool._dialect,
- "do_rollback",
- mock.Mock(side_effect=Exception("can't run rollback")),
- ), mock.patch("sqlalchemy.util.warn") as m:
+ with (
+ mock.patch.object(
+ pool._dialect,
+ "do_rollback",
+ mock.Mock(side_effect=Exception("can't run rollback")),
+ ),
+ mock.patch("sqlalchemy.util.warn") as m,
+ ):
_finalize_fairy(
None, rec, pool, ref, echo, transaction_was_reset=False
)
Bar.__mapper__
-# EXPECTED_MYPY: "Type[HasUpdatedAt]" has no attribute "__mapper__"
+# EXPECTED_MYPY: "type[HasUpdatedAt]" has no attribute "__mapper__"
HasUpdatedAt.__mapper__
-# EXPECTED_MYPY: "Type[SomeAbstract]" has no attribute "__mapper__"
+# EXPECTED_MYPY: "type[SomeAbstract]" has no attribute "__mapper__"
SomeAbstract.__mapper__
a1 = A(id=5, ordering=10)
-# EXPECTED_MYPY: Argument "parents" to "A" has incompatible type "List[A]"; expected "Mapped[Any]" # noqa
+# EXPECTED_MYPY: Argument "parents" to "A" has incompatible type "list[A]"; expected "Mapped[Any]" # noqa
a2 = A(parents=[a1])
B, collection_class=ordering_list("ordering")
)
- # EXPECTED: Left hand assignment 'cs: "List[B]"' not compatible with ORM mapped expression of type "Mapped[List[C]]" # noqa
+ # EXPECTED: Left hand assignment 'cs: "list[B]"' not compatible with ORM mapped expression of type "Mapped[list[C]]" # noqa
cs: List[B] = relationship(C, uselist=True)
- # EXPECTED: Left hand assignment 'cs_2: "B"' not compatible with ORM mapped expression of type "Mapped[List[C]]" # noqa
+ # EXPECTED: Left hand assignment 'cs_2: "B"' not compatible with ORM mapped expression of type "Mapped[list[C]]" # noqa
cs_2: B = relationship(C, uselist=True)
# EXPECTED_MYPY: List item 1 has incompatible type "A"; expected "B"
a1 = A(bs=[B(data="b"), A()])
-# EXPECTED_MYPY: Incompatible types in assignment (expression has type "List[B]", variable has type "Set[B]") # noqa
+# EXPECTED_MYPY: Incompatible types in assignment (expression has type "list[B]", variable has type "set[B]") # noqa
x: Set[B] = a1.bs
bs: Set[B] = relationship(B, uselist=True, back_populates="a")
- # EXPECTED: Left hand assignment 'another_bs: "Set[B]"' not compatible with ORM mapped expression of type "Mapped[B]" # noqa
+ # EXPECTED: Left hand assignment 'another_bs: "set[B]"' not compatible with ORM mapped expression of type "Mapped[B]" # noqa
another_bs: Set[B] = relationship(B, viewonly=True)
attr_type.fail()
def test_column_named_twice(self):
- with expect_warnings(
- "On class 'Foo', Column object 'x' named directly multiple "
- "times, only one will be used: x, y. Consider using "
- "orm.synonym instead"
- ), expect_raises(exc.DuplicateColumnError):
+ with (
+ expect_warnings(
+ "On class 'Foo', Column object 'x' named directly multiple "
+ "times, only one will be used: x, y. Consider using "
+ "orm.synonym instead"
+ ),
+ expect_raises(exc.DuplicateColumnError),
+ ):
class Foo(Base):
__tablename__ = "foo"
@testing.variation("style", ["old", "new"])
def test_column_repeated_under_prop(self, style):
- with expect_warnings(
- "On class 'Foo', Column object 'x' named directly multiple "
- "times, only one will be used: x, y, z. Consider using "
- "orm.synonym instead"
- ), expect_raises(exc.DuplicateColumnError):
+ with (
+ expect_warnings(
+ "On class 'Foo', Column object 'x' named directly multiple "
+ "times, only one will be used: x, y, z. Consider using "
+ "orm.synonym instead"
+ ),
+ expect_raises(exc.DuplicateColumnError),
+ ):
if style.old:
class Foo(Base):
foo: Mapped[str]
bar: Mapped[str] = mapped_column()
- with _dataclass_mixin_warning(
- "_BaseMixin", "'create_user', 'update_user'"
- ), _dataclass_mixin_warning("SubMixin", "'foo', 'bar'"):
+ with (
+ _dataclass_mixin_warning(
+ "_BaseMixin", "'create_user', 'update_user'"
+ ),
+ _dataclass_mixin_warning("SubMixin", "'foo', 'bar'"),
+ ):
class User(SubMixin, Base):
__tablename__ = "sys_user"
def test_two_joins_adaption(self):
a, c, d = self.tables.a, self.tables.c, self.tables.d
- with _aliased_join_warning(r"C\(c\)"), _aliased_join_warning(
- r"D\(d\)"
+ with (
+ _aliased_join_warning(r"C\(c\)"),
+ _aliased_join_warning(r"D\(d\)"),
):
q = self._two_join_fixture()._compile_state()
def test_two_joins_sql(self):
q = self._two_join_fixture()
- with _aliased_join_warning(r"C\(c\)"), _aliased_join_warning(
- r"D\(d\)"
+ with (
+ _aliased_join_warning(r"C\(c\)"),
+ _aliased_join_warning(r"D\(d\)"),
):
self.assert_compile(
q,
engine = {"e1": e1, "e2": e2, "e3": e3}[expected_engine_name]
- with mock.patch(
- "sqlalchemy.orm.context.ORMCompileState.orm_setup_cursor_result"
- ), mock.patch(
- "sqlalchemy.orm.context.ORMCompileState.orm_execute_statement"
- ), mock.patch(
- "sqlalchemy.orm.bulk_persistence."
- "BulkORMInsert.orm_execute_statement"
- ), mock.patch(
- "sqlalchemy.orm.bulk_persistence."
- "BulkUDCompileState.orm_setup_cursor_result"
+ with (
+ mock.patch(
+ "sqlalchemy.orm.context.ORMCompileState."
+ "orm_setup_cursor_result"
+ ),
+ mock.patch(
+ "sqlalchemy.orm.context.ORMCompileState.orm_execute_statement"
+ ),
+ mock.patch(
+ "sqlalchemy.orm.bulk_persistence."
+ "BulkORMInsert.orm_execute_statement"
+ ),
+ mock.patch(
+ "sqlalchemy.orm.bulk_persistence."
+ "BulkUDCompileState.orm_setup_cursor_result"
+ ),
):
sess.execute(statement)
u2 = User(name="u1", id=1)
sess.add(u2)
- with expect_raises(sa.exc.IntegrityError), expect_warnings(
- "New instance"
+ with (
+ expect_raises(sa.exc.IntegrityError),
+ expect_warnings("New instance"),
):
sess.commit()
u2 = User(name="u1", id=1)
sess.add(u2)
- with expect_raises(sa.exc.IntegrityError), expect_warnings(
- "New instance"
+ with (
+ expect_raises(sa.exc.IntegrityError),
+ expect_warnings("New instance"),
):
sess.commit()
def fail(*arg, **kw):
raise BaseException("some base exception")
- with mock.patch.object(
- testing.db.dialect, "do_rollback", side_effect=fail
- ) as fail_mock, mock.patch.object(
- testing.db.dialect,
- "do_commit",
- side_effect=testing.db.dialect.do_commit,
- ) as succeed_mock:
+ with (
+ mock.patch.object(
+ testing.db.dialect, "do_rollback", side_effect=fail
+ ) as fail_mock,
+ mock.patch.object(
+ testing.db.dialect,
+ "do_commit",
+ side_effect=testing.db.dialect.do_commit,
+ ) as succeed_mock,
+ ):
# sess.begin() -> commit(). why would do_rollback() be called?
# because of connection pool finalize_fairy *after* the commit.
# this will cause the conn.close() in session.commit() to fail,
else:
return self.context.rowcount
- with patch.object(
- config.db.dialect, "supports_sane_multi_rowcount", False
- ), patch("sqlalchemy.engine.cursor.CursorResult.rowcount", rowcount):
+ with (
+ patch.object(
+ config.db.dialect, "supports_sane_multi_rowcount", False
+ ),
+ patch("sqlalchemy.engine.cursor.CursorResult.rowcount", rowcount),
+ ):
Foo = self.classes.Foo
s1 = self._fixture()
f1s1 = Foo(value="f1 value")
eq_(f1s1.version_id, 2)
def test_update_delete_no_plain_rowcount(self):
- with patch.object(
- config.db.dialect, "supports_sane_rowcount", False
- ), patch.object(
- config.db.dialect, "supports_sane_multi_rowcount", False
+ with (
+ patch.object(config.db.dialect, "supports_sane_rowcount", False),
+ patch.object(
+ config.db.dialect, "supports_sane_multi_rowcount", False
+ ),
):
Foo = self.classes.Foo
s1 = self._fixture()
n1.related.append(n2)
- with patch.object(
- config.db.dialect, "supports_sane_rowcount", False
- ), patch.object(
- config.db.dialect, "supports_sane_multi_rowcount", False
+ with (
+ patch.object(config.db.dialect, "supports_sane_rowcount", False),
+ patch.object(
+ config.db.dialect, "supports_sane_multi_rowcount", False
+ ),
):
s2 = Session(bind=s.connection(bind_arguments=dict(mapper=Node)))
s2.query(Node).filter(Node.id == n2.id).update({"version_id": 3})
r = conn.execute(select(self.table).limit(1))
r.fetchone()
- with mock.patch.object(
- r, "_soft_close", raise_
- ), testing.expect_raises_message(IOError, "random non-DBAPI"):
+ with (
+ mock.patch.object(r, "_soft_close", raise_),
+ testing.expect_raises_message(IOError, "random non-DBAPI"),
+ ):
r.first()
r.close()
t1 = Test()
-# EXPECTED_RE_TYPE: .*[dD]ict\[.*str, Any\]
+# EXPECTED_RE_TYPE: .*dict\[.*str, Any\]
reveal_type(t1.data)
# EXPECTED_TYPE: UUID
rows1 = q.all()
- # EXPECTED_RE_TYPE: builtins.[Ll]ist\[.*User\*?\]
+ # EXPECTED_RE_TYPE: builtins.list\[.*User\*?\]
reveal_type(rows1)
q2 = sess.query(User.id).filter_by(id=7)
rows2 = q2.all()
- # EXPECTED_TYPE: List[.*Row[.*int].*]
+ # EXPECTED_TYPE: list[.*Row[.*int].*]
reveal_type(rows2)
# test #8280
from __future__ import annotations
-from typing import Tuple
-
from sqlalchemy import Column
from sqlalchemy import column
from sqlalchemy import create_engine
# EXPECTED_TYPE: User
reveal_type(q1.one())
- # EXPECTED_TYPE: List[User]
+ # EXPECTED_TYPE: list[User]
reveal_type(q1.all())
# mypy switches to builtins.list for some reason here
- # EXPECTED_RE_TYPE: .*\.[Ll]ist\[.*Row\*?\[.*User\].*\]
+ # EXPECTED_RE_TYPE: .*\.list\[.*Row\*?\[.*User\].*\]
reveal_type(q1.only_return_tuples(True).all())
- # EXPECTED_TYPE: List[Tuple[User]]
+ # EXPECTED_TYPE: list[tuple[User]]
reveal_type(q1.tuples().all())
q2 = q1.tuples()
- # EXPECTED_TYPE: Tuple[int, str]
+ # EXPECTED_TYPE: tuple[int, str]
reveal_type(q2.one())
r1 = q2.one()
# this one unfortunately is not working in mypy.
# pylance gets the correct type
- # EXPECTED_TYPE: Select[Tuple[int, Any]]
+ # EXPECTED_TYPE: Select[tuple[int, Any]]
# when experimenting with having a separate TypedSelect class for typing,
# mypy would downgrade to Any rather than picking the basemost type.
# with typing integrated into Select etc. we can at least get a Select
reveal_type(s2)
# so a fully explicit type may be given
- s2_typed: Select[Tuple[int, str]] = select(User.id, s1.c.name)
+ s2_typed: Select[tuple[int, str]] = select(User.id, s1.c.name)
- # EXPECTED_TYPE: Select[Tuple[int, str]]
+ # EXPECTED_TYPE: Select[tuple[int, str]]
reveal_type(s2_typed)
# plain FromClause etc we at least get Select
def t_connection_execute_multi() -> None:
result = connection.execute(multi_stmt).t
- # EXPECTED_RE_TYPE: sqlalchemy.*TupleResult\[Tuple\[builtins.int\*?, builtins.str\*?\]\]
+ # EXPECTED_RE_TYPE: sqlalchemy.*TupleResult\[tuple\[builtins.int\*?, builtins.str\*?\]\]
reveal_type(result)
row = result.one()
- # EXPECTED_RE_TYPE: Tuple\[builtins.int\*?, builtins.str\*?\]
+ # EXPECTED_RE_TYPE: tuple\[builtins.int\*?, builtins.str\*?\]
reveal_type(row)
x, y = row
def t_connection_execute_single() -> None:
result = connection.execute(single_stmt).t
- # EXPECTED_RE_TYPE: sqlalchemy.*TupleResult\[Tuple\[builtins.str\*?\]\]
+ # EXPECTED_RE_TYPE: sqlalchemy.*TupleResult\[tuple\[builtins.str\*?\]\]
reveal_type(result)
row = result.one()
- # EXPECTED_RE_TYPE: Tuple\[builtins.str\*?\]
+ # EXPECTED_RE_TYPE: tuple\[builtins.str\*?\]
reveal_type(row)
(x,) = row
def t_connection_execute_single_row_scalar() -> None:
result = connection.execute(single_stmt).t
- # EXPECTED_RE_TYPE: sqlalchemy.*TupleResult\[Tuple\[builtins.str\*?\]\]
+ # EXPECTED_RE_TYPE: sqlalchemy.*TupleResult\[tuple\[builtins.str\*?\]\]
reveal_type(result)
x = result.scalar()
def t_session_execute_multi() -> None:
result = session.execute(multi_stmt).t
- # EXPECTED_RE_TYPE: sqlalchemy.*TupleResult\[Tuple\[builtins.int\*?, builtins.str\*?\]\]
+ # EXPECTED_RE_TYPE: sqlalchemy.*TupleResult\[tuple\[builtins.int\*?, builtins.str\*?\]\]
reveal_type(result)
row = result.one()
- # EXPECTED_RE_TYPE: Tuple\[builtins.int\*?, builtins.str\*?\]
+ # EXPECTED_RE_TYPE: tuple\[builtins.int\*?, builtins.str\*?\]
reveal_type(row)
x, y = row
def t_session_execute_single() -> None:
result = session.execute(single_stmt).t
- # EXPECTED_RE_TYPE: sqlalchemy.*TupleResult\[Tuple\[builtins.str\*?\]\]
+ # EXPECTED_RE_TYPE: sqlalchemy.*TupleResult\[tuple\[builtins.str\*?\]\]
reveal_type(result)
row = result.one()
- # EXPECTED_RE_TYPE: Tuple\[builtins.str\*?\]
+ # EXPECTED_RE_TYPE: tuple\[builtins.str\*?\]
reveal_type(row)
(x,) = row
async def t_async_connection_execute_multi() -> None:
result = (await async_connection.execute(multi_stmt)).t
- # EXPECTED_RE_TYPE: sqlalchemy.*TupleResult\[Tuple\[builtins.int\*?, builtins.str\*?\]\]
+ # EXPECTED_RE_TYPE: sqlalchemy.*TupleResult\[tuple\[builtins.int\*?, builtins.str\*?\]\]
reveal_type(result)
row = result.one()
- # EXPECTED_RE_TYPE: Tuple\[builtins.int\*?, builtins.str\*?\]
+ # EXPECTED_RE_TYPE: tuple\[builtins.int\*?, builtins.str\*?\]
reveal_type(row)
x, y = row
async def t_async_connection_execute_single() -> None:
result = (await async_connection.execute(single_stmt)).t
- # EXPECTED_RE_TYPE: sqlalchemy.*TupleResult\[Tuple\[builtins.str\*?\]\]
+ # EXPECTED_RE_TYPE: sqlalchemy.*TupleResult\[tuple\[builtins.str\*?\]\]
reveal_type(result)
row = result.one()
- # EXPECTED_RE_TYPE: Tuple\[builtins.str\*?\]
+ # EXPECTED_RE_TYPE: tuple\[builtins.str\*?\]
reveal_type(row)
(x,) = row
async def t_async_session_execute_multi() -> None:
result = (await async_session.execute(multi_stmt)).t
- # EXPECTED_RE_TYPE: sqlalchemy.*TupleResult\[Tuple\[builtins.int\*?, builtins.str\*?\]\]
+ # EXPECTED_RE_TYPE: sqlalchemy.*TupleResult\[tuple\[builtins.int\*?, builtins.str\*?\]\]
reveal_type(result)
row = result.one()
- # EXPECTED_RE_TYPE: Tuple\[builtins.int\*?, builtins.str\*?\]
+ # EXPECTED_RE_TYPE: tuple\[builtins.int\*?, builtins.str\*?\]
reveal_type(row)
x, y = row
async def t_async_session_execute_single() -> None:
result = (await async_session.execute(single_stmt)).t
- # EXPECTED_RE_TYPE: sqlalchemy.*TupleResult\[Tuple\[builtins.str\*?\]\]
+ # EXPECTED_RE_TYPE: sqlalchemy.*TupleResult\[tuple\[builtins.str\*?\]\]
reveal_type(result)
row = result.one()
- # EXPECTED_RE_TYPE: Tuple\[builtins.str\*?\]
+ # EXPECTED_RE_TYPE: tuple\[builtins.str\*?\]
reveal_type(row)
(x,) = row
async def t_async_connection_stream_multi() -> None:
result = (await async_connection.stream(multi_stmt)).t
- # EXPECTED_RE_TYPE: sqlalchemy.*AsyncTupleResult\[Tuple\[builtins.int\*?, builtins.str\*?\]\]
+ # EXPECTED_RE_TYPE: sqlalchemy.*AsyncTupleResult\[tuple\[builtins.int\*?, builtins.str\*?\]\]
reveal_type(result)
row = await result.one()
- # EXPECTED_RE_TYPE: Tuple\[builtins.int\*?, builtins.str\*?\]
+ # EXPECTED_RE_TYPE: tuple\[builtins.int\*?, builtins.str\*?\]
reveal_type(row)
x, y = row
async def t_async_connection_stream_single() -> None:
result = (await async_connection.stream(single_stmt)).t
- # EXPECTED_RE_TYPE: sqlalchemy.*AsyncTupleResult\[Tuple\[builtins.str\*?\]\]
+ # EXPECTED_RE_TYPE: sqlalchemy.*AsyncTupleResult\[tuple\[builtins.str\*?\]\]
reveal_type(result)
row = await result.one()
- # EXPECTED_RE_TYPE: Tuple\[builtins.str\*?\]
+ # EXPECTED_RE_TYPE: tuple\[builtins.str\*?\]
reveal_type(row)
(x,) = row
async def t_async_session_stream_multi() -> None:
result = (await async_session.stream(multi_stmt)).t
- # EXPECTED_RE_TYPE: sqlalchemy.*TupleResult\[Tuple\[builtins.int\*?, builtins.str\*?\]\]
+ # EXPECTED_RE_TYPE: sqlalchemy.*TupleResult\[tuple\[builtins.int\*?, builtins.str\*?\]\]
reveal_type(result)
row = await result.one()
- # EXPECTED_RE_TYPE: Tuple\[builtins.int\*?, builtins.str\*?\]
+ # EXPECTED_RE_TYPE: tuple\[builtins.int\*?, builtins.str\*?\]
reveal_type(row)
x, y = row
async def t_async_session_stream_single() -> None:
result = (await async_session.stream(single_stmt)).t
- # EXPECTED_RE_TYPE: sqlalchemy.*AsyncTupleResult\[Tuple\[builtins.str\*?\]\]
+ # EXPECTED_RE_TYPE: sqlalchemy.*AsyncTupleResult\[tuple\[builtins.str\*?\]\]
reveal_type(result)
row = await result.one()
- # EXPECTED_RE_TYPE: Tuple\[builtins.str\*?\]
+ # EXPECTED_RE_TYPE: tuple\[builtins.str\*?\]
reveal_type(row)
(x,) = row
cov: True
extras=
- py{3,38,39,310,311,312,313}: {[greenletextras]extras}
+ py{3,39,310,311,312,313}: {[greenletextras]extras}
- py{38,39,310}-sqlite_file: sqlcipher
+ py{39,310}-sqlite_file: sqlcipher
postgresql: postgresql
postgresql: postgresql_pg8000
postgresql: postgresql_psycopg
sqlite-nogreenlet: EXTRA_SQLITE_DRIVERS={env:EXTRA_SQLITE_DRIVERS:--dbdriver sqlite --dbdriver pysqlite_numeric}
- py{37,38,39}-sqlite_file: EXTRA_SQLITE_DRIVERS={env:EXTRA_SQLITE_DRIVERS:--dbdriver sqlite --dbdriver aiosqlite --dbdriver pysqlcipher}
+ py{39}-sqlite_file: EXTRA_SQLITE_DRIVERS={env:EXTRA_SQLITE_DRIVERS:--dbdriver sqlite --dbdriver aiosqlite --dbdriver pysqlcipher}
# omit pysqlcipher for Python 3.10
py{3,310,311,312}-sqlite_file: EXTRA_SQLITE_DRIVERS={env:EXTRA_SQLITE_DRIVERS:--dbdriver sqlite --dbdriver aiosqlite}