# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/python/black
- rev: 25.11.0
+ rev: 26.3.1
hooks:
- id: black
types: [rst]
exclude: README.*
additional_dependencies:
- - black==25.9.0
+ - black==26.3.1
from sqlalchemy import select
from sqlalchemy import table
-
engine = create_engine("sqlite://")
engine.execute("CREATE TABLE foo (id integer)")
from sqlalchemy import table
from sqlalchemy import text
-
engine = create_engine("sqlite://")
# don't rely on autocommit for DML and DDL
from sqlalchemy.orm import mapper
-
mapper(SomeClass, some_table, properties={"related": relationship(SomeRelatedClass)})
To work from a central :class:`_orm.registry` object::
Setting Isolation Level or DBAPI Autocommit for a Connection
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+..format: off
For an individual :class:`_engine.Connection` object that's acquired from
:meth:`.Engine.connect`, the isolation level can be set for the duration of
Not every DBAPI supports every value; if an unsupported value is used for a
certain backend, an error is raised.
+..format: on
For example, to force REPEATABLE READ on a specific connection, then
begin a transaction::
from sqlalchemy.dialects import registry
-
registry.register("mysql.foodialect", "myapp.dialect", "MyMySQLDialect")
The above will respond to ``create_engine("mysql+foodialect://")`` and load the
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import declarative_base
-
Base = declarative_base()
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import registry
-
reg = registry()
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import registry
-
reg = registry()
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
-
reg = registry()
from sqlalchemy import String
from sqlalchemy.orm import mapped_column
-
intpk = Annotated[int, mapped_column(primary_key=True)]
timestamp = Annotated[
datetime.datetime,
from sqlalchemy import UniqueConstraint
from sqlalchemy.orm import DeclarativeBase
-
metadata = MetaData()
group_users = Table(
"group_users",
from sqlalchemy import Table
from sqlalchemy.ext.asyncio import create_async_engine
-
meta = MetaData()
t1 = Table(
from sqlalchemy.orm.attributes import set_attribute
from sqlalchemy.orm.instrumentation import is_instrumented
-
registry = _reg()
from sqlalchemy.orm import sessionmaker
from . import caching_query
-
# dogpile cache regions. A home base for cache configurations.
regions = {}
from .environment import Session
from .model import Person
-
# load Person objects. cache the result in the "default" cache region
print("loading people....")
people = Session.scalars(select(Person).options(FromCache("default"))).all()
from .model import cache_address_bits
from .model import Person
-
for p in Session.scalars(
select(Person).options(joinedload(Person.addresses), cache_address_bits)
).unique():
from sqlalchemy.orm import relationship
from sqlalchemy.orm import sessionmaker
-
Base = declarative_base()
from sqlalchemy.orm import Session
from sqlalchemy.orm import with_polymorphic
-
intpk = Annotated[int, mapped_column(primary_key=True)]
str50 = Annotated[str, mapped_column(String(50))]
from sqlalchemy.orm import Session
from sqlalchemy.orm import with_polymorphic
-
intpk = Annotated[int, mapped_column(primary_key=True)]
str50 = Annotated[str, mapped_column(String(50))]
from sqlalchemy.orm import Session
from sqlalchemy.sql.expression import cast
-
Base = declarative_base()
from sqlalchemy.orm import aliased
from sqlalchemy.orm import Session
-
Base = declarative_base()
from . import Profiler
-
if __name__ == "__main__":
Profiler.main()
from sqlalchemy.orm import Session
from . import Profiler
-
Base = declarative_base()
engine = None
from sqlalchemy.orm import Session
from . import Profiler
-
Base = declarative_base()
engine = None
from sqlalchemy.orm import Session
from . import Profiler
-
Base = declarative_base()
engine = None
from sqlalchemy.orm import Session
from . import Profiler
-
Base = declarative_base()
engine = None
from sqlalchemy.sql import operators
from sqlalchemy.sql import visitors
-
echo = True
db1 = create_async_engine("sqlite+aiosqlite://", echo=echo)
db2 = create_async_engine("sqlite+aiosqlite://", echo=echo)
from sqlalchemy.sql import operators
from sqlalchemy.sql import visitors
-
echo = True
db1 = create_engine("sqlite://", echo=echo)
db2 = create_engine("sqlite://", echo=echo)
from sqlalchemy.orm import relationship
from sqlalchemy.orm import sessionmaker
-
echo = True
engine = create_engine("sqlite://", echo=echo)
from sqlalchemy.orm import relationship
from sqlalchemy.orm import Session
-
logging.basicConfig(
filename="space_invaders.log",
format="%(asctime)s,%(msecs)03d %(levelname)-5.5s %(message)s",
from sqlalchemy.orm import Session
from sqlalchemy.orm import with_loader_criteria
-
Base = declarative_base()
# this will be the current time as the test runs
from ..util.concurrency import await_fallback
from ..util.concurrency import await_only
-
if TYPE_CHECKING:
from ..engine.interfaces import ConnectArgsType
from ..engine.url import URL
from ..sql.schema import ColumnCollectionConstraint
from ..sql.schema import Index
-
_OnConflictConstraintT = Union[str, ColumnCollectionConstraint, Index, None]
_OnConflictIndexElementsT = Optional[
Iterable[Union[Column[Any], str, roles.DDLConstraintColumnRole]]
from .base import XML
from ...sql import try_cast
-
base.dialect = dialect = pyodbc.dialect
view_name = f"sys.{row[0]}"
- cursor.execute(
- """
+ cursor.execute("""
SELECT CASE transaction_isolation_level
WHEN 0 THEN NULL
WHEN 1 THEN 'READ UNCOMMITTED'
AS TRANSACTION_ISOLATION_LEVEL
FROM {}
where session_id = @@SPID
- """.format(
- view_name
- )
- )
+ """.format(view_name))
except self.dbapi.Error as err:
raise NotImplementedError(
"Can't fetch isolation level; encountered error {} when "
else "NULL as filter_definition"
)
rp = connection.execution_options(future_result=True).execute(
- sql.text(
- f"""
+ sql.text(f"""
select
ind.index_id,
ind.is_unique,
and ind.type != 0
order by
ind.name
- """
- )
+ """)
.bindparams(
sql.bindparam("tabname", tablename, ischema.CoerceUnicode()),
sql.bindparam("schname", owner, ischema.CoerceUnicode()),
do["mssql_where"] = row["filter_definition"]
rp = connection.execution_options(future_result=True).execute(
- sql.text(
- """
+ sql.text("""
select
ind_col.index_id,
col.name,
order by
ind_col.index_id,
ind_col.key_ordinal
- """
- )
+ """)
.bindparams(
sql.bindparam("tabname", tablename, ischema.CoerceUnicode()),
sql.bindparam("schname", owner, ischema.CoerceUnicode()),
):
# Foreign key constraints
s = (
- text(
- """\
+ text("""\
WITH fk_info AS (
SELECT
ischema_ref_con.constraint_schema,
ORDER BY fk_info.constraint_schema, fk_info.constraint_name,
fk_info.ordinal_position
-"""
- )
+""")
.bindparams(
sql.bindparam("tablename", tablename, ischema.CoerceUnicode()),
sql.bindparam("owner", owner, ischema.CoerceUnicode()),
from ...types import TypeDecorator
from ...types import Unicode
-
ischema = MetaData()
""" # noqa
+
import re
from .base import MSDialect
""" # noqa
-
import datetime
import decimal
import re
)
""" # noqa
+
from __future__ import annotations
from types import ModuleType
)
""" # noqa
+
from __future__ import annotations
from types import ModuleType
)
""" # noqa
+
from __future__ import annotations
from collections import defaultdict
def _is_mariadb_102(self) -> bool:
return (
self.is_mariadb
- and self._mariadb_normalized_version_info # type:ignore[operator]
+ and self._mariadb_normalized_version_info # type: ignore[operator]
> (
10,
2,
dialects are mysqlclient and PyMySQL.
""" # noqa
+
from __future__ import annotations
from typing import Any
from ...sql.selectable import NamedFromClause
from ...util.typing import Self
-
__all__ = ("Insert", "insert")
.. mariadb: https://github.com/mariadb-corporation/mariadb-connector-python
""" # noqa
+
from __future__ import annotations
import re
""" # noqa
+
from __future__ import annotations
import re
def _escape_identifier(self, value: str) -> str:
value = value.replace(
- self.escape_quote, # type:ignore[attr-defined]
- self.escape_to_quote, # type:ignore[attr-defined]
+ self.escape_quote, # type: ignore[attr-defined]
+ self.escape_to_quote, # type: ignore[attr-defined]
)
return value
The mysqldb dialect supports server-side cursors. See :ref:`mysql_ss_cursors`.
"""
+
from __future__ import annotations
import re
to the pymysql driver as well.
""" # noqa
+
from __future__ import annotations
from typing import Any
connection_uri = "mysql+pyodbc:///?odbc_connect=%s" % params
""" # noqa
+
from __future__ import annotations
import datetime
buffer = []
for row in columns:
- (name, col_type, nullable, default, extra) = (
+ name, col_type, nullable, default, extra = (
row[i] for i in (0, 1, 2, 4, 5)
)
as better integration of outputtypehandlers.
""" # noqa
+
from __future__ import annotations
import decimal
.. versionadded:: 2.0.0 added support for the python-oracledb driver.
""" # noqa
+
from __future__ import annotations
import collections
)
else:
- raise TypeError(
- """
+ raise TypeError("""
Invalid input for VECTOR: expected a list, an array.array,
or a SparseVector object.
- """
- )
+ """)
return process
from .types import TSQUERY
from .types import TSVECTOR
-
# Alias psycopg also as psycopg_async
psycopg_async = type(
"psycopg_async", (ModuleType,), {"dialect": psycopg.dialect_async}
from ...sql.expression import alias
from ...util.typing import Self
-
__all__ = ("Insert", "insert")
from ... import types as sqltypes
from ...sql import functions as sqlfunc
-
__all__ = ("HSTORE", "hstore")
# mypy: ignore-errors
from ...sql import operators
-
_getitem_precedence = operators._PRECEDENCE[operators.json_getitem_op]
_eq_precedence = operators._PRECEDENCE[operators.eq]
""" # noqa
+
import decimal
import re
def _set_client_encoding(self, dbapi_connection, client_encoding):
cursor = dbapi_connection.cursor()
- cursor.execute(
- f"""SET CLIENT_ENCODING TO '{
- client_encoding.replace("'", "''")
- }'"""
- )
+ cursor.execute(f"""
+ SET CLIENT_ENCODING TO '{client_encoding.replace("'", "''")}'""")
cursor.execute("COMMIT")
cursor.close()
`Client-side-binding cursors <https://www.psycopg.org/psycopg3/docs/advanced/cursors.html#client-side-binding-cursors>`_
""" # noqa
+
from __future__ import annotations
from collections import deque
which may be more performant.
""" # noqa
+
from __future__ import annotations
import collections.abc as collections_abc
:mod:`sqlalchemy.dialects.postgresql.psycopg2`
""" # noqa
+
from .psycopg2 import PGDialect_psycopg2
from ... import util
:paramref:`_sa.create_engine.poolclass` parameter.
""" # noqa
+
from __future__ import annotations
import asyncio
cursor.execute("select x.a, x.b from x")
assert [c[0] for c in cursor.description] == ["a", "b"]
- cursor.execute(
- """
+ cursor.execute("""
select x.a, x.b from x where a=1
union
select x.a, x.b from x where a=2
- """
- )
+ """)
assert [c[0] for c in cursor.description] == ["a", "b"], [
c[0] for c in cursor.description
]
result = conn.exec_driver_sql("select x.a, x.b from x")
assert result.keys() == ["a", "b"]
- result = conn.exec_driver_sql(
- """
+ result = conn.exec_driver_sql("""
select x.a, x.b from x where a=1
union
select x.a, x.b from x where a=2
- """
- )
+ """)
assert result.keys() == ["a", "b"]
Note that above, even though SQLAlchemy filters out the dots, *both
documentation.
''' # noqa
+
from __future__ import annotations
import datetime
fks = {}
for row in pragma_fks:
- (numerical_id, rtbl, lcol, rcol) = (row[0], row[2], row[3], row[4])
+ numerical_id, rtbl, lcol, rcol = (row[0], row[2], row[3], row[4])
if not rcol:
# no referred column, which means it was not named in the
print(conn.scalar(text("SELECT UDF()")))
""" # noqa
+
from __future__ import annotations
import math
from typing import TypeVar
from typing import Union
-
_DT = TypeVar(
"_DT", bound=Union[datetime.datetime, datetime.time, datetime.date]
)
# This module is part of SQLAlchemy and is released under
# the MIT License: https://www.opensource.org/licenses/mit-license.php
"""Defines :class:`_engine.Connection` and :class:`_engine.Engine`."""
+
from __future__ import annotations
import contextlib
dialect = dialect_cls(**dialect_args)
# assemble connection arguments
- (cargs_tup, _cparams) = dialect.create_connect_args(u)
+ cargs_tup, _cparams = dialect.create_connect_args(u)
cparams = util.immutabledict(_cparams).union(pop_kwarg("connect_args", {}))
if "async_fallback" in cparams and util.asbool(cparams["async_fallback"]):
"""Define cursor-specific result set constructs including
:class:`.CursorResult`."""
-
from __future__ import annotations
import collections
from ..util.typing import Literal
from ..util.typing import Self
-
if typing.TYPE_CHECKING:
from .base import Connection
from .default import DefaultExecutionContext
from . import url as _url
from .. import util
-
if typing.TYPE_CHECKING:
from .base import Engine
from .interfaces import _CoreAnyExecuteParams
They all share one common characteristic: None is passed through unchanged.
"""
+
from __future__ import annotations
import typing
use the key 'name'. So for most return values, each record will have a
'name' attribute..
"""
+
from __future__ import annotations
import contextlib
# the MIT License: https://www.opensource.org/licenses/mit-license.php
"""Public API functions for the event system."""
+
from __future__ import annotations
from typing import Any
from .. import exc
from .. import util
-
CANCEL = util.symbol("CANCEL")
NO_RETVAL = util.symbol("NO_RETVAL")
``Pool`` vs. ``QueuePool``) are all implemented here.
"""
+
from __future__ import annotations
import collections
instances of ``_Dispatch``.
"""
+
from __future__ import annotations
import typing
generation of deprecation notes and docstrings.
"""
+
from __future__ import annotations
import typing
an equivalent :class:`._EventKey`.
"""
+
from __future__ import annotations
import collections
:exc:`.DBAPIError`.
"""
+
from __future__ import annotations
import typing
from .. import util as _sa_util
-
_sa_util.preloaded.import_prefix("sqlalchemy.ext")
See the example ``examples/association/proxied_association.py``.
"""
+
from __future__ import annotations
import operator
],
)
# "Class has incompatible disjoint bases" - no idea
-class AsyncConnection( # type:ignore[misc]
+class AsyncConnection( # type: ignore[misc]
ProxyComparable[Connection],
StartableContext["AsyncConnection"],
AsyncConnectable,
from ...util.typing import Concatenate
from ...util.typing import ParamSpec
-
if TYPE_CHECKING:
from .engine import AsyncConnection
from .engine import AsyncEngine
""" # noqa
+
from __future__ import annotations
import dataclasses
from ..sql import literal_column
from ..sql import util as sql_util
-
log = logging.getLogger(__name__)
)
"""
+
from __future__ import annotations
from typing import Any
"""Public API functions and helpers for declarative."""
+
from __future__ import annotations
import collections
"""
+
from __future__ import annotations
from typing import Any
see the example :ref:`examples_instrumentation`.
"""
+
import weakref
from .. import util
from ..orm.instrumentation import ClassManager
from ..orm.instrumentation import InstrumentationFactory
-
INSTRUMENTATION_MANAGER = "__sa_instrumentation_manager__"
"""Attribute, elects custom instrumentation when present on a mapped class.
Mypy plugin for SQLAlchemy ORM.
"""
+
from __future__ import annotations
from typing import Callable
"""
+
from __future__ import annotations
from typing import Any
from ..util import b64decode
from ..util import b64encode
-
__all__ = ["Serializer", "Deserializer", "dumps", "loads"]
this module is legacy as 2.0 APIs are now standard.
"""
+
from .engine import Connection as Connection
from .engine import create_engine as create_engine
from .engine import Engine as Engine
in a forwards-compatible way.
"""
+
from __future__ import annotations
from typing import Any
instance only.
"""
+
from __future__ import annotations
import logging
from .util import py38
from .util.typing import Literal
-
if py38:
STACKLEVEL = True
# needed as of py3.11.0b1
through the adapter, allowing for some very sophisticated behavior.
"""
+
from __future__ import annotations
import operator
if not setup_joins:
return None
- (target, onclause, from_, flags) = setup_joins[-1]
+ target, onclause, from_, flags = setup_joins[-1]
if isinstance(
target,
if child_state not in uow.states:
child_action = (None, None)
else:
- (deleted, listonly) = uow.states[child_state]
+ deleted, listonly = uow.states[child_state]
if deleted:
child_action = (
unitofwork.DeleteState(uow, child_state),
as actively in the load/persist ORM loop.
"""
+
from __future__ import annotations
from dataclasses import is_dataclass
from ..util.typing import is_fwd_ref
from ..util.typing import is_pep593
-
if typing.TYPE_CHECKING:
from ._typing import _InstanceDict
from ._typing import _RegistryType
from .. import util
from ..engine import result
-
if TYPE_CHECKING:
from . import QueryableAttribute
from .mapper import Mapper
"""
-
from __future__ import annotations
from typing import Type
# the MIT License: https://www.opensource.org/licenses/mit-license.php
"""ORM event interfaces."""
+
from __future__ import annotations
from typing import Any
"""
-
from __future__ import annotations
from typing import Any
)
try:
- (process, labels, extra) = list(
+ process, labels, extra = list(
zip(
*[
query_entity.row_processor(context, cursor)
if primary_key_identity is not None:
mapper = query._propagate_attrs["plugin_subject"]
- (_get_clause, _get_params) = mapper._get_clause
+ _get_clause, _get_params = mapper._get_clause
# None present in ident - turn those comparisons
# into "IS NULL"
available in :class:`~sqlalchemy.orm.`.
"""
+
from __future__ import annotations
from collections import deque
in unitofwork.py.
"""
+
from __future__ import annotations
from itertools import chain
)
else:
detail = (
- f"""none of {
- ", ".join(f"'{t}'" for t in checks)
- } """
- "are resolvable by the registry"
+ "none of "
+ + ", ".join(f"'{t}'" for t in checks)
+ + " are resolvable by the registry"
)
raise orm_exc.MappedAnnotationError(
"Could not locate SQLAlchemy Core type when resolving "
database to return iterable result sets.
"""
+
from __future__ import annotations
import collections.abc as collections_abc
from ..util.typing import Self
from ..util.typing import SupportsIndex
-
if TYPE_CHECKING:
from ._typing import _EntityType
from ._typing import _ExternalEntityType
and `secondaryjoin` aspects of :func:`_orm.relationship`.
"""
+
from __future__ import annotations
import collections
) or not hasattr(attr.impl, "get_history"):
continue
- (added, unchanged, deleted) = attr.impl.get_history(
+ added, unchanged, deleted = attr.impl.get_history(
state, dict_, passive=PassiveFlag.NO_CHANGE
)
from .. import util
from ..util import topological
-
if TYPE_CHECKING:
from .dependency import DependencyProcessor
from .interfaces import MapperProperty
for mapper in self._mappers(uow):
for state in uow.mappers[mapper].difference(self.processed):
- (isdelete, listonly) = uow.states[state]
+ isdelete, listonly = uow.states[state]
if not listonly:
if isdelete:
delete_states.add(state)
def _elements(self, uow):
for mapper in self._mappers(uow):
for state in uow.mappers[mapper]:
- (isdelete, listonly) = uow.states[state]
+ isdelete, listonly = uow.states[state]
if isdelete == self.isdelete and not listonly:
yield state
"""Pool implementation classes."""
+
from __future__ import annotations
import threading
"""
-
from __future__ import annotations
from ..util.typing import Literal
"""
def __missing__(self, key: str) -> str:
- (ident, derived) = key.split(" ", 1)
+ ident, derived = key.split(" ", 1)
anonymous_counter = self.get(derived, 1)
self[derived] = anonymous_counter + 1 # type: ignore
value = f"{derived}_{anonymous_counter}"
"""Foundational utilities common to many sql modules."""
-
from __future__ import annotations
import collections
:doc:`/ext/compiler`.
"""
+
from __future__ import annotations
import collections
leep_res = self._literal_execute_expanding_parameter(
escaped_name, parameter, values
)
- (to_update, replacement_expr) = leep_res
+ to_update, replacement_expr = leep_res
to_update_sets[escaped_name] = to_update
replacement_expressions[escaped_name] = replacement_expr
within INSERT and UPDATE statements.
"""
+
from __future__ import annotations
import functools
toplevel,
kw,
):
- (_, _, implicit_return_defaults, *_) = _get_returning_modifiers(
+ _, _, implicit_return_defaults, *_ = _get_returning_modifiers(
compiler, stmt, compile_state, toplevel
)
to invoke them for a create/drop call.
"""
+
from __future__ import annotations
import contextlib
:class:`_expression.Delete`.
"""
+
from __future__ import annotations
import collections.abc as collections_abc
from ..util.typing import ParamSpec
from ..util.typing import Self
-
if typing.TYPE_CHECKING:
from ._typing import _ByArgument
from ._typing import _ColumnExpressionArgument
"""Defines the public namespace for SQL expression constructs."""
-
from __future__ import annotations
from ._dml_constructors import delete as delete
from .. import util
from ..util.typing import Literal
-
if TYPE_CHECKING:
from .elements import BindParameter
from .elements import ClauseElement
rec = lambda_element._rec
if rec.bindparam_trackers:
tracker_instrumented_fn = (
- rec.tracker_instrumented_fn # type:ignore [union-attr] # noqa: E501
+ rec.tracker_instrumented_fn # type: ignore [union-attr] # noqa: E501
)
for tracker in rec.bindparam_trackers:
tracker(
as components in SQL expressions.
"""
+
from __future__ import annotations
from abc import ABC
if existing_col is not self:
if not allow_replacements:
raise exc.DuplicateColumnError(
- f"A column with {conflicts_on} "
- f"""'{
+ f"A column with {conflicts_on} " f"""'{
self.key if conflicts_on == 'key' else self.name
- }' """
- f"is already present in table '{table.name}'."
+ }' """ f"is already present in table '{table.name}'."
)
for fk in existing_col.foreign_keys:
table.foreign_keys.remove(fk)
from ..util.typing import Protocol
from ..util.typing import Self
-
and_ = BooleanClauseList.and_
raw_columns, left, right, onclause
)
else:
- (replace_from_obj_index) = self._join_place_explicit_left_side(
+ replace_from_obj_index = self._join_place_explicit_left_side(
left
)
# mypy: allow-untyped-defs, allow-untyped-calls
"""SQL specific types."""
+
from __future__ import annotations
import collections.abc as collections_abc
from sqlalchemy import Uuid
from sqlalchemy import Table, Column, MetaData, String
-
metadata_obj = MetaData()
t = Table(
from ..util import langhelpers
from ..util.typing import Self
-
SKIP_TRAVERSE = util.symbol("skip_traverse")
COMPARE_FAILED = False
COMPARE_SUCCEEDED = True
# mypy: allow-untyped-defs, allow-untyped-calls
"""High level utilities which build upon other modules here."""
+
from __future__ import annotations
from collections import deque
adapter = ClauseAdapter(left)
ret = None
while stack:
- (right, prevright) = stack.pop()
+ right, prevright = stack.pop()
if isinstance(right, Join) and right is not stop_on:
right = right._clone()
right.onclause = adapter.traverse(right.onclause)
from ..util import await_only
from ..util.typing import Literal
-
if typing.TYPE_CHECKING:
from ..engine import Engine
from ..engine.url import URL
with open(
Path(cachedir) / "sqla_mypy_config.cfg", "w"
) as config_file:
- config_file.write(
- f"""
+ config_file.write(f"""
[mypy]\n
plugins = sqlalchemy.ext.mypy.plugin\n
show_error_codes = True\n
[mypy-sqlalchemy.*]
ignore_errors = True
- """
- )
+ """)
with open(
Path(cachedir) / "plain_mypy_config.cfg", "w"
) as config_file:
- config_file.write(
- f"""
+ config_file.write(f"""
[mypy]\n
show_error_codes = True\n
{mypy_path}
[mypy-sqlalchemy.*]
ignore_errors = True
- """
- )
+ """)
yield cachedir
@config.fixture()
import os
import sys
-
bootstrap_file = locals()["bootstrap_file"]
to_bootstrap = locals()["to_bootstrap"]
__target_fn="__target_fn", __orig_fn="__orig_fn", name=fn.__name__
)
metadata.update(format_argspec_plus(spec, grouped=False))
- code = (
- """\
+ code = """\
def %(name)s%(grouped_args)s:
return %(__target_fn)s(%(__orig_fn)s, %(apply_kw)s)
-"""
- % metadata
- )
+""" % metadata
decorated = _exec_code_in_env(
code, {"__target_fn": target, "__orig_fn": fn}, fn.__name__
)
from ..util import freethreading
from ..util import has_compiled_ext
-
try:
import cProfile
except ImportError:
from ..sql import schema
from ..util import decorator
-
log = logging.getLogger(__name__)
FOLLOWER_IDENT = None
from ...testing import is_true
from ...testing import mock
-
metadata, users = None, None
from ..util import has_refcount_gc
from ..util import inspect_getfullargspec
-
if not has_refcount_gc:
def non_refcount_gc_collect(*args):
"""Compatibility namespace for sqlalchemy.sql.types."""
-
from __future__ import annotations
from .sql.sqltypes import _Binary as _Binary
# mypy: allow-untyped-defs, allow-untyped-calls
"""Collection classes and helpers."""
+
from __future__ import annotations
import operator
unique_list as unique_list,
)
-
_T = TypeVar("_T", bound=Any)
_KT = TypeVar("_KT", bound=Any)
_VT = TypeVar("_VT", bound=Any)
from typing import Type
from typing import TypeVar
-
py314b1 = sys.version_info >= (3, 14, 0, "beta", 1)
py314 = sys.version_info >= (3, 14)
py313 = sys.version_info >= (3, 13)
modules, classes, hierarchies, attributes, functions, and methods.
"""
+
from __future__ import annotations
import collections
# more kinds of methods that use @decorator, things may have to
# be further improved in this area
if "__" in repr(spec[0]):
- code = (
- """\
+ code = """\
%(prefix)sdef %(name)s%(grouped_args)s:
return %(target_prefix)s%(target)s(%(fn)s, %(apply_pos)s)
-"""
- % metadata
- )
+""" % metadata
else:
- code = (
- """\
+ code = """\
%(prefix)sdef %(name)s%(grouped_args)s:
return %(target_prefix)s%(target)s(%(fn)s, %(apply_kw)s)
-"""
- % metadata
- )
+""" % metadata
env: Dict[str, Any] = {
targ_name: target,
def __get__(self, instance: Any, owner: Any) -> Callable[..., _T]:
if instance is None:
- return self.clslevel.__get__(owner, owner.__class__) # type:ignore
+ return self.clslevel.__get__( # type: ignore[no-any-return]
+ owner, owner.__class__
+ )
else:
- return self.func.__get__(instance, owner) # type:ignore
+ return self.func.__get__( # type: ignore[no-any-return]
+ instance, owner
+ )
def classlevel(self, func: Callable[..., Any]) -> hybridmethod[_T]:
self.clslevel = func
if frame.f_code in _warning_tags:
warning_tag_found = True
- (_suffix, _category) = _warning_tags[frame.f_code]
+ _suffix, _category = _warning_tags[frame.f_code]
category = category or _category
message = f"{message} ({_suffix})"
runtime.
"""
+
from __future__ import annotations
import sys
condition.
"""
+
from __future__ import annotations
import asyncio
from .concurrency import await_only
from .langhelpers import memoized_property
-
_T = TypeVar("_T", bound=Any)
__all__ = ["Empty", "Full", "Queue"]
needed for normal library use.
"""
+
from __future__ import annotations
from argparse import ArgumentParser
"flake8-rst-docstrings",
"pydocstyle<4.0.0",
"pygments",
- "black==25.11.0",
+ "black==26.3.1",
"slotscheck>=0.17.0",
"zimports>=0.7.0", # required by generate_tuple_map_overloads
]
from sqlalchemy.testing import provision
-
logging.basicConfig()
logging.getLogger(provision.__name__).setLevel(logging.INFO)
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import profiling
-
t1 = t2 = None
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import profiling
-
NUM_FIELDS = 10
NUM_RECORDS = 1000
from sqlalchemy.testing import fixtures
-
here = os.path.dirname(__file__)
sqla_base = os.path.normpath(os.path.join(here, "..", ".."))
installs SQLAlchemy's testing plugin into the local environment.
"""
+
import os
import sys
import pytest
-
os.environ["SQLALCHEMY_WARN_20"] = "true"
collect_ignore_glob = []
event.listen(
metadata,
"after_create",
- DDL(
- """CREATE FULLTEXT INDEX
+ DDL("""CREATE FULLTEXT INDEX
ON cattable (description)
- KEY INDEX PK_cattable"""
- ),
+ KEY INDEX PK_cattable"""),
)
event.listen(
metadata,
"after_create",
- DDL(
- """CREATE FULLTEXT INDEX
+ DDL("""CREATE FULLTEXT INDEX
ON matchtable (title)
- KEY INDEX PK_matchtable"""
- ),
+ KEY INDEX PK_matchtable"""),
)
event.listen(
@testing.fixture
def scalar_strings(self, connection):
- connection.exec_driver_sql(
- """
+ connection.exec_driver_sql("""
CREATE FUNCTION scalar_strings (
)
FROM (
VALUES ('some string'), ('some string'), ('some string')
) AS my_tab(my_string)
- """
- )
+ """)
yield
connection.exec_driver_sql("DROP FUNCTION scalar_strings")
@testing.fixture
def two_strings(self, connection):
- connection.exec_driver_sql(
- """
+ connection.exec_driver_sql("""
CREATE FUNCTION three_pairs (
)
RETURNS TABLE
FROM (
VALUES ('a', 'b'), ('c', 'd'), ('e', 'f')
) AS my_tab(s1, s2)
-"""
- )
+""")
yield
connection.exec_driver_sql("DROP FUNCTION three_pairs")
@classmethod
def setup_test_class(cls):
with testing.db.begin() as c:
- c.exec_driver_sql(
- """
+ c.exec_driver_sql("""
create or replace procedure foo(x_in IN number, x_out OUT number,
y_out OUT number, z_out OUT varchar) IS
retval number;
y_out := x_in * 15;
z_out := NULL;
end;
- """
- )
+ """)
def test_out_params(self, connection):
result = connection.execute(
connection.exec_driver_sql(
"CREATE OR REPLACE TYPE strings_t IS TABLE OF VARCHAR2 (100)"
)
- connection.exec_driver_sql(
- r"""
+ connection.exec_driver_sql(r"""
CREATE OR REPLACE FUNCTION scalar_strings (
count_in IN INTEGER, string_in IN VARCHAR2)
RETURN strings_t
RETURN l_strings;
END;
- """
- )
+ """)
yield
connection.exec_driver_sql("DROP FUNCTION scalar_strings")
connection.exec_driver_sql("DROP TYPE strings_t")
@testing.fixture
def two_strings(self, connection):
- connection.exec_driver_sql(
- """
+ connection.exec_driver_sql("""
CREATE OR REPLACE TYPE two_strings_ot
AUTHID DEFINER IS OBJECT
(
string1 VARCHAR2 (10),
string2 VARCHAR2 (10)
-)"""
- )
- connection.exec_driver_sql(
- """
+)""")
+ connection.exec_driver_sql("""
CREATE OR REPLACE TYPE two_strings_nt IS TABLE OF two_strings_ot
-"""
- )
+""")
- connection.exec_driver_sql(
- """
+ connection.exec_driver_sql("""
CREATE OR REPLACE FUNCTION three_pairs
RETURN two_strings_nt
AUTHID DEFINER
two_strings_ot ('c', 'd'),
two_strings_ot ('e', 'f'));
END;
-"""
- )
+""")
yield
connection.exec_driver_sql("DROP FUNCTION three_pairs")
connection.exec_driver_sql("DROP TYPE two_strings_nt")
# we connect as the other user.
with testing.db.begin() as conn:
- for stmt in (
- """
+ for stmt in ("""
create table %(test_schema)s.parent(
id integer primary key,
data varchar2(50)
-- so we give it to public. ideas welcome.
grant references on %(test_schema)s.parent to public;
grant references on %(test_schema)s.child to public;
- """
- % {"test_schema": testing.config.test_schema}
- ).split(";"):
+ """ % {"test_schema": testing.config.test_schema}).split(";"):
if stmt.strip():
conn.exec_driver_sql(stmt)
@classmethod
def teardown_test_class(cls):
with testing.db.begin() as conn:
- for stmt in (
- """
+ for stmt in ("""
drop table %(test_schema)s.child;
drop table %(test_schema)s.parent;
drop table local_table;
drop synonym %(test_schema)s_pt;
drop synonym %(test_schema)s.local_table;
- """
- % {"test_schema": testing.config.test_schema}
- ).split(";"):
+ """ % {"test_schema": testing.config.test_schema}).split(";"):
if stmt.strip():
conn.exec_driver_sql(stmt)
)
sql += """
CREATE SYNONYM syn_link FOR tbl_plain_v@%(link)s;
- """ % {
- "link": cls.dblink
- }
+ """ % {"link": cls.dblink}
with testing.db.begin() as conn:
for stmt in (
sql % {"test_schema": testing.config.test_schema}
Column("other", Unicode(255), index=True),
)
metadata.create_all(connection)
- connection.exec_driver_sql(
- """create index idx3 on sometable(
- lower("group"), other, upper(other))"""
- )
- connection.exec_driver_sql(
- """create index idx1 on sometable
- (("group" || col), col || other desc)"""
- )
- connection.exec_driver_sql(
- """
+ connection.exec_driver_sql("""create index idx3 on sometable(
+ lower("group"), other, upper(other))""")
+ connection.exec_driver_sql("""create index idx1 on sometable
+ (("group" || col), col || other desc)""")
+ connection.exec_driver_sql("""
create unique index idx2 on sometable
(col desc, lower(other), "group" asc)
- """
- )
+ """)
expected = [
{
# find what the primary k constraint name should be
primaryconsname = connection.scalar(
- text(
- """SELECT constraint_name
+ text("""SELECT constraint_name
FROM all_constraints
WHERE table_name = :table_name
AND owner = :owner
- AND constraint_type = 'P' """
- ),
+ AND constraint_type = 'P' """),
dict(
table_name=s_table.name.upper(),
owner=testing.db.dialect.default_schema_name.upper(),
event.listen(
metadata,
"after_create",
- DDL(
- """
+ DDL("""
CREATE OR REPLACE FUNCTION %s.version() RETURNS integer AS $$
BEGIN
return 0;
END;
-$$ LANGUAGE plpgsql;"""
- % (default_schema_name,)
- ),
+$$ LANGUAGE plpgsql;""" % (default_schema_name,)),
)
event.listen(
metadata,
conn = testing.db.connect()
trans = conn.begin()
try:
- conn.exec_driver_sql(
- """
+ conn.exec_driver_sql("""
CREATE OR REPLACE FUNCTION note(message varchar) RETURNS integer AS $$
BEGIN
RAISE NOTICE 'notice: %%', message;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
-"""
- )
+""")
conn.exec_driver_sql("SELECT note('hi there')")
conn.exec_driver_sql("SELECT note('another note')")
finally:
execute that DefaultClause upon insert."""
meta = self.metadata
- connection.execute(
- text(
- """
+ connection.execute(text("""
CREATE TABLE speedy_users
(
speedy_user_id SERIAL PRIMARY KEY,
user_name VARCHAR NOT NULL,
user_password VARCHAR NOT NULL
);
- """
- )
- )
+ """))
t = Table("speedy_users", meta, autoload_with=connection)
r = connection.execute(
t.insert(), dict(user_name="user", user_password="lala")
Column("other", String(20)),
)
metadata.create_all(connection)
- connection.exec_driver_sql(
- """
+ connection.exec_driver_sql("""
create index idx3 on party
(lower(name::text), other, lower(aname::text) desc)
- """
- )
+ """)
connection.exec_driver_sql(
"create index idx1 on party ((id || name), (other || id::text))"
)
connection.exec_driver_sql(
"create unique index idx2 on party (id) where name = 'test'"
)
- connection.exec_driver_sql(
- """
+ connection.exec_driver_sql("""
create index idx4 on party using btree
(name nulls first, lower(other), aname desc)
where name != 'foo'
- """
- )
+ """)
version = connection.dialect.server_version_info
if version >= (15,):
- connection.exec_driver_sql(
- """
+ connection.exec_driver_sql("""
create unique index zz_idx5 on party
(name desc, upper(other))
nulls not distinct
- """
- )
+ """)
expected = [
{
t1.create(connection)
# check ASC, DESC options alone
- connection.exec_driver_sql(
- """
+ connection.exec_driver_sql("""
create index idx1 on party
(id, name ASC, aname DESC)
- """
- )
+ """)
# check DESC w/ NULLS options
- connection.exec_driver_sql(
- """
+ connection.exec_driver_sql("""
create index idx2 on party
(name DESC NULLS FIRST, aname DESC NULLS LAST)
- """
- )
+ """)
# check ASC w/ NULLS options
- connection.exec_driver_sql(
- """
+ connection.exec_driver_sql("""
create index idx3 on party
(name ASC NULLS FIRST, aname ASC NULLS LAST)
- """
- )
+ """)
# reflect data
m2 = MetaData()
)
metadata.create_all(connection)
connection.exec_driver_sql("CREATE INDEX idx1 ON t (x) INCLUDE (name)")
- connection.exec_driver_sql(
- """
+ connection.exec_driver_sql("""
create index idx3 on t
(lower(name::text), other desc nulls last, lower(aname::text))
include (id, x)
- """
- )
- connection.exec_driver_sql(
- """
+ """)
+ connection.exec_driver_sql("""
create unique index idx2 on t using btree
(lower(other), (id * id)) include (id)
- """
- )
+ """)
ind = connection.dialect.get_indexes(connection, "t", None)
eq_(
from sqlalchemy.testing import ne_
from sqlalchemy.testing.assertions import expect_raises_message
-
dialect = None
@event.listens_for(p, "reset")
def reset(conn, rec, state):
canary.append(
- f"""reset_{
- 'rollback_ok'
- if state.asyncio_safe else 'no_rollback'
- }"""
+ "reset_"
+ f"{'rollback_ok' if state.asyncio_safe else 'no_rollback'}"
)
@event.listens_for(p, "checkin")
indexes"""
conn = connection
- conn.exec_driver_sql(
- """
+ conn.exec_driver_sql("""
CREATE TABLE book (
id INTEGER NOT NULL,
title VARCHAR(100) NOT NULL,
series_id INTEGER,
UNIQUE(series, series_id),
PRIMARY KEY(id)
- )"""
- )
+ )""")
book = Table("book", metadata, autoload_with=connection)
assert book.primary_key.contains_column(book.c.id)
"""test reflection of a composite primary key"""
conn = connection
- conn.exec_driver_sql(
- """
+ conn.exec_driver_sql("""
CREATE TABLE book (
id INTEGER NOT NULL,
isbn VARCHAR(50) NOT NULL,
series_id INTEGER NOT NULL,
UNIQUE(series, series_id),
PRIMARY KEY(id, isbn)
- )"""
- )
+ )""")
book = Table("book", metadata, autoload_with=connection)
assert book.primary_key.contains_column(book.c.id)
assert book.primary_key.contains_column(book.c.isbn)
@testing.requires.denormalized_names
def setup_test(self):
with testing.db.begin() as conn:
- conn.exec_driver_sql(
- """
+ conn.exec_driver_sql("""
CREATE TABLE weird_casing(
col1 char(20),
"Col2" char(20),
"col3" char(20)
)
- """
- )
+ """)
@testing.requires.denormalized_names
def teardown_test(self):
from sqlalchemy import String
from sqlalchemy.orm import declarative_base
-
Base = declarative_base()
from sqlalchemy import String
from sqlalchemy.orm import registry
-
reg: registry = registry()
from sqlalchemy.sql.schema import MetaData
from sqlalchemy.sql.schema import Table
-
reg: registry = registry()
from sqlalchemy.sql.schema import MetaData
from sqlalchemy.sql.schema import Table
-
Base = declarative_base()
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import Mapped
-
Base = declarative_base()
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import declared_attr
-
Base = declarative_base()
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import declared_attr
-
Base = declarative_base()
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import registry
-
reg: registry = registry()
Base = declarative_base()
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import registry
-
reg: registry = registry()
# TODO: also reg.as_declarative_base()
from sqlalchemy.orm.interfaces import MapperProperty
from sqlalchemy.sql.schema import ForeignKey
-
reg: registry = registry()
from sqlalchemy.orm import declared_attr
from sqlalchemy.orm import Mapped
-
Base = declarative_base()
import sqlalchemy as sa
from sqlalchemy import orm as saorm
-
Base = saorm.declarative_base()
from sqlalchemy.orm import relationship
from sqlalchemy.orm.decl_api import declared_attr
-
Base = declarative_base()
useobject=True,
)
b = Blog()
- (p1, p2, p3) = (Post(), Post(), Post())
+ p1, p2, p3 = (Post(), Post(), Post())
b.posts.append(p1)
b.posts.append(p2)
b.posts.append(p3)
from sqlalchemy.testing.schema import Table
from sqlalchemy.testing.util import picklers
-
metadata = None
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
-
__all__ = ()
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
-
Base = None
User = Address = None
from .test_typed_mapping import MappedColumnTest as _MappedColumnTest
from .test_typed_mapping import RelationshipLHSTest as _RelationshipLHSTest
-
_R = TypeVar("_R")
M = Mapped
from sqlalchemy.util import compat
from sqlalchemy.util.typing import Annotated
-
# try to differentiate between typing_extensions.TypeAliasType
# and typing.TypeAliasType
TypingTypeAliasType = getattr(typing, "TypeAliasType", TypeAliasType)
from sqlalchemy.util import compat
from sqlalchemy.util.typing import Annotated
-
# try to differentiate between typing_extensions.TypeAliasType
# and typing.TypeAliasType
TypingTypeAliasType = getattr(typing, "TypeAliasType", TypeAliasType)
s.courses.remove(c)
self.assert_(c.students == [])
- (s1, s2, s3) = (Student(), Student(), Student())
+ s1, s2, s3 = (Student(), Student(), Student())
c.students = [s1, s2, s3]
self.assert_(s2.courses == [c])
useobject=True,
)
b = Blog()
- (p1, p2, p3) = (Post(), Post(), Post())
+ p1, p2, p3 = (Post(), Post(), Post())
b.posts.append(p1)
b.posts.append(p2)
b.posts.append(p3)
test_session = fixture_session()
- (user7, user8, user9, user10) = test_session.query(User).all()
+ user7, user8, user9, user10 = test_session.query(User).all()
(
address1,
address2,
sess = fixture_session()
- (user7, user8, user9, user10) = sess.query(User).all()
- (address1, address2, address3, address4, address5) = sess.query(
+ user7, user8, user9, user10 = sess.query(User).all()
+ address1, address2, address3, address4, address5 = sess.query(
Address
).all()
sess = fixture_session()
- (user7, user8, user9, user10) = sess.query(User).all()
+ user7, user8, user9, user10 = sess.query(User).all()
expected = [(user7, 1), (user8, 3), (user9, 1), (user10, 0)]
q = sess.query(User)
)
sess = fixture_session()
- (user7, user8, user9, user10) = sess.query(User).all()
+ user7, user8, user9, user10 = sess.query(User).all()
expected = [
(user7, 1, "Name:jack"),
(user8, 3, "Name:ed"),
sess = fixture_session()
- (user7, user8, user9, user10) = sess.query(User).all()
- (address1, address2, address3, address4, address5) = sess.query(
+ user7, user8, user9, user10 = sess.query(User).all()
+ address1, address2, address3, address4, address5 = sess.query(
Address
).all()
expected = [
sess = fixture_session()
- (order1, order2, order3, order4, order5) = sess.query(Order).all()
- (item1, item2, item3, item4, item5) = sess.query(Item).all()
+ order1, order2, order3, order4, order5 = sess.query(Order).all()
+ item1, item2, item3, item4, item5 = sess.query(Item).all()
expected = [
(order1, item1),
(order1, item2),
sess.flush()
u1.username = "ed"
- (ad1, ad2) = sess.query(Address).all()
+ ad1, ad2 = sess.query(Address).all()
eq_([Address(username="jack"), Address(username="jack")], [ad1, ad2])
def go():
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
-
# TODO: more tests mapping to selects
session.expunge_all()
- (h1, h2, h3, h4, h5) = session.query(Hoho).order_by(Hoho.id).all()
+ h1, h2, h3, h4, h5 = session.query(Hoho).order_by(Hoho.id).all()
eq_(h1.hoho, althohoval)
eq_(h3.hoho, althohoval)
from sqlalchemy import create_engine
from sqlalchemy import event
-
gevent.monkey.patch_all() # noqa
logging.basicConfig() # noqa
from sqlalchemy.orm import relationship
from sqlalchemy.orm import Session
-
Base = declarative_base()
from sqlalchemy.testing.schema import pep435_enum
from sqlalchemy.types import UserDefinedType
-
if TYPE_CHECKING:
from sqlalchemy import Select
from sqlalchemy.testing.engines import all_dialects
from sqlalchemy.testing.provision import normalize_sequence
-
table1 = table(
"mytable",
column("myid", Integer),
@testing.requires.subqueries
def test_union(self, connection):
t1, t2, t3 = self.tables("t1", "t2", "t3")
- (s1, s2) = (
+ s1, s2 = (
select(t1.c.col3.label("col3"), t1.c.col4.label("col4")).where(
t1.c.col2.in_(["t1col2r1", "t1col2r2"]),
),
def test_union_ordered(self, connection):
t1, t2, t3 = self.tables("t1", "t2", "t3")
- (s1, s2) = (
+ s1, s2 = (
select(t1.c.col3.label("col3"), t1.c.col4.label("col4")).where(
t1.c.col2.in_(["t1col2r1", "t1col2r2"]),
),
def test_union_ordered_alias(self, connection):
t1, t2, t3 = self.tables("t1", "t2", "t3")
- (s1, s2) = (
+ s1, s2 = (
select(t1.c.col3.label("col3"), t1.c.col4.label("col4")).where(
t1.c.col2.in_(["t1col2r1", "t1col2r2"]),
),
from sqlalchemy.testing.assertions import expect_raises_message
from sqlalchemy.testing.provision import normalize_sequence
-
metadata = MetaData()
table1 = Table(
"table1",
from sqlalchemy import create_engine
from sqlalchemy import inspect
-
e = create_engine("sqlite://")
insp = inspect(e)
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
-
engine = create_async_engine("")
SM = async_sessionmaker(engine, class_=AsyncSession)
from sqlalchemy.orm import Session
from sqlalchemy.orm import sessionmaker
-
async_engine = create_async_engine("...")
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship
-
if TYPE_CHECKING:
from sqlalchemy.orm import InstrumentedAttribute
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import Session
-
if TYPE_CHECKING:
from sqlalchemy import Row
from sqlalchemy.orm.query import RowReturningQuery
from sqlalchemy import true
from sqlalchemy import UUID
-
m = MetaData()
from sqlalchemy import String
from sqlalchemy import true
-
# builtin.pyi stubs define object.__eq__() as returning bool, which
# can't be overridden (it's final). So for us to type `__eq__()` and
# `__ne__()`, we have to use type: ignore[override]. Test if does this mean
from black.mode import Mode
from black.mode import TargetVersion
-
home = Path(__file__).parent.parent
include_paths = (
re.compile(r"lib/sqlalchemy"),
metadata["after_line"] = ""
if clslevel:
- code = (
- '''\
+ code = '''\
@classmethod
%(async)sdef %(name)s%(grouped_args)s:
r"""%(doc)s\n """ # noqa: E501
%(line_prefix)s %(await)s%(target_cls_name)s.%(name)s(%(apply_kw_proxied)s)
%(after_line)s
-'''
- % metadata
- )
+''' % metadata
else:
- code = (
- '''\
+ code = '''\
%(async)sdef %(name)s%(grouped_args)s:
r"""%(doc)s\n """ # noqa: E501
%(line_prefix)s %(await)s%(self_arg)s._proxied.%(name)s(%(apply_kw_proxied)s)
%(after_line)s
-''' # noqa: E501
- % metadata
- )
+''' % metadata # noqa: E501
buf.write(textwrap.indent(code, " "))
from sqlalchemy.orm import util
-
if TYPE_CHECKING:
from typing import Any
from typing import List
# in case it requires a version pin
pydocstyle
pygments
- black==25.11.0
+ black==26.3.1
slotscheck>=0.17.0
# required by generate_tuple_map_overloads