:tags: change, orm
:tickets: 4412
- Added a new function :func:`.close_all_sessions` which takes
+ Added a new function :func:`_orm.close_all_sessions` which takes
over the task of the :meth:`.Session.close_all` method, which
is now deprecated as this is confusing as a classmethod.
Pull request courtesy Augustin Trancart.
Status: sqlalchemy.Enum(Status, length=50, native_enum=False)
}
+By default :class:`_sqltypes.Enum` that are automatically generated are not
+associated with the :class:`_sql.MetaData` instance used by the ``Base``, so if
+the metadata defines a schema it will not be automatically associated with the
+enum. To automatically associate the enum with the schema in the metadata or
+table they belong to the :paramref:`_sqltypes.Enum.inherit_schema` can be set::
+
+ from enum import Enum
+ import sqlalchemy as sa
+ from sqlalchemy.orm import DeclarativeBase
+
+
+ class Base(DeclarativeBase):
+ metadata = sa.MetaData(schema="my_schema")
+ type_annotation_map = {Enum: sa.Enum(Enum, inherit_schema=True)}
+
Linking Specific ``enum.Enum`` or ``typing.Literal`` to other datatypes
-++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
The above examples feature the use of an :class:`_sqltypes.Enum` that is
automatically configuring itself to the arguments / attributes present on
from .pool import PoolProxiedConnection as PoolProxiedConnection
from .pool import PoolResetState as PoolResetState
from .pool import QueuePool as QueuePool
-from .pool import SingletonThreadPool as SingleonThreadPool
+from .pool import SingletonThreadPool as SingletonThreadPool
from .pool import StaticPool as StaticPool
from .schema import BaseDDLElement as BaseDDLElement
from .schema import BLANK_SCHEMA as BLANK_SCHEMA
def __go(lcls: Any) -> None:
- from . import util as _sa_util
-
- _sa_util.preloaded.import_prefix("sqlalchemy")
+ _util.preloaded.import_prefix("sqlalchemy")
from . import exc
__go(locals())
+
+
+def __getattr__(name: str) -> Any:
+ if name == "SingleonThreadPool":
+ _util.warn_deprecated(
+ "SingleonThreadPool was a typo in the v2 series. "
+ "Please use the correct SingletonThreadPool name.",
+ "2.0.24",
+ )
+ return SingletonThreadPool
+ raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
):
util.warn(
"Loader depth for query is excessively deep; caching will "
- "be disabled for additional loaders. Consider using the "
- "recursion_depth feature for deeply nested recursive eager "
- "loaders. Use the compiled_cache=None execution option to "
+ "be disabled for additional loaders. For recursive eager "
+ "loaders consider using the recursion_depth feature. "
+ "Use the compiled_cache=None execution option to "
"skip this warning."
)
execution_options = execution_options.union(
from itertools import zip_longest
import typing
from typing import Any
+from typing import Callable
from typing import Dict
from typing import Iterable
from typing import Iterator
class _CacheKeyTraversalDispatchType(Protocol):
def __call__(
s, self: HasCacheKey, visitor: _CacheKeyTraversal
- ) -> CacheKey:
+ ) -> _CacheKeyTraversalDispatchTypeReturn:
...
ANON_NAME,
) = tuple(CacheTraverseTarget)
+_CacheKeyTraversalDispatchTypeReturn = Sequence[
+ Tuple[
+ str,
+ Any,
+ Union[
+ Callable[..., Tuple[Any, ...]],
+ CacheTraverseTarget,
+ InternalTraversal,
+ ],
+ ]
+]
+
class HasCacheKey:
"""Mixin for objects which can produce a cache key.
),
)
else:
- result += meth(
+ result += meth( # type: ignore
attrname, obj, self, anon_map, bindparams
)
return result
return operators.getitem, index, return_type
def contains(self, *arg, **kw):
+ """``ARRAY.contains()`` not implemented for the base ARRAY type.
+ Use the dialect-specific ARRAY type.
+
+ .. seealso::
+
+ :class:`_postgresql.ARRAY` - PostgreSQL specific version.
+ """
raise NotImplementedError(
"ARRAY.contains() not implemented for the base "
"ARRAY type; please use the dialect-specific ARRAY type"