]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Type annotations for sqlalchemy.orm.dynamic
authorMaksim Latysh <m.latysh@godeltech.com>
Thu, 23 Mar 2023 12:37:40 +0000 (08:37 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Thu, 28 Sep 2023 18:56:27 +0000 (14:56 -0400)
Closes: #9039
Pull-request: https://github.com/sqlalchemy/sqlalchemy/pull/9039
Pull-request-sha: dae9138f8019bf7e2b305ec869734373a997350d

Change-Id: I89a96213558928877abffc450aeeecbbd196aa92

lib/sqlalchemy/orm/attributes.py
lib/sqlalchemy/orm/dynamic.py
lib/sqlalchemy/orm/query.py
lib/sqlalchemy/orm/relationships.py
lib/sqlalchemy/orm/writeonly.py

index 00d3f50792b108ea8766b1ca20c0695227c88c97..1098359ecaa229ec4ad352209ca49a00b40dcc84 100644 (file)
@@ -791,7 +791,7 @@ class AttributeEventToken:
 
     __slots__ = "impl", "op", "parent_token"
 
-    def __init__(self, attribute_impl, op):
+    def __init__(self, attribute_impl: AttributeImpl, op: util.symbol):
         self.impl = attribute_impl
         self.op = op
         self.parent_token = self.impl.parent_token
@@ -834,7 +834,7 @@ class AttributeImpl:
         self,
         class_: _ExternalEntityType[_O],
         key: str,
-        callable_: _LoaderCallable,
+        callable_: Optional[_LoaderCallable],
         dispatch: _Dispatch[QueryableAttribute[Any]],
         trackparent: bool = False,
         compare_function: Optional[Callable[..., bool]] = None,
@@ -2618,7 +2618,7 @@ def register_attribute_impl(
         # TODO: this appears to be the WriteOnlyAttributeImpl /
         # DynamicAttributeImpl constructor which is hardcoded
         impl = cast("Type[WriteOnlyAttributeImpl]", impl_class)(
-            class_, key, typecallable, dispatch, **kw
+            class_, key, dispatch, **kw
         )
     elif uselist:
         impl = CollectionAttributeImpl(
index 7514d86cd791077893f00379311cf4c7ca0798b0..1d0c03606c8038f8d4da17cc21add6dfb2897136 100644 (file)
@@ -4,7 +4,6 @@
 #
 # This module is part of SQLAlchemy and is released under
 # the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
 
 
 """Dynamic collection API.
@@ -23,13 +22,19 @@ from __future__ import annotations
 from typing import Any
 from typing import Iterable
 from typing import Iterator
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Type
 from typing import TYPE_CHECKING
 from typing import TypeVar
+from typing import Union
 
 from . import attributes
 from . import exc as orm_exc
 from . import relationships
 from . import util as orm_util
+from .base import PassiveFlag
 from .query import Query
 from .session import object_session
 from .writeonly import AbstractCollectionWriter
@@ -39,15 +44,28 @@ from .writeonly import WriteOnlyLoader
 from .. import util
 from ..engine import result
 
+
 if TYPE_CHECKING:
+    from . import QueryableAttribute
+    from .mapper import Mapper
+    from .relationships import _RelationshipOrderByArg
     from .session import Session
-
+    from .state import InstanceState
+    from .util import AliasedClass
+    from ..event import _Dispatch
+    from ..sql.elements import ColumnElement
 
 _T = TypeVar("_T", bound=Any)
 
 
-class DynamicCollectionHistory(WriteOnlyHistory):
-    def __init__(self, attr, state, passive, apply_to=None):
+class DynamicCollectionHistory(WriteOnlyHistory[_T]):
+    def __init__(
+        self,
+        attr: DynamicAttributeImpl,
+        state: InstanceState[_T],
+        passive: PassiveFlag,
+        apply_to: Optional[DynamicCollectionHistory[_T]] = None,
+    ) -> None:
         if apply_to:
             coll = AppenderQuery(attr, state).autoflush(False)
             self.unchanged_items = util.OrderedIdentitySet(coll)
@@ -63,21 +81,21 @@ class DynamicCollectionHistory(WriteOnlyHistory):
 
 class DynamicAttributeImpl(WriteOnlyAttributeImpl):
     _supports_dynamic_iteration = True
-    collection_history_cls = DynamicCollectionHistory
+    collection_history_cls = DynamicCollectionHistory[Any]
+    query_class: Type[AppenderMixin[Any]]  # type: ignore[assignment]
 
     def __init__(
         self,
-        class_,
-        key,
-        typecallable,
-        dispatch,
-        target_mapper,
-        order_by,
-        query_class=None,
-        **kw,
-    ):
+        class_: Union[Type[Any], AliasedClass[Any]],
+        key: str,
+        dispatch: _Dispatch[QueryableAttribute[Any]],
+        target_mapper: Mapper[_T],
+        order_by: _RelationshipOrderByArg,
+        query_class: Optional[Type[AppenderMixin[_T]]] = None,
+        **kw: Any,
+    ) -> None:
         attributes.AttributeImpl.__init__(
-            self, class_, key, typecallable, dispatch, **kw
+            self, class_, key, None, dispatch, **kw
         )
         self.target_mapper = target_mapper
         if order_by:
@@ -102,21 +120,23 @@ class AppenderMixin(AbstractCollectionWriter[_T]):
 
     """
 
-    query_class = None
+    query_class: Optional[Type[Query[_T]]] = None
+    _order_by_clauses: Tuple[ColumnElement[Any], ...]
 
-    def __init__(self, attr, state):
-        Query.__init__(self, attr.target_mapper, None)
+    def __init__(
+        self, attr: DynamicAttributeImpl, state: InstanceState[_T]
+    ) -> None:
+        Query.__init__(
+            self,  # type: ignore[arg-type]
+            attr.target_mapper,
+            None,
+        )
         super().__init__(attr, state)
 
     @property
-    def session(self) -> Session:
+    def session(self) -> Optional[Session]:
         sess = object_session(self.instance)
-        if (
-            sess is not None
-            and self.autoflush
-            and sess.autoflush
-            and self.instance in sess
-        ):
+        if sess is not None and sess.autoflush and self.instance in sess:
             sess.flush()
         if not orm_util.has_identity(self.instance):
             return None
@@ -127,7 +147,7 @@ class AppenderMixin(AbstractCollectionWriter[_T]):
     def session(self, session: Session) -> None:
         self.sess = session
 
-    def _iter(self):
+    def _iter(self) -> Union[result.ScalarResult[_T], result.Result[_T]]:
         sess = self.session
         if sess is None:
             state = attributes.instance_state(self.instance)
@@ -141,9 +161,9 @@ class AppenderMixin(AbstractCollectionWriter[_T]):
 
             return result.IteratorResult(
                 result.SimpleResultMetaData([self.attr.class_.__name__]),
-                self.attr._get_collection_history(
+                self.attr._get_collection_history(  # type: ignore[arg-type]
                     attributes.instance_state(self.instance),
-                    attributes.PASSIVE_NO_INITIALIZE,
+                    PassiveFlag.PASSIVE_NO_INITIALIZE,
                 ).added_items,
                 _source_supports_scalars=True,
             ).scalars()
@@ -155,15 +175,15 @@ class AppenderMixin(AbstractCollectionWriter[_T]):
         def __iter__(self) -> Iterator[_T]:
             ...
 
-    def __getitem__(self, index: Any) -> _T:
+    def __getitem__(self, index: Any) -> Union[_T, List[_T]]:
         sess = self.session
         if sess is None:
             return self.attr._get_collection_history(
                 attributes.instance_state(self.instance),
-                attributes.PASSIVE_NO_INITIALIZE,
+                PassiveFlag.PASSIVE_NO_INITIALIZE,
             ).indexed(index)
         else:
-            return self._generate(sess).__getitem__(index)
+            return self._generate(sess).__getitem__(index)  # type: ignore[no-any-return] # noqa: E501
 
     def count(self) -> int:
         sess = self.session
@@ -171,13 +191,16 @@ class AppenderMixin(AbstractCollectionWriter[_T]):
             return len(
                 self.attr._get_collection_history(
                     attributes.instance_state(self.instance),
-                    attributes.PASSIVE_NO_INITIALIZE,
+                    PassiveFlag.PASSIVE_NO_INITIALIZE,
                 ).added_items
             )
         else:
             return self._generate(sess).count()
 
-    def _generate(self, sess=None):
+    def _generate(
+        self,
+        sess: Optional[Session] = None,
+    ) -> Query[_T]:
         # note we're returning an entirely new Query class instance
         # here without any assignment capabilities; the class of this
         # query is determined by the session.
@@ -259,7 +282,7 @@ class AppenderMixin(AbstractCollectionWriter[_T]):
         self._remove_impl(item)
 
 
-class AppenderQuery(AppenderMixin[_T], Query[_T]):
+class AppenderQuery(AppenderMixin[_T], Query[_T]):  # type: ignore[misc]
     """A dynamic query that supports basic collection storage operations.
 
     Methods on :class:`.AppenderQuery` include all methods of
@@ -270,7 +293,7 @@ class AppenderQuery(AppenderMixin[_T], Query[_T]):
     """
 
 
-def mixin_user_query(cls):
+def mixin_user_query(cls: Any) -> type[AppenderMixin[Any]]:
     """Return a new class with AppenderQuery functionality layered over."""
     name = "Appender" + cls.__name__
     return type(name, (AppenderMixin, cls), {"query_class": cls})
index b1678bce18e513102391253b342efe2255c5a5bb..5da7ee9b22840465a03ac69067eb6f34ac091c51 100644 (file)
@@ -235,7 +235,9 @@ class Query(
 
     def __init__(
         self,
-        entities: Sequence[_ColumnsClauseArgument[Any]],
+        entities: Union[
+            _ColumnsClauseArgument[Any], Sequence[_ColumnsClauseArgument[Any]]
+        ],
         session: Optional[Session] = None,
     ):
         """Construct a :class:`_query.Query` directly.
@@ -278,7 +280,10 @@ class Query(
         return self
 
     def _set_entities(
-        self, entities: Iterable[_ColumnsClauseArgument[Any]]
+        self,
+        entities: Union[
+            _ColumnsClauseArgument[Any], Iterable[_ColumnsClauseArgument[Any]]
+        ],
     ) -> None:
         self._raw_columns = [
             coercions.expect(
index bb5ddb872588bf9249232b9099cab9c3a05e5534..7ea30d7b180ae0c46dc089dab4e3ac93afc2b455 100644 (file)
@@ -271,6 +271,9 @@ class _RelationshipArg(Generic[_T1, _T2]):
             self.resolved = attr_value
 
 
+_RelationshipOrderByArg = Union[Literal[False], Tuple[ColumnElement[Any], ...]]
+
+
 class _RelationshipArgs(NamedTuple):
     """stores user-passed parameters that are resolved at mapper configuration
     time.
@@ -289,10 +292,7 @@ class _RelationshipArgs(NamedTuple):
         Optional[_RelationshipJoinConditionArgument],
         Optional[ColumnElement[Any]],
     ]
-    order_by: _RelationshipArg[
-        _ORMOrderByArgument,
-        Union[Literal[None, False], Tuple[ColumnElement[Any], ...]],
-    ]
+    order_by: _RelationshipArg[_ORMOrderByArgument, _RelationshipOrderByArg]
     foreign_keys: _RelationshipArg[
         Optional[_ORMColCollectionArgument], Set[ColumnElement[Any]]
     ]
@@ -341,7 +341,7 @@ class RelationshipProperty(
     secondaryjoin: Optional[ColumnElement[bool]]
     secondary: Optional[FromClause]
     _join_condition: JoinCondition
-    order_by: Union[Literal[False], Tuple[ColumnElement[Any], ...]]
+    order_by: _RelationshipOrderByArg
 
     _user_defined_foreign_keys: Set[ColumnElement[Any]]
     _calculated_foreign_keys: Set[ColumnElement[Any]]
index 0f245835b095fa9816027e286e428a4599a1ded0..416a0399f93ea3bbcc6b97d1f0e4fd5dc6b2448d 100644 (file)
@@ -4,8 +4,6 @@
 #
 # This module is part of SQLAlchemy and is released under
 # the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
-
 
 """Write-only collection API.
 
@@ -21,12 +19,17 @@ object must be executed each time.
 from __future__ import annotations
 
 from typing import Any
+from typing import Collection
+from typing import Dict
 from typing import Generic
 from typing import Iterable
+from typing import Iterator
+from typing import List
 from typing import NoReturn
 from typing import Optional
 from typing import overload
 from typing import Tuple
+from typing import Type
 from typing import TYPE_CHECKING
 from typing import TypeVar
 from typing import Union
@@ -36,9 +39,10 @@ from . import attributes
 from . import interfaces
 from . import relationships
 from . import strategies
+from .base import NEVER_SET
 from .base import object_mapper
 from .base import PassiveFlag
-from .relationships import RelationshipDirection
+from .base import RelationshipDirection
 from .. import exc
 from .. import inspect
 from .. import log
@@ -53,22 +57,38 @@ from ..sql.dml import Update
 from ..util.typing import Literal
 
 if TYPE_CHECKING:
+    from . import QueryableAttribute
     from ._typing import _InstanceDict
-    from .attributes import _AdaptedCollectionProtocol
     from .attributes import AttributeEventToken
-    from .attributes import CollectionAdapter
     from .base import LoaderCallableStatus
+    from .collections import _AdaptedCollectionProtocol
+    from .collections import CollectionAdapter
+    from .mapper import Mapper
+    from .relationships import _RelationshipOrderByArg
     from .state import InstanceState
+    from .util import AliasedClass
+    from ..event import _Dispatch
+    from ..sql.selectable import FromClause
     from ..sql.selectable import Select
 
-
 _T = TypeVar("_T", bound=Any)
 
 
-class WriteOnlyHistory:
+class WriteOnlyHistory(Generic[_T]):
     """Overrides AttributeHistory to receive append/remove events directly."""
 
-    def __init__(self, attr, state, passive, apply_to=None):
+    unchanged_items: util.OrderedIdentitySet
+    added_items: util.OrderedIdentitySet
+    deleted_items: util.OrderedIdentitySet
+    _reconcile_collection: bool
+
+    def __init__(
+        self,
+        attr: WriteOnlyAttributeImpl,
+        state: InstanceState[_T],
+        passive: PassiveFlag,
+        apply_to: Optional[WriteOnlyHistory[_T]] = None,
+    ) -> None:
         if apply_to:
             if passive & PassiveFlag.SQL_OK:
                 raise exc.InvalidRequestError(
@@ -90,18 +110,18 @@ class WriteOnlyHistory:
             self._reconcile_collection = False
 
     @property
-    def added_plus_unchanged(self):
+    def added_plus_unchanged(self) -> List[_T]:
         return list(self.added_items.union(self.unchanged_items))
 
     @property
-    def all_items(self):
+    def all_items(self) -> List[_T]:
         return list(
             self.added_items.union(self.unchanged_items).union(
                 self.deleted_items
             )
         )
 
-    def as_history(self):
+    def as_history(self) -> attributes.History:
         if self._reconcile_collection:
             added = self.added_items.difference(self.unchanged_items)
             deleted = self.deleted_items.intersection(self.unchanged_items)
@@ -114,13 +134,13 @@ class WriteOnlyHistory:
             )
         return attributes.History(list(added), list(unchanged), list(deleted))
 
-    def indexed(self, index):
+    def indexed(self, index: Union[int, slice]) -> Union[List[_T], _T]:
         return list(self.added_items)[index]
 
-    def add_added(self, value):
+    def add_added(self, value: _T) -> None:
         self.added_items.add(value)
 
-    def add_removed(self, value):
+    def add_removed(self, value: _T) -> None:
         if value in self.added_items:
             self.added_items.remove(value)
         else:
@@ -130,35 +150,41 @@ class WriteOnlyHistory:
 class WriteOnlyAttributeImpl(
     attributes.HasCollectionAdapter, attributes.AttributeImpl
 ):
-    uses_objects = True
-    default_accepts_scalar_loader = False
-    supports_population = False
-    _supports_dynamic_iteration = False
-    collection = False
-    dynamic = True
-    order_by = ()
-    collection_history_cls = WriteOnlyHistory
+    uses_objects: bool = True
+    default_accepts_scalar_loader: bool = False
+    supports_population: bool = False
+    _supports_dynamic_iteration: bool = False
+    collection: bool = False
+    dynamic: bool = True
+    order_by: _RelationshipOrderByArg = ()
+    collection_history_cls: Type[WriteOnlyHistory[Any]] = WriteOnlyHistory
+
+    query_class: Type[WriteOnlyCollection[Any]]
 
     def __init__(
         self,
-        class_,
-        key,
-        typecallable,
-        dispatch,
-        target_mapper,
-        order_by,
-        **kw,
+        class_: Union[Type[Any], AliasedClass[Any]],
+        key: str,
+        dispatch: _Dispatch[QueryableAttribute[Any]],
+        target_mapper: Mapper[_T],
+        order_by: _RelationshipOrderByArg,
+        **kw: Any,
     ):
-        super().__init__(class_, key, typecallable, dispatch, **kw)
+        super().__init__(class_, key, None, dispatch, **kw)
         self.target_mapper = target_mapper
         self.query_class = WriteOnlyCollection
         if order_by:
             self.order_by = tuple(order_by)
 
-    def get(self, state, dict_, passive=attributes.PASSIVE_OFF):
-        if not passive & attributes.SQL_OK:
+    def get(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        passive: PassiveFlag = PassiveFlag.PASSIVE_OFF,
+    ) -> Union[util.OrderedIdentitySet, WriteOnlyCollection[Any]]:
+        if not passive & PassiveFlag.SQL_OK:
             return self._get_collection_history(
-                state, attributes.PASSIVE_NO_INITIALIZE
+                state, PassiveFlag.PASSIVE_NO_INITIALIZE
             ).added_items
         else:
             return self.query_class(self, state)
@@ -204,24 +230,34 @@ class WriteOnlyAttributeImpl(
     ) -> Union[
         Literal[LoaderCallableStatus.PASSIVE_NO_RESULT], CollectionAdapter
     ]:
-        if not passive & attributes.SQL_OK:
+        data: Collection[Any]
+        if not passive & PassiveFlag.SQL_OK:
             data = self._get_collection_history(state, passive).added_items
         else:
             history = self._get_collection_history(state, passive)
             data = history.added_plus_unchanged
-        return DynamicCollectionAdapter(data)  # type: ignore
+        return DynamicCollectionAdapter(data)  # type: ignore[return-value]
 
     @util.memoized_property
-    def _append_token(self):
+    def _append_token(  # type:ignore[override]
+        self,
+    ) -> attributes.AttributeEventToken:
         return attributes.AttributeEventToken(self, attributes.OP_APPEND)
 
     @util.memoized_property
-    def _remove_token(self):
+    def _remove_token(  # type:ignore[override]
+        self,
+    ) -> attributes.AttributeEventToken:
         return attributes.AttributeEventToken(self, attributes.OP_REMOVE)
 
     def fire_append_event(
-        self, state, dict_, value, initiator, collection_history=None
-    ):
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        value: Any,
+        initiator: Optional[AttributeEventToken],
+        collection_history: Optional[WriteOnlyHistory[Any]] = None,
+    ) -> None:
         if collection_history is None:
             collection_history = self._modified_event(state, dict_)
 
@@ -234,8 +270,13 @@ class WriteOnlyAttributeImpl(
             self.sethasparent(attributes.instance_state(value), state, True)
 
     def fire_remove_event(
-        self, state, dict_, value, initiator, collection_history=None
-    ):
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        value: Any,
+        initiator: Optional[AttributeEventToken],
+        collection_history: Optional[WriteOnlyHistory[Any]] = None,
+    ) -> None:
         if collection_history is None:
             collection_history = self._modified_event(state, dict_)
 
@@ -247,18 +288,20 @@ class WriteOnlyAttributeImpl(
         for fn in self.dispatch.remove:
             fn(state, value, initiator or self._remove_token)
 
-    def _modified_event(self, state, dict_):
+    def _modified_event(
+        self, state: InstanceState[Any], dict_: _InstanceDict
+    ) -> WriteOnlyHistory[Any]:
         if self.key not in state.committed_state:
             state.committed_state[self.key] = self.collection_history_cls(
                 self, state, PassiveFlag.PASSIVE_NO_FETCH
             )
 
-        state._modified_event(dict_, self, attributes.NEVER_SET)
+        state._modified_event(dict_, self, NEVER_SET)
 
         # this is a hack to allow the entities.ComparableEntity fixture
         # to work
         dict_[self.key] = True
-        return state.committed_state[self.key]
+        return state.committed_state[self.key]  # type: ignore[no-any-return]
 
     def set(
         self,
@@ -321,25 +364,38 @@ class WriteOnlyAttributeImpl(
                 collection_history=collection_history,
             )
 
-    def delete(self, *args, **kwargs):
+    def delete(self, *args: Any, **kwargs: Any) -> NoReturn:
         raise NotImplementedError()
 
-    def set_committed_value(self, state, dict_, value):
+    def set_committed_value(
+        self, state: InstanceState[Any], dict_: _InstanceDict, value: Any
+    ) -> NoReturn:
         raise NotImplementedError(
             "Dynamic attributes don't support collection population."
         )
 
-    def get_history(self, state, dict_, passive=attributes.PASSIVE_NO_FETCH):
+    def get_history(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        passive: PassiveFlag = PassiveFlag.PASSIVE_NO_FETCH,
+    ) -> attributes.History:
         c = self._get_collection_history(state, passive)
         return c.as_history()
 
     def get_all_pending(
-        self, state, dict_, passive=attributes.PASSIVE_NO_INITIALIZE
-    ):
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        passive: PassiveFlag = PassiveFlag.PASSIVE_NO_INITIALIZE,
+    ) -> List[Tuple[InstanceState[Any], Any]]:
         c = self._get_collection_history(state, passive)
         return [(attributes.instance_state(x), x) for x in c.all_items]
 
-    def _get_collection_history(self, state, passive):
+    def _get_collection_history(
+        self, state: InstanceState[Any], passive: PassiveFlag
+    ) -> WriteOnlyHistory[Any]:
+        c: WriteOnlyHistory[Any]
         if self.key in state.committed_state:
             c = state.committed_state[self.key]
         else:
@@ -347,7 +403,7 @@ class WriteOnlyAttributeImpl(
                 self, state, PassiveFlag.PASSIVE_NO_FETCH
             )
 
-        if state.has_identity and (passive & attributes.INIT_OK):
+        if state.has_identity and (passive & PassiveFlag.INIT_OK):
             return self.collection_history_cls(
                 self, state, passive, apply_to=c
             )
@@ -356,34 +412,34 @@ class WriteOnlyAttributeImpl(
 
     def append(
         self,
-        state,
-        dict_,
-        value,
-        initiator,
-        passive=attributes.PASSIVE_NO_FETCH,
-    ):
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        value: Any,
+        initiator: Optional[AttributeEventToken],
+        passive: PassiveFlag = PassiveFlag.PASSIVE_NO_FETCH,
+    ) -> None:
         if initiator is not self:
             self.fire_append_event(state, dict_, value, initiator)
 
     def remove(
         self,
-        state,
-        dict_,
-        value,
-        initiator,
-        passive=attributes.PASSIVE_NO_FETCH,
-    ):
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        value: Any,
+        initiator: Optional[AttributeEventToken],
+        passive: PassiveFlag = PassiveFlag.PASSIVE_NO_FETCH,
+    ) -> None:
         if initiator is not self:
             self.fire_remove_event(state, dict_, value, initiator)
 
     def pop(
         self,
-        state,
-        dict_,
-        value,
-        initiator,
-        passive=attributes.PASSIVE_NO_FETCH,
-    ):
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        value: Any,
+        initiator: Optional[AttributeEventToken],
+        passive: PassiveFlag = PassiveFlag.PASSIVE_NO_FETCH,
+    ) -> None:
         self.remove(state, dict_, value, initiator, passive=passive)
 
 
@@ -392,7 +448,7 @@ class WriteOnlyAttributeImpl(
 class WriteOnlyLoader(strategies.AbstractRelationshipLoader, log.Identified):
     impl_class = WriteOnlyAttributeImpl
 
-    def init_class_attribute(self, mapper):
+    def init_class_attribute(self, mapper: Mapper[Any]) -> None:
         self.is_class_level = True
         if not self.uselist or self.parent_property.direction not in (
             interfaces.ONETOMANY,
@@ -404,7 +460,7 @@ class WriteOnlyLoader(strategies.AbstractRelationshipLoader, log.Identified):
                 "uselist=False." % self.parent_property
             )
 
-        strategies._register_attribute(
+        strategies._register_attribute(  # type: ignore[no-untyped-call]
             self.parent_property,
             mapper,
             useobject=True,
@@ -418,19 +474,21 @@ class WriteOnlyLoader(strategies.AbstractRelationshipLoader, log.Identified):
 class DynamicCollectionAdapter:
     """simplified CollectionAdapter for internal API consistency"""
 
-    def __init__(self, data):
+    data: Collection[Any]
+
+    def __init__(self, data: Collection[Any]):
         self.data = data
 
-    def __iter__(self):
+    def __iter__(self) -> Iterator[Any]:
         return iter(self.data)
 
-    def _reset_empty(self):
+    def _reset_empty(self) -> None:
         pass
 
-    def __len__(self):
+    def __len__(self) -> int:
         return len(self.data)
 
-    def __bool__(self):
+    def __bool__(self) -> bool:
         return True
 
 
@@ -443,8 +501,14 @@ class AbstractCollectionWriter(Generic[_T]):
     if not TYPE_CHECKING:
         __slots__ = ()
 
-    def __init__(self, attr, state):
-        self.instance = instance = state.obj()
+    instance: _T
+    _from_obj: Tuple[FromClause, ...]
+
+    def __init__(self, attr: WriteOnlyAttributeImpl, state: InstanceState[_T]):
+        instance = state.obj()
+        if TYPE_CHECKING:
+            assert instance
+        self.instance = instance
         self.attr = attr
 
         mapper = object_mapper(instance)
@@ -535,7 +599,7 @@ class WriteOnlyCollection(AbstractCollectionWriter[_T]):
             stmt = stmt.order_by(*self._order_by_clauses)
         return stmt
 
-    def insert(self) -> Insert[_T]:
+    def insert(self) -> Insert:
         """For one-to-many collections, produce a :class:`_dml.Insert` which
         will insert new rows in terms of this this instance-local
         :class:`_orm.WriteOnlyCollection`.
@@ -561,7 +625,7 @@ class WriteOnlyCollection(AbstractCollectionWriter[_T]):
                 "INSERT along with add_all()."
             )
 
-        dict_ = {}
+        dict_: Dict[str, Any] = {}
 
         for l, r in prop.synchronize_pairs:
             fn = prop._get_attr_w_warn_on_none(
@@ -575,14 +639,14 @@ class WriteOnlyCollection(AbstractCollectionWriter[_T]):
 
         return insert(self.attr.target_mapper).values(**dict_)
 
-    def update(self) -> Update[_T]:
+    def update(self) -> Update:
         """Produce a :class:`_dml.Update` which will refer to rows in terms
         of this instance-local :class:`_orm.WriteOnlyCollection`.
 
         """
         return update(self.attr.target_mapper).where(*self._where_criteria)
 
-    def delete(self) -> Delete[_T]:
+    def delete(self) -> Delete:
         """Produce a :class:`_dml.Delete` which will refer to rows in terms
         of this instance-local :class:`_orm.WriteOnlyCollection`.