From: Maksim Latysh Date: Thu, 23 Mar 2023 12:37:40 +0000 (-0400) Subject: Type annotations for sqlalchemy.orm.dynamic X-Git-Tag: rel_2_0_22~24^2 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=0a12d22d6d63bae1c695f0afc90f9cb6d825b82e;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git Type annotations for sqlalchemy.orm.dynamic Closes: #9039 Pull-request: https://github.com/sqlalchemy/sqlalchemy/pull/9039 Pull-request-sha: dae9138f8019bf7e2b305ec869734373a997350d Change-Id: I89a96213558928877abffc450aeeecbbd196aa92 --- diff --git a/lib/sqlalchemy/orm/attributes.py b/lib/sqlalchemy/orm/attributes.py index 00d3f50792..1098359eca 100644 --- a/lib/sqlalchemy/orm/attributes.py +++ b/lib/sqlalchemy/orm/attributes.py @@ -791,7 +791,7 @@ class AttributeEventToken: __slots__ = "impl", "op", "parent_token" - def __init__(self, attribute_impl, op): + def __init__(self, attribute_impl: AttributeImpl, op: util.symbol): self.impl = attribute_impl self.op = op self.parent_token = self.impl.parent_token @@ -834,7 +834,7 @@ class AttributeImpl: self, class_: _ExternalEntityType[_O], key: str, - callable_: _LoaderCallable, + callable_: Optional[_LoaderCallable], dispatch: _Dispatch[QueryableAttribute[Any]], trackparent: bool = False, compare_function: Optional[Callable[..., bool]] = None, @@ -2618,7 +2618,7 @@ def register_attribute_impl( # TODO: this appears to be the WriteOnlyAttributeImpl / # DynamicAttributeImpl constructor which is hardcoded impl = cast("Type[WriteOnlyAttributeImpl]", impl_class)( - class_, key, typecallable, dispatch, **kw + class_, key, dispatch, **kw ) elif uselist: impl = CollectionAttributeImpl( diff --git a/lib/sqlalchemy/orm/dynamic.py b/lib/sqlalchemy/orm/dynamic.py index 7514d86cd7..1d0c03606c 100644 --- a/lib/sqlalchemy/orm/dynamic.py +++ b/lib/sqlalchemy/orm/dynamic.py @@ -4,7 +4,6 @@ # # This module is part of SQLAlchemy and is released under # the MIT License: https://www.opensource.org/licenses/mit-license.php -# mypy: ignore-errors """Dynamic collection API. @@ -23,13 +22,19 @@ from __future__ import annotations from typing import Any from typing import Iterable from typing import Iterator +from typing import List +from typing import Optional +from typing import Tuple +from typing import Type from typing import TYPE_CHECKING from typing import TypeVar +from typing import Union from . import attributes from . import exc as orm_exc from . import relationships from . import util as orm_util +from .base import PassiveFlag from .query import Query from .session import object_session from .writeonly import AbstractCollectionWriter @@ -39,15 +44,28 @@ from .writeonly import WriteOnlyLoader from .. import util from ..engine import result + if TYPE_CHECKING: + from . import QueryableAttribute + from .mapper import Mapper + from .relationships import _RelationshipOrderByArg from .session import Session - + from .state import InstanceState + from .util import AliasedClass + from ..event import _Dispatch + from ..sql.elements import ColumnElement _T = TypeVar("_T", bound=Any) -class DynamicCollectionHistory(WriteOnlyHistory): - def __init__(self, attr, state, passive, apply_to=None): +class DynamicCollectionHistory(WriteOnlyHistory[_T]): + def __init__( + self, + attr: DynamicAttributeImpl, + state: InstanceState[_T], + passive: PassiveFlag, + apply_to: Optional[DynamicCollectionHistory[_T]] = None, + ) -> None: if apply_to: coll = AppenderQuery(attr, state).autoflush(False) self.unchanged_items = util.OrderedIdentitySet(coll) @@ -63,21 +81,21 @@ class DynamicCollectionHistory(WriteOnlyHistory): class DynamicAttributeImpl(WriteOnlyAttributeImpl): _supports_dynamic_iteration = True - collection_history_cls = DynamicCollectionHistory + collection_history_cls = DynamicCollectionHistory[Any] + query_class: Type[AppenderMixin[Any]] # type: ignore[assignment] def __init__( self, - class_, - key, - typecallable, - dispatch, - target_mapper, - order_by, - query_class=None, - **kw, - ): + class_: Union[Type[Any], AliasedClass[Any]], + key: str, + dispatch: _Dispatch[QueryableAttribute[Any]], + target_mapper: Mapper[_T], + order_by: _RelationshipOrderByArg, + query_class: Optional[Type[AppenderMixin[_T]]] = None, + **kw: Any, + ) -> None: attributes.AttributeImpl.__init__( - self, class_, key, typecallable, dispatch, **kw + self, class_, key, None, dispatch, **kw ) self.target_mapper = target_mapper if order_by: @@ -102,21 +120,23 @@ class AppenderMixin(AbstractCollectionWriter[_T]): """ - query_class = None + query_class: Optional[Type[Query[_T]]] = None + _order_by_clauses: Tuple[ColumnElement[Any], ...] - def __init__(self, attr, state): - Query.__init__(self, attr.target_mapper, None) + def __init__( + self, attr: DynamicAttributeImpl, state: InstanceState[_T] + ) -> None: + Query.__init__( + self, # type: ignore[arg-type] + attr.target_mapper, + None, + ) super().__init__(attr, state) @property - def session(self) -> Session: + def session(self) -> Optional[Session]: sess = object_session(self.instance) - if ( - sess is not None - and self.autoflush - and sess.autoflush - and self.instance in sess - ): + if sess is not None and sess.autoflush and self.instance in sess: sess.flush() if not orm_util.has_identity(self.instance): return None @@ -127,7 +147,7 @@ class AppenderMixin(AbstractCollectionWriter[_T]): def session(self, session: Session) -> None: self.sess = session - def _iter(self): + def _iter(self) -> Union[result.ScalarResult[_T], result.Result[_T]]: sess = self.session if sess is None: state = attributes.instance_state(self.instance) @@ -141,9 +161,9 @@ class AppenderMixin(AbstractCollectionWriter[_T]): return result.IteratorResult( result.SimpleResultMetaData([self.attr.class_.__name__]), - self.attr._get_collection_history( + self.attr._get_collection_history( # type: ignore[arg-type] attributes.instance_state(self.instance), - attributes.PASSIVE_NO_INITIALIZE, + PassiveFlag.PASSIVE_NO_INITIALIZE, ).added_items, _source_supports_scalars=True, ).scalars() @@ -155,15 +175,15 @@ class AppenderMixin(AbstractCollectionWriter[_T]): def __iter__(self) -> Iterator[_T]: ... - def __getitem__(self, index: Any) -> _T: + def __getitem__(self, index: Any) -> Union[_T, List[_T]]: sess = self.session if sess is None: return self.attr._get_collection_history( attributes.instance_state(self.instance), - attributes.PASSIVE_NO_INITIALIZE, + PassiveFlag.PASSIVE_NO_INITIALIZE, ).indexed(index) else: - return self._generate(sess).__getitem__(index) + return self._generate(sess).__getitem__(index) # type: ignore[no-any-return] # noqa: E501 def count(self) -> int: sess = self.session @@ -171,13 +191,16 @@ class AppenderMixin(AbstractCollectionWriter[_T]): return len( self.attr._get_collection_history( attributes.instance_state(self.instance), - attributes.PASSIVE_NO_INITIALIZE, + PassiveFlag.PASSIVE_NO_INITIALIZE, ).added_items ) else: return self._generate(sess).count() - def _generate(self, sess=None): + def _generate( + self, + sess: Optional[Session] = None, + ) -> Query[_T]: # note we're returning an entirely new Query class instance # here without any assignment capabilities; the class of this # query is determined by the session. @@ -259,7 +282,7 @@ class AppenderMixin(AbstractCollectionWriter[_T]): self._remove_impl(item) -class AppenderQuery(AppenderMixin[_T], Query[_T]): +class AppenderQuery(AppenderMixin[_T], Query[_T]): # type: ignore[misc] """A dynamic query that supports basic collection storage operations. Methods on :class:`.AppenderQuery` include all methods of @@ -270,7 +293,7 @@ class AppenderQuery(AppenderMixin[_T], Query[_T]): """ -def mixin_user_query(cls): +def mixin_user_query(cls: Any) -> type[AppenderMixin[Any]]: """Return a new class with AppenderQuery functionality layered over.""" name = "Appender" + cls.__name__ return type(name, (AppenderMixin, cls), {"query_class": cls}) diff --git a/lib/sqlalchemy/orm/query.py b/lib/sqlalchemy/orm/query.py index b1678bce18..5da7ee9b22 100644 --- a/lib/sqlalchemy/orm/query.py +++ b/lib/sqlalchemy/orm/query.py @@ -235,7 +235,9 @@ class Query( def __init__( self, - entities: Sequence[_ColumnsClauseArgument[Any]], + entities: Union[ + _ColumnsClauseArgument[Any], Sequence[_ColumnsClauseArgument[Any]] + ], session: Optional[Session] = None, ): """Construct a :class:`_query.Query` directly. @@ -278,7 +280,10 @@ class Query( return self def _set_entities( - self, entities: Iterable[_ColumnsClauseArgument[Any]] + self, + entities: Union[ + _ColumnsClauseArgument[Any], Iterable[_ColumnsClauseArgument[Any]] + ], ) -> None: self._raw_columns = [ coercions.expect( diff --git a/lib/sqlalchemy/orm/relationships.py b/lib/sqlalchemy/orm/relationships.py index bb5ddb8725..7ea30d7b18 100644 --- a/lib/sqlalchemy/orm/relationships.py +++ b/lib/sqlalchemy/orm/relationships.py @@ -271,6 +271,9 @@ class _RelationshipArg(Generic[_T1, _T2]): self.resolved = attr_value +_RelationshipOrderByArg = Union[Literal[False], Tuple[ColumnElement[Any], ...]] + + class _RelationshipArgs(NamedTuple): """stores user-passed parameters that are resolved at mapper configuration time. @@ -289,10 +292,7 @@ class _RelationshipArgs(NamedTuple): Optional[_RelationshipJoinConditionArgument], Optional[ColumnElement[Any]], ] - order_by: _RelationshipArg[ - _ORMOrderByArgument, - Union[Literal[None, False], Tuple[ColumnElement[Any], ...]], - ] + order_by: _RelationshipArg[_ORMOrderByArgument, _RelationshipOrderByArg] foreign_keys: _RelationshipArg[ Optional[_ORMColCollectionArgument], Set[ColumnElement[Any]] ] @@ -341,7 +341,7 @@ class RelationshipProperty( secondaryjoin: Optional[ColumnElement[bool]] secondary: Optional[FromClause] _join_condition: JoinCondition - order_by: Union[Literal[False], Tuple[ColumnElement[Any], ...]] + order_by: _RelationshipOrderByArg _user_defined_foreign_keys: Set[ColumnElement[Any]] _calculated_foreign_keys: Set[ColumnElement[Any]] diff --git a/lib/sqlalchemy/orm/writeonly.py b/lib/sqlalchemy/orm/writeonly.py index 0f245835b0..416a0399f9 100644 --- a/lib/sqlalchemy/orm/writeonly.py +++ b/lib/sqlalchemy/orm/writeonly.py @@ -4,8 +4,6 @@ # # This module is part of SQLAlchemy and is released under # the MIT License: https://www.opensource.org/licenses/mit-license.php -# mypy: ignore-errors - """Write-only collection API. @@ -21,12 +19,17 @@ object must be executed each time. from __future__ import annotations from typing import Any +from typing import Collection +from typing import Dict from typing import Generic from typing import Iterable +from typing import Iterator +from typing import List from typing import NoReturn from typing import Optional from typing import overload from typing import Tuple +from typing import Type from typing import TYPE_CHECKING from typing import TypeVar from typing import Union @@ -36,9 +39,10 @@ from . import attributes from . import interfaces from . import relationships from . import strategies +from .base import NEVER_SET from .base import object_mapper from .base import PassiveFlag -from .relationships import RelationshipDirection +from .base import RelationshipDirection from .. import exc from .. import inspect from .. import log @@ -53,22 +57,38 @@ from ..sql.dml import Update from ..util.typing import Literal if TYPE_CHECKING: + from . import QueryableAttribute from ._typing import _InstanceDict - from .attributes import _AdaptedCollectionProtocol from .attributes import AttributeEventToken - from .attributes import CollectionAdapter from .base import LoaderCallableStatus + from .collections import _AdaptedCollectionProtocol + from .collections import CollectionAdapter + from .mapper import Mapper + from .relationships import _RelationshipOrderByArg from .state import InstanceState + from .util import AliasedClass + from ..event import _Dispatch + from ..sql.selectable import FromClause from ..sql.selectable import Select - _T = TypeVar("_T", bound=Any) -class WriteOnlyHistory: +class WriteOnlyHistory(Generic[_T]): """Overrides AttributeHistory to receive append/remove events directly.""" - def __init__(self, attr, state, passive, apply_to=None): + unchanged_items: util.OrderedIdentitySet + added_items: util.OrderedIdentitySet + deleted_items: util.OrderedIdentitySet + _reconcile_collection: bool + + def __init__( + self, + attr: WriteOnlyAttributeImpl, + state: InstanceState[_T], + passive: PassiveFlag, + apply_to: Optional[WriteOnlyHistory[_T]] = None, + ) -> None: if apply_to: if passive & PassiveFlag.SQL_OK: raise exc.InvalidRequestError( @@ -90,18 +110,18 @@ class WriteOnlyHistory: self._reconcile_collection = False @property - def added_plus_unchanged(self): + def added_plus_unchanged(self) -> List[_T]: return list(self.added_items.union(self.unchanged_items)) @property - def all_items(self): + def all_items(self) -> List[_T]: return list( self.added_items.union(self.unchanged_items).union( self.deleted_items ) ) - def as_history(self): + def as_history(self) -> attributes.History: if self._reconcile_collection: added = self.added_items.difference(self.unchanged_items) deleted = self.deleted_items.intersection(self.unchanged_items) @@ -114,13 +134,13 @@ class WriteOnlyHistory: ) return attributes.History(list(added), list(unchanged), list(deleted)) - def indexed(self, index): + def indexed(self, index: Union[int, slice]) -> Union[List[_T], _T]: return list(self.added_items)[index] - def add_added(self, value): + def add_added(self, value: _T) -> None: self.added_items.add(value) - def add_removed(self, value): + def add_removed(self, value: _T) -> None: if value in self.added_items: self.added_items.remove(value) else: @@ -130,35 +150,41 @@ class WriteOnlyHistory: class WriteOnlyAttributeImpl( attributes.HasCollectionAdapter, attributes.AttributeImpl ): - uses_objects = True - default_accepts_scalar_loader = False - supports_population = False - _supports_dynamic_iteration = False - collection = False - dynamic = True - order_by = () - collection_history_cls = WriteOnlyHistory + uses_objects: bool = True + default_accepts_scalar_loader: bool = False + supports_population: bool = False + _supports_dynamic_iteration: bool = False + collection: bool = False + dynamic: bool = True + order_by: _RelationshipOrderByArg = () + collection_history_cls: Type[WriteOnlyHistory[Any]] = WriteOnlyHistory + + query_class: Type[WriteOnlyCollection[Any]] def __init__( self, - class_, - key, - typecallable, - dispatch, - target_mapper, - order_by, - **kw, + class_: Union[Type[Any], AliasedClass[Any]], + key: str, + dispatch: _Dispatch[QueryableAttribute[Any]], + target_mapper: Mapper[_T], + order_by: _RelationshipOrderByArg, + **kw: Any, ): - super().__init__(class_, key, typecallable, dispatch, **kw) + super().__init__(class_, key, None, dispatch, **kw) self.target_mapper = target_mapper self.query_class = WriteOnlyCollection if order_by: self.order_by = tuple(order_by) - def get(self, state, dict_, passive=attributes.PASSIVE_OFF): - if not passive & attributes.SQL_OK: + def get( + self, + state: InstanceState[Any], + dict_: _InstanceDict, + passive: PassiveFlag = PassiveFlag.PASSIVE_OFF, + ) -> Union[util.OrderedIdentitySet, WriteOnlyCollection[Any]]: + if not passive & PassiveFlag.SQL_OK: return self._get_collection_history( - state, attributes.PASSIVE_NO_INITIALIZE + state, PassiveFlag.PASSIVE_NO_INITIALIZE ).added_items else: return self.query_class(self, state) @@ -204,24 +230,34 @@ class WriteOnlyAttributeImpl( ) -> Union[ Literal[LoaderCallableStatus.PASSIVE_NO_RESULT], CollectionAdapter ]: - if not passive & attributes.SQL_OK: + data: Collection[Any] + if not passive & PassiveFlag.SQL_OK: data = self._get_collection_history(state, passive).added_items else: history = self._get_collection_history(state, passive) data = history.added_plus_unchanged - return DynamicCollectionAdapter(data) # type: ignore + return DynamicCollectionAdapter(data) # type: ignore[return-value] @util.memoized_property - def _append_token(self): + def _append_token( # type:ignore[override] + self, + ) -> attributes.AttributeEventToken: return attributes.AttributeEventToken(self, attributes.OP_APPEND) @util.memoized_property - def _remove_token(self): + def _remove_token( # type:ignore[override] + self, + ) -> attributes.AttributeEventToken: return attributes.AttributeEventToken(self, attributes.OP_REMOVE) def fire_append_event( - self, state, dict_, value, initiator, collection_history=None - ): + self, + state: InstanceState[Any], + dict_: _InstanceDict, + value: Any, + initiator: Optional[AttributeEventToken], + collection_history: Optional[WriteOnlyHistory[Any]] = None, + ) -> None: if collection_history is None: collection_history = self._modified_event(state, dict_) @@ -234,8 +270,13 @@ class WriteOnlyAttributeImpl( self.sethasparent(attributes.instance_state(value), state, True) def fire_remove_event( - self, state, dict_, value, initiator, collection_history=None - ): + self, + state: InstanceState[Any], + dict_: _InstanceDict, + value: Any, + initiator: Optional[AttributeEventToken], + collection_history: Optional[WriteOnlyHistory[Any]] = None, + ) -> None: if collection_history is None: collection_history = self._modified_event(state, dict_) @@ -247,18 +288,20 @@ class WriteOnlyAttributeImpl( for fn in self.dispatch.remove: fn(state, value, initiator or self._remove_token) - def _modified_event(self, state, dict_): + def _modified_event( + self, state: InstanceState[Any], dict_: _InstanceDict + ) -> WriteOnlyHistory[Any]: if self.key not in state.committed_state: state.committed_state[self.key] = self.collection_history_cls( self, state, PassiveFlag.PASSIVE_NO_FETCH ) - state._modified_event(dict_, self, attributes.NEVER_SET) + state._modified_event(dict_, self, NEVER_SET) # this is a hack to allow the entities.ComparableEntity fixture # to work dict_[self.key] = True - return state.committed_state[self.key] + return state.committed_state[self.key] # type: ignore[no-any-return] def set( self, @@ -321,25 +364,38 @@ class WriteOnlyAttributeImpl( collection_history=collection_history, ) - def delete(self, *args, **kwargs): + def delete(self, *args: Any, **kwargs: Any) -> NoReturn: raise NotImplementedError() - def set_committed_value(self, state, dict_, value): + def set_committed_value( + self, state: InstanceState[Any], dict_: _InstanceDict, value: Any + ) -> NoReturn: raise NotImplementedError( "Dynamic attributes don't support collection population." ) - def get_history(self, state, dict_, passive=attributes.PASSIVE_NO_FETCH): + def get_history( + self, + state: InstanceState[Any], + dict_: _InstanceDict, + passive: PassiveFlag = PassiveFlag.PASSIVE_NO_FETCH, + ) -> attributes.History: c = self._get_collection_history(state, passive) return c.as_history() def get_all_pending( - self, state, dict_, passive=attributes.PASSIVE_NO_INITIALIZE - ): + self, + state: InstanceState[Any], + dict_: _InstanceDict, + passive: PassiveFlag = PassiveFlag.PASSIVE_NO_INITIALIZE, + ) -> List[Tuple[InstanceState[Any], Any]]: c = self._get_collection_history(state, passive) return [(attributes.instance_state(x), x) for x in c.all_items] - def _get_collection_history(self, state, passive): + def _get_collection_history( + self, state: InstanceState[Any], passive: PassiveFlag + ) -> WriteOnlyHistory[Any]: + c: WriteOnlyHistory[Any] if self.key in state.committed_state: c = state.committed_state[self.key] else: @@ -347,7 +403,7 @@ class WriteOnlyAttributeImpl( self, state, PassiveFlag.PASSIVE_NO_FETCH ) - if state.has_identity and (passive & attributes.INIT_OK): + if state.has_identity and (passive & PassiveFlag.INIT_OK): return self.collection_history_cls( self, state, passive, apply_to=c ) @@ -356,34 +412,34 @@ class WriteOnlyAttributeImpl( def append( self, - state, - dict_, - value, - initiator, - passive=attributes.PASSIVE_NO_FETCH, - ): + state: InstanceState[Any], + dict_: _InstanceDict, + value: Any, + initiator: Optional[AttributeEventToken], + passive: PassiveFlag = PassiveFlag.PASSIVE_NO_FETCH, + ) -> None: if initiator is not self: self.fire_append_event(state, dict_, value, initiator) def remove( self, - state, - dict_, - value, - initiator, - passive=attributes.PASSIVE_NO_FETCH, - ): + state: InstanceState[Any], + dict_: _InstanceDict, + value: Any, + initiator: Optional[AttributeEventToken], + passive: PassiveFlag = PassiveFlag.PASSIVE_NO_FETCH, + ) -> None: if initiator is not self: self.fire_remove_event(state, dict_, value, initiator) def pop( self, - state, - dict_, - value, - initiator, - passive=attributes.PASSIVE_NO_FETCH, - ): + state: InstanceState[Any], + dict_: _InstanceDict, + value: Any, + initiator: Optional[AttributeEventToken], + passive: PassiveFlag = PassiveFlag.PASSIVE_NO_FETCH, + ) -> None: self.remove(state, dict_, value, initiator, passive=passive) @@ -392,7 +448,7 @@ class WriteOnlyAttributeImpl( class WriteOnlyLoader(strategies.AbstractRelationshipLoader, log.Identified): impl_class = WriteOnlyAttributeImpl - def init_class_attribute(self, mapper): + def init_class_attribute(self, mapper: Mapper[Any]) -> None: self.is_class_level = True if not self.uselist or self.parent_property.direction not in ( interfaces.ONETOMANY, @@ -404,7 +460,7 @@ class WriteOnlyLoader(strategies.AbstractRelationshipLoader, log.Identified): "uselist=False." % self.parent_property ) - strategies._register_attribute( + strategies._register_attribute( # type: ignore[no-untyped-call] self.parent_property, mapper, useobject=True, @@ -418,19 +474,21 @@ class WriteOnlyLoader(strategies.AbstractRelationshipLoader, log.Identified): class DynamicCollectionAdapter: """simplified CollectionAdapter for internal API consistency""" - def __init__(self, data): + data: Collection[Any] + + def __init__(self, data: Collection[Any]): self.data = data - def __iter__(self): + def __iter__(self) -> Iterator[Any]: return iter(self.data) - def _reset_empty(self): + def _reset_empty(self) -> None: pass - def __len__(self): + def __len__(self) -> int: return len(self.data) - def __bool__(self): + def __bool__(self) -> bool: return True @@ -443,8 +501,14 @@ class AbstractCollectionWriter(Generic[_T]): if not TYPE_CHECKING: __slots__ = () - def __init__(self, attr, state): - self.instance = instance = state.obj() + instance: _T + _from_obj: Tuple[FromClause, ...] + + def __init__(self, attr: WriteOnlyAttributeImpl, state: InstanceState[_T]): + instance = state.obj() + if TYPE_CHECKING: + assert instance + self.instance = instance self.attr = attr mapper = object_mapper(instance) @@ -535,7 +599,7 @@ class WriteOnlyCollection(AbstractCollectionWriter[_T]): stmt = stmt.order_by(*self._order_by_clauses) return stmt - def insert(self) -> Insert[_T]: + def insert(self) -> Insert: """For one-to-many collections, produce a :class:`_dml.Insert` which will insert new rows in terms of this this instance-local :class:`_orm.WriteOnlyCollection`. @@ -561,7 +625,7 @@ class WriteOnlyCollection(AbstractCollectionWriter[_T]): "INSERT along with add_all()." ) - dict_ = {} + dict_: Dict[str, Any] = {} for l, r in prop.synchronize_pairs: fn = prop._get_attr_w_warn_on_none( @@ -575,14 +639,14 @@ class WriteOnlyCollection(AbstractCollectionWriter[_T]): return insert(self.attr.target_mapper).values(**dict_) - def update(self) -> Update[_T]: + def update(self) -> Update: """Produce a :class:`_dml.Update` which will refer to rows in terms of this instance-local :class:`_orm.WriteOnlyCollection`. """ return update(self.attr.target_mapper).where(*self._where_criteria) - def delete(self) -> Delete[_T]: + def delete(self) -> Delete: """Produce a :class:`_dml.Delete` which will refer to rows in terms of this instance-local :class:`_orm.WriteOnlyCollection`.