#
# This module is part of SQLAlchemy and is released under
# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: allow-untyped-defs
"""ORM event interfaces.
from typing import Any
from typing import Callable
+from typing import Collection
+from typing import Dict
from typing import Generic
+from typing import Iterable
from typing import Optional
+from typing import Sequence
from typing import Set
from typing import Type
from typing import TYPE_CHECKING
from typing import TypeVar
import weakref
+from sqlalchemy.orm.unitofwork import UOWTransaction
from . import instrumentation
from . import interfaces
from . import mapperlib
+from ._typing import _InstanceDict
+from .attributes import Event
from .attributes import QueryableAttribute
from .base import _mapper_or_none
+from .base import EventConstants
from .base import NO_KEY
from .query import Query
from .scoping import scoped_session
+from .session import ORMExecuteState
from .session import Session
from .session import sessionmaker
+from .session import SessionTransaction
from .. import event
from .. import exc
from .. import util
+from ..engine import Connection
from ..event import EventTarget
+from ..event.base import _Dispatch
+from ..event.base import _HasEventsDispatch
from ..event.registry import _ET
+from ..orm.collections import CollectionAdapter
+from ..orm.context import QueryContext
from ..util.compat import inspect_getfullargspec
if TYPE_CHECKING:
from ..orm.mapper import Mapper
from ..orm.state import InstanceState
+_KT = TypeVar("_KT", bound=Any)
_ET2 = TypeVar("_ET2", bound=EventTarget)
super()._clear()
instrumentation._instrumentation_factory.dispatch._clear()
- def class_instrument(self, cls):
+ def class_instrument(self, cls: ClassManager[_O]) -> None:
"""Called after the given class is instrumented.
To get at the :class:`.ClassManager`, use
"""
- def class_uninstrument(self, cls):
+ def class_uninstrument(self, cls: ClassManager[_O]) -> None:
"""Called before the given class is uninstrumented.
To get at the :class:`.ClassManager`, use
"""
- def attribute_instrument(self, cls, key, inst):
+ def attribute_instrument(
+ self, cls: ClassManager[_O], key: _KT, inst: _O
+ ) -> None:
"""Called when an attribute is instrumented."""
super()._clear()
_InstanceEventsHold._clear()
- def first_init(self, manager, cls):
+ def first_init(self, manager: ClassManager[_O], cls: Type[_O]) -> None:
"""Called when the first instance of a particular mapping is called.
This event is called when the ``__init__`` method of a class
"""
- def init(self, target, args, kwargs):
+ def init(self, target: _O, args: Any, kwargs: Any) -> None:
"""Receive an instance when its constructor is called.
This method is only called during a userland construction of
"""
- def init_failure(self, target, args, kwargs):
+ def init_failure(self, target: _O, args: Any, kwargs: Any) -> None:
"""Receive an instance when its constructor has been called,
and raised an exception.
"""
- def _sa_event_merge_wo_load(self, target, context):
+ def _sa_event_merge_wo_load(
+ self, target: _O, context: QueryContext
+ ) -> None:
"""receive an object instance after it was the subject of a merge()
call, when load=False was passed.
"""
- def load(self, target, context):
+ def load(self, target: _O, context: QueryContext) -> None:
"""Receive an object instance after it has been created via
``__new__``, and after initial attribute population has
occurred.
"""
- def refresh(self, target, context, attrs):
+ def refresh(
+ self, target: _O, context: QueryContext, attrs: Optional[Iterable[str]]
+ ) -> None:
"""Receive an object instance after one or more attributes have
been refreshed from a query.
"""
- def refresh_flush(self, target, flush_context, attrs):
+ def refresh_flush(
+ self,
+ target: _O,
+ flush_context: UOWTransaction,
+ attrs: Optional[Iterable[str]],
+ ) -> None:
"""Receive an object instance after one or more attributes that
contain a column-level default or onupdate handler have been refreshed
during persistence of the object's state.
"""
- def expire(self, target, attrs):
+ def expire(self, target: _O, attrs: Optional[Iterable[str]]) -> None:
"""Receive an object instance after its attributes or some subset
have been expired.
"""
- def pickle(self, target, state_dict):
+ def pickle(self, target: _O, state_dict: _InstanceDict) -> None:
"""Receive an object instance when its associated state is
being pickled.
"""
- def unpickle(self, target, state_dict):
+ def unpickle(self, target: _O, state_dict: _InstanceDict) -> None:
"""Receive an object instance after its associated state has
been unpickled.
super()._clear()
_MapperEventsHold._clear()
- def instrument_class(self, mapper, class_):
+ def instrument_class(self, mapper: Mapper[_O], class_: Type[_O]) -> None:
r"""Receive a class when the mapper is first constructed,
before instrumentation is applied to the mapped class.
"""
- def before_mapper_configured(self, mapper, class_):
+ def before_mapper_configured(
+ self, mapper: Mapper[_O], class_: Type[_O]
+ ) -> None:
"""Called right before a specific mapper is to be configured.
This event is intended to allow a specific mapper to be skipped during
"""
- def mapper_configured(self, mapper, class_):
+ def mapper_configured(self, mapper: Mapper[_O], class_: Type[_O]) -> None:
r"""Called when a specific mapper has completed its own configuration
within the scope of the :func:`.configure_mappers` call.
"""
# TODO: need coverage for this event
- def before_configured(self):
+ def before_configured(self) -> None:
"""Called before a series of mappers have been configured.
The :meth:`.MapperEvents.before_configured` event is invoked
"""
- def after_configured(self):
+ def after_configured(self) -> None:
"""Called after a series of mappers have been configured.
The :meth:`.MapperEvents.after_configured` event is invoked
"""
- def before_insert(self, mapper, connection, target):
+ def before_insert(
+ self, mapper: Mapper[_O], connection: Connection, target: _O
+ ) -> None:
"""Receive an object instance before an INSERT statement
is emitted corresponding to that instance.
"""
- def after_insert(self, mapper, connection, target):
+ def after_insert(
+ self, mapper: Mapper[_O], connection: Connection, target: _O
+ ) -> None:
"""Receive an object instance after an INSERT statement
is emitted corresponding to that instance.
"""
- def before_update(self, mapper, connection, target):
+ def before_update(
+ self, mapper: Mapper[_O], connection: Connection, target: _O
+ ) -> None:
"""Receive an object instance before an UPDATE statement
is emitted corresponding to that instance.
"""
- def after_update(self, mapper, connection, target):
+ def after_update(
+ self, mapper: Mapper[_O], connection: Connection, target: _O
+ ) -> None:
"""Receive an object instance after an UPDATE statement
is emitted corresponding to that instance.
"""
- def before_delete(self, mapper, connection, target):
+ def before_delete(
+ self, mapper: Mapper[_O], connection: Connection, target: _O
+ ) -> None:
"""Receive an object instance before a DELETE statement
is emitted corresponding to that instance.
"""
- def after_delete(self, mapper, connection, target):
+ def after_delete(
+ self, mapper: Mapper[_O], connection: Connection, target: _O
+ ) -> None:
"""Receive an object instance after a DELETE statement
has been emitted corresponding to that instance.
_dispatch_target = Session
- def _lifecycle_event( # type: ignore [misc]
+ @staticmethod
+ def _lifecycle_event(
fn: Callable[[SessionEvents, Session, Any], None]
- ) -> Callable[[Session, Any], None]:
+ ) -> Callable[[SessionEvents, Session, Any], None]:
_sessionevents_lifecycle_event_names.add(fn.__name__)
- return fn # type: ignore [return-value]
+ return fn
@classmethod
def _accept_with( # type: ignore [return]
event_key.base_listen(**kw)
- def do_orm_execute(self, orm_execute_state):
+ def do_orm_execute(self, orm_execute_state: ORMExecuteState) -> None:
"""Intercept statement executions that occur on behalf of an
ORM :class:`.Session` object.
"""
- def after_transaction_create(self, session, transaction):
+ def after_transaction_create(
+ self, session: Session, transaction: SessionTransaction
+ ) -> None:
"""Execute when a new :class:`.SessionTransaction` is created.
This event differs from :meth:`~.SessionEvents.after_begin`
"""
- def after_transaction_end(self, session, transaction):
+ def after_transaction_end(
+ self, session: Session, transaction: SessionTransaction
+ ) -> None:
"""Execute when the span of a :class:`.SessionTransaction` ends.
This event differs from :meth:`~.SessionEvents.after_commit`
"""
- def before_commit(self, session):
+ def before_commit(self, session: Session) -> None:
"""Execute before commit is called.
.. note::
"""
- def after_commit(self, session):
+ def after_commit(self, session: Session) -> None:
"""Execute after a commit has occurred.
.. note::
"""
- def after_rollback(self, session):
+ def after_rollback(self, session: Session) -> None:
"""Execute after a real DBAPI rollback has occurred.
Note that this event only fires when the *actual* rollback against
"""
- def after_soft_rollback(self, session, previous_transaction):
+ def after_soft_rollback(
+ self, session: Session, previous_transaction: SessionTransaction
+ ) -> None:
"""Execute after any rollback has occurred, including "soft"
rollbacks that don't actually emit at the DBAPI level.
"""
- def before_flush(self, session, flush_context, instances):
+ def before_flush(
+ self,
+ session: Session,
+ flush_context: UOWTransaction,
+ instances: Optional[Sequence[Any]],
+ ) -> None:
"""Execute before flush process has started.
:param session: The target :class:`.Session`.
"""
- def after_flush(self, session, flush_context):
+ def after_flush(
+ self, session: Session, flush_context: UOWTransaction
+ ) -> None:
"""Execute after flush has completed, but before commit has been
called.
"""
- def after_flush_postexec(self, session, flush_context):
+ def after_flush_postexec(
+ self, session: Session, flush_context: UOWTransaction
+ ) -> None:
"""Execute after flush has completed, and after the post-exec
state occurs.
"""
- def after_begin(self, session, transaction, connection):
+ def after_begin(
+ self,
+ session: Session,
+ transaction: SessionTransaction,
+ connection: Connection,
+ ) -> None:
"""Execute after a transaction is begun on a connection
:param session: The target :class:`.Session`.
"""
@_lifecycle_event
- def before_attach(self, session, instance):
+ def before_attach(self, session: Session, instance: _O) -> None:
"""Execute before an instance is attached to a session.
This is called before an add, delete or merge causes
"""
@_lifecycle_event
- def after_attach(self, session, instance):
+ def after_attach(self, session: Session, instance: _O) -> None:
"""Execute after an instance is attached to a session.
This is called after an add, delete or merge.
update_context.result,
),
)
- def after_bulk_update(self, update_context):
+ def after_bulk_update(self, update_context: _O) -> None:
"""Event for after the legacy :meth:`_orm.Query.update` method
has been called.
delete_context.result,
),
)
- def after_bulk_delete(self, delete_context):
+ def after_bulk_delete(self, delete_context: _O) -> None:
"""Event for after the legacy :meth:`_orm.Query.delete` method
has been called.
"""
@_lifecycle_event
- def transient_to_pending(self, session, instance):
+ def transient_to_pending(self, session: Session, instance: _O) -> None:
"""Intercept the "transient to pending" transition for a specific
object.
"""
@_lifecycle_event
- def pending_to_transient(self, session, instance):
+ def pending_to_transient(self, session: Session, instance: _O) -> None:
"""Intercept the "pending to transient" transition for a specific
object.
"""
@_lifecycle_event
- def persistent_to_transient(self, session, instance):
+ def persistent_to_transient(self, session: Session, instance: _O) -> None:
"""Intercept the "persistent to transient" transition for a specific
object.
"""
@_lifecycle_event
- def pending_to_persistent(self, session, instance):
+ def pending_to_persistent(self, session: Session, instance: _O) -> None:
"""Intercept the "pending to persistent"" transition for a specific
object.
"""
@_lifecycle_event
- def detached_to_persistent(self, session, instance):
+ def detached_to_persistent(self, session: Session, instance: _O) -> None:
"""Intercept the "detached to persistent" transition for a specific
object.
"""
@_lifecycle_event
- def loaded_as_persistent(self, session, instance):
+ def loaded_as_persistent(self, session: Session, instance: _O) -> None:
"""Intercept the "loaded as persistent" transition for a specific
object.
"""
@_lifecycle_event
- def persistent_to_deleted(self, session, instance):
+ def persistent_to_deleted(self, session: Session, instance: _O) -> None:
"""Intercept the "persistent to deleted" transition for a specific
object.
"""
@_lifecycle_event
- def deleted_to_persistent(self, session, instance):
+ def deleted_to_persistent(self, session: Session, instance: _O) -> None:
"""Intercept the "deleted to persistent" transition for a specific
object.
"""
@_lifecycle_event
- def deleted_to_detached(self, session, instance):
+ def deleted_to_detached(self, session: Session, instance: _O) -> None:
"""Intercept the "deleted to detached" transition for a specific
object.
"""
@_lifecycle_event
- def persistent_to_detached(self, session, instance):
+ def persistent_to_detached(self, session: Session, instance: _O) -> None:
"""Intercept the "persistent to detached" transition for a specific
object.
_dispatch_target = QueryableAttribute
@staticmethod
- def _set_dispatch(cls, dispatch_cls):
+ def _set_dispatch(
+ cls: Type[_HasEventsDispatch[Any]], dispatch_cls: Type[_Dispatch[Any]]
+ ) -> _Dispatch[Any]:
dispatch = event.Events._set_dispatch(cls, dispatch_cls)
dispatch_cls._active_history = False
return dispatch
if active_history:
mgr[target.key].dispatch._active_history = True
- def append(self, target, value, initiator, *, key=NO_KEY):
+ def append(
+ self,
+ target: _O,
+ value: _T,
+ initiator: Event,
+ *,
+ key: EventConstants = NO_KEY,
+ ) -> Optional[_T]:
"""Receive a collection append event.
The append event is invoked for each element as it is appended
"""
- def append_wo_mutation(self, target, value, initiator, *, key=NO_KEY):
+ def append_wo_mutation(
+ self,
+ target: _O,
+ value: _T,
+ initiator: Event,
+ *,
+ key: EventConstants = NO_KEY,
+ ) -> None:
"""Receive a collection append event where the collection was not
actually mutated.
"""
- def bulk_replace(self, target, values, initiator, *, keys=None):
+ def bulk_replace(
+ self,
+ target: _O,
+ values: Iterable[_T],
+ initiator: Event,
+ *,
+ keys: Optional[Iterable[EventConstants]] = None,
+ ) -> None:
"""Receive a collection 'bulk replace' event.
This event is invoked for a sequence of values as they are incoming
"""
- def remove(self, target, value, initiator, *, key=NO_KEY):
+ def remove(
+ self,
+ target: _O,
+ value: _T,
+ initiator: Event,
+ *,
+ key: EventConstants = NO_KEY,
+ ) -> None:
"""Receive a collection remove event.
:param target: the object instance receiving the event.
"""
- def set(self, target, value, oldvalue, initiator):
+ def set(
+ self, target: _O, value: _T, oldvalue: _T, initiator: Event
+ ) -> None:
"""Receive a scalar set event.
:param target: the object instance receiving the event.
"""
- def init_scalar(self, target, value, dict_):
+ def init_scalar(
+ self, target: _O, value: _T, dict_: Dict[Any, Any]
+ ) -> None:
r"""Receive a scalar "init" event.
This event is invoked when an uninitialized, unpersisted scalar
"""
- def init_collection(self, target, collection, collection_adapter):
+ def init_collection(
+ self,
+ target: _O,
+ collection: Type[Collection[Any]],
+ collection_adapter: CollectionAdapter,
+ ) -> None:
"""Receive a 'collection init' event.
This event is triggered for a collection-based attribute, when
"""
- def dispose_collection(self, target, collection, collection_adapter):
+ def dispose_collection(
+ self,
+ target: _O,
+ collection: Collection[Any],
+ collection_adapter: CollectionAdapter,
+ ) -> None:
"""Receive a 'collection dispose' event.
This event is triggered for a collection-based attribute when
"""
- def modified(self, target, initiator):
+ def modified(self, target: _O, initiator: Event) -> None:
"""Receive a 'modified' event.
This event is triggered when the :func:`.attributes.flag_modified`
_target_class_doc = "SomeQuery"
_dispatch_target = Query
- def before_compile(self, query):
+ def before_compile(self, query: Query[Any]) -> None:
"""Receive the :class:`_query.Query`
object before it is composed into a
core :class:`_expression.Select` object.
"""
- def before_compile_update(self, query, update_context):
+ def before_compile_update(
+ self, query: Query[Any], update_context: _O
+ ) -> None:
"""Allow modifications to the :class:`_query.Query` object within
:meth:`_query.Query.update`.
"""
- def before_compile_delete(self, query, delete_context):
+ def before_compile_delete(
+ self, query: Query[Any], delete_context: _O
+ ) -> None:
"""Allow modifications to the :class:`_query.Query` object within
:meth:`_query.Query.delete`.