--- /dev/null
+.. change::
+ :tags: change, orm
+ :tickets: 10497
+
+ A sweep through class and function names in the ORM renames many classes
+ and functions that have no intent of public visibility to be underscored.
+ This is to reduce ambiguity as to which APIs are intended to be targeted by
+ third party applications and extensions. Third parties are encouraged to
+ propose new public APIs in Discussions to the extent they are needed to
+ replace those that have been clarified as private.
from ..engine.result import Result
from ..orm import LoaderCallableStatus
from ..orm._typing import _O
- from ..orm.bulk_persistence import BulkUDCompileState
+ from ..orm.bulk_persistence import _BulkUDCompileState
from ..orm.context import QueryContext
from ..orm.session import _EntityBindKey
from ..orm.session import _SessionBind
None,
QueryContext.default_load_options,
Type[QueryContext.default_load_options],
- BulkUDCompileState.default_update_options,
- Type[BulkUDCompileState.default_update_options],
+ _BulkUDCompileState.default_update_options,
+ Type[_BulkUDCompileState.default_update_options],
]
if orm_context.is_select:
def _get_comparator(
self, comparator: Any
) -> Callable[[Any], _HybridClassLevelAccessor[_T]]:
- proxy_attr = attributes.create_proxied_attribute(self)
+ proxy_attr = attributes._create_proxied_attribute(self)
def expr_comparator(
owner: Type[object],
delattr(class_, key)
def instrument_collection_class(self, class_, key, collection_class):
- return collections.prepare_instrumentation(collection_class)
+ return collections._prepare_instrumentation(collection_class)
def get_instance_dict(self, class_, instance):
return instance.__dict__
from ..util.typing import TypeGuard
if TYPE_CHECKING:
- from .attributes import AttributeImpl
- from .attributes import CollectionAttributeImpl
- from .attributes import HasCollectionAdapter
+ from .attributes import _AttributeImpl
+ from .attributes import _CollectionAttributeImpl
+ from .attributes import _HasCollectionAdapter
from .attributes import QueryableAttribute
from .base import PassiveFlag
from .decl_api import registry as _registry_type
) -> TypeGuard[RelationshipProperty[Any]]: ...
def is_collection_impl(
- impl: AttributeImpl,
- ) -> TypeGuard[CollectionAttributeImpl]: ...
+ impl: _AttributeImpl,
+ ) -> TypeGuard[_CollectionAttributeImpl]: ...
def is_has_collection_adapter(
- impl: AttributeImpl,
- ) -> TypeGuard[HasCollectionAdapter]: ...
+ impl: _AttributeImpl,
+ ) -> TypeGuard[_HasCollectionAdapter]: ...
else:
insp_is_mapper_property = operator.attrgetter("is_property")
from .relationships import RelationshipProperty
from .state import InstanceState
from .util import AliasedInsp
- from .writeonly import WriteOnlyAttributeImpl
+ from .writeonly import _WriteOnlyAttributeImpl
from ..event.base import _Dispatch
from ..sql._typing import _ColumnExpressionArgument
from ..sql._typing import _DMLColumnArgument
class_: _ExternalEntityType[Any]
key: str
parententity: _InternalEntityType[Any]
- impl: AttributeImpl
+ impl: _AttributeImpl
comparator: interfaces.PropComparator[_T_co]
_of_type: Optional[_InternalEntityType[Any]]
_extra_criteria: Tuple[ColumnElement[bool], ...]
key: str,
parententity: _InternalEntityType[_O],
comparator: interfaces.PropComparator[_T_co],
- impl: Optional[AttributeImpl] = None,
+ impl: Optional[_AttributeImpl] = None,
of_type: Optional[_InternalEntityType[Any]] = None,
extra_criteria: Tuple[ColumnElement[bool], ...] = (),
):
@dataclasses.dataclass(frozen=True)
-class AdHocHasEntityNamespace(HasCacheKey):
+class _AdHocHasEntityNamespace(HasCacheKey):
_traverse_internals: ClassVar[_TraverseInternalsType] = [
("_entity_namespace", InternalTraversal.dp_has_cache_key),
]
return self._entity_namespace.entity_namespace
-def create_proxied_attribute(
+def _create_proxied_attribute(
descriptor: Any,
) -> Callable[..., QueryableAttribute[Any]]:
"""Create an QueryableAttribute / user descriptor hybrid.
else:
# used by hybrid attributes which try to remain
# agnostic of any ORM concepts like mappers
- return AdHocHasEntityNamespace(self._parententity)
+ return _AdHocHasEntityNamespace(self._parententity)
@property
def property(self):
__slots__ = "impl", "op", "parent_token"
- def __init__(self, attribute_impl: AttributeImpl, op: util.symbol):
+ def __init__(self, attribute_impl: _AttributeImpl, op: util.symbol):
self.impl = attribute_impl
self.op = op
self.parent_token = self.impl.parent_token
Event = AttributeEventToken # legacy
-class AttributeImpl:
+class _AttributeImpl:
"""internal implementation for instrumented attributes."""
collection: bool
return value
-class ScalarAttributeImpl(AttributeImpl):
+class _ScalarAttributeImpl(_AttributeImpl):
"""represents a scalar value-holding InstrumentedAttribute."""
default_accepts_scalar_loader = True
fn(state, value, initiator or self._remove_token)
-class ScalarObjectAttributeImpl(ScalarAttributeImpl):
+class _ScalarObjectAttributeImpl(_ScalarAttributeImpl):
"""represents a scalar-holding InstrumentedAttribute,
where the target object is also instrumented.
return value
-class HasCollectionAdapter:
+class _HasCollectionAdapter:
__slots__ = ()
collection: bool
if TYPE_CHECKING:
def _is_collection_attribute_impl(
- impl: AttributeImpl,
- ) -> TypeGuard[CollectionAttributeImpl]: ...
+ impl: _AttributeImpl,
+ ) -> TypeGuard[_CollectionAttributeImpl]: ...
else:
_is_collection_attribute_impl = operator.attrgetter("collection")
-class CollectionAttributeImpl(HasCollectionAdapter, AttributeImpl):
+class _CollectionAttributeImpl(_HasCollectionAdapter, _AttributeImpl):
"""A collection-holding attribute that instruments changes in membership.
Only handles collections of instrumented objects.
return user_data._sa_adapter
-def backref_listeners(
+def _backref_listeners(
attribute: QueryableAttribute[Any], key: str, uselist: bool
) -> None:
"""Apply listeners to synchronize a two-way relationship."""
@classmethod
def from_scalar_attribute(
cls,
- attribute: ScalarAttributeImpl,
+ attribute: _ScalarAttributeImpl,
state: InstanceState[Any],
current: Any,
) -> History:
@classmethod
def from_object_attribute(
cls,
- attribute: ScalarObjectAttributeImpl,
+ attribute: _ScalarObjectAttributeImpl,
state: InstanceState[Any],
current: Any,
original: Any = _NO_HISTORY,
@classmethod
def from_collection(
cls,
- attribute: CollectionAttributeImpl,
+ attribute: _CollectionAttributeImpl,
state: InstanceState[Any],
current: Any,
) -> History:
return manager.has_parent(state, key, optimistic)
-def register_attribute(
+def _register_attribute(
class_: Type[_O],
key: str,
*,
doc: Optional[str] = None,
**kw: Any,
) -> InstrumentedAttribute[_T]:
- desc = register_descriptor(
+ desc = _register_descriptor(
class_, key, comparator=comparator, parententity=parententity, doc=doc
)
- register_attribute_impl(class_, key, **kw)
+ _register_attribute_impl(class_, key, **kw)
return desc
-def register_attribute_impl(
+def _register_attribute_impl(
class_: Type[_O],
key: str,
uselist: bool = False,
callable_: Optional[_LoaderCallable] = None,
useobject: bool = False,
- impl_class: Optional[Type[AttributeImpl]] = None,
+ impl_class: Optional[Type[_AttributeImpl]] = None,
backref: Optional[str] = None,
**kw: Any,
) -> QueryableAttribute[Any]:
"_Dispatch[QueryableAttribute[Any]]", manager[key].dispatch
) # noqa: E501
- impl: AttributeImpl
+ impl: _AttributeImpl
if impl_class:
# TODO: this appears to be the WriteOnlyAttributeImpl /
# DynamicAttributeImpl constructor which is hardcoded
- impl = cast("Type[WriteOnlyAttributeImpl]", impl_class)(
+ impl = cast("Type[_WriteOnlyAttributeImpl]", impl_class)(
class_, key, dispatch, **kw
)
elif uselist:
- impl = CollectionAttributeImpl(
+ impl = _CollectionAttributeImpl(
class_, key, callable_, dispatch, typecallable=typecallable, **kw
)
elif useobject:
- impl = ScalarObjectAttributeImpl(
+ impl = _ScalarObjectAttributeImpl(
class_, key, callable_, dispatch, **kw
)
else:
- impl = ScalarAttributeImpl(class_, key, callable_, dispatch, **kw)
+ impl = _ScalarAttributeImpl(class_, key, callable_, dispatch, **kw)
manager[key].impl = impl
if backref:
- backref_listeners(manager[key], backref, uselist)
+ _backref_listeners(manager[key], backref, uselist)
manager.post_configure_attribute(key)
return manager[key]
-def register_descriptor(
+def _register_descriptor(
class_: Type[Any],
key: str,
*,
return descriptor
-def unregister_attribute(class_: Type[Any], key: str) -> None:
+def _unregister_attribute(class_: Type[Any], key: str) -> None:
manager_of_class(class_).uninstrument_attribute(key)
attr = state.manager[key].impl
if TYPE_CHECKING:
- assert isinstance(attr, HasCollectionAdapter)
+ assert isinstance(attr, _HasCollectionAdapter)
old = dict_.pop(key, None) # discard old collection
if old is not None:
from . import loading
from . import persistence
from .base import NO_VALUE
-from .context import AbstractORMCompileState
+from .context import _AbstractORMCompileState
+from .context import _ORMFromStatementCompileState
from .context import FromStatement
-from .context import ORMFromStatementCompileState
from .context import QueryContext
from .. import exc as sa_exc
from .. import util
populators[key](mapping)
-class ORMDMLState(AbstractORMCompileState):
+class _ORMDMLState(_AbstractORMCompileState):
is_dml_returning = True
- from_statement_ctx: Optional[ORMFromStatementCompileState] = None
+ from_statement_ctx: Optional[_ORMFromStatementCompileState] = None
@classmethod
def _get_orm_crud_kv_pairs(
fs = fs.options(*orm_level_statement._with_options)
self.select_statement = fs
self.from_statement_ctx = fsc = (
- ORMFromStatementCompileState.create_for_statement(fs, compiler)
+ _ORMFromStatementCompileState.create_for_statement(
+ fs, compiler
+ )
)
fsc.setup_dml_returning_compile_state(dml_mapper)
return result
-class BulkUDCompileState(ORMDMLState):
+class _BulkUDCompileState(_ORMDMLState):
class default_update_options(Options):
_dml_strategy: DMLStrategyArgument = "auto"
_synchronize_session: SynchronizeSessionArgument = "auto"
(
update_options,
execution_options,
- ) = BulkUDCompileState.default_update_options.from_execution_options(
+ ) = _BulkUDCompileState.default_update_options.from_execution_options(
"_sa_orm_update_options",
{
"synchronize_session",
@CompileState.plugin_for("orm", "insert")
-class BulkORMInsert(ORMDMLState, InsertDMLState):
+class _BulkORMInsert(_ORMDMLState, InsertDMLState):
class default_insert_options(Options):
_dml_strategy: DMLStrategyArgument = "auto"
_render_nulls: bool = False
(
insert_options,
execution_options,
- ) = BulkORMInsert.default_insert_options.from_execution_options(
+ ) = _BulkORMInsert.default_insert_options.from_execution_options(
"_sa_orm_insert_options",
{"dml_strategy", "autoflush", "populate_existing", "render_nulls"},
execution_options,
)
@classmethod
- def create_for_statement(cls, statement, compiler, **kw) -> BulkORMInsert:
+ def create_for_statement(cls, statement, compiler, **kw) -> _BulkORMInsert:
self = cast(
- BulkORMInsert,
+ _BulkORMInsert,
super().create_for_statement(statement, compiler, **kw),
)
@CompileState.plugin_for("orm", "update")
-class BulkORMUpdate(BulkUDCompileState, UpdateDMLState):
+class _BulkORMUpdate(_BulkUDCompileState, UpdateDMLState):
@classmethod
def create_for_statement(cls, statement, compiler, **kw):
self = cls.__new__(cls)
@CompileState.plugin_for("orm", "delete")
-class BulkORMDelete(BulkUDCompileState, DeleteDMLState):
+class _BulkORMDelete(_BulkUDCompileState, DeleteDMLState):
@classmethod
def create_for_statement(cls, statement, compiler, **kw):
self = cls.__new__(cls)
_T = TypeVar("_T", bound=Any)
-_ClsRegistryType = MutableMapping[str, Union[type, "ClsRegistryToken"]]
+_ClsRegistryType = MutableMapping[str, Union[type, "_ClsRegistryToken"]]
# strong references to registries which we place in
# the _decl_class_registry, which is usually weak referencing.
# the internal registries here link to classes with weakrefs and remove
# themselves when all references to contained classes are removed.
-_registries: Set[ClsRegistryToken] = set()
+_registries: Set[_ClsRegistryToken] = set()
-def add_class(
+def _add_class(
classname: str, cls: Type[_T], decl_class_registry: _ClsRegistryType
) -> None:
"""Add a class to the _decl_class_registry associated with the
raise
-def remove_class(
+def _remove_class(
classname: str, cls: Type[Any], decl_class_registry: _ClsRegistryType
) -> None:
if classname in decl_class_registry:
return not test(thing)
-class ClsRegistryToken:
+class _ClsRegistryToken:
"""an object that can be in the registry._class_registry as a value."""
__slots__ = ()
-class _MultipleClassMarker(ClsRegistryToken):
+class _MultipleClassMarker(_ClsRegistryToken):
"""refers to multiple classes of the same name
within _decl_class_registry.
self.contents.add(weakref.ref(item, self._remove_item))
-class _ModuleMarker(ClsRegistryToken):
+class _ModuleMarker(_ClsRegistryToken):
"""Refers to a module name within
_decl_class_registry.
def __contains__(self, name: str) -> bool:
return name in self.contents
- def __getitem__(self, name: str) -> ClsRegistryToken:
+ def __getitem__(self, name: str) -> _ClsRegistryToken:
return self.contents[name]
def _remove_item(self, name: str) -> None:
from ..util.compat import inspect_getfullargspec
if typing.TYPE_CHECKING:
+ from .attributes import _CollectionAttributeImpl
from .attributes import AttributeEventToken
- from .attributes import CollectionAttributeImpl
from .mapped_collection import attribute_keyed_dict
from .mapped_collection import column_keyed_dict
from .mapped_collection import keyfunc_mapping
"empty",
)
- attr: CollectionAttributeImpl
+ attr: _CollectionAttributeImpl
_key: str
# this is actually a weakref; see note in constructor
def __init__(
self,
- attr: CollectionAttributeImpl,
+ attr: _CollectionAttributeImpl,
owner_state: InstanceState[Any],
data: _AdaptedCollectionProtocol,
):
existing_adapter._fire_remove_event_bulk(removals, initiator=initiator)
-def prepare_instrumentation(
+def _prepare_instrumentation(
factory: Union[Type[Collection[Any]], _CollectionFactoryType],
) -> _CollectionFactoryType:
"""Prepare a callable for future use as a collection class factory.
if TYPE_CHECKING:
from ._typing import _InternalEntityType
from ._typing import OrmExecuteOptionsParameter
- from .loading import PostLoad
+ from .loading import _PostLoad
from .mapper import Mapper
from .query import Query
from .session import _BindArguments
)
runid: int
- post_load_paths: Dict[PathRegistry, PostLoad]
- compile_state: ORMCompileState
+ post_load_paths: Dict[PathRegistry, _PostLoad]
+ compile_state: _ORMCompileState
class default_load_options(Options):
_only_return_tuples = False
)
-class AbstractORMCompileState(CompileState):
+class _AbstractORMCompileState(CompileState):
is_dml_returning = False
def _init_global_attributes(
statement: Union[Select, FromStatement],
compiler: Optional[SQLCompiler],
**kw: Any,
- ) -> AbstractORMCompileState:
+ ) -> _AbstractORMCompileState:
"""Create a context for a statement given a :class:`.Compiler`.
This method is always invoked in the context of SQLCompiler.process().
raise NotImplementedError()
-class AutoflushOnlyORMCompileState(AbstractORMCompileState):
+class _AutoflushOnlyORMCompileState(_AbstractORMCompileState):
"""ORM compile state that is a passthrough, except for autoflush."""
@classmethod
return result
-class ORMCompileState(AbstractORMCompileState):
+class _ORMCompileState(_AbstractORMCompileState):
class default_compile_options(CacheableOptions):
_cache_key_traversal = [
("_use_legacy_query_style", InternalTraversal.dp_boolean),
statement: Union[Select, FromStatement],
compiler: Optional[SQLCompiler],
**kw: Any,
- ) -> ORMCompileState: ...
+ ) -> _ORMCompileState: ...
def _append_dedupe_col_collection(self, obj, col_collection):
dedupe = self.dedupe_columns
)
-class DMLReturningColFilter:
+class _DMLReturningColFilter:
"""an adapter used for the DML RETURNING case.
Has a subset of the interface used by
@sql.base.CompileState.plugin_for("orm", "orm_from_statement")
-class ORMFromStatementCompileState(ORMCompileState):
+class _ORMFromStatementCompileState(_ORMCompileState):
_from_obj_alias = None
_has_mapper_entities = False
statement_container: Union[Select, FromStatement],
compiler: Optional[SQLCompiler],
**kw: Any,
- ) -> ORMFromStatementCompileState:
+ ) -> _ORMFromStatementCompileState:
assert isinstance(statement_container, FromStatement)
if compiler is not None and compiler.stack:
target_mapper = self.statement._propagate_attrs.get(
"plugin_subject", None
)
- adapter = DMLReturningColFilter(target_mapper, dml_mapper)
+ adapter = _DMLReturningColFilter(target_mapper, dml_mapper)
if self.compile_options._is_star and (len(self._entities) != 1):
raise sa_exc.CompileError(
__visit_name__ = "orm_from_statement"
- _compile_options = ORMFromStatementCompileState.default_compile_options
+ _compile_options = _ORMFromStatementCompileState.default_compile_options
- _compile_state_factory = ORMFromStatementCompileState.create_for_statement
+ _compile_state_factory = _ORMFromStatementCompileState.create_for_statement
_for_update_arg = None
"""
meth = cast(
- ORMSelectCompileState, SelectState.get_plugin_class(self)
+ _ORMSelectCompileState, SelectState.get_plugin_class(self)
).get_column_descriptions
return meth(self)
@sql.base.CompileState.plugin_for("orm", "compound_select")
-class CompoundSelectCompileState(
- AutoflushOnlyORMCompileState, CompoundSelectState
+class _CompoundSelectCompileState(
+ _AutoflushOnlyORMCompileState, CompoundSelectState
):
pass
@sql.base.CompileState.plugin_for("orm", "select")
-class ORMSelectCompileState(ORMCompileState, SelectState):
+class _ORMSelectCompileState(_ORMCompileState, SelectState):
_already_joined_edges = ()
_memoized_entities = _EMPTY_DICT
statement: Union[Select, FromStatement],
compiler: Optional[SQLCompiler],
**kw: Any,
- ) -> ORMSelectCompileState:
+ ) -> _ORMSelectCompileState:
"""compiler hook, we arrive here from compiler.visit_select() only."""
self = cls.__new__(cls)
def _column_descriptions(
query_or_select_stmt: Union[Query, Select, FromStatement],
- compile_state: Optional[ORMSelectCompileState] = None,
+ compile_state: Optional[_ORMSelectCompileState] = None,
legacy: bool = False,
) -> List[ORMColumnDescription]:
if compile_state is None:
- compile_state = ORMSelectCompileState._create_entities_collection(
+ compile_state = _ORMSelectCompileState._create_entities_collection(
query_or_select_stmt, legacy=legacy
)
ctx = compile_state
expr: Union[_InternalEntityType, ColumnElement[Any]]
entity_zero: Optional[_InternalEntityType]
- def setup_compile_state(self, compile_state: ORMCompileState) -> None:
+ def setup_compile_state(self, compile_state: _ORMCompileState) -> None:
raise NotImplementedError()
def setup_dml_returning_compile_state(
self,
- compile_state: ORMCompileState,
- adapter: DMLReturningColFilter,
+ compile_state: _ORMCompileState,
+ adapter: _DMLReturningColFilter,
) -> None:
raise NotImplementedError()
def setup_dml_returning_compile_state(
self,
- compile_state: ORMCompileState,
- adapter: DMLReturningColFilter,
+ compile_state: _ORMCompileState,
+ adapter: _DMLReturningColFilter,
) -> None:
loading._setup_entity_query(
compile_state,
def setup_dml_returning_compile_state(
self,
- compile_state: ORMCompileState,
- adapter: DMLReturningColFilter,
+ compile_state: _ORMCompileState,
+ adapter: _DMLReturningColFilter,
) -> None:
return self.setup_compile_state(compile_state)
def setup_dml_returning_compile_state(
self,
- compile_state: ORMCompileState,
- adapter: DMLReturningColFilter,
+ compile_state: _ORMCompileState,
+ adapter: _DMLReturningColFilter,
) -> None:
return self.setup_compile_state(compile_state)
def setup_dml_returning_compile_state(
self,
- compile_state: ORMCompileState,
- adapter: DMLReturningColFilter,
+ compile_state: _ORMCompileState,
+ adapter: _DMLReturningColFilter,
) -> None:
self._fetch_column = self.column
column = adapter(self.column, False)
self._non_primary_mappers[np_mapper] = True
def _dispose_cls(self, cls: Type[_O]) -> None:
- clsregistry.remove_class(cls.__name__, cls, self._class_registry)
+ clsregistry._remove_class(cls.__name__, cls, self._class_registry)
def _add_manager(self, manager: ClassManager[Any]) -> None:
self._managers[manager] = True
with mapperlib._CONFIGURE_MUTEX:
if not mapper_kw.get("non_primary", False):
- clsregistry.add_class(
+ clsregistry._add_class(
self.classname, self.cls, registry._class_registry
)
self._setup_dataclasses_transforms()
with mapperlib._CONFIGURE_MUTEX:
- clsregistry.add_class(
+ clsregistry._add_class(
self.classname, self.cls, registry._class_registry
)
from .. import util
-class DependencyProcessor:
+class _DependencyProcessor:
def __init__(self, prop):
self.prop = prop
self.cascade = prop.cascade
uow.register_preprocessor(self, True)
def per_property_flush_actions(self, uow):
- after_save = unitofwork.ProcessAll(uow, self, False, True)
- before_delete = unitofwork.ProcessAll(uow, self, True, True)
+ after_save = unitofwork._ProcessAll(uow, self, False, True)
+ before_delete = unitofwork._ProcessAll(uow, self, True, True)
- parent_saves = unitofwork.SaveUpdateAll(
+ parent_saves = unitofwork._SaveUpdateAll(
uow, self.parent.primary_base_mapper
)
- child_saves = unitofwork.SaveUpdateAll(
+ child_saves = unitofwork._SaveUpdateAll(
uow, self.mapper.primary_base_mapper
)
- parent_deletes = unitofwork.DeleteAll(
+ parent_deletes = unitofwork._DeleteAll(
uow, self.parent.primary_base_mapper
)
- child_deletes = unitofwork.DeleteAll(
+ child_deletes = unitofwork._DeleteAll(
uow, self.mapper.primary_base_mapper
)
"""
child_base_mapper = self.mapper.primary_base_mapper
- child_saves = unitofwork.SaveUpdateAll(uow, child_base_mapper)
- child_deletes = unitofwork.DeleteAll(uow, child_base_mapper)
+ child_saves = unitofwork._SaveUpdateAll(uow, child_base_mapper)
+ child_deletes = unitofwork._DeleteAll(uow, child_base_mapper)
# locate and disable the aggregate processors
# for this dependency
if isdelete:
- before_delete = unitofwork.ProcessAll(uow, self, True, True)
+ before_delete = unitofwork._ProcessAll(uow, self, True, True)
before_delete.disabled = True
else:
- after_save = unitofwork.ProcessAll(uow, self, False, True)
+ after_save = unitofwork._ProcessAll(uow, self, False, True)
after_save.disabled = True
# check if the "child" side is part of the cycle
# check if the "parent" side is part of the cycle
if not isdelete:
- parent_saves = unitofwork.SaveUpdateAll(
+ parent_saves = unitofwork._SaveUpdateAll(
uow, self.parent.base_mapper
)
parent_deletes = before_delete = None
if parent_saves in uow.cycles:
parent_in_cycles = True
else:
- parent_deletes = unitofwork.DeleteAll(uow, self.parent.base_mapper)
+ parent_deletes = unitofwork._DeleteAll(
+ uow, self.parent.base_mapper
+ )
parent_saves = after_save = None
if parent_deletes in uow.cycles:
parent_in_cycles = True
continue
if isdelete:
- before_delete = unitofwork.ProcessState(uow, self, True, state)
+ before_delete = unitofwork._ProcessState(
+ uow, self, True, state
+ )
if parent_in_cycles:
- parent_deletes = unitofwork.DeleteState(uow, state)
+ parent_deletes = unitofwork._DeleteState(uow, state)
else:
- after_save = unitofwork.ProcessState(uow, self, False, state)
+ after_save = unitofwork._ProcessState(uow, self, False, state)
if parent_in_cycles:
- parent_saves = unitofwork.SaveUpdateState(uow, state)
+ parent_saves = unitofwork._SaveUpdateState(uow, state)
if child_in_cycles:
child_actions = []
(deleted, listonly) = uow.states[child_state]
if deleted:
child_action = (
- unitofwork.DeleteState(uow, child_state),
+ unitofwork._DeleteState(uow, child_state),
True,
)
else:
child_action = (
- unitofwork.SaveUpdateState(uow, child_state),
+ unitofwork._SaveUpdateState(uow, child_state),
False,
)
child_actions.append(child_action)
return "%s(%s)" % (self.__class__.__name__, self.prop)
-class OneToManyDP(DependencyProcessor):
+class _OneToManyDP(_DependencyProcessor):
def per_property_dependencies(
self,
uow,
before_delete,
):
if self.post_update:
- child_post_updates = unitofwork.PostUpdateAll(
+ child_post_updates = unitofwork._PostUpdateAll(
uow, self.mapper.primary_base_mapper, False
)
- child_pre_updates = unitofwork.PostUpdateAll(
+ child_pre_updates = unitofwork._PostUpdateAll(
uow, self.mapper.primary_base_mapper, True
)
childisdelete,
):
if self.post_update:
- child_post_updates = unitofwork.PostUpdateAll(
+ child_post_updates = unitofwork._PostUpdateAll(
uow, self.mapper.primary_base_mapper, False
)
- child_pre_updates = unitofwork.PostUpdateAll(
+ child_pre_updates = unitofwork._PostUpdateAll(
uow, self.mapper.primary_base_mapper, True
)
):
return
if clearkeys:
- sync.clear(dest, self.mapper, self.prop.synchronize_pairs)
+ sync._clear(dest, self.mapper, self.prop.synchronize_pairs)
else:
- sync.populate(
+ sync._populate(
source,
self.parent,
dest,
)
def _pks_changed(self, uowcommit, state):
- return sync.source_modified(
+ return sync._source_modified(
uowcommit, state, self.parent, self.prop.synchronize_pairs
)
-class ManyToOneDP(DependencyProcessor):
+class _ManyToOneDP(_DependencyProcessor):
def __init__(self, prop):
- DependencyProcessor.__init__(self, prop)
+ _DependencyProcessor.__init__(self, prop)
for mapper in self.mapper.self_and_descendants:
- mapper._dependency_processors.append(DetectKeySwitch(prop))
+ mapper._dependency_processors.append(_DetectKeySwitch(prop))
def per_property_dependencies(
self,
before_delete,
):
if self.post_update:
- parent_post_updates = unitofwork.PostUpdateAll(
+ parent_post_updates = unitofwork._PostUpdateAll(
uow, self.parent.primary_base_mapper, False
)
- parent_pre_updates = unitofwork.PostUpdateAll(
+ parent_pre_updates = unitofwork._PostUpdateAll(
uow, self.parent.primary_base_mapper, True
)
):
if self.post_update:
if not isdelete:
- parent_post_updates = unitofwork.PostUpdateAll(
+ parent_post_updates = unitofwork._PostUpdateAll(
uow, self.parent.primary_base_mapper, False
)
if childisdelete:
]
)
else:
- parent_pre_updates = unitofwork.PostUpdateAll(
+ parent_pre_updates = unitofwork._PostUpdateAll(
uow, self.parent.primary_base_mapper, True
)
return
if clearkeys or child is None:
- sync.clear(state, self.parent, self.prop.synchronize_pairs)
+ sync._clear(state, self.parent, self.prop.synchronize_pairs)
else:
self._verify_canload(child)
- sync.populate(
+ sync._populate(
child,
self.mapper,
state,
)
-class DetectKeySwitch(DependencyProcessor):
+class _DetectKeySwitch(_DependencyProcessor):
"""For many-to-one relationships with no one-to-many backref,
searches for parents through the unit of work when a primary
key has changed and updates them.
uow.register_preprocessor(self, False)
def per_property_flush_actions(self, uow):
- parent_saves = unitofwork.SaveUpdateAll(uow, self.parent.base_mapper)
- after_save = unitofwork.ProcessAll(uow, self, False, False)
+ parent_saves = unitofwork._SaveUpdateAll(uow, self.parent.base_mapper)
+ after_save = unitofwork._ProcessAll(uow, self, False, False)
uow.dependencies.update([(parent_saves, after_save)])
def per_state_flush_actions(self, uow, states, isdelete):
uowcommit.register_object(
state, False, self.passive_updates
)
- sync.populate(
+ sync._populate(
related_state,
self.mapper,
state,
)
def _pks_changed(self, uowcommit, state):
- return bool(state.key) and sync.source_modified(
+ return bool(state.key) and sync._source_modified(
uowcommit, state, self.mapper, self.prop.synchronize_pairs
)
-class ManyToManyDP(DependencyProcessor):
+class _ManyToManyDP(_DependencyProcessor):
def per_property_dependencies(
self,
uow,
if need_cascade_pks:
for child in history.unchanged:
associationrow = {}
- sync.update(
+ sync._update(
state,
self.parent,
associationrow,
"old_",
self.prop.synchronize_pairs,
)
- sync.update(
+ sync._update(
child,
self.mapper,
associationrow,
)
return False
- sync.populate_dict(
+ sync._populate_dict(
state, self.parent, associationrow, self.prop.synchronize_pairs
)
- sync.populate_dict(
+ sync._populate_dict(
child,
self.mapper,
associationrow,
return True
def _pks_changed(self, uowcommit, state):
- return sync.source_modified(
+ return sync._source_modified(
uowcommit, state, self.parent, self.prop.synchronize_pairs
)
_direction_to_processor = {
- ONETOMANY: OneToManyDP,
- MANYTOONE: ManyToOneDP,
- MANYTOMANY: ManyToManyDP,
+ ONETOMANY: _OneToManyDP,
+ MANYTOONE: _ManyToOneDP,
+ MANYTOMANY: _ManyToManyDP,
}
from .attributes import History
from .attributes import InstrumentedAttribute
from .attributes import QueryableAttribute
- from .context import ORMCompileState
+ from .context import _ORMCompileState
from .decl_base import _ClassScanMapperConfig
from .mapper import Mapper
from .properties import ColumnProperty
def instrument_class(self, mapper: Mapper[Any]) -> None:
prop = self
- class _ProxyImpl(attributes.AttributeImpl):
+ class _ProxyImpl(attributes._AttributeImpl):
accepts_scalar_loader = False
load_on_unexpire = True
collection = False
self.descriptor = property(fget=fget, fset=fset, fdel=fdel)
- proxy_attr = attributes.create_proxied_attribute(self.descriptor)(
+ proxy_attr = attributes._create_proxied_attribute(self.descriptor)(
self.parent.class_,
self.key,
self.descriptor,
"""Establish events that populate/expire the composite attribute."""
def load_handler(
- state: InstanceState[Any], context: ORMCompileState
+ state: InstanceState[Any], context: _ORMCompileState
) -> None:
_load_refresh_handler(state, context, None, is_refresh=False)
def refresh_handler(
state: InstanceState[Any],
- context: ORMCompileState,
+ context: _ORMCompileState,
to_load: Optional[Sequence[str]],
) -> None:
# note this corresponds to sqlalchemy.ext.mutable load_attrs()
def _load_refresh_handler(
state: InstanceState[Any],
- context: ORMCompileState,
+ context: _ORMCompileState,
to_load: Optional[Sequence[str]],
is_refresh: bool,
) -> None:
from .base import PassiveFlag
from .query import Query
from .session import object_session
-from .writeonly import AbstractCollectionWriter
-from .writeonly import WriteOnlyAttributeImpl
+from .writeonly import _AbstractCollectionWriter
+from .writeonly import _WriteOnlyAttributeImpl
+from .writeonly import _WriteOnlyLoader
from .writeonly import WriteOnlyHistory
-from .writeonly import WriteOnlyLoader
from .. import util
from ..engine import result
class DynamicCollectionHistory(WriteOnlyHistory[_T]):
def __init__(
self,
- attr: DynamicAttributeImpl,
+ attr: _DynamicAttributeImpl,
state: InstanceState[_T],
passive: PassiveFlag,
apply_to: Optional[DynamicCollectionHistory[_T]] = None,
self._reconcile_collection = False
-class DynamicAttributeImpl(WriteOnlyAttributeImpl):
+class _DynamicAttributeImpl(_WriteOnlyAttributeImpl):
_supports_dynamic_iteration = True
collection_history_cls = DynamicCollectionHistory[Any]
- query_class: Type[AppenderMixin[Any]] # type: ignore[assignment]
+ query_class: Type[_AppenderMixin[Any]] # type: ignore[assignment]
def __init__(
self,
dispatch: _Dispatch[QueryableAttribute[Any]],
target_mapper: Mapper[_T],
order_by: _RelationshipOrderByArg,
- query_class: Optional[Type[AppenderMixin[_T]]] = None,
+ query_class: Optional[Type[_AppenderMixin[_T]]] = None,
**kw: Any,
) -> None:
- attributes.AttributeImpl.__init__(
+ attributes._AttributeImpl.__init__(
self, class_, key, None, dispatch, **kw
)
self.target_mapper = target_mapper
self.order_by = tuple(order_by)
if not query_class:
self.query_class = AppenderQuery
- elif AppenderMixin in query_class.mro():
+ elif _AppenderMixin in query_class.mro():
self.query_class = query_class
else:
self.query_class = mixin_user_query(query_class)
@relationships.RelationshipProperty.strategy_for(lazy="dynamic")
-class DynaLoader(WriteOnlyLoader):
- impl_class = DynamicAttributeImpl
+class _DynaLoader(_WriteOnlyLoader):
+ impl_class = _DynamicAttributeImpl
-class AppenderMixin(AbstractCollectionWriter[_T]):
+class _AppenderMixin(_AbstractCollectionWriter[_T]):
"""A mixin that expects to be mixing in a Query class with
AbstractAppender.
_order_by_clauses: Tuple[ColumnElement[Any], ...]
def __init__(
- self, attr: DynamicAttributeImpl, state: InstanceState[_T]
+ self, attr: _DynamicAttributeImpl, state: InstanceState[_T]
) -> None:
Query.__init__(
self, # type: ignore[arg-type]
self._remove_impl(item)
-class AppenderQuery(AppenderMixin[_T], Query[_T]): # type: ignore[misc]
+class AppenderQuery(_AppenderMixin[_T], Query[_T]): # type: ignore[misc]
"""A dynamic query that supports basic collection storage operations.
Methods on :class:`.AppenderQuery` include all methods of
"""
-def mixin_user_query(cls: Any) -> type[AppenderMixin[Any]]:
+def mixin_user_query(cls: Any) -> type[_AppenderMixin[Any]]:
"""Return a new class with AppenderQuery functionality layered over."""
name = "Appender" + cls.__name__
- return type(name, (AppenderMixin, cls), {"query_class": cls})
+ return type(name, (_AppenderMixin, cls), {"query_class": cls})
return len(self._dict)
-class WeakInstanceDict(IdentityMap):
+class _WeakInstanceDict(IdentityMap):
_dict: Dict[_IdentityKeyType[Any], InstanceState[Any]]
def __getitem__(self, key: _IdentityKeyType[_O]) -> _O:
if TYPE_CHECKING:
from ._typing import _RegistryType
- from .attributes import AttributeImpl
+ from .attributes import _AttributeImpl
from .attributes import QueryableAttribute
from .collections import _AdaptedCollectionProtocol
from .collections import _CollectionFactoryType
def instrument_collection_class(
self, key: str, collection_class: Type[Collection[Any]]
) -> _CollectionFactoryType:
- return collections.prepare_instrumentation(collection_class)
+ return collections._prepare_instrumentation(collection_class)
def initialize_collection(
self,
else:
return key in self.local_attrs
- def get_impl(self, key: str) -> AttributeImpl:
+ def get_impl(self, key: str) -> _AttributeImpl:
return self[key].impl
@property
from .attributes import InstrumentedAttribute
from .base import Mapped
from .context import _MapperEntity
- from .context import ORMCompileState
+ from .context import _ORMCompileState
from .context import QueryContext
from .decl_api import RegistryType
from .decl_base import _ClassScanMapperConfig
from .loading import _PopulatorDict
from .mapper import Mapper
- from .path_registry import AbstractEntityRegistry
+ from .path_registry import _AbstractEntityRegistry
from .query import Query
from .session import Session
from .state import InstanceState
def setup(
self,
- context: ORMCompileState,
+ context: _ORMCompileState,
query_entity: _MapperEntity,
- path: AbstractEntityRegistry,
+ path: _AbstractEntityRegistry,
adapter: Optional[ORMAdapter],
**kwargs: Any,
) -> None:
def create_row_processor(
self,
- context: ORMCompileState,
+ context: _ORMCompileState,
query_entity: _MapperEntity,
- path: AbstractEntityRegistry,
+ path: _AbstractEntityRegistry,
mapper: Mapper[Any],
result: Result[Unpack[TupleAny]],
adapter: Optional[ORMAdapter],
)
def _get_context_loader(
- self, context: ORMCompileState, path: AbstractEntityRegistry
+ self, context: _ORMCompileState, path: _AbstractEntityRegistry
) -> Optional[_LoadElement]:
load: Optional[_LoadElement] = None
def setup(
self,
- context: ORMCompileState,
+ context: _ORMCompileState,
query_entity: _MapperEntity,
- path: AbstractEntityRegistry,
+ path: _AbstractEntityRegistry,
adapter: Optional[ORMAdapter],
**kwargs: Any,
) -> None:
def create_row_processor(
self,
- context: ORMCompileState,
+ context: _ORMCompileState,
query_entity: _MapperEntity,
- path: AbstractEntityRegistry,
+ path: _AbstractEntityRegistry,
mapper: Mapper[Any],
result: Result[Unpack[TupleAny]],
adapter: Optional[ORMAdapter],
_is_compile_state = True
- def process_compile_state(self, compile_state: ORMCompileState) -> None:
+ def process_compile_state(self, compile_state: _ORMCompileState) -> None:
"""Apply a modification to a given :class:`.ORMCompileState`.
This method is part of the implementation of a particular
def process_compile_state_replaced_entities(
self,
- compile_state: ORMCompileState,
+ compile_state: _ORMCompileState,
mapper_entities: Sequence[_MapperEntity],
) -> None:
"""Apply a modification to a given :class:`.ORMCompileState`,
def process_compile_state_replaced_entities(
self,
- compile_state: ORMCompileState,
+ compile_state: _ORMCompileState,
mapper_entities: Sequence[_MapperEntity],
) -> None:
self.process_compile_state(compile_state)
def setup_query(
self,
- compile_state: ORMCompileState,
+ compile_state: _ORMCompileState,
query_entity: _MapperEntity,
- path: AbstractEntityRegistry,
+ path: _AbstractEntityRegistry,
loadopt: Optional[_LoadElement],
adapter: Optional[ORMAdapter],
**kwargs: Any,
def create_row_processor(
self,
- context: ORMCompileState,
+ context: _ORMCompileState,
query_entity: _MapperEntity,
- path: AbstractEntityRegistry,
+ path: _AbstractEntityRegistry,
loadopt: Optional[_LoadElement],
mapper: Mapper[Any],
result: Result[Unpack[TupleAny]],
from .base import _RAISE_FOR_STATE
from .base import _SET_DEFERRED_EXPIRED
from .base import PassiveFlag
+from .context import _ORMCompileState
from .context import FromStatement
-from .context import ORMCompileState
from .context import QueryContext
from .util import _none_set
from .util import state_str
# flush current contents if we expect to load data
session._autoflush()
- ctx = querycontext.ORMSelectCompileState._create_entities_collection(
+ ctx = querycontext._ORMSelectCompileState._create_entities_collection(
statement, legacy=False
)
else:
frozen_result = None
- ctx = querycontext.ORMSelectCompileState._create_entities_collection(
+ ctx = querycontext._ORMSelectCompileState._create_entities_collection(
query, legacy=True
)
return None
-def load_on_ident(
+def _load_on_ident(
session: Session,
statement: Union[Select, FromStatement],
key: Optional[_IdentityKeyType],
else:
ident = identity_token = None
- return load_on_pk_identity(
+ return _load_on_pk_identity(
session,
statement,
ident,
)
-def load_on_pk_identity(
+def _load_on_pk_identity(
session: Session,
statement: Union[Select, FromStatement],
primary_key_identity: Optional[Tuple[Any, ...]],
statement._compile_options
is SelectState.default_select_compile_options
):
- compile_options = ORMCompileState.default_compile_options
+ compile_options = _ORMCompileState.default_compile_options
else:
compile_options = statement._compile_options
_load_supers = [selectin_load_via]
for _selectinload_entity in _load_supers:
- if PostLoad.path_exists(
+ if _PostLoad.path_exists(
context, load_path, _selectinload_entity
):
continue
_polymorphic_from,
option_entities,
)
- PostLoad.callable_for_path(
+ _PostLoad.callable_for_path(
context,
load_path,
_selectinload_entity.mapper,
_selectinload_entity,
)
- post_load = PostLoad.for_context(context, load_path, only_load_props)
+ post_load = _PostLoad.for_context(context, load_path, only_load_props)
if refresh_state:
refresh_identity_key = refresh_state.key
return polymorphic_instance
-class PostLoad:
+class _PostLoad:
"""Track loaders and states for "post load" operations."""
__slots__ = "loaders", "states", "load_keys"
if path.path in context.post_load_paths:
pl = context.post_load_paths[path.path]
else:
- pl = context.post_load_paths[path.path] = PostLoad()
+ pl = context.post_load_paths[path.path] = _PostLoad()
pl.loaders[token] = (
context,
token,
)
-def load_scalar_attributes(mapper, state, attribute_names, passive):
+def _load_scalar_attributes(mapper, state, attribute_names, passive):
"""initiate a column-based attribute refresh operation."""
# assert mapper is _state_mapper(state)
# columns needed already, this implicitly undefers that column
stmt = FromStatement(mapper, statement)
- return load_on_ident(
+ return _load_on_ident(
session,
stmt,
None,
)
return
- result = load_on_ident(
+ result = _load_on_ident(
session,
select(mapper).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL),
identity_key,
from ._typing import _ORMColumnExprArgument
from ._typing import _RegistryType
from .decl_api import registry
- from .dependency import DependencyProcessor
+ from .dependency import _DependencyProcessor
from .descriptor_props import CompositeProperty
from .descriptor_props import SynonymProperty
from .events import MapperEvents
from .instrumentation import ClassManager
- from .path_registry import CachingEntityRegistry
+ from .path_registry import _CachingEntityRegistry
from .properties import ColumnProperty
from .relationships import RelationshipProperty
from .state import InstanceState
_identity_class: Type[_O]
_delete_orphans: List[Tuple[str, Type[Any]]]
- _dependency_processors: List[DependencyProcessor]
+ _dependency_processors: List[_DependencyProcessor]
_memoized_values: Dict[Any, Callable[[], Any]]
_inheriting_mappers: util.WeakSequence[Mapper[Any]]
_all_tables: Set[TableClause]
return self.persist_selectable
@util.memoized_property
- def _path_registry(self) -> CachingEntityRegistry:
+ def _path_registry(self) -> _CachingEntityRegistry:
return PathRegistry.per_mapper(self)
def _configure_inheritance(self):
self.class_,
mapper=self,
expired_attribute_loader=util.partial(
- loading.load_scalar_attributes, self
+ loading._load_scalar_attributes, self
),
# finalize flag means instrument the __init__ method
# and call the class_instrument event
def is_root(path: PathRegistry) -> TypeGuard[RootRegistry]: ...
- def is_entity(path: PathRegistry) -> TypeGuard[AbstractEntityRegistry]: ...
+ def is_entity(
+ path: PathRegistry,
+ ) -> TypeGuard[_AbstractEntityRegistry]: ...
else:
is_root = operator.attrgetter("is_root")
return id(self)
@overload
- def __getitem__(self, entity: _StrPathToken) -> TokenRegistry: ...
+ def __getitem__(self, entity: _StrPathToken) -> _TokenRegistry: ...
@overload
def __getitem__(self, entity: int) -> _PathElementType: ...
@overload
def __getitem__(
self, entity: _InternalEntityType[Any]
- ) -> AbstractEntityRegistry: ...
+ ) -> _AbstractEntityRegistry: ...
@overload
def __getitem__(
self, entity: StrategizedProperty[Any]
- ) -> PropRegistry: ...
+ ) -> _PropRegistry: ...
def __getitem__(
self,
StrategizedProperty[Any],
],
) -> Union[
- TokenRegistry,
+ _TokenRegistry,
_PathElementType,
_PathRepresentation,
- PropRegistry,
- AbstractEntityRegistry,
+ _PropRegistry,
+ _AbstractEntityRegistry,
]:
raise NotImplementedError()
@overload
@classmethod
- def per_mapper(cls, mapper: Mapper[Any]) -> CachingEntityRegistry: ...
+ def per_mapper(cls, mapper: Mapper[Any]) -> _CachingEntityRegistry: ...
@overload
@classmethod
- def per_mapper(cls, mapper: AliasedInsp[Any]) -> SlotsEntityRegistry: ...
+ def per_mapper(cls, mapper: AliasedInsp[Any]) -> _SlotsEntityRegistry: ...
@classmethod
def per_mapper(
cls, mapper: _InternalEntityType[Any]
- ) -> AbstractEntityRegistry:
+ ) -> _AbstractEntityRegistry:
if mapper.is_mapper:
- return CachingEntityRegistry(cls.root, mapper)
+ return _CachingEntityRegistry(cls.root, mapper)
else:
- return SlotsEntityRegistry(cls.root, mapper)
+ return _SlotsEntityRegistry(cls.root, mapper)
@classmethod
def coerce(cls, raw: _PathRepresentation) -> PathRegistry:
return f"{self.__class__.__name__}({self.path!r})"
-class CreatesToken(PathRegistry):
+class _CreatesToken(PathRegistry):
__slots__ = ()
is_aliased_class: bool
is_root: bool
- def token(self, token: _StrPathToken) -> TokenRegistry:
+ def token(self, token: _StrPathToken) -> _TokenRegistry:
if token.endswith(f":{_WILDCARD_TOKEN}"):
- return TokenRegistry(self, token)
+ return _TokenRegistry(self, token)
elif token.endswith(f":{_DEFAULT_TOKEN}"):
- return TokenRegistry(self.root, token)
+ return _TokenRegistry(self.root, token)
else:
raise exc.ArgumentError(f"invalid token: {token}")
-class RootRegistry(CreatesToken):
+class RootRegistry(_CreatesToken):
"""Root registry, defers to mappers so that
paths are maintained per-root-mapper.
def _getitem(
self, entity: Any
- ) -> Union[TokenRegistry, AbstractEntityRegistry]:
+ ) -> Union[_TokenRegistry, _AbstractEntityRegistry]:
if entity in PathToken._intern:
if TYPE_CHECKING:
assert isinstance(entity, _StrPathToken)
- return TokenRegistry(self, PathToken._intern[entity])
+ return _TokenRegistry(self, PathToken._intern[entity])
else:
try:
return entity._path_registry # type: ignore
return result
-class TokenRegistry(PathRegistry):
+class _TokenRegistry(PathRegistry):
__slots__ = ("token", "parent", "path", "natural_path")
inherit_cache = True
token: _StrPathToken
- parent: CreatesToken
+ parent: _CreatesToken
- def __init__(self, parent: CreatesToken, token: _StrPathToken):
+ def __init__(self, parent: _CreatesToken, token: _StrPathToken):
token = PathToken.intern(token)
self.token = token
return
if TYPE_CHECKING:
- assert isinstance(parent, AbstractEntityRegistry)
+ assert isinstance(parent, _AbstractEntityRegistry)
if not parent.is_aliased_class:
for mp_ent in parent.mapper.iterate_to_root():
- yield TokenRegistry(parent.parent[mp_ent], self.token)
+ yield _TokenRegistry(parent.parent[mp_ent], self.token)
elif (
parent.is_aliased_class
and cast(
for ent in cast(
"AliasedInsp[Any]", parent.entity
)._with_polymorphic_entities:
- yield TokenRegistry(parent.parent[ent], self.token)
+ yield _TokenRegistry(parent.parent[ent], self.token)
else:
yield self
return
if TYPE_CHECKING:
- assert isinstance(parent, AbstractEntityRegistry)
+ assert isinstance(parent, _AbstractEntityRegistry)
for mp_ent in parent.mapper.iterate_to_root():
- yield TokenRegistry(parent.parent[mp_ent], self.token).natural_path
+ yield _TokenRegistry(
+ parent.parent[mp_ent], self.token
+ ).natural_path
if (
parent.is_aliased_class
and cast(
"AliasedInsp[Any]", parent.entity
)._with_polymorphic_entities:
yield (
- TokenRegistry(parent.parent[ent], self.token).natural_path
+ _TokenRegistry(parent.parent[ent], self.token).natural_path
)
else:
yield self.natural_path
__getitem__ = _getitem
-class PropRegistry(PathRegistry):
+class _PropRegistry(PathRegistry):
__slots__ = (
"prop",
"parent",
entity: Optional[_InternalEntityType[Any]]
def __init__(
- self, parent: AbstractEntityRegistry, prop: StrategizedProperty[Any]
+ self, parent: _AbstractEntityRegistry, prop: StrategizedProperty[Any]
):
# restate this path in terms of the
# given StrategizedProperty's parent.
insp = cast("_InternalEntityType[Any]", parent[-1])
- natural_parent: AbstractEntityRegistry = parent
+ natural_parent: _AbstractEntityRegistry = parent
# inherit "is_unnatural" from the parent
self.is_unnatural = parent.parent.is_unnatural or bool(
self._default_path_loader_key = self.prop._default_path_loader_key
self._loader_key = ("loader", self.natural_path)
- def _truncate_recursive(self) -> PropRegistry:
+ def _truncate_recursive(self) -> _PropRegistry:
earliest = None
for i, token in enumerate(reversed(self.path[:-1])):
if token is self.prop:
return self.coerce(self.path[0 : -(earliest + 1)]) # type: ignore
@property
- def entity_path(self) -> AbstractEntityRegistry:
+ def entity_path(self) -> _AbstractEntityRegistry:
assert self.entity is not None
return self[self.entity]
def _getitem(
self, entity: Union[int, slice, _InternalEntityType[Any]]
- ) -> Union[AbstractEntityRegistry, _PathElementType, _PathRepresentation]:
+ ) -> Union[_AbstractEntityRegistry, _PathElementType, _PathRepresentation]:
if isinstance(entity, (int, slice)):
return self.path[entity]
else:
- return SlotsEntityRegistry(self, entity)
+ return _SlotsEntityRegistry(self, entity)
if not TYPE_CHECKING:
__getitem__ = _getitem
-class AbstractEntityRegistry(CreatesToken):
+class _AbstractEntityRegistry(_CreatesToken):
__slots__ = (
"key",
"parent",
has_entity = True
is_entity = True
- parent: Union[RootRegistry, PropRegistry]
+ parent: Union[RootRegistry, _PropRegistry]
key: _InternalEntityType[Any]
entity: _InternalEntityType[Any]
is_aliased_class: bool
def __init__(
self,
- parent: Union[RootRegistry, PropRegistry],
+ parent: Union[RootRegistry, _PropRegistry],
entity: _InternalEntityType[Any],
):
self.key = entity
else:
self.natural_path = self.path
- def _truncate_recursive(self) -> AbstractEntityRegistry:
+ def _truncate_recursive(self) -> _AbstractEntityRegistry:
return self.parent._truncate_recursive()[self.entity]
@property
if isinstance(entity, (int, slice)):
return self.path[entity]
elif entity in PathToken._intern:
- return TokenRegistry(self, PathToken._intern[entity])
+ return _TokenRegistry(self, PathToken._intern[entity])
else:
- return PropRegistry(self, entity)
+ return _PropRegistry(self, entity)
if not TYPE_CHECKING:
__getitem__ = _getitem
-class SlotsEntityRegistry(AbstractEntityRegistry):
+class _SlotsEntityRegistry(_AbstractEntityRegistry):
# for aliased class, return lightweight, no-cycles created
# version
inherit_cache = True
class _ERDict(Dict[Any, Any]):
- def __init__(self, registry: CachingEntityRegistry):
+ def __init__(self, registry: _CachingEntityRegistry):
self.registry = registry
- def __missing__(self, key: Any) -> PropRegistry:
- self[key] = item = PropRegistry(self.registry, key)
+ def __missing__(self, key: Any) -> _PropRegistry:
+ self[key] = item = _PropRegistry(self.registry, key)
return item
-class CachingEntityRegistry(AbstractEntityRegistry):
+class _CachingEntityRegistry(_AbstractEntityRegistry):
# for long lived mapper, return dict based caching
# version that creates reference cycles
def __init__(
self,
- parent: Union[RootRegistry, PropRegistry],
+ parent: Union[RootRegistry, _PropRegistry],
entity: _InternalEntityType[Any],
):
super().__init__(parent, entity)
if isinstance(entity, (int, slice)):
return self.path[entity]
elif isinstance(entity, PathToken):
- return TokenRegistry(self, entity)
+ return _TokenRegistry(self, entity)
else:
return self._cache[entity]
def path_is_entity(
path: PathRegistry,
- ) -> TypeGuard[AbstractEntityRegistry]: ...
+ ) -> TypeGuard[_AbstractEntityRegistry]: ...
- def path_is_property(path: PathRegistry) -> TypeGuard[PropRegistry]: ...
+ def path_is_property(path: PathRegistry) -> TypeGuard[_PropRegistry]: ...
else:
path_is_entity = operator.attrgetter("is_entity")
from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
-def save_obj(base_mapper, states, uowtransaction, single=False):
+def _save_obj(base_mapper, states, uowtransaction, single=False):
"""Issue ``INSERT`` and/or ``UPDATE`` statements for a list
of objects.
# if batch=false, call _save_obj separately for each object
if not single and not base_mapper.batch:
for state in _sort_states(base_mapper, states):
- save_obj(base_mapper, [state], uowtransaction, single=True)
+ _save_obj(base_mapper, [state], uowtransaction, single=True)
return
states_to_update = []
)
-def post_update(base_mapper, states, uowtransaction, post_update_cols):
+def _post_update(base_mapper, states, uowtransaction, post_update_cols):
"""Issue UPDATE statements on behalf of a relationship() which
specifies post_update.
)
-def delete_obj(base_mapper, states, uowtransaction):
+def _delete_obj(base_mapper, states, uowtransaction):
"""Issue ``DELETE`` statements for a list of objects.
This is called within the context of a UOWTransaction during a
# occurs after the UPDATE is emitted however we invoke it here
# explicitly in the absence of our invoking an UPDATE
for m, equated_pairs in mapper._table_to_equated[table]:
- sync.populate(
+ sync._populate(
state,
m,
state,
stmt = future.select(mapper).set_label_style(
LABEL_STYLE_TABLENAME_PLUS_COL
)
- loading.load_on_ident(
+ loading._load_on_ident(
uowtransaction.session,
stmt,
state.key,
# TODO: this still goes a little too often. would be nice to
# have definitive list of "columns that changed" here
for m, equated_pairs in mapper._table_to_equated[table]:
- sync.populate(
+ sync._populate(
state,
m,
state,
def _postfetch_bulk_save(mapper, dict_, table):
for m, equated_pairs in mapper._table_to_equated[table]:
- sync.bulk_populate_inherit_keys(dict_, m, equated_pairs)
+ sync._bulk_populate_inherit_keys(dict_, m, equated_pairs)
def _connections_for_states(base_mapper, uowtransaction, states):
strategies = util.preloaded.orm_strategies
return state.InstanceState._instance_level_callable_processor(
self.parent.class_manager,
- strategies.LoadDeferredColumns(self.key),
+ strategies._LoadDeferredColumns(self.key),
self.key,
)
strategies = util.preloaded.orm_strategies
return state.InstanceState._instance_level_callable_processor(
self.parent.class_manager,
- strategies.LoadDeferredColumns(self.key, True),
+ strategies._LoadDeferredColumns(self.key, True),
self.key,
)
if not self.instrument:
return
- attributes.register_descriptor(
+ attributes._register_descriptor(
mapper.class_,
self.key,
comparator=self.comparator_factory(self, mapper),
from .context import _column_descriptions
from .context import _determine_last_joined_entity
from .context import _legacy_filter_by_entity_zero
+from .context import _ORMCompileState
from .context import FromStatement
-from .context import ORMCompileState
from .context import QueryContext
from .interfaces import ORMColumnDescription
from .interfaces import ORMColumnsClauseRole
_memoized_select_entities = ()
_compile_options: Union[Type[CacheableOptions], CacheableOptions] = (
- ORMCompileState.default_compile_options
+ _ORMCompileState.default_compile_options
)
_with_options: Tuple[ExecutableOption, ...]
# we still implement _get_impl() so that baked query can override
# it
- return self._get_impl(ident, loading.load_on_pk_identity)
+ return self._get_impl(ident, loading._load_on_pk_identity)
def _get_impl(
self,
def _compile_state(
self, for_statement: bool = False, **kw: Any
- ) -> ORMCompileState:
+ ) -> _ORMCompileState:
"""Create an out-of-compiler ORMCompileState object.
The ORMCompileState object is normally created directly as a result
# query._statement is not None as we have the ORM Query here
# however this is the more general path.
compile_state_cls = cast(
- ORMCompileState,
- ORMCompileState._get_plugin_class_for_plugin(stmt, "orm"),
+ _ORMCompileState,
+ _ORMCompileState._get_plugin_class_for_plugin(stmt, "orm"),
)
return compile_state_cls.create_for_statement(stmt, None)
"""
- def process_compile_state(self, compile_state: ORMCompileState) -> None:
+ def process_compile_state(self, compile_state: _ORMCompileState) -> None:
pass
from .clsregistry import _class_resolver
from .clsregistry import _ModNS
from .decl_base import _ClassScanMapperConfig
- from .dependency import DependencyProcessor
+ from .dependency import _DependencyProcessor
from .mapper import Mapper
from .query import Query
from .session import Session
from .state import InstanceState
- from .strategies import LazyLoader
+ from .strategies import _LazyLoader
from .util import AliasedClass
from .util import AliasedInsp
from ..sql._typing import _CoreAdapterProto
_overlaps: Sequence[str]
- _lazy_strategy: LazyLoader
+ _lazy_strategy: _LazyLoader
_persistence_only = dict(
passive_deletes=False,
cascade_backrefs=False,
)
- _dependency_processor: Optional[DependencyProcessor] = None
+ _dependency_processor: Optional[_DependencyProcessor] = None
primaryjoin: ColumnElement[bool]
secondaryjoin: Optional[ColumnElement[bool]]
secondary: Optional[FromClause]
- _join_condition: JoinCondition
+ _join_condition: _JoinCondition
order_by: _RelationshipOrderByArg
_user_defined_foreign_keys: Set[ColumnElement[Any]]
)
def instrument_class(self, mapper: Mapper[Any]) -> None:
- attributes.register_descriptor(
+ attributes._register_descriptor(
mapper.class_,
self.key,
comparator=self.comparator_factory(self, mapper),
self._join_condition._warn_for_conflicting_sync_targets()
super().do_init()
self._lazy_strategy = cast(
- "LazyLoader", self._get_strategy((("lazy", "select"),))
+ "_LazyLoader", self._get_strategy((("lazy", "select"),))
)
def _setup_registry_dependencies(self) -> None:
self.target = self.entity.persist_selectable
def _setup_join_conditions(self) -> None:
- self._join_condition = jc = JoinCondition(
+ self._join_condition = jc = _JoinCondition(
parent_persist_selectable=self.parent.persist_selectable,
child_persist_selectable=self.entity.persist_selectable,
parent_local_selectable=self.parent.local_table,
self.uselist = self.direction is not MANYTOONE
if not self.viewonly:
self._dependency_processor = ( # type: ignore
- dependency.DependencyProcessor.from_relationship
+ dependency._DependencyProcessor.from_relationship
)(self)
@util.memoized_property
return element
-class JoinCondition:
+class _JoinCondition:
primaryjoin_initial: Optional[ColumnElement[bool]]
primaryjoin: ColumnElement[bool]
secondaryjoin: Optional[ColumnElement[bool]]
from .base import object_state
from .base import PassiveFlag
from .base import state_str
+from .context import _ORMCompileState
from .context import FromStatement
-from .context import ORMCompileState
from .identity import IdentityMap
from .query import Query
from .state import InstanceState
"""
- _compile_state_cls: Optional[Type[ORMCompileState]]
+ _compile_state_cls: Optional[Type[_ORMCompileState]]
_starting_event_idx: int
_events_todo: List[Any]
_update_execution_options: Optional[_ExecuteOptions]
parameters: Optional[_CoreAnyExecuteParams],
execution_options: _ExecuteOptions,
bind_arguments: _BindArguments,
- compile_state_cls: Optional[Type[ORMCompileState]],
+ compile_state_cls: Optional[Type[_ORMCompileState]],
events_todo: List[_InstanceLevelDispatch[Session]],
):
"""Construct a new :class:`_orm.ORMExecuteState`.
self,
) -> Optional[
Union[
- context.ORMCompileState.default_compile_options,
- Type[context.ORMCompileState.default_compile_options],
+ context._ORMCompileState.default_compile_options,
+ Type[context._ORMCompileState.default_compile_options],
]
]:
if not self.is_select:
return None
if opts is not None and opts.isinstance(
- context.ORMCompileState.default_compile_options
+ context._ORMCompileState.default_compile_options
):
return opts # type: ignore
else:
def update_delete_options(
self,
) -> Union[
- bulk_persistence.BulkUDCompileState.default_update_options,
- Type[bulk_persistence.BulkUDCompileState.default_update_options],
+ bulk_persistence._BulkUDCompileState.default_update_options,
+ Type[bulk_persistence._BulkUDCompileState.default_update_options],
]:
"""Return the update_delete_options that will be used for this
execution."""
"statement so there are no update options."
)
uo: Union[
- bulk_persistence.BulkUDCompileState.default_update_options,
- Type[bulk_persistence.BulkUDCompileState.default_update_options],
+ bulk_persistence._BulkUDCompileState.default_update_options,
+ Type[bulk_persistence._BulkUDCompileState.default_update_options],
] = self.execution_options.get(
"_sa_orm_update_options",
- bulk_persistence.BulkUDCompileState.default_update_options,
+ bulk_persistence._BulkUDCompileState.default_update_options,
)
return uo
raise sa_exc.ArgumentError(
"autocommit=True is no longer supported"
)
- self.identity_map = identity.WeakInstanceDict()
+ self.identity_map = identity._WeakInstanceDict()
if not future:
raise sa_exc.ArgumentError(
)
if TYPE_CHECKING:
assert isinstance(
- compile_state_cls, context.AbstractORMCompileState
+ compile_state_cls, context._AbstractORMCompileState
)
else:
compile_state_cls = None
all_states = self.identity_map.all_states() + list(self._new)
self.identity_map._kill()
- self.identity_map = identity.WeakInstanceDict()
+ self.identity_map = identity._WeakInstanceDict()
self._new = {}
self._deleted = {}
stmt: Select[Unpack[TupleAny]] = sql.select(object_mapper(instance))
if (
- loading.load_on_ident(
+ loading._load_on_ident(
self,
stmt,
state.key,
return self._get_impl(
entity,
ident,
- loading.load_on_pk_identity,
+ loading._load_on_pk_identity,
options=options,
populate_existing=populate_existing,
with_for_update=with_for_update,
from ._typing import _IdentityKeyType
from ._typing import _InstanceDict
from ._typing import _LoaderCallable
- from .attributes import AttributeImpl
+ from .attributes import _AttributeImpl
from .attributes import History
from .base import PassiveFlag
from .collections import _AdaptedCollectionProtocol
def get_history(self, key: str, passive: PassiveFlag) -> History:
return self.manager[key].impl.get_history(self, self.dict, passive)
- def get_impl(self, key: str) -> AttributeImpl:
+ def get_impl(self, key: str) -> _AttributeImpl:
return self.manager[key].impl
def _get_pending_mutation(self, key: str) -> PendingCollection:
def _modified_event(
self,
dict_: _InstanceDict,
- attr: Optional[AttributeImpl],
+ attr: Optional[_AttributeImpl],
previous: Any,
collection: bool = False,
is_userland: bool = False,
del self.callables[key]
def _commit_all(
- self, dict_: _InstanceDict, instance_dict: Optional[IdentityMap] = None
+ self,
+ dict_: _InstanceDict,
+ instance_dict: Optional[IdentityMap] = None,
) -> None:
"""commit all attributes unconditionally.
from .base import PASSIVE_OFF
from .base import PassiveFlag
from .context import _column_descriptions
-from .context import ORMCompileState
-from .context import ORMSelectCompileState
+from .context import _ORMCompileState
+from .context import _ORMSelectCompileState
from .context import QueryContext
from .interfaces import LoaderStrategy
from .interfaces import StrategizedProperty
uselist = useobject and prop.uselist
if useobject and prop.single_parent:
- listen_hooks.append(single_parent_validator)
+ listen_hooks.append(_single_parent_validator)
if prop.key in prop.parent.validators:
fn, opts = prop.parent.validators[prop.key]
)
if useobject:
- listen_hooks.append(unitofwork.track_cascade_events)
+ listen_hooks.append(unitofwork._track_cascade_events)
# need to assemble backref listeners
# after the singleparentvalidator, mapper validator
backref = prop.back_populates
if backref and prop._effective_sync_backref:
listen_hooks.append(
- lambda desc, prop: attributes.backref_listeners(
+ lambda desc, prop: attributes._backref_listeners(
desc, backref, uselist
)
)
if prop is m._props.get(
prop.key
) and not m.class_manager._attr_has_impl(prop.key):
- desc = attributes.register_attribute_impl(
+ desc = attributes._register_attribute_impl(
m.class_,
prop.key,
parent_token=prop,
@properties.ColumnProperty.strategy_for(instrument=False, deferred=False)
-class UninstrumentedColumnLoader(LoaderStrategy):
+class _UninstrumentedColumnLoader(LoaderStrategy):
"""Represent a non-instrumented MapperProperty.
The polymorphic_on argument of mapper() often results in this,
@log.class_logger
@properties.ColumnProperty.strategy_for(instrument=True, deferred=False)
-class ColumnLoader(LoaderStrategy):
+class _ColumnLoader(LoaderStrategy):
"""Provide loading behavior for a :class:`.ColumnProperty`."""
__slots__ = "columns", "is_composite"
@log.class_logger
@properties.ColumnProperty.strategy_for(query_expression=True)
-class ExpressionColumnLoader(ColumnLoader):
+class _ExpressionColumnLoader(_ColumnLoader):
def __init__(self, parent, strategy_key):
super().__init__(parent, strategy_key)
deferred=True, instrument=True, raiseload=True
)
@properties.ColumnProperty.strategy_for(do_nothing=True)
-class DeferredColumnLoader(LoaderStrategy):
+class _DeferredColumnLoader(LoaderStrategy):
"""Provide loading behavior for a deferred :class:`.ColumnProperty`."""
__slots__ = "columns", "group", "raiseload"
p.key
for p in localparent.iterate_properties
if isinstance(p, StrategizedProperty)
- and isinstance(p.strategy, DeferredColumnLoader)
+ and isinstance(p.strategy, _DeferredColumnLoader)
and p.group == self.group
]
else:
if self.raiseload:
self._invoke_raise_load(state, passive, "raise")
- loading.load_scalar_attributes(
+ loading._load_scalar_attributes(
state.mapper, state, set(group), PASSIVE_OFF
)
)
-class LoadDeferredColumns:
+class _LoadDeferredColumns:
"""serializable loader object used by DeferredColumnLoader"""
def __init__(self, key: str, raiseload: bool = False):
return strategy._load_for_state(state, passive)
-class AbstractRelationshipLoader(LoaderStrategy):
+class _AbstractRelationshipLoader(LoaderStrategy):
"""LoaderStratgies which deal with related objects."""
__slots__ = "mapper", "target", "uselist", "entity"
@log.class_logger
@relationships.RelationshipProperty.strategy_for(do_nothing=True)
-class DoNothingLoader(LoaderStrategy):
+class _DoNothingLoader(LoaderStrategy):
"""Relationship loader that makes no change to the object's state.
Compared to NoLoader, this loader does not initialize the
@log.class_logger
@relationships.RelationshipProperty.strategy_for(lazy="noload")
@relationships.RelationshipProperty.strategy_for(lazy=None)
-class NoLoader(AbstractRelationshipLoader):
+class _NoLoader(_AbstractRelationshipLoader):
"""Provide loading behavior for a :class:`.Relationship`
with "lazy=None".
@relationships.RelationshipProperty.strategy_for(lazy="raise")
@relationships.RelationshipProperty.strategy_for(lazy="raise_on_sql")
@relationships.RelationshipProperty.strategy_for(lazy="baked_select")
-class LazyLoader(
- AbstractRelationshipLoader, util.MemoizedSlots, log.Identified
+class _LazyLoader(
+ _AbstractRelationshipLoader, util.MemoizedSlots, log.Identified
):
"""Provide loading behavior for a :class:`.Relationship`
with "lazy=True", that is loads when first accessed.
_raw_columns=[clauseelement],
_propagate_attrs=clauseelement._propagate_attrs,
_label_style=LABEL_STYLE_TABLENAME_PLUS_COL,
- _compile_options=ORMCompileState.default_compile_options,
+ _compile_options=_ORMCompileState.default_compile_options,
)
load_options = QueryContext.default_load_options
if self._raise_on_sql and not passive & PassiveFlag.NO_RAISE:
self._invoke_raise_load(state, passive, "raise_on_sql")
- return loading.load_on_pk_identity(
+ return loading._load_on_pk_identity(
session,
stmt,
primary_key_identity,
if (
rev.direction is interfaces.MANYTOONE
and rev._use_get
- and not isinstance(rev.strategy, LazyLoader)
+ and not isinstance(rev.strategy, _LazyLoader)
):
strategy_options.Load._construct_for_existing_path(
compile_context.compile_options._current_path[
InstanceState._instance_level_callable_processor
)(
mapper.class_manager,
- LoadLazyAttribute(
+ _LoadLazyAttribute(
key,
self,
loadopt,
populators["new"].append((self.key, reset_for_lazy_callable))
-class LoadLazyAttribute:
+class _LoadLazyAttribute:
"""semi-serializable loader object used by LazyLoader
Historically, this object would be carried along with instances that
)
-class PostLoader(AbstractRelationshipLoader):
+class _PostLoader(_AbstractRelationshipLoader):
"""A relationship loader that emits a second SELECT statement."""
__slots__ = ()
}
)
- if loading.PostLoad.path_exists(
+ if loading._PostLoad.path_exists(
context, effective_path, self.parent_property
):
return effective_path, False, execution_options, recursion_depth
@relationships.RelationshipProperty.strategy_for(lazy="immediate")
-class ImmediateLoader(PostLoader):
+class _ImmediateLoader(_PostLoader):
__slots__ = ("join_depth",)
def __init__(self, parent, strategy_key):
else:
flags = attributes.PASSIVE_OFF | PassiveFlag.NO_RAISE
- loading.PostLoad.callable_for_path(
+ loading._PostLoad.callable_for_path(
context,
effective_path,
self.parent,
@log.class_logger
@relationships.RelationshipProperty.strategy_for(lazy="subquery")
-class SubqueryLoader(PostLoader):
+class _SubqueryLoader(_PostLoader):
__slots__ = ("join_depth",)
def __init__(self, parent, strategy_key):
# compiled query but swapping the params, seems only marginally
# less time spent but more complicated
orig_query = context.query._execution_options.get(
- ("orig_query", SubqueryLoader), context.query
+ ("orig_query", _SubqueryLoader), context.query
)
# make a new compile_state for the query that's probably cached, but
# we're sort of undoing a bit of that caching :(
- compile_state_cls = ORMCompileState._get_plugin_class_for_plugin(
+ compile_state_cls = _ORMCompileState._get_plugin_class_for_plugin(
orig_query, "orm"
)
q._execution_options = context.query._execution_options.merge_with(
context.execution_options,
{
- ("orig_query", SubqueryLoader): orig_query,
+ ("orig_query", _SubqueryLoader): orig_query,
("subquery_paths", None): (subq_path, rewritten_path),
},
)
if not run_loader:
return
- if not isinstance(context.compile_state, ORMSelectCompileState):
+ if not isinstance(context.compile_state, _ORMSelectCompileState):
# issue 7505 - subqueryload() in 1.3 and previous would silently
# degrade for from_statement() without warning. this behavior
# is restored here
@log.class_logger
@relationships.RelationshipProperty.strategy_for(lazy="joined")
@relationships.RelationshipProperty.strategy_for(lazy=False)
-class JoinedLoader(AbstractRelationshipLoader):
+class _JoinedLoader(_AbstractRelationshipLoader):
"""Provide loading behavior for a :class:`.Relationship`
using joined eager loading.
@log.class_logger
@relationships.RelationshipProperty.strategy_for(lazy="selectin")
-class SelectInLoader(PostLoader, util.MemoizedSlots):
+class _SelectInLoader(_PostLoader, util.MemoizedSlots):
__slots__ = (
"join_depth",
"omit_join",
else:
effective_entity = self.entity
- loading.PostLoad.callable_for_path(
+ loading._PostLoad.callable_for_path(
context,
selectin_path,
self.parent,
q = Select._create_raw_select(
_raw_columns=[bundle_sql, entity_sql],
_label_style=LABEL_STYLE_TABLENAME_PLUS_COL,
- _compile_options=ORMCompileState.default_compile_options,
+ _compile_options=_ORMCompileState.default_compile_options,
_propagate_attrs={
"compile_state_plugin": "orm",
"plugin_subject": effective_entity,
)
-def single_parent_validator(desc, prop):
+def _single_parent_validator(desc, prop):
def _do_check(state, value, oldvalue, initiator):
if value is not None and initiator.key == prop.key:
hasparent = initiator.hasparent(attributes.instance_state(value))
from .attributes import QueryableAttribute
from .base import InspectionAttr
from .interfaces import LoaderOption
+from .path_registry import _AbstractEntityRegistry
from .path_registry import _DEFAULT_TOKEN
from .path_registry import _StrPathToken
+from .path_registry import _TokenRegistry
from .path_registry import _WILDCARD_TOKEN
-from .path_registry import AbstractEntityRegistry
from .path_registry import path_is_property
from .path_registry import PathRegistry
-from .path_registry import TokenRegistry
from .util import _orm_full_deannotate
from .util import AliasedInsp
from .. import exc as sa_exc
from ._typing import _EntityType
from ._typing import _InternalEntityType
from .context import _MapperEntity
- from .context import ORMCompileState
+ from .context import _ORMCompileState
from .context import QueryContext
from .interfaces import _StrategyKey
from .interfaces import MapperProperty
def process_compile_state_replaced_entities(
self,
- compile_state: ORMCompileState,
+ compile_state: _ORMCompileState,
mapper_entities: Sequence[_MapperEntity],
) -> None:
if not compile_state.compile_options._enable_eagerloads:
not bool(compile_state.current_path),
)
- def process_compile_state(self, compile_state: ORMCompileState) -> None:
+ def process_compile_state(self, compile_state: _ORMCompileState) -> None:
if not compile_state.compile_options._enable_eagerloads:
return
def _process(
self,
- compile_state: ORMCompileState,
+ compile_state: _ORMCompileState,
mapper_entities: Sequence[_MapperEntity],
raiseerr: bool,
) -> None:
@classmethod
def _construct_for_existing_path(
- cls, path: AbstractEntityRegistry
+ cls, path: _AbstractEntityRegistry
) -> Load:
load = cls.__new__(cls)
load.path = path
def _process(
self,
- compile_state: ORMCompileState,
+ compile_state: _ORMCompileState,
mapper_entities: Sequence[_MapperEntity],
raiseerr: bool,
) -> None:
if attr.endswith(_DEFAULT_TOKEN):
attr = f"{attr.split(':')[0]}:{_WILDCARD_TOKEN}"
- effective_path = cast(AbstractEntityRegistry, parent.path).token(attr)
+ effective_path = cast(_AbstractEntityRegistry, parent.path).token(attr)
assert effective_path.is_token
("loader", natural_path)
for natural_path in (
cast(
- TokenRegistry, effective_path
+ _TokenRegistry, effective_path
)._generate_natural_for_superclasses()
)
]
from .base import PassiveFlag
-def populate(
+def _populate(
source,
source_mapper,
dest,
uowcommit.attributes[("pk_cascaded", dest, r)] = True
-def bulk_populate_inherit_keys(source_dict, source_mapper, synchronize_pairs):
+def _bulk_populate_inherit_keys(source_dict, source_mapper, synchronize_pairs):
# a simplified version of populate() used by bulk insert mode
for l, r in synchronize_pairs:
try:
_raise_col_to_prop(True, source_mapper, l, source_mapper, r, err)
-def clear(dest, dest_mapper, synchronize_pairs):
+def _clear(dest, dest_mapper, synchronize_pairs):
for l, r in synchronize_pairs:
if (
r.primary_key
_raise_col_to_prop(True, None, l, dest_mapper, r, err)
-def update(source, source_mapper, dest, old_prefix, synchronize_pairs):
+def _update(source, source_mapper, dest, old_prefix, synchronize_pairs):
for l, r in synchronize_pairs:
try:
oldvalue = source_mapper._get_committed_attr_by_column(
dest[old_prefix + r.key] = oldvalue
-def populate_dict(source, source_mapper, dict_, synchronize_pairs):
+def _populate_dict(source, source_mapper, dict_, synchronize_pairs):
for l, r in synchronize_pairs:
try:
value = source_mapper._get_state_attr_by_column(
dict_[r.key] = value
-def source_modified(uowcommit, source, source_mapper, synchronize_pairs):
+def _source_modified(uowcommit, source, source_mapper, synchronize_pairs):
"""return true if the source object has changes from an old to a
new value on the given synchronize pairs
if TYPE_CHECKING:
- from .dependency import DependencyProcessor
+ from .dependency import _DependencyProcessor
from .interfaces import MapperProperty
from .mapper import Mapper
from .session import Session
from .state import InstanceState
-def track_cascade_events(descriptor, prop):
+def _track_cascade_events(descriptor, prop):
"""Establish event listeners on object attributes which handle
cascade-on-set/append.
class UOWTransaction:
+ """Manages the internal state of a unit of work flush operation."""
+
session: Session
transaction: SessionTransaction
attributes: Dict[str, Any]
- deps: util.defaultdict[Mapper[Any], Set[DependencyProcessor]]
+ deps: util.defaultdict[Mapper[Any], Set[_DependencyProcessor]]
mappers: util.defaultdict[Mapper[Any], Set[InstanceState[Any]]]
def __init__(self, session: Session):
def register_preprocessor(self, processor, fromparent):
key = (processor, fromparent)
if key not in self.presort_actions:
- self.presort_actions[key] = Preprocess(processor, fromparent)
+ self.presort_actions[key] = _Preprocess(processor, fromparent)
def register_object(
self,
cols.update(post_update_cols)
def _per_mapper_flush_actions(self, mapper):
- saves = SaveUpdateAll(self, mapper.base_mapper)
- deletes = DeleteAll(self, mapper.base_mapper)
+ saves = _SaveUpdateAll(self, mapper.base_mapper)
+ deletes = _DeleteAll(self, mapper.base_mapper)
self.dependencies.add((saves, deletes))
for dep in mapper._dependency_processors:
self.session._register_persistent(other)
-class IterateMappersMixin:
+class _IterateMappersMixin:
__slots__ = ()
def _mappers(self, uow):
return self.dependency_processor.mapper.self_and_descendants
-class Preprocess(IterateMappersMixin):
+class _Preprocess(_IterateMappersMixin):
__slots__ = (
"dependency_processor",
"fromparent",
return False
-class PostSortRec:
+class _PostSortRec:
__slots__ = ("disabled",)
def __new__(cls, uow, *args):
self.execute(uow)
-class ProcessAll(IterateMappersMixin, PostSortRec):
+class _ProcessAll(_IterateMappersMixin, _PostSortRec):
__slots__ = "dependency_processor", "isdelete", "fromparent", "sort_key"
def __init__(self, uow, dependency_processor, isdelete, fromparent):
yield state
-class PostUpdateAll(PostSortRec):
+class _PostUpdateAll(_PostSortRec):
__slots__ = "mapper", "isdelete", "sort_key"
def __init__(self, uow, mapper, isdelete):
states, cols = uow.post_update_states[self.mapper]
states = [s for s in states if uow.states[s][0] == self.isdelete]
- persistence.post_update(self.mapper, states, uow, cols)
+ persistence._post_update(self.mapper, states, uow, cols)
-class SaveUpdateAll(PostSortRec):
+class _SaveUpdateAll(_PostSortRec):
__slots__ = ("mapper", "sort_key")
def __init__(self, uow, mapper):
@util.preload_module("sqlalchemy.orm.persistence")
def execute(self, uow):
- util.preloaded.orm_persistence.save_obj(
+ util.preloaded.orm_persistence._save_obj(
self.mapper,
uow.states_for_mapper_hierarchy(self.mapper, False, False),
uow,
uow.states_for_mapper_hierarchy(self.mapper, False, False)
)
base_mapper = self.mapper.base_mapper
- delete_all = DeleteAll(uow, base_mapper)
+ delete_all = _DeleteAll(uow, base_mapper)
for state in states:
# keep saves before deletes -
# this ensures 'row switch' operations work
- action = SaveUpdateState(uow, state)
+ action = _SaveUpdateState(uow, state)
uow.dependencies.add((action, delete_all))
yield action
return "%s(%s)" % (self.__class__.__name__, self.mapper)
-class DeleteAll(PostSortRec):
+class _DeleteAll(_PostSortRec):
__slots__ = ("mapper", "sort_key")
def __init__(self, uow, mapper):
@util.preload_module("sqlalchemy.orm.persistence")
def execute(self, uow):
- util.preloaded.orm_persistence.delete_obj(
+ util.preloaded.orm_persistence._delete_obj(
self.mapper,
uow.states_for_mapper_hierarchy(self.mapper, True, False),
uow,
uow.states_for_mapper_hierarchy(self.mapper, True, False)
)
base_mapper = self.mapper.base_mapper
- save_all = SaveUpdateAll(uow, base_mapper)
+ save_all = _SaveUpdateAll(uow, base_mapper)
for state in states:
# keep saves before deletes -
# this ensures 'row switch' operations work
- action = DeleteState(uow, state)
+ action = _DeleteState(uow, state)
uow.dependencies.add((save_all, action))
yield action
return "%s(%s)" % (self.__class__.__name__, self.mapper)
-class ProcessState(PostSortRec):
+class _ProcessState(_PostSortRec):
__slots__ = "dependency_processor", "isdelete", "state", "sort_key"
def __init__(self, uow, dependency_processor, isdelete, state):
)
-class SaveUpdateState(PostSortRec):
+class _SaveUpdateState(_PostSortRec):
__slots__ = "state", "mapper", "sort_key"
def __init__(self, uow, state):
r for r in recs if r.__class__ is cls_ and r.mapper is mapper
]
recs.difference_update(our_recs)
- persistence.save_obj(
+ persistence._save_obj(
mapper, [self.state] + [r.state for r in our_recs], uow
)
)
-class DeleteState(PostSortRec):
+class _DeleteState(_PostSortRec):
__slots__ = "state", "mapper", "sort_key"
def __init__(self, uow, state):
]
recs.difference_update(our_recs)
states = [self.state] + [r.state for r in our_recs]
- persistence.delete_obj(
+ persistence._delete_obj(
mapper, [s for s in states if uow.states[s][0]], uow
)
from ._typing import _InternalEntityType
from ._typing import _ORMCOLEXPR
from .context import _MapperEntity
- from .context import ORMCompileState
+ from .context import _ORMCompileState
from .mapper import Mapper
- from .path_registry import AbstractEntityRegistry
+ from .path_registry import _AbstractEntityRegistry
from .query import Query
from .relationships import RelationshipProperty
from ..engine import Row
return self.mapper.class_
@property
- def _path_registry(self) -> AbstractEntityRegistry:
+ def _path_registry(self) -> _AbstractEntityRegistry:
if self._use_mapper_path:
return self.mapper._path_registry
else:
else:
stack.extend(subclass.__subclasses__())
- def _should_include(self, compile_state: ORMCompileState) -> bool:
+ def _should_include(self, compile_state: _ORMCompileState) -> bool:
if (
compile_state.select_statement._annotations.get(
"for_loader_criteria", None
def process_compile_state_replaced_entities(
self,
- compile_state: ORMCompileState,
+ compile_state: _ORMCompileState,
mapper_entities: Iterable[_MapperEntity],
) -> None:
self.process_compile_state(compile_state)
- def process_compile_state(self, compile_state: ORMCompileState) -> None:
+ def process_compile_state(self, compile_state: _ORMCompileState) -> None:
"""Apply a modification to a given :class:`.CompileState`."""
# if options to limit the criteria to immediate query only,
def __init__(
self,
- attr: WriteOnlyAttributeImpl,
+ attr: _WriteOnlyAttributeImpl,
state: InstanceState[_T],
passive: PassiveFlag,
apply_to: Optional[WriteOnlyHistory[_T]] = None,
self.deleted_items.add(value)
-class WriteOnlyAttributeImpl(
- attributes.HasCollectionAdapter, attributes.AttributeImpl
+class _WriteOnlyAttributeImpl(
+ attributes._HasCollectionAdapter, attributes._AttributeImpl
):
uses_objects: bool = True
default_accepts_scalar_loader: bool = False
else:
history = self._get_collection_history(state, passive)
data = history.added_plus_unchanged
- return DynamicCollectionAdapter(data) # type: ignore[return-value]
+ return _DynamicCollectionAdapter(data) # type: ignore[return-value]
@util.memoized_property
def _append_token( # type:ignore[override]
@log.class_logger
@relationships.RelationshipProperty.strategy_for(lazy="write_only")
-class WriteOnlyLoader(strategies.AbstractRelationshipLoader, log.Identified):
- impl_class = WriteOnlyAttributeImpl
+class _WriteOnlyLoader(strategies._AbstractRelationshipLoader, log.Identified):
+ impl_class = _WriteOnlyAttributeImpl
def init_class_attribute(self, mapper: Mapper[Any]) -> None:
self.is_class_level = True
)
-class DynamicCollectionAdapter:
+class _DynamicCollectionAdapter:
"""simplified CollectionAdapter for internal API consistency"""
data: Collection[Any]
return True
-class AbstractCollectionWriter(Generic[_T]):
+class _AbstractCollectionWriter(Generic[_T]):
"""Virtual collection which includes append/remove methods that synchronize
into the attribute event system.
instance: _T
_from_obj: Tuple[FromClause, ...]
- def __init__(self, attr: WriteOnlyAttributeImpl, state: InstanceState[_T]):
+ def __init__(
+ self, attr: _WriteOnlyAttributeImpl, state: InstanceState[_T]
+ ):
instance = state.obj()
if TYPE_CHECKING:
assert instance
)
-class WriteOnlyCollection(AbstractCollectionWriter[_T]):
+class WriteOnlyCollection(_AbstractCollectionWriter[_T]):
"""Write-only collection which can synchronize changes into the
attribute event system.
kw.setdefault("comparator", object())
kw.setdefault("parententity", object())
- attributes.register_attribute(class_, key, **kw)
+ attributes._register_attribute(class_, key, **kw)
@decorator
base = registry()
f1 = MockClass(base, "foo.bar.Foo")
f2 = MockClass(base, "foo.bar.Foo")
- clsregistry.add_class("Foo", f1, base._class_registry)
+ clsregistry._add_class("Foo", f1, base._class_registry)
gc_collect()
with expect_warnings(
"same class name and module name as foo.bar.Foo, and "
"will be replaced in the string-lookup table."
):
- clsregistry.add_class(
+ clsregistry._add_class(
"Foo",
f2,
base._class_registry,
base = registry()
f1 = MockClass(base, "foo.bar.Foo")
f2 = MockClass(base, "foo.alt.Foo")
- clsregistry.add_class("Foo", f1, base._class_registry)
- clsregistry.add_class("Foo", f2, base._class_registry)
+ clsregistry._add_class("Foo", f1, base._class_registry)
+ clsregistry._add_class("Foo", f2, base._class_registry)
name_resolver, resolver = clsregistry._resolver(f1, MockProp())
gc_collect()
f1 = MockClass(base, "foo.bar.Foo")
f2 = MockClass(base, "foo.alt.Foo")
f3 = MockClass(base, "bat.alt.Hoho")
- clsregistry.add_class("Foo", f1, base._class_registry)
- clsregistry.add_class("Foo", f2, base._class_registry)
- clsregistry.add_class("HoHo", f3, base._class_registry)
+ clsregistry._add_class("Foo", f1, base._class_registry)
+ clsregistry._add_class("Foo", f2, base._class_registry)
+ clsregistry._add_class("HoHo", f3, base._class_registry)
name_resolver, resolver = clsregistry._resolver(f1, MockProp())
gc_collect()
f1 = MockClass(base, "foo.bar.Foo")
f2 = MockClass(base, "foo.alt.Foo")
f3 = MockClass(base, "bat.alt.Foo")
- clsregistry.add_class("Foo", f1, base._class_registry)
- clsregistry.add_class("Foo", f2, base._class_registry)
- clsregistry.add_class("Foo", f3, base._class_registry)
+ clsregistry._add_class("Foo", f1, base._class_registry)
+ clsregistry._add_class("Foo", f2, base._class_registry)
+ clsregistry._add_class("Foo", f3, base._class_registry)
name_resolver, resolver = clsregistry._resolver(f1, MockProp())
gc_collect()
f1 = MockClass(registry, "existent.Foo")
f2 = MockClass(registry, "existent.existent.Foo")
- clsregistry.add_class("Foo", f1, registry._class_registry)
- clsregistry.add_class("Foo", f2, registry._class_registry)
+ clsregistry._add_class("Foo", f1, registry._class_registry)
+ clsregistry._add_class("Foo", f2, registry._class_registry)
class MyClass(Base):
__tablename__ = "my_table"
base = registry()
f1 = MockClass(base, "foo.bar.Foo")
f2 = MockClass(base, "foo.alt.Foo")
- clsregistry.add_class("Foo", f1, base._class_registry)
- clsregistry.add_class("Foo", f2, base._class_registry)
+ clsregistry._add_class("Foo", f1, base._class_registry)
+ clsregistry._add_class("Foo", f2, base._class_registry)
name_resolver, resolver = clsregistry._resolver(f1, MockProp())
gc_collect()
base = registry()
f1 = MockClass(base, "foo.bar.Foo")
f2 = MockClass(base, "foo.alt.Foo")
- clsregistry.add_class("Foo", f1, base._class_registry)
- clsregistry.add_class("Foo", f2, base._class_registry)
+ clsregistry._add_class("Foo", f1, base._class_registry)
+ clsregistry._add_class("Foo", f2, base._class_registry)
gc_collect()
base = registry()
f1 = MockClass(base, "foo.bar.Foo")
f2 = MockClass(base, "foo.alt.Foo")
- clsregistry.add_class("Foo", f1, base._class_registry)
- clsregistry.add_class("Foo", f2, base._class_registry)
+ clsregistry._add_class("Foo", f1, base._class_registry)
+ clsregistry._add_class("Foo", f2, base._class_registry)
del f2
gc_collect()
for i in range(3):
f1 = MockClass(base, "foo.bar.Foo")
f2 = MockClass(base, "foo.alt.Foo")
- clsregistry.add_class("Foo", f1, base._class_registry)
- clsregistry.add_class("Foo", f2, base._class_registry)
+ clsregistry._add_class("Foo", f1, base._class_registry)
+ clsregistry._add_class("Foo", f2, base._class_registry)
eq_(len(clsregistry._registries), 11)
base = registry()
f1 = MockClass(base, "foo.bar.Foo")
f2 = MockClass(base, "foo.alt.Foo")
- clsregistry.add_class("Foo", f1, base._class_registry)
- clsregistry.add_class("Foo", f2, base._class_registry)
+ clsregistry._add_class("Foo", f1, base._class_registry)
+ clsregistry._add_class("Foo", f2, base._class_registry)
dupe_reg = base._class_registry["Foo"]
dupe_reg.contents = [lambda: None]
base = registry()
f1 = MockClass(base, "foo.bar.Foo")
- clsregistry.add_class("Foo", f1, base._class_registry)
+ clsregistry._add_class("Foo", f1, base._class_registry)
reg = base._class_registry["_sa_module_registry"]
mod_entry = reg["foo"]["bar"]
def test_module_reg_no_class(self):
base = registry()
f1 = MockClass(base, "foo.bar.Foo")
- clsregistry.add_class("Foo", f1, base._class_registry)
+ clsregistry._add_class("Foo", f1, base._class_registry)
reg = base._class_registry["_sa_module_registry"]
mod_entry = reg["foo"]["bar"] # noqa
name_resolver, resolver = clsregistry._resolver(f1, MockProp())
def test_module_reg_cleanout_two_sub(self):
base = registry()
f1 = MockClass(base, "foo.bar.Foo")
- clsregistry.add_class("Foo", f1, base._class_registry)
+ clsregistry._add_class("Foo", f1, base._class_registry)
reg = base._class_registry["_sa_module_registry"]
f2 = MockClass(base, "foo.alt.Bar")
- clsregistry.add_class("Bar", f2, base._class_registry)
+ clsregistry._add_class("Bar", f2, base._class_registry)
assert reg["foo"]["bar"]
del f1
gc_collect()
def test_module_reg_cleanout_sub_to_base(self):
base = registry()
f3 = MockClass(base, "bat.bar.Hoho")
- clsregistry.add_class("Hoho", f3, base._class_registry)
+ clsregistry._add_class("Hoho", f3, base._class_registry)
reg = base._class_registry["_sa_module_registry"]
assert reg["bat"]["bar"]
def test_module_reg_cleanout_cls_to_base(self):
base = registry()
f4 = MockClass(base, "single.Blat")
- clsregistry.add_class("Blat", f4, base._class_registry)
+ clsregistry._add_class("Blat", f4, base._class_registry)
reg = base._class_registry["_sa_module_registry"]
assert reg["single"]
del f4
from sqlalchemy.orm import Session
from sqlalchemy.orm import undefer
from sqlalchemy.orm import WriteOnlyMapped
-from sqlalchemy.orm.attributes import CollectionAttributeImpl
+from sqlalchemy.orm.attributes import _CollectionAttributeImpl
from sqlalchemy.orm.collections import attribute_keyed_dict
from sqlalchemy.orm.collections import KeyFuncDict
-from sqlalchemy.orm.dynamic import DynamicAttributeImpl
+from sqlalchemy.orm.dynamic import _DynamicAttributeImpl
from sqlalchemy.orm.properties import MappedColumn
-from sqlalchemy.orm.writeonly import WriteOnlyAttributeImpl
+from sqlalchemy.orm.writeonly import _WriteOnlyAttributeImpl
from sqlalchemy.schema import CreateTable
from sqlalchemy.sql.base import _NoArg
from sqlalchemy.sql.sqltypes import Enum
Base.registry.dispose()
@testing.combinations(
- (Relationship, CollectionAttributeImpl),
- (Mapped, CollectionAttributeImpl),
- (WriteOnlyMapped, WriteOnlyAttributeImpl),
- (DynamicMapped, DynamicAttributeImpl),
+ (Relationship, _CollectionAttributeImpl),
+ (Mapped, _CollectionAttributeImpl),
+ (WriteOnlyMapped, _WriteOnlyAttributeImpl),
+ (DynamicMapped, _DynamicAttributeImpl),
argnames="mapped_cls,implcls",
)
def test_use_relationship(self, decl_base, mapped_cls, implcls):
from sqlalchemy.orm import Session
from sqlalchemy.orm import undefer
from sqlalchemy.orm import WriteOnlyMapped
-from sqlalchemy.orm.attributes import CollectionAttributeImpl
+from sqlalchemy.orm.attributes import _CollectionAttributeImpl
from sqlalchemy.orm.collections import attribute_keyed_dict
from sqlalchemy.orm.collections import KeyFuncDict
-from sqlalchemy.orm.dynamic import DynamicAttributeImpl
+from sqlalchemy.orm.dynamic import _DynamicAttributeImpl
from sqlalchemy.orm.properties import MappedColumn
-from sqlalchemy.orm.writeonly import WriteOnlyAttributeImpl
+from sqlalchemy.orm.writeonly import _WriteOnlyAttributeImpl
from sqlalchemy.schema import CreateTable
from sqlalchemy.sql.base import _NoArg
from sqlalchemy.sql.sqltypes import Enum
Base.registry.dispose()
@testing.combinations(
- (Relationship, CollectionAttributeImpl),
- (Mapped, CollectionAttributeImpl),
- (WriteOnlyMapped, WriteOnlyAttributeImpl),
- (DynamicMapped, DynamicAttributeImpl),
+ (Relationship, _CollectionAttributeImpl),
+ (Mapped, _CollectionAttributeImpl),
+ (WriteOnlyMapped, _WriteOnlyAttributeImpl),
+ (DynamicMapped, _DynamicAttributeImpl),
argnames="mapped_cls,implcls",
)
def test_use_relationship(self, decl_base, mapped_cls, implcls):
kw.setdefault("comparator", object())
kw.setdefault("parententity", object())
- attributes.register_attribute(class_, key, **kw)
+ attributes._register_attribute(class_, key, **kw)
class AttributeImplAPITest(fixtures.MappedTest):
)
assert attributes.manager_of_class(Foo).is_instrumented("collection")
assert isinstance(Foo().collection, set)
- attributes.unregister_attribute(Foo, "collection")
+ attributes._unregister_attribute(Foo, "collection")
assert not attributes.manager_of_class(Foo).is_instrumented(
"collection"
)
useobject=True,
)
assert isinstance(Foo().collection, MyDict)
- attributes.unregister_attribute(Foo, "collection")
+ attributes._unregister_attribute(Foo, "collection")
class MyColl:
pass
with (
mock.patch(
- "sqlalchemy.orm.context.ORMCompileState."
+ "sqlalchemy.orm.context._ORMCompileState."
"orm_setup_cursor_result"
),
mock.patch(
- "sqlalchemy.orm.context.ORMCompileState.orm_execute_statement"
+ "sqlalchemy.orm.context._ORMCompileState.orm_execute_statement"
),
mock.patch(
"sqlalchemy.orm.bulk_persistence."
- "BulkORMInsert.orm_execute_statement"
+ "_BulkORMInsert.orm_execute_statement"
),
mock.patch(
"sqlalchemy.orm.bulk_persistence."
- "BulkUDCompileState.orm_setup_cursor_result"
+ "_BulkUDCompileState.orm_setup_cursor_result"
),
):
sess.execute(statement)
kw.setdefault("comparator", object())
kw.setdefault("parententity", object())
- return attributes.register_attribute(class_, key, **kw)
+ return attributes._register_attribute(class_, key, **kw)
class Canary:
pass
instrumentation.register_class(Foo)
- attributes.register_attribute(
+ attributes._register_attribute(
Foo,
"attr",
parententity=object(),
u1 = sess.query(User).options(defer(User.name)).first()
assert isinstance(
attributes.instance_state(u1).callables["name"],
- strategies.LoadDeferredColumns,
+ strategies._LoadDeferredColumns,
)
# expire the attr, it gets the InstanceState callable
u1 = sess.query(User).options(lazyload(User.addresses)).first()
assert isinstance(
attributes.instance_state(u1).callables["addresses"],
- strategies.LoadLazyAttribute,
+ strategies._LoadLazyAttribute,
)
# expire, it goes away from callables as of 1.4 and is considered
# to be expired
)
assert isinstance(
attributes.instance_state(u1).callables["addresses"],
- strategies.LoadLazyAttribute,
+ strategies._LoadLazyAttribute,
)
# load the attr, goes away
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship
from sqlalchemy.orm import Session
-from sqlalchemy.orm.context import ORMSelectCompileState
+from sqlalchemy.orm.context import _ORMSelectCompileState
from sqlalchemy.sql import column
from sqlalchemy.sql import table
from sqlalchemy.sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
.order_by(User.id)
)
- compile_state = ORMSelectCompileState.create_for_statement(stmt, None)
+ compile_state = _ORMSelectCompileState.create_for_statement(stmt, None)
is_(compile_state._primary_entity, None)
def test_column_queries_one(self):
pass
manager = instrumentation.register_class(A)
- attributes.register_attribute(
+ attributes._register_attribute(
A,
"x",
comparator=object(),
pass
from sqlalchemy.testing import mock
- from sqlalchemy.orm.attributes import register_attribute_impl
+ from sqlalchemy.orm.attributes import _register_attribute_impl
with mock.patch(
- "sqlalchemy.orm.attributes.register_attribute_impl",
- side_effect=register_attribute_impl,
+ "sqlalchemy.orm.attributes._register_attribute_impl",
+ side_effect=_register_attribute_impl,
) as some_mock:
self.mapper(A, users, properties={"bs": relationship(B)})
self.mapper(B, addresses)
else:
return True
- return relationships.JoinCondition(
+ return relationships._JoinCondition(
self.three_tab_a,
self.three_tab_b,
self.three_tab_a,
)
def _join_fixture_m2m(self, **kw):
- return relationships.JoinCondition(
+ return relationships._JoinCondition(
self.m2mleft,
self.m2mright,
self.m2mleft,
j1 = self._join_fixture_m2m()
return (
j1,
- relationships.JoinCondition(
+ relationships._JoinCondition(
self.m2mright,
self.m2mleft,
self.m2mright,
)
def _join_fixture_o2m(self, **kw):
- return relationships.JoinCondition(
+ return relationships._JoinCondition(
self.left,
self.right,
self.left,
)
def _join_fixture_m2o(self, **kw):
- return relationships.JoinCondition(
+ return relationships._JoinCondition(
self.right,
self.left,
self.right,
)
def _join_fixture_o2m_selfref(self, **kw):
- return relationships.JoinCondition(
+ return relationships._JoinCondition(
self.selfref,
self.selfref,
self.selfref,
)
def _join_fixture_m2o_selfref(self, **kw):
- return relationships.JoinCondition(
+ return relationships._JoinCondition(
self.selfref,
self.selfref,
self.selfref,
)
def _join_fixture_o2m_composite_selfref(self, **kw):
- return relationships.JoinCondition(
+ return relationships._JoinCondition(
self.composite_selfref,
self.composite_selfref,
self.composite_selfref,
)
def _join_fixture_m2o_composite_selfref(self, **kw):
- return relationships.JoinCondition(
+ return relationships._JoinCondition(
self.composite_selfref,
self.composite_selfref,
self.composite_selfref,
)
def _join_fixture_o2m_composite_selfref_func(self, **kw):
- return relationships.JoinCondition(
+ return relationships._JoinCondition(
self.composite_selfref,
self.composite_selfref,
self.composite_selfref,
)
def _join_fixture_o2m_composite_selfref_func_remote_side(self, **kw):
- return relationships.JoinCondition(
+ return relationships._JoinCondition(
self.composite_selfref,
self.composite_selfref,
self.composite_selfref,
)
def _join_fixture_o2m_composite_selfref_func_annotated(self, **kw):
- return relationships.JoinCondition(
+ return relationships._JoinCondition(
self.composite_selfref,
self.composite_selfref,
self.composite_selfref,
)
def _join_fixture_compound_expression_1(self, **kw):
- return relationships.JoinCondition(
+ return relationships._JoinCondition(
self.left,
self.right,
self.left,
)
def _join_fixture_compound_expression_2(self, **kw):
- return relationships.JoinCondition(
+ return relationships._JoinCondition(
self.left,
self.right,
self.left,
)
def _join_fixture_compound_expression_1_non_annotated(self, **kw):
- return relationships.JoinCondition(
+ return relationships._JoinCondition(
self.left,
self.right,
self.left,
right = self.base_w_sub_rel.join(
self.rel_sub, self.base_w_sub_rel.c.id == self.rel_sub.c.id
)
- return relationships.JoinCondition(
+ return relationships._JoinCondition(
self.base_w_sub_rel,
right,
self.base_w_sub_rel,
left = self.base.join(
self.sub_w_base_rel, self.base.c.id == self.sub_w_base_rel.c.id
)
- return relationships.JoinCondition(
+ return relationships._JoinCondition(
left,
self.base,
self.sub_w_base_rel,
right = self.base.join(
self.sub_w_base_rel, self.base.c.id == self.sub_w_base_rel.c.id
)
- return relationships.JoinCondition(
+ return relationships._JoinCondition(
left,
right,
self.sub,
right = self.base.join(
self.sub_w_sub_rel, self.base.c.id == self.sub_w_sub_rel.c.id
)
- return relationships.JoinCondition(
+ return relationships._JoinCondition(
left,
right,
self.sub,
right = self.base.join(
self.right_w_base_rel, self.base.c.id == self.right_w_base_rel.c.id
)
- return relationships.JoinCondition(
+ return relationships._JoinCondition(
self.right_w_base_rel,
right,
self.right_w_base_rel,
right = self.base.join(
self.right_w_base_rel, self.base.c.id == self.right_w_base_rel.c.id
)
- return relationships.JoinCondition(
+ return relationships._JoinCondition(
self.right_w_base_rel,
right,
self.right_w_base_rel,
left = self.base.join(self.sub, self.base.c.id == self.sub.c.id)
# see test_relationships->AmbiguousJoinInterpretedAsSelfRef
- return relationships.JoinCondition(
+ return relationships._JoinCondition(
left,
self.sub,
left,
)
def _join_fixture_o2m_to_annotated_func(self, **kw):
- return relationships.JoinCondition(
+ return relationships._JoinCondition(
self.left,
self.right,
self.left,
)
def _join_fixture_o2m_to_oldstyle_func(self, **kw):
- return relationships.JoinCondition(
+ return relationships._JoinCondition(
self.left,
self.right,
self.left,
)
def _join_fixture_overlapping_composite_fks(self, **kw):
- return relationships.JoinCondition(
+ return relationships._JoinCondition(
self.composite_target,
self.composite_multi_ref,
self.composite_target,
)
def _join_fixture_o2m_o_side_none(self, **kw):
- return relationships.JoinCondition(
+ return relationships._JoinCondition(
self.left,
self.right,
self.left,
)
def _join_fixture_purely_single_o2m(self, **kw):
- return relationships.JoinCondition(
+ return relationships._JoinCondition(
self.purely_single_col,
self.purely_single_col,
self.purely_single_col,
)
def _join_fixture_purely_single_m2o(self, **kw):
- return relationships.JoinCondition(
+ return relationships._JoinCondition(
self.purely_single_col,
self.purely_single_col,
self.purely_single_col,
def fn(a, b):
return (a == b) | (b == a)
- return relationships.JoinCondition(
+ return relationships._JoinCondition(
self.selfref,
self.selfref,
self.selfref,
sub_w_sub_rel__flag = self.base.c.flag._annotate(
{"parentmapper": prop.mapper}
)
- return relationships.JoinCondition(
+ return relationships._JoinCondition(
local_selectable,
remote_selectable,
local_selectable,
"providing a list of those columns which "
"should be counted as containing a foreign "
"key reference to the parent table.",
- relationships.JoinCondition,
+ relationships._JoinCondition,
self.left,
self.right_multi_fk,
self.left,
def test_determine_join_no_fks_o2m(self):
self._assert_raises_no_join(
- relationships.JoinCondition,
+ relationships._JoinCondition,
"Whatever.foo",
None,
self.left,
def test_determine_join_ambiguous_fks_m2m(self):
self._assert_raises_ambig_join(
- relationships.JoinCondition,
+ relationships._JoinCondition,
"Whatever.foo",
self.m2msecondary_ambig_fks,
self.m2mleft,
def test_determine_join_no_fks_m2m(self):
self._assert_raises_no_join(
- relationships.JoinCondition,
+ relationships._JoinCondition,
"Whatever.foo",
self.m2msecondary_no_fks,
self.m2mleft,
)
def _join_fixture_fks_ambig_m2m(self):
- return relationships.JoinCondition(
+ return relationships._JoinCondition(
self.m2mleft,
self.m2mright,
self.m2mleft,
def go():
with mock.patch(
- "sqlalchemy.orm.strategies.SelectInLoader._chunksize", 47
+ "sqlalchemy.orm.strategies._SelectInLoader._chunksize", 47
):
q = session.query(A).options(selectinload(A.bs)).order_by(A.id)
def go():
with mock.patch(
- "sqlalchemy.orm.strategies.SelectInLoader._chunksize", 47
+ "sqlalchemy.orm.strategies._SelectInLoader._chunksize", 47
):
q = session.query(B).options(selectinload(B.a)).order_by(B.id)
)
with mock.patch(
- "sqlalchemy.orm.session.loading.load_on_ident"
+ "sqlalchemy.orm.session.loading._load_on_ident"
) as load_on_ident:
s.refresh(m1, with_for_update={"read": True})
s.refresh(m1, with_for_update=True)
pairs = [(a_mapper.c.id, b_mapper.c.id)]
a1.obj().id = 7
assert "id" not in b1.obj().__dict__
- sync.populate(a1, a_mapper, b1, b_mapper, pairs, uowcommit, False)
+ sync._populate(a1, a_mapper, b1, b_mapper, pairs, uowcommit, False)
eq_(b1.obj().id, 7)
eq_(b1.obj().__dict__["id"], 7)
assert ("pk_cascaded", b1, b_mapper.c.id) not in uowcommit.attributes
pairs = [(a_mapper.c.id, b_mapper.c.id)]
a1.obj().id = 7
assert "id" not in b1.obj().__dict__
- sync.populate(a1, a_mapper, b1, b_mapper, pairs, uowcommit, True)
+ sync._populate(a1, a_mapper, b1, b_mapper, pairs, uowcommit, True)
eq_(b1.obj().id, 7)
eq_(b1.obj().__dict__["id"], 7)
eq_(uowcommit.attributes[("pk_cascaded", b1, b_mapper.c.id)], True)
orm_exc.UnmappedColumnError,
"Can't execute sync rule for source column 't2.id'; "
r"mapper 'Mapper\[A\(t1\)\]' does not map this column.",
- sync.populate,
+ sync._populate,
a1,
a_mapper,
b1,
r"Can't execute sync rule for destination "
r"column 't1.id'; "
r"mapper 'Mapper\[B\(t2\)\]' does not map this column.",
- sync.populate,
+ sync._populate,
a1,
a_mapper,
b1,
pairs = [(a_mapper.c.id, b_mapper.c.t1id)]
b1.obj().t1id = 8
eq_(b1.obj().__dict__["t1id"], 8)
- sync.clear(b1, b_mapper, pairs)
+ sync._clear(b1, b_mapper, pairs)
eq_(b1.obj().__dict__["t1id"], None)
def test_clear_pk(self):
AssertionError,
"Dependency rule on column 't1.id' tried to blank-out primary key "
"column 't2.id' on instance '<B",
- sync.clear,
+ sync._clear,
b1,
b_mapper,
pairs,
"Can't execute sync rule for destination "
r"column 't1.foo'; mapper 'Mapper\[B\(t2\)\]' does not "
"map this column.",
- sync.clear,
+ sync._clear,
b1,
b_mapper,
pairs,
a1.obj().id = 12
pairs = [(a_mapper.c.id, b_mapper.c.id)]
dest = {}
- sync.update(a1, a_mapper, dest, "old_", pairs)
+ sync._update(a1, a_mapper, dest, "old_", pairs)
eq_(dest, {"id": 12, "old_id": 10})
def test_update_unmapped(self):
orm_exc.UnmappedColumnError,
"Can't execute sync rule for source column 't2.id'; "
r"mapper 'Mapper\[A\(t1\)\]' does not map this column.",
- sync.update,
+ sync._update,
a1,
a_mapper,
dest,
a1.obj().id = 10
pairs = [(a_mapper.c.id, b_mapper.c.id)]
dest = {}
- sync.populate_dict(a1, a_mapper, dest, pairs)
+ sync._populate_dict(a1, a_mapper, dest, pairs)
eq_(dest, {"id": 10})
def test_populate_dict_unmapped(self):
orm_exc.UnmappedColumnError,
"Can't execute sync rule for source column 't2.id'; "
r"mapper 'Mapper\[A\(t1\)\]' does not map this column.",
- sync.populate_dict,
+ sync._populate_dict,
a1,
a_mapper,
dest,
uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
a1.obj().id = 10
pairs = [(a_mapper.c.id, b_mapper.c.id)]
- eq_(sync.source_modified(uowcommit, a1, a_mapper, pairs), False)
+ eq_(sync._source_modified(uowcommit, a1, a_mapper, pairs), False)
def test_source_modified_no_pairs(self):
uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
- eq_(sync.source_modified(uowcommit, a1, a_mapper, []), False)
+ eq_(sync._source_modified(uowcommit, a1, a_mapper, []), False)
def test_source_modified_modified(self):
uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
a1._commit_all(a1.dict)
a1.obj().id = 12
pairs = [(a_mapper.c.id, b_mapper.c.id)]
- eq_(sync.source_modified(uowcommit, a1, a_mapper, pairs), True)
+ eq_(sync._source_modified(uowcommit, a1, a_mapper, pairs), True)
def test_source_modified_composite(self):
uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
(a_mapper.c.id, b_mapper.c.id),
(a_mapper.c.foo, b_mapper.c.id),
]
- eq_(sync.source_modified(uowcommit, a1, a_mapper, pairs), True)
+ eq_(sync._source_modified(uowcommit, a1, a_mapper, pairs), True)
def test_source_modified_composite_unmodified(self):
uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
(a_mapper.c.id, b_mapper.c.id),
(a_mapper.c.foo, b_mapper.c.id),
]
- eq_(sync.source_modified(uowcommit, a1, a_mapper, pairs), False)
+ eq_(sync._source_modified(uowcommit, a1, a_mapper, pairs), False)
def test_source_modified_no_unmapped(self):
uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
orm_exc.UnmappedColumnError,
"Can't execute sync rule for source column 't2.id'; "
r"mapper 'Mapper\[A\(t1\)\]' does not map this column.",
- sync.source_modified,
+ sync._source_modified,
uowcommit,
a1,
a_mapper,