#
# This module is part of SQLAlchemy and is released under
# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
"""Dynamic collection API.
from typing import Any
from typing import Iterable
from typing import Iterator
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Type
from typing import TYPE_CHECKING
from typing import TypeVar
+from typing import Union
from . import attributes
from . import exc as orm_exc
from . import relationships
from . import util as orm_util
+from .base import PassiveFlag
from .query import Query
from .session import object_session
from .writeonly import AbstractCollectionWriter
from .. import util
from ..engine import result
+
if TYPE_CHECKING:
+ from . import QueryableAttribute
+ from .mapper import Mapper
+ from .relationships import _RelationshipOrderByArg
from .session import Session
-
+ from .state import InstanceState
+ from .util import AliasedClass
+ from ..event import _Dispatch
+ from ..sql.elements import ColumnElement
_T = TypeVar("_T", bound=Any)
-class DynamicCollectionHistory(WriteOnlyHistory):
- def __init__(self, attr, state, passive, apply_to=None):
+class DynamicCollectionHistory(WriteOnlyHistory[_T]):
+ def __init__(
+ self,
+ attr: DynamicAttributeImpl,
+ state: InstanceState[_T],
+ passive: PassiveFlag,
+ apply_to: Optional[DynamicCollectionHistory[_T]] = None,
+ ) -> None:
if apply_to:
coll = AppenderQuery(attr, state).autoflush(False)
self.unchanged_items = util.OrderedIdentitySet(coll)
class DynamicAttributeImpl(WriteOnlyAttributeImpl):
_supports_dynamic_iteration = True
- collection_history_cls = DynamicCollectionHistory
+ collection_history_cls = DynamicCollectionHistory[Any]
+ query_class: Type[AppenderMixin[Any]] # type: ignore[assignment]
def __init__(
self,
- class_,
- key,
- typecallable,
- dispatch,
- target_mapper,
- order_by,
- query_class=None,
- **kw,
- ):
+ class_: Union[Type[Any], AliasedClass[Any]],
+ key: str,
+ dispatch: _Dispatch[QueryableAttribute[Any]],
+ target_mapper: Mapper[_T],
+ order_by: _RelationshipOrderByArg,
+ query_class: Optional[Type[AppenderMixin[_T]]] = None,
+ **kw: Any,
+ ) -> None:
attributes.AttributeImpl.__init__(
- self, class_, key, typecallable, dispatch, **kw
+ self, class_, key, None, dispatch, **kw
)
self.target_mapper = target_mapper
if order_by:
"""
- query_class = None
+ query_class: Optional[Type[Query[_T]]] = None
+ _order_by_clauses: Tuple[ColumnElement[Any], ...]
- def __init__(self, attr, state):
- Query.__init__(self, attr.target_mapper, None)
+ def __init__(
+ self, attr: DynamicAttributeImpl, state: InstanceState[_T]
+ ) -> None:
+ Query.__init__(
+ self, # type: ignore[arg-type]
+ attr.target_mapper,
+ None,
+ )
super().__init__(attr, state)
@property
- def session(self) -> Session:
+ def session(self) -> Optional[Session]:
sess = object_session(self.instance)
- if (
- sess is not None
- and self.autoflush
- and sess.autoflush
- and self.instance in sess
- ):
+ if sess is not None and sess.autoflush and self.instance in sess:
sess.flush()
if not orm_util.has_identity(self.instance):
return None
def session(self, session: Session) -> None:
self.sess = session
- def _iter(self):
+ def _iter(self) -> Union[result.ScalarResult[_T], result.Result[_T]]:
sess = self.session
if sess is None:
state = attributes.instance_state(self.instance)
return result.IteratorResult(
result.SimpleResultMetaData([self.attr.class_.__name__]),
- self.attr._get_collection_history(
+ self.attr._get_collection_history( # type: ignore[arg-type]
attributes.instance_state(self.instance),
- attributes.PASSIVE_NO_INITIALIZE,
+ PassiveFlag.PASSIVE_NO_INITIALIZE,
).added_items,
_source_supports_scalars=True,
).scalars()
def __iter__(self) -> Iterator[_T]:
...
- def __getitem__(self, index: Any) -> _T:
+ def __getitem__(self, index: Any) -> Union[_T, List[_T]]:
sess = self.session
if sess is None:
return self.attr._get_collection_history(
attributes.instance_state(self.instance),
- attributes.PASSIVE_NO_INITIALIZE,
+ PassiveFlag.PASSIVE_NO_INITIALIZE,
).indexed(index)
else:
- return self._generate(sess).__getitem__(index)
+ return self._generate(sess).__getitem__(index) # type: ignore[no-any-return] # noqa: E501
def count(self) -> int:
sess = self.session
return len(
self.attr._get_collection_history(
attributes.instance_state(self.instance),
- attributes.PASSIVE_NO_INITIALIZE,
+ PassiveFlag.PASSIVE_NO_INITIALIZE,
).added_items
)
else:
return self._generate(sess).count()
- def _generate(self, sess=None):
+ def _generate(
+ self,
+ sess: Optional[Session] = None,
+ ) -> Query[_T]:
# note we're returning an entirely new Query class instance
# here without any assignment capabilities; the class of this
# query is determined by the session.
self._remove_impl(item)
-class AppenderQuery(AppenderMixin[_T], Query[_T]):
+class AppenderQuery(AppenderMixin[_T], Query[_T]): # type: ignore[misc]
"""A dynamic query that supports basic collection storage operations.
Methods on :class:`.AppenderQuery` include all methods of
"""
-def mixin_user_query(cls):
+def mixin_user_query(cls: Any) -> type[AppenderMixin[Any]]:
"""Return a new class with AppenderQuery functionality layered over."""
name = "Appender" + cls.__name__
return type(name, (AppenderMixin, cls), {"query_class": cls})
#
# This module is part of SQLAlchemy and is released under
# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
-
"""Write-only collection API.
from __future__ import annotations
from typing import Any
+from typing import Collection
+from typing import Dict
from typing import Generic
from typing import Iterable
+from typing import Iterator
+from typing import List
from typing import NoReturn
from typing import Optional
from typing import overload
from typing import Tuple
+from typing import Type
from typing import TYPE_CHECKING
from typing import TypeVar
from typing import Union
from . import interfaces
from . import relationships
from . import strategies
+from .base import NEVER_SET
from .base import object_mapper
from .base import PassiveFlag
-from .relationships import RelationshipDirection
+from .base import RelationshipDirection
from .. import exc
from .. import inspect
from .. import log
from ..util.typing import Literal
if TYPE_CHECKING:
+ from . import QueryableAttribute
from ._typing import _InstanceDict
- from .attributes import _AdaptedCollectionProtocol
from .attributes import AttributeEventToken
- from .attributes import CollectionAdapter
from .base import LoaderCallableStatus
+ from .collections import _AdaptedCollectionProtocol
+ from .collections import CollectionAdapter
+ from .mapper import Mapper
+ from .relationships import _RelationshipOrderByArg
from .state import InstanceState
+ from .util import AliasedClass
+ from ..event import _Dispatch
+ from ..sql.selectable import FromClause
from ..sql.selectable import Select
-
_T = TypeVar("_T", bound=Any)
-class WriteOnlyHistory:
+class WriteOnlyHistory(Generic[_T]):
"""Overrides AttributeHistory to receive append/remove events directly."""
- def __init__(self, attr, state, passive, apply_to=None):
+ unchanged_items: util.OrderedIdentitySet
+ added_items: util.OrderedIdentitySet
+ deleted_items: util.OrderedIdentitySet
+ _reconcile_collection: bool
+
+ def __init__(
+ self,
+ attr: WriteOnlyAttributeImpl,
+ state: InstanceState[_T],
+ passive: PassiveFlag,
+ apply_to: Optional[WriteOnlyHistory[_T]] = None,
+ ) -> None:
if apply_to:
if passive & PassiveFlag.SQL_OK:
raise exc.InvalidRequestError(
self._reconcile_collection = False
@property
- def added_plus_unchanged(self):
+ def added_plus_unchanged(self) -> List[_T]:
return list(self.added_items.union(self.unchanged_items))
@property
- def all_items(self):
+ def all_items(self) -> List[_T]:
return list(
self.added_items.union(self.unchanged_items).union(
self.deleted_items
)
)
- def as_history(self):
+ def as_history(self) -> attributes.History:
if self._reconcile_collection:
added = self.added_items.difference(self.unchanged_items)
deleted = self.deleted_items.intersection(self.unchanged_items)
)
return attributes.History(list(added), list(unchanged), list(deleted))
- def indexed(self, index):
+ def indexed(self, index: Union[int, slice]) -> Union[List[_T], _T]:
return list(self.added_items)[index]
- def add_added(self, value):
+ def add_added(self, value: _T) -> None:
self.added_items.add(value)
- def add_removed(self, value):
+ def add_removed(self, value: _T) -> None:
if value in self.added_items:
self.added_items.remove(value)
else:
class WriteOnlyAttributeImpl(
attributes.HasCollectionAdapter, attributes.AttributeImpl
):
- uses_objects = True
- default_accepts_scalar_loader = False
- supports_population = False
- _supports_dynamic_iteration = False
- collection = False
- dynamic = True
- order_by = ()
- collection_history_cls = WriteOnlyHistory
+ uses_objects: bool = True
+ default_accepts_scalar_loader: bool = False
+ supports_population: bool = False
+ _supports_dynamic_iteration: bool = False
+ collection: bool = False
+ dynamic: bool = True
+ order_by: _RelationshipOrderByArg = ()
+ collection_history_cls: Type[WriteOnlyHistory[Any]] = WriteOnlyHistory
+
+ query_class: Type[WriteOnlyCollection[Any]]
def __init__(
self,
- class_,
- key,
- typecallable,
- dispatch,
- target_mapper,
- order_by,
- **kw,
+ class_: Union[Type[Any], AliasedClass[Any]],
+ key: str,
+ dispatch: _Dispatch[QueryableAttribute[Any]],
+ target_mapper: Mapper[_T],
+ order_by: _RelationshipOrderByArg,
+ **kw: Any,
):
- super().__init__(class_, key, typecallable, dispatch, **kw)
+ super().__init__(class_, key, None, dispatch, **kw)
self.target_mapper = target_mapper
self.query_class = WriteOnlyCollection
if order_by:
self.order_by = tuple(order_by)
- def get(self, state, dict_, passive=attributes.PASSIVE_OFF):
- if not passive & attributes.SQL_OK:
+ def get(
+ self,
+ state: InstanceState[Any],
+ dict_: _InstanceDict,
+ passive: PassiveFlag = PassiveFlag.PASSIVE_OFF,
+ ) -> Union[util.OrderedIdentitySet, WriteOnlyCollection[Any]]:
+ if not passive & PassiveFlag.SQL_OK:
return self._get_collection_history(
- state, attributes.PASSIVE_NO_INITIALIZE
+ state, PassiveFlag.PASSIVE_NO_INITIALIZE
).added_items
else:
return self.query_class(self, state)
) -> Union[
Literal[LoaderCallableStatus.PASSIVE_NO_RESULT], CollectionAdapter
]:
- if not passive & attributes.SQL_OK:
+ data: Collection[Any]
+ if not passive & PassiveFlag.SQL_OK:
data = self._get_collection_history(state, passive).added_items
else:
history = self._get_collection_history(state, passive)
data = history.added_plus_unchanged
- return DynamicCollectionAdapter(data) # type: ignore
+ return DynamicCollectionAdapter(data) # type: ignore[return-value]
@util.memoized_property
- def _append_token(self):
+ def _append_token( # type:ignore[override]
+ self,
+ ) -> attributes.AttributeEventToken:
return attributes.AttributeEventToken(self, attributes.OP_APPEND)
@util.memoized_property
- def _remove_token(self):
+ def _remove_token( # type:ignore[override]
+ self,
+ ) -> attributes.AttributeEventToken:
return attributes.AttributeEventToken(self, attributes.OP_REMOVE)
def fire_append_event(
- self, state, dict_, value, initiator, collection_history=None
- ):
+ self,
+ state: InstanceState[Any],
+ dict_: _InstanceDict,
+ value: Any,
+ initiator: Optional[AttributeEventToken],
+ collection_history: Optional[WriteOnlyHistory[Any]] = None,
+ ) -> None:
if collection_history is None:
collection_history = self._modified_event(state, dict_)
self.sethasparent(attributes.instance_state(value), state, True)
def fire_remove_event(
- self, state, dict_, value, initiator, collection_history=None
- ):
+ self,
+ state: InstanceState[Any],
+ dict_: _InstanceDict,
+ value: Any,
+ initiator: Optional[AttributeEventToken],
+ collection_history: Optional[WriteOnlyHistory[Any]] = None,
+ ) -> None:
if collection_history is None:
collection_history = self._modified_event(state, dict_)
for fn in self.dispatch.remove:
fn(state, value, initiator or self._remove_token)
- def _modified_event(self, state, dict_):
+ def _modified_event(
+ self, state: InstanceState[Any], dict_: _InstanceDict
+ ) -> WriteOnlyHistory[Any]:
if self.key not in state.committed_state:
state.committed_state[self.key] = self.collection_history_cls(
self, state, PassiveFlag.PASSIVE_NO_FETCH
)
- state._modified_event(dict_, self, attributes.NEVER_SET)
+ state._modified_event(dict_, self, NEVER_SET)
# this is a hack to allow the entities.ComparableEntity fixture
# to work
dict_[self.key] = True
- return state.committed_state[self.key]
+ return state.committed_state[self.key] # type: ignore[no-any-return]
def set(
self,
collection_history=collection_history,
)
- def delete(self, *args, **kwargs):
+ def delete(self, *args: Any, **kwargs: Any) -> NoReturn:
raise NotImplementedError()
- def set_committed_value(self, state, dict_, value):
+ def set_committed_value(
+ self, state: InstanceState[Any], dict_: _InstanceDict, value: Any
+ ) -> NoReturn:
raise NotImplementedError(
"Dynamic attributes don't support collection population."
)
- def get_history(self, state, dict_, passive=attributes.PASSIVE_NO_FETCH):
+ def get_history(
+ self,
+ state: InstanceState[Any],
+ dict_: _InstanceDict,
+ passive: PassiveFlag = PassiveFlag.PASSIVE_NO_FETCH,
+ ) -> attributes.History:
c = self._get_collection_history(state, passive)
return c.as_history()
def get_all_pending(
- self, state, dict_, passive=attributes.PASSIVE_NO_INITIALIZE
- ):
+ self,
+ state: InstanceState[Any],
+ dict_: _InstanceDict,
+ passive: PassiveFlag = PassiveFlag.PASSIVE_NO_INITIALIZE,
+ ) -> List[Tuple[InstanceState[Any], Any]]:
c = self._get_collection_history(state, passive)
return [(attributes.instance_state(x), x) for x in c.all_items]
- def _get_collection_history(self, state, passive):
+ def _get_collection_history(
+ self, state: InstanceState[Any], passive: PassiveFlag
+ ) -> WriteOnlyHistory[Any]:
+ c: WriteOnlyHistory[Any]
if self.key in state.committed_state:
c = state.committed_state[self.key]
else:
self, state, PassiveFlag.PASSIVE_NO_FETCH
)
- if state.has_identity and (passive & attributes.INIT_OK):
+ if state.has_identity and (passive & PassiveFlag.INIT_OK):
return self.collection_history_cls(
self, state, passive, apply_to=c
)
def append(
self,
- state,
- dict_,
- value,
- initiator,
- passive=attributes.PASSIVE_NO_FETCH,
- ):
+ state: InstanceState[Any],
+ dict_: _InstanceDict,
+ value: Any,
+ initiator: Optional[AttributeEventToken],
+ passive: PassiveFlag = PassiveFlag.PASSIVE_NO_FETCH,
+ ) -> None:
if initiator is not self:
self.fire_append_event(state, dict_, value, initiator)
def remove(
self,
- state,
- dict_,
- value,
- initiator,
- passive=attributes.PASSIVE_NO_FETCH,
- ):
+ state: InstanceState[Any],
+ dict_: _InstanceDict,
+ value: Any,
+ initiator: Optional[AttributeEventToken],
+ passive: PassiveFlag = PassiveFlag.PASSIVE_NO_FETCH,
+ ) -> None:
if initiator is not self:
self.fire_remove_event(state, dict_, value, initiator)
def pop(
self,
- state,
- dict_,
- value,
- initiator,
- passive=attributes.PASSIVE_NO_FETCH,
- ):
+ state: InstanceState[Any],
+ dict_: _InstanceDict,
+ value: Any,
+ initiator: Optional[AttributeEventToken],
+ passive: PassiveFlag = PassiveFlag.PASSIVE_NO_FETCH,
+ ) -> None:
self.remove(state, dict_, value, initiator, passive=passive)
class WriteOnlyLoader(strategies.AbstractRelationshipLoader, log.Identified):
impl_class = WriteOnlyAttributeImpl
- def init_class_attribute(self, mapper):
+ def init_class_attribute(self, mapper: Mapper[Any]) -> None:
self.is_class_level = True
if not self.uselist or self.parent_property.direction not in (
interfaces.ONETOMANY,
"uselist=False." % self.parent_property
)
- strategies._register_attribute(
+ strategies._register_attribute( # type: ignore[no-untyped-call]
self.parent_property,
mapper,
useobject=True,
class DynamicCollectionAdapter:
"""simplified CollectionAdapter for internal API consistency"""
- def __init__(self, data):
+ data: Collection[Any]
+
+ def __init__(self, data: Collection[Any]):
self.data = data
- def __iter__(self):
+ def __iter__(self) -> Iterator[Any]:
return iter(self.data)
- def _reset_empty(self):
+ def _reset_empty(self) -> None:
pass
- def __len__(self):
+ def __len__(self) -> int:
return len(self.data)
- def __bool__(self):
+ def __bool__(self) -> bool:
return True
if not TYPE_CHECKING:
__slots__ = ()
- def __init__(self, attr, state):
- self.instance = instance = state.obj()
+ instance: _T
+ _from_obj: Tuple[FromClause, ...]
+
+ def __init__(self, attr: WriteOnlyAttributeImpl, state: InstanceState[_T]):
+ instance = state.obj()
+ if TYPE_CHECKING:
+ assert instance
+ self.instance = instance
self.attr = attr
mapper = object_mapper(instance)
stmt = stmt.order_by(*self._order_by_clauses)
return stmt
- def insert(self) -> Insert[_T]:
+ def insert(self) -> Insert:
"""For one-to-many collections, produce a :class:`_dml.Insert` which
will insert new rows in terms of this this instance-local
:class:`_orm.WriteOnlyCollection`.
"INSERT along with add_all()."
)
- dict_ = {}
+ dict_: Dict[str, Any] = {}
for l, r in prop.synchronize_pairs:
fn = prop._get_attr_w_warn_on_none(
return insert(self.attr.target_mapper).values(**dict_)
- def update(self) -> Update[_T]:
+ def update(self) -> Update:
"""Produce a :class:`_dml.Update` which will refer to rows in terms
of this instance-local :class:`_orm.WriteOnlyCollection`.
"""
return update(self.attr.target_mapper).where(*self._where_criteria)
- def delete(self) -> Delete[_T]:
+ def delete(self) -> Delete:
"""Produce a :class:`_dml.Delete` which will refer to rows in terms
of this instance-local :class:`_orm.WriteOnlyCollection`.