from .interfaces import _MappedAttribute
from .interfaces import _MapsColumns
from .interfaces import MapperProperty
-from .mapper import Mapper as mapper
from .mapper import Mapper
from .properties import ColumnProperty
from .properties import MappedColumn
_T = TypeVar("_T", bound=Any)
_MapperKwArgs = Mapping[str, Any]
-
_TableArgsType = Union[Tuple[Any, ...], Dict[str, Any]]
-class _DeclMappedClassProtocol(Protocol[_O]):
- metadata: MetaData
+class MappedClassProtocol(Protocol[_O]):
+ """A protocol representing a SQLAlchemy mapped class.
+
+ The protocol is generic on the type of class, use
+ ``MappedClassProtocol[Any]`` to allow any mapped class.
+ """
+
+ __name__: str
__mapper__: Mapper[_O]
- __table__: Table
+ __table__: FromClause
+
+ def __call__(self, **kw: Any) -> _O:
+ ...
+
+
+class _DeclMappedClassProtocol(MappedClassProtocol[_O], Protocol):
+ "Internal more detailed version of ``MappedClassProtocol``."
+ metadata: MetaData
__tablename__: str
- __mapper_args__: Mapping[str, Any]
+ __mapper_args__: _MapperKwArgs
__table_args__: Optional[_TableArgsType]
_sa_apply_dc_transforms: Optional[_DataclassArguments]
def __declare_first__(self) -> None:
- pass
+ ...
def __declare_last__(self) -> None:
- pass
+ ...
class _DataclassArguments(TypedDict):
mapper_kw: _MapperKwArgs,
) -> Mapper[_O]:
_ImperativeMapperConfig(registry, cls, table, mapper_kw)
- return cast("_DeclMappedClassProtocol[_O]", cls).__mapper__
+ return cast("MappedClassProtocol[_O]", cls).__mapper__
@util.preload_module("sqlalchemy.orm.decl_api")
manager = attributes.opt_manager_of_class(cls)
if manager and manager.class_ is cls_:
raise exc.InvalidRequestError(
- "Class %r already has been " "instrumented declaratively" % cls
+ f"Class {cls!r} already has been instrumented declaratively"
)
if cls_.__dict__.get("__abstract__", False):
self._early_mapping(mapper_kw)
def map(self, mapper_kw: _MapperKwArgs = util.EMPTY_DICT) -> Mapper[Any]:
- mapper_cls = mapper
+ mapper_cls = Mapper
return self.set_cls_attribute(
"__mapper__",
% (cls, inherits_search)
)
inherits = inherits_search[0]
- elif isinstance(inherits, mapper):
+ elif isinstance(inherits, Mapper):
inherits = inherits.class_
self.inherits = inherits
def _setup_declared_events(self) -> None:
if _get_immediate_cls_attr(self.cls, "__declare_last__"):
- @event.listens_for(mapper, "after_configured")
+ @event.listens_for(Mapper, "after_configured")
def after_configured() -> None:
cast(
"_DeclMappedClassProtocol[Any]", self.cls
if _get_immediate_cls_attr(self.cls, "__declare_first__"):
- @event.listens_for(mapper, "before_configured")
+ @event.listens_for(Mapper, "before_configured")
def before_configured() -> None:
cast(
"_DeclMappedClassProtocol[Any]", self.cls
def _setup_table(self, table: Optional[FromClause] = None) -> None:
cls = self.cls
- cls_as_Decl = cast("_DeclMappedClassProtocol[Any]", cls)
+ cls_as_Decl = cast("MappedClassProtocol[Any]", cls)
tablename = self.tablename
table_args = self.table_args
self.local_table = table
def _metadata_for_cls(self, manager: ClassManager[Any]) -> MetaData:
- if hasattr(self.cls, "metadata"):
- return cast("_DeclMappedClassProtocol[Any]", self.cls).metadata
+ meta: Optional[MetaData] = getattr(self.cls, "metadata", None)
+ if meta is not None:
+ return meta
else:
return manager.registry.metadata
% (cls, inherits_search)
)
inherits = inherits_search[0]
- elif isinstance(inherits, mapper):
+ elif isinstance(inherits, Mapper):
inherits = inherits.class_
self.inherits = inherits
if "inherits" in mapper_args:
inherits_arg = mapper_args["inherits"]
- if isinstance(inherits_arg, mapper):
+ if isinstance(inherits_arg, Mapper):
inherits_arg = inherits_arg.class_
if inherits_arg is not self.inherits:
),
)
else:
- mapper_cls = mapper
+ mapper_cls = Mapper
return self.set_cls_attribute(
"__mapper__",
"""
if "__mapper__" in cls.__dict__:
- mapped_cls = cast("_DeclMappedClassProtocol[Any]", cls)
+ mapped_cls = cast("MappedClassProtocol[Any]", cls)
+
+ def _table_or_raise(mc: MappedClassProtocol[Any]) -> Table:
+ if isinstance(mc.__table__, Table):
+ return mc.__table__
+ raise exc.InvalidRequestError(
+ f"Cannot add a new attribute to mapped class {mc.__name__!r} "
+ "because it's not mapped against a table."
+ )
+
if isinstance(value, Column):
_undefer_column_name(key, value)
- # TODO: raise for this is not a Table
- mapped_cls.__table__.append_column(value, replace_existing=True)
+ _table_or_raise(mapped_cls).append_column(
+ value, replace_existing=True
+ )
mapped_cls.__mapper__.add_property(key, value)
elif isinstance(value, _MapsColumns):
mp = value.mapper_property_to_assign
for col in value.columns_to_assign:
_undefer_column_name(key, col)
- # TODO: raise for this is not a Table
- mapped_cls.__table__.append_column(col, replace_existing=True)
+ _table_or_raise(mapped_cls).append_column(
+ col, replace_existing=True
+ )
if not mp:
mapped_cls.__mapper__.add_property(key, col)
if mp:
def _del_attribute(cls: Type[Any], key: str) -> None:
-
if (
"__mapper__" in cls.__dict__
and key in cls.__dict__
and not cast(
- "_DeclMappedClassProtocol[Any]", cls
+ "MappedClassProtocol[Any]", cls
).__mapper__._dispose_called
):
value = cls.__dict__[key]
else:
type.__delattr__(cls, key)
cast(
- "_DeclMappedClassProtocol[Any]", cls
+ "MappedClassProtocol[Any]", cls
).__mapper__._expire_memoizations()
else:
type.__delattr__(cls, key)