--- /dev/null
+.. change::
+ :tags: change, engine
+ :tickets: 4643
+
+ Remove deprecated method ``get_primary_keys` in the :class:`.Dialect` and
+ :class:`.Inspector` classes. Please refer to the
+ :meth:`.Dialect.get_pk_constraint` and :meth:`.Inspector.get_primary_keys`
+ methods.
+
+ Remove deprecated event ``dbapi_error`` and the method
+ ``ConnectionEvents.dbapi_error`. Please refer to the
+ :meth:`.ConnectionEvents.handle_error` event.
+ This chance also removes the attributes ``ExecutionContext.is_disconnect``
+ and ``ExecutionContext.exception``
+
+.. change::
+ :tags: change, postgresql
+ :tickets: 4643
+
+ Remove support for deprecated engine URLs of the form ``postgres://``;
+ this has emitted a warning for many years and projects should be
+ using ``postgresql://``.
+
+.. change::
+ :tags: change, mysql
+ :tickets: 4643
+
+ Remove deprecated dialect ``mysql+gaerdbms`` that has beed deprecated
+ since version 1.0. Use the MySQLdb dialect directly.
+
+ Remove deprecated parameter ``quoting`` from :class:`.mysql.ENUM`
+ and :class:`.mysql.SET` in the ``mysql`` dialect. The values passed to the
+ enum or the set are quoted by SQLAlchemy when needed automatically.
+
+.. change::
+ :tags: change, orm
+ :tickets: 4643
+
+ Remove deprecated function ``comparable_property``. Please refer to the
+ :mod:`~sqlalchemy.ext.hybrid` extension. This also removes the function
+ ``comparable_using`` in the declarative extension.
+
+ Remove deprecated function ``compile_mappers``. Please use
+ :func:`.configure_mappers`
+
+ Remove deprecated method ``collection.linker``. Please refer to the
+ :meth:`.AttributeEvents.init_collection` and
+ :meth:`.AttributeEvents.dispose_collection` event handlers.
+
+ Remove deprecated method ``Session.prune`` and parameter
+ ``Session.weak_identity_map``. See the recipe at
+ :ref:`session_referencing_behavior` for an event-based approach to
+ maintaining strong identity references.
+ This change also removes the class ``StrongInstanceDict``.
+
+ Remove deprecated parameter ``mapper.order_by``. Use :meth:`.Query.order_by`
+ to determine the ordering of a result set.
+
+ Remove deprecated parameter ``Session._enable_transaction_accounting`.
+
+ Remove deprecated parameter ``Session.is_modified.passive``.
+
+.. change::
+ :tags: change, types
+ :tickets: 4643
+
+ Remove deprecated class ``Binary``. Please use :class:`.LargeBinary`.
+
+.. change::
+ :tags: change, sql, core
+ :tickets: 4643
+
+ Remove deprecated methods ``Compiled.compile``, ``ClauseElement.__and__`` and
+ ``ClauseElement.__or__`` and attribute ``Over.func``.
+
+ Remove deprecated ``FromClause.count`` method. Please use the
+ :class:`.functions.count` function available from the
+ :attr:`.func` namespace.
+
+.. change::
+ :tags: change, sql
+ :tickets: 4643
+
+ Remove deprecated parameters ``text.bindparams`` and ``text.typemap``.
+ Please refer to the :meth:`.TextClause.bindparams` and
+ :meth:`.TextClause.columns` methods.
+
+ Remove deprecated parameter ``Table.useexisting``. Please use
+ :paramref:`.Table.extend_existing`.
.. automodule:: sqlalchemy.dialects.mysql.oursql
-Google App Engine
------------------
-
-.. automodule:: sqlalchemy.dialects.mysql.gaerdbms
-
pyodbc
------
.. autofunction:: synonym_for
-.. autofunction:: comparable_using
-
.. autofunction:: instrument_declarative
.. autoclass:: AbstractConcreteBase
from .types import BIGINT # noqa
from .types import BigInteger # noqa
from .types import BINARY # noqa
-from .types import Binary # noqa
from .types import BLOB # noqa
from .types import BOOLEAN # noqa
from .types import Boolean # noqa
from .. import util
-_translates = {"postgres": "postgresql"}
-
-
def _auto_fn(name):
"""default dialect importer.
dialect = name
driver = "base"
- if dialect in _translates:
- translated = _translates[dialect]
- util.warn_deprecated(
- "The '%s' dialect name has been "
- "renamed to '%s'" % (dialect, translated)
- )
- dialect = translated
try:
module = __import__("sqlalchemy.dialects.%s" % (dialect,)).dialects
except ImportError:
"will correspond to an actual SQL Server "
"CREATE SEQUENCE in "
"a future release. Please use the mssql_identity_start "
- "and mssql_identity_increment parameters."
+ "and mssql_identity_increment parameters.",
+ version="1.3",
)
if column.default.start == 0:
start = 0
"deprecated and will be removed in a future release. This "
"flag has no effect on the behavior of the "
"IdentifierPreparer.quote method; please refer to "
- "quoted_name()."
+ "quoted_name().",
+ version="1.3",
)
dbname, owner = _schema_elements(schema)
from . import base # noqa
from . import cymysql # noqa
-from . import gaerdbms # noqa
from . import mysqlconnector # noqa
from . import mysqldb # noqa
from . import oursql # noqa
)
def visit_ENUM(self, type_, **kw):
- return self._visit_enumerated_values(
- "ENUM", type_, type_._enumerated_values
- )
+ return self._visit_enumerated_values("ENUM", type_, type_.enums)
def visit_SET(self, type_, **kw):
- return self._visit_enumerated_values(
- "SET", type_, type_._enumerated_values
- )
+ return self._visit_enumerated_values("SET", type_, type_.values)
def visit_BOOLEAN(self, type_, **kw):
return "BOOL"
from ... import sql
from ... import util
from ...sql import sqltypes
+from ...sql.base import NO_ARG
-class _EnumeratedValues(_StringType):
- def _init_values(self, values, kw):
- self.quoting = kw.pop("quoting", "auto")
-
- if self.quoting == "auto" and len(values):
- # What quoting character are we using?
- q = None
- for e in values:
- if len(e) == 0:
- self.quoting = "unquoted"
- break
- elif q is None:
- q = e[0]
-
- if len(e) == 1 or e[0] != q or e[-1] != q:
- self.quoting = "unquoted"
- break
- else:
- self.quoting = "quoted"
-
- if self.quoting == "quoted":
- util.warn_deprecated(
- "Manually quoting %s value literals is deprecated. Supply "
- "unquoted values and use the quoting= option in cases of "
- "ambiguity." % self.__class__.__name__
- )
-
- values = self._strip_values(values)
-
- self._enumerated_values = values
- length = max([len(v) for v in values] + [0])
- return values, length
-
- @classmethod
- def _strip_values(cls, values):
- strip_values = []
- for a in values:
- if a[0:1] == '"' or a[0:1] == "'":
- # strip enclosing quotes and unquote interior
- a = a[1:-1].replace(a[0] * 2, a[0])
- strip_values.append(a)
- return strip_values
-
-
-class ENUM(sqltypes.NativeForEmulated, sqltypes.Enum, _EnumeratedValues):
+class ENUM(sqltypes.NativeForEmulated, sqltypes.Enum, _StringType):
"""MySQL ENUM type."""
__visit_name__ = "ENUM"
Column('myenum', ENUM("foo", "bar", "baz"))
- :param enums: The range of valid values for this ENUM. Values will be
- quoted when generating the schema according to the quoting flag (see
- below). This object may also be a PEP-435-compliant enumerated
- type.
+ :param enums: The range of valid values for this ENUM. Values in
+ enums are not quoted, they will be escaped and surrounded by single
+ quotes when generating the schema. This object may also be a
+ PEP-435-compliant enumerated type.
.. versionadded: 1.1 added support for PEP-435-compliant enumerated
types.
BINARY in schema. This does not affect the type of data stored,
only the collation of character data.
- :param quoting: Defaults to 'auto': automatically determine enum value
- quoting. If all enum values are surrounded by the same quoting
- character, then use 'quoted' mode. Otherwise, use 'unquoted' mode.
-
- 'quoted': values in enums are already quoted, they will be used
- directly when generating the schema - this usage is deprecated.
-
- 'unquoted': values in enums are not quoted, they will be escaped and
- surrounded by single quotes when generating the schema.
-
- Previous versions of this type always required manually quoted
- values to be supplied; future versions will always quote the string
- literals for you. This is a transitional option.
+ :param quoting: Not used. A warning will be raised if provided.
"""
-
+ if kw.pop("quoting", NO_ARG) is not NO_ARG:
+ util.warn_deprecated_20(
+ "The 'quoting' parameter to :class:`.mysql.ENUM` is deprecated"
+ " and will be removed in a future release. "
+ "This parameter now has no effect."
+ )
kw.pop("strict", None)
self._enum_init(enums, kw)
_StringType.__init__(self, length=self.length, **kw)
kw.setdefault("values_callable", impl.values_callable)
return cls(**kw)
- def _setup_for_values(self, values, objects, kw):
- values, length = self._init_values(values, kw)
- return super(ENUM, self)._setup_for_values(values, objects, kw)
-
def _object_value_for_elem(self, elem):
# mysql sends back a blank string for any value that
# was persisted that was not in the enums; that is, it does no
)
-class SET(_EnumeratedValues):
+class SET(_StringType):
"""MySQL SET type."""
__visit_name__ = "SET"
set will be used to generate DDL for a table, or if the
:paramref:`.SET.retrieve_as_bitwise` flag is set to True.
- :param values: The range of valid values for this SET.
+ :param values: The range of valid values for this SET. The values
+ are not quoted, they will be escaped and surrounded by single
+ quotes when generating the schema.
:param convert_unicode: Same flag as that of
:paramref:`.String.convert_unicode`.
:param binary: same as that of :paramref:`.VARCHAR.binary`.
- :param quoting: Defaults to 'auto': automatically determine set value
- quoting. If all values are surrounded by the same quoting
- character, then use 'quoted' mode. Otherwise, use 'unquoted' mode.
-
- 'quoted': values in enums are already quoted, they will be used
- directly when generating the schema - this usage is deprecated.
-
- 'unquoted': values in enums are not quoted, they will be escaped and
- surrounded by single quotes when generating the schema.
-
- Previous versions of this type always required manually quoted
- values to be supplied; future versions will always quote the string
- literals for you. This is a transitional option.
-
- .. versionadded:: 0.9.0
-
:param retrieve_as_bitwise: if True, the data for the set type will be
persisted and selected using an integer value, where a set is coerced
into a bitwise mask for persistence. MySQL allows this mode which
.. versionadded:: 1.0.0
+ :param quoting: Not used. A warning will be raised if passed.
"""
+ if kw.pop("quoting", NO_ARG) is not NO_ARG:
+ util.warn_deprecated_20(
+ "The 'quoting' parameter to :class:`.mysql.SET` is deprecated"
+ " and will be removed in a future release. "
+ "This parameter now has no effect."
+ )
self.retrieve_as_bitwise = kw.pop("retrieve_as_bitwise", False)
- values, length = self._init_values(values, kw)
self.values = tuple(values)
if not self.retrieve_as_bitwise and "" in values:
raise exc.ArgumentError(
self._bitmap.update(
(2 ** idx, value) for idx, value in enumerate(self.values)
)
+ length = max([len(v) for v in values] + [0])
kw.setdefault("length", length)
super(SET, self).__init__(**kw)
+++ /dev/null
-# mysql/gaerdbms.py
-# Copyright (C) 2005-2020 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: http://www.opensource.org/licenses/mit-license.php
-r"""
-.. dialect:: mysql+gaerdbms
- :name: Google Cloud SQL
- :dbapi: rdbms
- :connectstring: mysql+gaerdbms:///<dbname>?instance=<instancename>
- :url: https://developers.google.com/appengine/docs/python/cloud-sql/developers-guide
-
- This dialect is based primarily on the :mod:`.mysql.mysqldb` dialect with
- minimal changes.
-
- .. deprecated:: 1.0 This dialect is **no longer necessary** for
- Google Cloud SQL; the MySQLdb dialect can be used directly.
- Cloud SQL now recommends creating connections via the
- mysql dialect using the URL format
-
- ``mysql+mysqldb://root@/<dbname>?unix_socket=/cloudsql/<projectid>:<instancename>``
-
-
-Pooling
--------
-
-Google App Engine connections appear to be randomly recycled,
-so the dialect does not pool connections. The :class:`.NullPool`
-implementation is installed within the :class:`.Engine` by
-default.
-
-""" # noqa
-
-import os
-import re
-
-from sqlalchemy.util import warn_deprecated
-from .mysqldb import MySQLDialect_mysqldb
-from ...pool import NullPool
-
-
-def _is_dev_environment():
- return os.environ.get("SERVER_SOFTWARE", "").startswith("Development/")
-
-
-class MySQLDialect_gaerdbms(MySQLDialect_mysqldb):
- @classmethod
- def dbapi(cls):
-
- warn_deprecated(
- "Google Cloud SQL now recommends creating connections via the "
- "MySQLdb dialect directly, using the URL format "
- "mysql+mysqldb://root@/<dbname>?unix_socket=/cloudsql/"
- "<projectid>:<instancename>"
- )
-
- # from django:
- # http://code.google.com/p/googleappengine/source/
- # browse/trunk/python/google/storage/speckle/
- # python/django/backend/base.py#118
- # see also [ticket:2649]
- # see also http://stackoverflow.com/q/14224679/34549
- from google.appengine.api import apiproxy_stub_map
-
- if _is_dev_environment():
- from google.appengine.api import rdbms_mysqldb
-
- return rdbms_mysqldb
- elif apiproxy_stub_map.apiproxy.GetStub("rdbms"):
- from google.storage.speckle.python.api import rdbms_apiproxy
-
- return rdbms_apiproxy
- else:
- from google.storage.speckle.python.api import rdbms_googleapi
-
- return rdbms_googleapi
-
- @classmethod
- def get_pool_class(cls, url):
- # Cloud SQL connections die at any moment
- return NullPool
-
- def create_connect_args(self, url):
- opts = url.translate_connect_args()
- if not _is_dev_environment():
- # 'dsn' and 'instance' are because we are skipping
- # the traditional google.api.rdbms wrapper
- opts["dsn"] = ""
- opts["instance"] = url.query["instance"]
- return [], opts
-
- def _extract_error_code(self, exception):
- match = re.compile(r"^(\d+)L?:|^\((\d+)L?,").match(str(exception))
- # The rdbms api will wrap then re-raise some types of errors
- # making this regex return no matches.
- code = match.group(1) or match.group(2) if match else None
- if code:
- return int(code)
-
-
-dialect = MySQLDialect_gaerdbms
import re
-from .enumerated import _EnumeratedValues
+from .enumerated import ENUM
from .enumerated import SET
from .types import DATETIME
from .types import TIME
for kw in ("charset", "collate"):
if spec.get(kw, False):
type_kw[kw] = spec[kw]
- if issubclass(col_type, _EnumeratedValues):
- type_args = _EnumeratedValues._strip_values(type_args)
+ if issubclass(col_type, (ENUM, SET)):
+ type_args = _strip_values(type_args)
if issubclass(col_type, SET) and "" in type_args:
type_kw["retrieve_as_bitwise"] = True
"""Compile a string to regex, I and UNICODE."""
return re.compile(regex, re.I | re.UNICODE)
+
+
+def _strip_values(values):
+ "Strip reflected values quotes"
+ strip_values = []
+ for a in values:
+ if a[0:1] == '"' or a[0:1] == "'":
+ # strip enclosing quotes and unquote interior
+ a = a[1:-1].replace(a[0] * 2, a[0])
+ strip_values.append(a)
+ return strip_values
def create_connect_args(self, url):
opts = dict(url.query)
- # deprecated in 1.3
for opt in ("use_ansi", "auto_convert_lobs"):
if opt in opts:
util.warn_deprecated(
"cx_oracle dialect option %r should only be passed to "
- "create_engine directly, not within the URL string" % opt
+ "create_engine directly, not within the URL string" % opt,
+ version="1.3",
)
util.coerce_kw_type(opts, opt, bool)
setattr(self, opt, opts.pop(opt))
):
exc_info = sys.exc_info()
- if context and context.exception is None:
- context.exception = e
-
is_exit_exception = not isinstance(e, Exception)
if not self._is_disconnect:
)
) or (is_exit_exception and not self.closed)
- if context:
- context.is_disconnect = self._is_disconnect
-
invalidate_pool_on_disconnect = not is_exit_exception
if self._reentrant_error:
) and not self._execution_options.get(
"skip_user_error_events", False
):
- # legacy dbapi_error event
- if should_wrap and context:
- self.dispatch.dbapi_error(
- self, cursor, statement, parameters, context, e
- )
-
- # new handle_error event
ctx = ExceptionContextImpl(
e,
sqlalchemy_exception,
"""
return sqltypes.adapt_type(typeobj, self.colspecs)
- def get_pk_constraint(self, conn, table_name, schema=None, **kw):
- """Compatibility method, adapts the result of get_primary_keys()
- for those dialects which don't implement get_pk_constraint().
-
- """
- return {
- "constrained_columns": self.get_primary_keys(
- conn, table_name, schema=schema, **kw
- )
- }
-
def has_index(self, connection, table_name, index_name, schema=None):
if not self.has_table(connection, table_name, schema=schema):
return False
from .interfaces import Dialect
from .. import event
from .. import exc
-from .. import util
class ConnectionEvents(event.Events):
log.info("Received statement: %s", statement)
When the methods are called with a `statement` parameter, such as in
- :meth:`.after_cursor_execute`, :meth:`.before_cursor_execute` and
- :meth:`.dbapi_error`, the statement is the exact SQL string that was
- prepared for transmission to the DBAPI ``cursor`` in the connection's
- :class:`.Dialect`.
+ :meth:`.after_cursor_execute` or :meth:`.before_cursor_execute`,
+ the statement is the exact SQL string that was prepared for transmission
+ to the DBAPI ``cursor`` in the connection's :class:`.Dialect`.
The :meth:`.before_execute` and :meth:`.before_cursor_execute`
events can also be established with the ``retval=True`` flag, which
"""
- @util.deprecated(
- "0.9",
- "The :meth:`.ConnectionEvents.dbapi_error` "
- "event is deprecated and will be removed in a future release. "
- "Please refer to the :meth:`.ConnectionEvents.handle_error` "
- "event.",
- )
- def dbapi_error(
- self, conn, cursor, statement, parameters, context, exception
- ):
- """Intercept a raw DBAPI error.
-
- This event is called with the DBAPI exception instance
- received from the DBAPI itself, *before* SQLAlchemy wraps the
- exception with it's own exception wrappers, and before any
- other operations are performed on the DBAPI cursor; the
- existing transaction remains in effect as well as any state
- on the cursor.
-
- The use case here is to inject low-level exception handling
- into an :class:`.Engine`, typically for logging and
- debugging purposes.
-
- .. warning::
-
- Code should **not** modify
- any state or throw any exceptions here as this will
- interfere with SQLAlchemy's cleanup and error handling
- routines. For exception modification, please refer to the
- new :meth:`.ConnectionEvents.handle_error` event.
-
- Subsequent to this hook, SQLAlchemy may attempt any
- number of operations on the connection/cursor, including
- closing the cursor, rolling back of the transaction in the
- case of connectionless execution, and disposing of the entire
- connection pool if a "disconnect" was detected. The
- exception is then wrapped in a SQLAlchemy DBAPI exception
- wrapper and re-thrown.
-
- :param conn: :class:`.Connection` object
- :param cursor: DBAPI cursor object
- :param statement: string SQL statement, as passed to the DBAPI
- :param parameters: Dictionary, tuple, or list of parameters being
- passed to the ``execute()`` or ``executemany()`` method of the
- DBAPI ``cursor``. In some cases may be ``None``.
- :param context: :class:`.ExecutionContext` object in use. May
- be ``None``.
- :param exception: The **unwrapped** exception emitted directly from the
- DBAPI. The class here is specific to the DBAPI module in use.
-
- """
-
def handle_error(self, exception_context):
r"""Intercept all exceptions processed by the :class:`.Connection`.
raise NotImplementedError()
- @util.deprecated(
- "0.8",
- "The :meth:`.Dialect.get_primary_keys` method is deprecated and "
- "will be removed in a future release. Please refer to the "
- ":meth:`.Dialect.get_pk_constraint` method. ",
- )
- def get_primary_keys(self, connection, table_name, schema=None, **kw):
- """Return information about primary keys in `table_name`.
-
- """
-
- raise NotImplementedError()
-
def get_pk_constraint(self, connection, table_name, schema=None, **kw):
"""Return information about the primary key constraint on
table_name`.
and updates.
"""
- exception = None
- """A DBAPI-level exception that was caught when this ExecutionContext
- attempted to execute a statement.
-
- This attribute is meaningful only within the
- :meth:`.ConnectionEvents.dbapi_error` event.
-
- .. versionadded:: 0.9.7
-
- .. seealso::
-
- :attr:`.ExecutionContext.is_disconnect`
-
- :meth:`.ConnectionEvents.dbapi_error`
-
- """
-
- is_disconnect = None
- """Boolean flag set to True or False when a DBAPI-level exception
- is caught when this ExecutionContext attempted to execute a statement.
-
- This attribute is meaningful only within the
- :meth:`.ConnectionEvents.dbapi_error` event.
-
- .. versionadded:: 0.9.7
-
- .. seealso::
-
- :attr:`.ExecutionContext.exception`
-
- :meth:`.ConnectionEvents.dbapi_error`
-
- """
-
def create_cursor(self):
"""Return a new cursor generated from this ExecutionContext's
connection.
from ..sql import operators
from ..sql import schema as sa_schema
from ..sql.type_api import TypeEngine
-from ..util import deprecated
from ..util import topological
col_def["type"] = coltype()
return col_defs
- @deprecated(
- "0.7",
- "The :meth:`.Inspector.get_primary_keys` method is deprecated and "
- "will be removed in a future release. Please refer to the "
- ":meth:`.Inspector.get_pk_constraint` method.",
- )
- def get_primary_keys(self, table_name, schema=None, **kw):
- """Return information about primary keys in `table_name`.
-
- Given a string `table_name`, and an optional string `schema`, return
- primary key information as a list of column names.
- """
-
- with self._operation_context() as conn:
- return self.dialect.get_pk_constraint(
- conn, table_name, schema, info_cache=self.info_cache, **kw
- )["constrained_columns"]
-
def get_pk_constraint(self, table_name, schema=None, **kw):
"""Return information about primary key constraint on `table_name`.
def _contains(self, value, row):
key = value
if key in self._keymap:
- util.warn_deprecated(
+ util.warn_deprecated_20(
"Using the 'in' operator to test for string or column "
"keys, or integer indexes, in a :class:`.Row` object is "
"deprecated and will "
"be removed in a future release. "
"Use the `Row._fields` or `Row._mapping` attribute, i.e. "
- "'key in row._fields'"
+ "'key in row._fields'",
)
return True
else:
"Retreiving row values using Column objects from a "
"row that was unpickled is deprecated; adequate "
"state cannot be pickled for this to be efficient. "
- "This usage will raise KeyError in a future release."
+ "This usage will raise KeyError in a future release.",
+ version="1.4",
)
else:
util.warn_deprecated(
"matching names as keys is deprecated, and will raise "
"KeyError in a future release; only Column "
"objects that are explicitly part of the statement "
- "object should be used."
+ "object should be used.",
+ version="1.4",
)
if result is None:
if raiseerr:
class SADeprecationWarning(DeprecationWarning):
"""Issued for usage of deprecated APIs."""
+ deprecated_since = None
+ "Indicates the version that started raising this deprecation warning"
+
class RemovedIn20Warning(SADeprecationWarning):
"""Issued for usage of APIs specifically deprecated in SQLAlchemy 2.0.
"""
+ deprecated_since = "1.4"
+ "Indicates the version that started raising this deprecation warning"
+
class SAPendingDeprecationWarning(PendingDeprecationWarning):
"""A similar warning as :class:`.SADeprecationWarning`, this warning
"""
+ deprecated_since = None
+ "Indicates the version that started raising this deprecation warning"
+
class SAWarning(RuntimeWarning):
"""Issued at runtime."""
from .api import AbstractConcreteBase
from .api import as_declarative
-from .api import comparable_using
from .api import ConcreteBase
from .api import declarative_base
from .api import DeclarativeMeta
"declarative_base",
"synonym_for",
"has_inherited_table",
- "comparable_using",
"instrument_declarative",
"declared_attr",
"as_declarative",
from ... import inspection
from ... import util
from ...orm import attributes
-from ...orm import comparable_property
from ...orm import exc as orm_exc
from ...orm import interfaces
from ...orm import relationships
return decorate
-def comparable_using(comparator_factory):
- """Decorator, allow a Python @property to be used in query criteria.
-
- This is a decorator front end to
- :func:`~sqlalchemy.orm.comparable_property` that passes
- through the comparator_factory and the function being decorated::
-
- @comparable_using(MyComparatorType)
- @property
- def prop(self):
- return 'special sauce'
-
- The regular ``comparable_property()`` is also usable directly in a
- declarative setting and may be convenient for read/write properties::
-
- prop = comparable_property(MyComparatorType)
-
- """
-
- def decorate(fn):
- return comparable_property(comparator_factory, fn)
-
- return decorate
-
-
class declared_attr(interfaces._MappedAttribute, property):
"""Mark a class-level method as representing the definition of
a mapped property or special declarative member name.
"on declarative mixin classes."
)
elif isinstance(obj, declarative_props):
- oldclassprop = isinstance(obj, util.classproperty)
- if not oldclassprop and obj._cascading:
+ if obj._cascading:
if name in dict_:
# unfortunately, while we can use the user-
# defined attribute here to allow a clean
] = ret = obj.__get__(obj, cls)
setattr(cls, name, ret)
else:
- if oldclassprop:
- util.warn_deprecated(
- "Use of sqlalchemy.util.classproperty on "
- "declarative classes is deprecated."
- )
# access attribute using normal class access
ret = getattr(cls, name)
from . import exc # noqa
from . import mapper as mapperlib # noqa
from . import strategy_options
-from .descriptor_props import ComparableProperty # noqa
from .descriptor_props import CompositeProperty # noqa
from .descriptor_props import SynonymProperty # noqa
from .interfaces import EXT_CONTINUE # noqa
synonym = public_factory(SynonymProperty, ".orm.synonym")
-comparable_property = public_factory(
- ComparableProperty, ".orm.comparable_property"
-)
-
-
-@_sa_util.deprecated(
- "0.7",
- message=":func:`.compile_mappers` is deprecated and will be removed "
- "in a future release. Please use :func:`.configure_mappers`",
-)
-def compile_mappers():
- """Initialize the inter-mapper relationships of all mappers that have
- been defined.
-
- """
- configure_mappers()
-
def clear_mappers():
"""Remove all mappers from all classes.
using loader callables if the value is not locally present.
"""
- if passive is True:
- util.warn_deprecated(
- "Passing True for 'passive' is deprecated. "
- "Use attributes.PASSIVE_NO_INITIALIZE"
- )
- passive = PASSIVE_NO_INITIALIZE
- elif passive is False:
- util.warn_deprecated(
- "Passing False for 'passive' is "
- "deprecated. Use attributes.PASSIVE_OFF"
- )
- passive = PASSIVE_OFF
return get_state_history(instance_state(obj), key, passive)
The decorators fall into two groups: annotations and interception recipes.
- The annotating decorators (appender, remover, iterator, linker, converter,
+ The annotating decorators (appender, remover, iterator, converter,
internally_instrumented) indicate the method's purpose and take no
arguments. They are not written with parens::
fn._sa_instrumented = True
return fn
- @staticmethod
- @util.deprecated(
- "1.0",
- "The :meth:`.collection.linker` handler is deprecated and will "
- "be removed in a future release. Please refer to the "
- ":meth:`.AttributeEvents.init_collection` "
- "and :meth:`.AttributeEvents.dispose_collection` event handlers. ",
- )
- def linker(fn):
- """Tag the method as a "linked to attribute" event handler.
-
- This optional event handler will be called when the collection class
- is linked to or unlinked from the InstrumentedAttribute. It is
- invoked immediately after the '_sa_adapter' property is set on
- the instance. A single argument is passed: the collection adapter
- that has been linked, or None if unlinking.
-
-
- """
- fn._sa_instrument_role = "linker"
- return fn
-
- link = linker
- """Synonym for :meth:`.collection.linker`.
-
- .. deprecated:: 1.0 - :meth:`.collection.link` is deprecated and will be
- removed in a future release.
-
- """
-
@staticmethod
@util.deprecated(
"1.3",
"appender",
"remover",
"iterator",
- "linker",
"converter",
)
roles.setdefault(role, name)
p._mapped_by_synonym = self.key
self.parent = parent
-
-
-@util.deprecated_cls(
- "0.7",
- ":func:`.comparable_property` is deprecated and will be removed in a "
- "future release. Please refer to the :mod:`~sqlalchemy.ext.hybrid` "
- "extension.",
-)
-class ComparableProperty(DescriptorProperty):
- """Instruments a Python property for use in query expressions."""
-
- def __init__(
- self, comparator_factory, descriptor=None, doc=None, info=None
- ):
- """Provides a method of applying a :class:`.PropComparator`
- to any Python descriptor attribute.
-
-
- Allows any Python descriptor to behave like a SQL-enabled
- attribute when used at the class level in queries, allowing
- redefinition of expression operator behavior.
-
- In the example below we redefine :meth:`.PropComparator.operate`
- to wrap both sides of an expression in ``func.lower()`` to produce
- case-insensitive comparison::
-
- from sqlalchemy.orm import comparable_property
- from sqlalchemy.orm.interfaces import PropComparator
- from sqlalchemy.sql import func
- from sqlalchemy import Integer, String, Column
- from sqlalchemy.ext.declarative import declarative_base
-
- class CaseInsensitiveComparator(PropComparator):
- def __clause_element__(self):
- return self.prop
-
- def operate(self, op, other):
- return op(
- func.lower(self.__clause_element__()),
- func.lower(other)
- )
-
- Base = declarative_base()
-
- class SearchWord(Base):
- __tablename__ = 'search_word'
- id = Column(Integer, primary_key=True)
- word = Column(String)
- word_insensitive = comparable_property(lambda prop, mapper:
- CaseInsensitiveComparator(
- mapper.c.word, mapper)
- )
-
-
- A mapping like the above allows the ``word_insensitive`` attribute
- to render an expression like::
-
- >>> print(SearchWord.word_insensitive == "Trucks")
- lower(search_word.word) = lower(:lower_1)
-
- :param comparator_factory:
- A PropComparator subclass or factory that defines operator behavior
- for this property.
-
- :param descriptor:
- Optional when used in a ``properties={}`` declaration. The Python
- descriptor or property to layer comparison behavior on top of.
-
- The like-named descriptor will be automatically retrieved from the
- mapped class if left blank in a ``properties`` declaration.
-
- :param info: Optional data dictionary which will be populated into the
- :attr:`.InspectionAttr.info` attribute of this object.
-
- .. versionadded:: 1.0.0
-
- """
- super(ComparableProperty, self).__init__()
- self.descriptor = descriptor
- self.comparator_factory = comparator_factory
- self.doc = doc or (descriptor and descriptor.__doc__) or None
- if info:
- self.info = info
- util.set_creation_order(self)
-
- def _comparator_factory(self, mapper):
- return self.comparator_factory(self, mapper)
:param collection_adapter: the :class:`.CollectionAdapter` that will
mediate internal access to the collection.
- .. versionadded:: 1.0.0 the :meth:`.AttributeEvents.init_collection`
- and :meth:`.AttributeEvents.dispose_collection` events supersede
- the :class:`.orm.collection.linker` hook.
+ .. versionadded:: 1.0.0 :meth:`.AttributeEvents.init_collection`
+ and :meth:`.AttributeEvents.dispose_collection` events.
.. seealso::
would be empty.
.. versionadded:: 1.0.0 the :meth:`.AttributeEvents.init_collection`
- and :meth:`.AttributeEvents.dispose_collection` events supersede
- the :class:`.collection.linker` hook.
+ and :meth:`.AttributeEvents.dispose_collection` events.
.. seealso::
import weakref
-from . import attributes
from . import util as orm_util
from .. import exc as sa_exc
from .. import util
if st is state:
self._dict.pop(state.key, None)
self._manage_removed_state(state)
-
- def prune(self):
- return 0
-
-
-class StrongInstanceDict(IdentityMap):
- """A 'strong-referencing' version of the identity map.
-
- .. deprecated 1.1::
- The strong
- reference identity map is legacy. See the
- recipe at :ref:`session_referencing_behavior` for
- an event-based approach to maintaining strong identity
- references.
-
-
- """
-
- if util.py2k:
-
- def itervalues(self):
- return self._dict.itervalues()
-
- def iteritems(self):
- return self._dict.iteritems()
-
- def __iter__(self):
- return iter(self.dict_)
-
- def __getitem__(self, key):
- return self._dict[key]
-
- def __contains__(self, key):
- return key in self._dict
-
- def get(self, key, default=None):
- return self._dict.get(key, default)
-
- def values(self):
- return self._dict.values()
-
- def items(self):
- return self._dict.items()
-
- def all_states(self):
- return [attributes.instance_state(o) for o in self.values()]
-
- def contains_state(self, state):
- return (
- state.key in self
- and attributes.instance_state(self[state.key]) is state
- )
-
- def replace(self, state):
- if state.key in self._dict:
- existing = self._dict[state.key]
- existing = attributes.instance_state(existing)
- if existing is not state:
- self._manage_removed_state(existing)
- else:
- return
- else:
- existing = None
-
- self._dict[state.key] = state.obj()
- self._manage_incoming_state(state)
- return existing
-
- def add(self, state):
- if state.key in self:
- if attributes.instance_state(self._dict[state.key]) is not state:
- raise sa_exc.InvalidRequestError(
- "Can't attach instance "
- "%s; another instance with key %s is already "
- "present in this session."
- % (orm_util.state_str(state), state.key)
- )
- return False
- else:
- self._dict[state.key] = state.obj()
- self._manage_incoming_state(state)
- return True
-
- def _add_unpresent(self, state, key):
- # inlined form of add() called by loading.py
- self._dict[key] = state.obj()
- state._instance_dict = self._wr
-
- def _fast_discard(self, state):
- # used by InstanceState for state being
- # GC'ed, inlines _managed_removed_state
- try:
- obj = self._dict[state.key]
- except KeyError:
- # catch gc removed the key after we just checked for it
- pass
- else:
- if attributes.instance_state(obj) is state:
- self._dict.pop(state.key, None)
-
- def discard(self, state):
- self.safe_discard(state)
-
- def safe_discard(self, state):
- if state.key in self._dict:
- obj = self._dict[state.key]
- st = attributes.instance_state(obj)
- if st is state:
- self._dict.pop(state.key, None)
- self._manage_removed_state(state)
-
- def prune(self):
- """prune unreferenced, non-dirty states."""
-
- ref_count = len(self)
- dirty = [s.obj() for s in self.all_states() if s.modified]
-
- # work around http://bugs.python.org/issue6149
- keepers = weakref.WeakValueDictionary()
- keepers.update(self)
-
- self._dict.clear()
- self._dict.update(keepers)
- self.modified = bool(dirty)
- return ref_count - len(self)
_dispose_called = False
@util.deprecated_params(
- order_by=(
- "1.1",
- "The :paramref:`.mapper.order_by` parameter "
- "is deprecated, and will be removed in a future release. "
- "Use :meth:`.Query.order_by` to determine the ordering of a "
- "result set.",
- ),
non_primary=(
"1.3",
"The :paramref:`.mapper.non_primary` parameter is deprecated, "
inherits=None,
inherit_condition=None,
inherit_foreign_keys=None,
- order_by=False,
always_refresh=False,
version_id_col=None,
version_id_generator=None,
:ref:`relationship_non_primary_mapper`
- :param order_by: A single :class:`.Column` or list of :class:`.Column`
- objects for which selection operations should use as the default
- ordering for entities. By default mappers have no pre-defined
- ordering.
-
:param passive_deletes: Indicates DELETE behavior of foreign key
columns when a joined-table inheritance entity is being deleted.
Defaults to ``False`` for a base mapper; for an inheriting mapper,
self._primary_key_argument = util.to_list(primary_key)
self.non_primary = non_primary
- if order_by is not False:
- self.order_by = util.to_list(order_by)
- else:
- self.order_by = order_by
-
self.always_refresh = always_refresh
if isinstance(version_id_col, MapperProperty):
)
)
- if (
- self.order_by is False
- and not self.concrete
- and self.inherits.order_by is not False
- ):
- self.order_by = self.inherits.order_by
-
self.polymorphic_map = self.inherits.polymorphic_map
self.batch = self.inherits.batch
self.inherits._inheriting_mappers.append(self)
from __future__ import absolute_import
from . import attributes
-from .descriptor_props import ComparableProperty
from .descriptor_props import CompositeProperty
from .descriptor_props import ConcreteInheritedProperty
from .descriptor_props import SynonymProperty
__all__ = [
"ColumnProperty",
- "ComparableProperty",
"CompositeProperty",
"ConcreteInheritedProperty",
"RelationshipProperty",
the newly resulting ``Query``
All existing ORDER BY settings can be suppressed by
- passing ``None`` - this will suppress any ordering configured
- on the :func:`.mapper` object using the deprecated
- :paramref:`.mapper.order_by` parameter.
-
+ passing ``None``.
"""
if len(criterion) == 1:
"Using the Query.instances() method without a context "
"is deprecated and will be disallowed in a future release. "
"Please make use of :meth:`.Query.from_statement` "
- "for linking ORM results to arbitrary select constructs."
+ "for linking ORM results to arbitrary select constructs.",
+ version="1.4",
)
context = QueryContext(self)
# if self._adapted_selectable is None:
context.froms += (self.selectable,)
- if context.order_by is False and self.mapper.order_by:
- context.order_by = self.mapper.order_by
-
- # apply adaptation to the mapper's order_by if needed.
- if adapter:
- context.order_by = adapter.adapt_list(
- util.to_list(context.order_by)
- )
-
loading._setup_entity_query(
context,
self.mapper,
"transaction is in progress"
)
- if self.session._enable_transaction_accounting:
- self._take_snapshot(autobegin=autobegin)
+ self._take_snapshot(autobegin=autobegin)
self.session.dispatch.after_transaction_create(self.session, self)
self._state = COMMITTED
self.session.dispatch.after_commit(self.session)
- if self.session._enable_transaction_accounting:
- self._remove_snapshot()
+ self._remove_snapshot()
self.close()
return self._parent
rollback_err = sys.exc_info()
finally:
transaction._state = DEACTIVE
- if self.session._enable_transaction_accounting:
- transaction._restore_snapshot(
- dirty_only=transaction.nested
- )
+ transaction._restore_snapshot(
+ dirty_only=transaction.nested
+ )
boundary = transaction
break
else:
sess = self.session
- if (
- not rollback_err
- and sess._enable_transaction_accounting
- and not sess._is_clean()
- ):
+ if not rollback_err and not sess._is_clean():
# if items were added, deleted, or mutated
# here, we need to re-restore the snapshot
"scalar",
)
- @util.deprecated_params(
- weak_identity_map=(
- "1.0",
- "The :paramref:`.Session.weak_identity_map` parameter as well as "
- "the strong-referencing identity map are deprecated, and will be "
- "removed in a future release. For the use case where objects "
- "present in a :class:`.Session` need to be automatically strong "
- "referenced, see the recipe at "
- ":ref:`session_referencing_behavior` for an event-based approach "
- "to maintaining strong identity references. ",
- ),
- _enable_transaction_accounting=(
- "0.7",
- "The :paramref:`.Session._enable_transaction_accounting` "
- "parameter is deprecated and will be removed in a future release.",
- ),
- )
def __init__(
self,
bind=None,
autoflush=True,
expire_on_commit=True,
- _enable_transaction_accounting=True,
autocommit=False,
twophase=False,
- weak_identity_map=None,
binds=None,
enable_baked_queries=True,
info=None,
.. versionadded:: 1.2
- :param _enable_transaction_accounting: A
- legacy-only flag which when ``False`` disables *all* 0.5-style
- object accounting on transaction boundaries.
-
:param expire_on_commit: Defaults to ``True``. When ``True``, all
instances will be fully expired after each :meth:`~.commit`,
so that all attribute/object access subsequent to a completed
called. This allows each database to roll back the entire
transaction, before each transaction is committed.
- :param weak_identity_map: Defaults to ``True`` - when set to
- ``False``, objects placed in the :class:`.Session` will be
- strongly referenced until explicitly removed or the
- :class:`.Session` is closed.
-
-
"""
-
- if weak_identity_map in (True, None):
- self._identity_cls = identity.WeakInstanceDict
- else:
- self._identity_cls = identity.StrongInstanceDict
-
- self.identity_map = self._identity_cls()
+ self.identity_map = identity.WeakInstanceDict()
self._new = {} # InstanceState->object, strong refs object
self._deleted = {} # same
self.autocommit = autocommit
self.expire_on_commit = expire_on_commit
self.enable_baked_queries = enable_baked_queries
- self._enable_transaction_accounting = _enable_transaction_accounting
self.twophase = twophase
self._query_cls = query_cls if query_cls else query.Query
"""
all_states = self.identity_map.all_states() + list(self._new)
- self.identity_map = self._identity_cls()
+ self.identity_map = identity.WeakInstanceDict()
self._new = {}
self._deleted = {}
self._new.pop(state)
state._detach(self)
- @util.deprecated(
- "0.7",
- "The :meth:`.Session.prune` method is deprecated along with "
- ":paramref:`.Session.weak_identity_map`. This method will be "
- "removed in a future release.",
- )
- def prune(self):
- """Remove unreferenced instances cached in the identity map.
-
- Note that this method is only meaningful if "weak_identity_map" is set
- to False. The default weak identity map is self-pruning.
-
- Removes any object in this Session's identity map that is not
- referenced in user code, modified, new or scheduled for deletion.
- Returns the number of objects pruned.
-
- """
- return self.identity_map.prune()
-
def expunge(self, instance):
"""Remove the `instance` from this ``Session``.
self._new.pop(state)
def _register_altered(self, states):
- if self._enable_transaction_accounting and self._transaction:
+ if self._transaction:
for state in states:
if state in self._new:
self._transaction._new[state] = True
def _remove_newly_deleted(self, states):
persistent_to_deleted = self.dispatch.persistent_to_deleted or None
for state in states:
- if self._enable_transaction_accounting and self._transaction:
+ if self._transaction:
self._transaction._deleted[state] = True
if persistent_to_deleted is not None:
finally:
self._flushing = False
- @util.deprecated_params(
- passive=(
- "0.8",
- "The :paramref:`.Session.is_modified.passive` flag is deprecated "
- "and will be removed in a future release. The flag is no longer "
- "used and is ignored.",
- )
- )
- def is_modified(self, instance, include_collections=True, passive=None):
+ def is_modified(self, instance, include_collections=True):
r"""Return ``True`` if the given instance has locally
modified attributes.
way to detect only local-column based properties (i.e. scalar columns
or many-to-one foreign keys) that would result in an UPDATE for this
instance upon flush.
- :param passive: not used
"""
state = object_state(instance)
q._distinct = True
break
- if q._order_by is False:
- q._order_by = leftmost_mapper.order_by
-
# don't need ORDER BY if no limit/offset
if q._limit is None and q._offset is None:
q._order_by = None
"name": self.name
}
fn = util.deprecated(
+ # This is used by `baked_lazyload_all` was only deprecated in
+ # version 1.2 so this must stick around until that is removed
"0.9",
"The :func:`.%(name)s_all` function is deprecated, and will be "
"removed in a future release. Please use method chaining with "
"Passing a string name for the 'alias' argument to "
"'contains_eager()` is deprecated, and will not work in a "
"future release. Please use a sqlalchemy.alias() or "
- "sqlalchemy.orm.aliased() construct."
+ "sqlalchemy.orm.aliased() construct.",
+ version="1.4",
)
elif getattr(attr, "_of_type", None):
util.warn_deprecated(
"The *addl_attrs on orm.defer is deprecated. Please use "
"method chaining in conjunction with defaultload() to "
- "indicate a path."
+ "indicate a path.",
+ version="1.3",
)
return _UnboundLoad._from_keys(
_UnboundLoad.defer, (key,) + addl_attrs, False, kw
util.warn_deprecated(
"The *addl_attrs on orm.undefer is deprecated. Please use "
"method chaining in conjunction with defaultload() to "
- "indicate a path."
+ "indicate a path.",
+ version="1.3",
)
return _UnboundLoad._from_keys(
_UnboundLoad.undefer, (key,) + addl_attrs, False, {}
"column-expression context is deprecated in version 1.4; "
"please use the .scalar_subquery() method to produce a scalar "
"subquery. This automatic coercion will be removed in a "
- "future release."
+ "future release.",
+ version="1.4",
)
def _implicit_coercions(
"Implicit coercion of SELECT and textual SELECT "
"constructs into FROM clauses is deprecated; please call "
".subquery() on any Core select or ORM Query object in "
- "order to produce a subquery object."
+ "order to produce a subquery object.",
+ version="1.4",
)
return resolved._implicit_subquery
elif resolved._is_text_clause:
"Implicit coercion of SELECT and textual SELECT constructs "
"into FROM clauses is deprecated; please call .subquery() "
"on any Core select or ORM Query object in order to produce a "
- "subquery object."
+ "subquery object.",
+ version="1.4",
)
return resolved._implicit_subquery
else:
self.string, schema_translate_map
)
- @util.deprecated(
- "0.7",
- "The :meth:`.Compiled.compile` method is deprecated and will be "
- "removed in a future release. The :class:`.Compiled` object "
- "now runs its compilation within the constructor, and this method "
- "does nothing.",
- )
- def compile(self):
- """Produce the internal string representation of this element.
- """
- pass
-
def _execute_on_connection(self, connection, multiparams, params):
if self.can_execute:
return connection._execute_compiled(self, multiparams, params)
"deprecated and will be removed in a future release. This "
"flag has no effect on the behavior of the "
"IdentifierPreparer.quote method; please refer to "
- "quoted_name()."
+ "quoted_name().",
+ # deprecated 0.9. warning from 1.3
+ version="0.9",
)
return self.quote(schema)
"deprecated and will be removed in a future release. This "
"flag has no effect on the behavior of the "
"IdentifierPreparer.quote method; please refer to "
- "quoted_name()."
+ "quoted_name().",
+ # deprecated 0.9. warning from 1.3
+ version="0.9",
)
force = getattr(ident, "quote", None)
"ascii", "backslashreplace"
) # noqa
- @util.deprecated(
- "0.9",
- "The :meth:`.ClauseElement.__and__` method is deprecated and will "
- "be removed in a future release. Conjunctions should only be "
- "used from a :class:`.ColumnElement` subclass, e.g. "
- ":meth:`.ColumnElement.__and__`.",
- )
- def __and__(self, other):
- """'and' at the ClauseElement level.
- """
- return and_(self, other)
-
- @util.deprecated(
- "0.9",
- "The :meth:`.ClauseElement.__or__` method is deprecated and will "
- "be removed in a future release. Conjunctions should only be "
- "used from a :class:`.ColumnElement` subclass, e.g. "
- ":meth:`.ColumnElement.__or__`.",
- )
- def __or__(self, other):
- """'or' at the ClauseElement level.
- """
- return or_(self, other)
-
def __invert__(self):
# undocumented element currently used by the ORM for
# relationship.contains()
self.text = self._bind_params_regex.sub(repl, text)
@classmethod
- @util.deprecated_params(
- bindparams=(
- "0.9",
- "The :paramref:`.text.bindparams` parameter "
- "is deprecated and will be removed in a future release. Please "
- "refer to the :meth:`.TextClause.bindparams` method.",
- ),
- typemap=(
- "0.9",
- "The :paramref:`.text.typemap` parameter is "
- "deprecated and will be removed in a future release. Please "
- "refer to the :meth:`.TextClause.columns` method.",
- ),
- )
@_document_text_coercion("text", ":func:`.text`", ":paramref:`.text.text`")
- def _create_text(self, text, bind=None, bindparams=None, typemap=None):
+ def _create_text(cls, text, bind=None):
r"""Construct a new :class:`.TextClause` clause, representing
a textual SQL string directly.
:param bind:
an optional connection or engine to be used for this text query.
- :param bindparams:
- A list of :func:`.bindparam` instances used to
- provide information about parameters embedded in the statement.
-
- E.g.::
-
- stmt = text("SELECT * FROM table WHERE id=:id",
- bindparams=[bindparam('id', value=5, type_=Integer)])
-
- :param typemap:
- A dictionary mapping the names of columns represented in the columns
- clause of a ``SELECT`` statement to type objects.
-
- E.g.::
-
- stmt = text("SELECT * FROM table",
- typemap={'id': Integer, 'name': String},
- )
-
.. seealso::
:ref:`sqlexpression_text` - in the Core tutorial
:ref:`orm_tutorial_literal_sql` - in the ORM tutorial
"""
- stmt = TextClause(text, bind=bind)
- if bindparams:
- stmt = stmt.bindparams(*bindparams)
- if typemap:
- stmt = stmt.columns(**typemap)
-
- return stmt
+ return TextClause(text, bind=bind)
@_generative
def bindparams(self, *binds, **names_to_values):
"continue_on": "True"
if continue_on is True_._singleton
else "False",
- }
+ },
+ version="1.4",
)
return cls._construct_raw(operator)
return lower, upper
- @property
- @util.deprecated(
- "1.1",
- "the :attr:`.Over.func` member of the :class:`.Over` "
- "class is deprecated and will be removed in a future release. "
- "Please refer to the :attr:`.Over.element` attribute.",
- )
- def func(self):
- """the element referred to by this :class:`.Over`
- clause.
-
-
- """
- return self.element
-
@util.memoized_property
def type(self):
return self.element.type
name, specify the flag ``quote_schema=True`` to the constructor, or use
the :class:`.quoted_name` construct to specify the name.
- :param useexisting: the same as :paramref:`.Table.extend_existing`.
-
:param comment: Optional string that will render an SQL comment on table
creation.
def _gen_cache_key(self, anon_map, bindparams):
return (self,) + self._annotations_cache_key
- @util.deprecated_params(
- useexisting=(
- "0.7",
- "The :paramref:`.Table.useexisting` parameter is deprecated and "
- "will be removed in a future release. Please use "
- ":paramref:`.Table.extend_existing`.",
- )
- )
def __new__(cls, *args, **kw):
if not args:
# python3k pickle seems to call this
_use_schema_map = False
- @util.deprecated(
- "1.1",
- message="The :meth:`.FromClause.count` method is deprecated, "
- "and will be removed in a future release. Please use the "
- ":class:`.functions.count` function available from the "
- ":attr:`.func` namespace.",
- )
- @util.preload_module("sqlalchemy.sql.functions")
- def count(self, whereclause=None, **params):
- """return a SELECT COUNT generated against this
- :class:`.FromClause`.
-
- .. seealso::
-
- :class:`.functions.count`
-
- """
- functions = util.preloaded.sql_functions
- if self.primary_key:
- col = list(self.primary_key)[0]
- else:
- col = list(self.columns)[0]
- return Select._create_select_from_fromclause(
- self,
- [functions.func.count(col).label("tbl_row_count")],
- whereclause,
- from_obj=[self],
- **params
- )
-
def select(self, whereclause=None, **params):
"""return a SELECT of this :class:`.FromClause`.
"unicode_error flag on String are deprecated. All modern "
"DBAPIs now support Python Unicode natively under Python 2, and "
"under Python 3 all strings are inherently Unicode. These flags "
- "will be removed in a future release."
+ "will be removed in a future release.",
+ version="1.3",
)
_Binary.__init__(self, length=length)
-@util.deprecated_cls(
- "0.6",
- "The :class:`.Binary` class is deprecated and will be removed "
- "in a future relase. Please use :class:`.LargeBinary`.",
-)
-class Binary(LargeBinary):
- def __init__(self, *arg, **kw):
- LargeBinary.__init__(self, *arg, **kw)
-
-
class SchemaType(SchemaEventTarget):
"""Mark a type as possibly requiring schema-level DDL for usage.
ensure_kwarg = "get_col_spec"
- class Comparator(TypeEngine.Comparator):
- __slots__ = ()
-
- def _adapt_expression(self, op, other_comparator):
- if hasattr(self.type, "adapt_operator"):
- util.warn_deprecated(
- "UserDefinedType.adapt_operator is deprecated. Create "
- "a UserDefinedType.Comparator subclass instead which "
- "generates the desired expression constructs, given a "
- "particular operator."
- )
- return self.type.adapt_operator(op), self.type
- else:
- return super(
- UserDefinedType.Comparator, self
- )._adapt_expression(op, other_comparator)
-
- comparator_factory = Comparator
-
def coerce_compared_value(self, op, value):
"""Suggest a type for a 'coerced' Python value in an expression.
def our_warn(msg, *arg, **kw):
if isinstance(msg, exc_cls):
- exception = msg
- msg = str(exception)
+ exception = type(msg)
+ msg = str(msg)
elif arg:
exception = arg[0]
else:
import re
import sqlalchemy as sa
-from .. import assert_raises_message
from .. import config
from .. import engines
from .. import eq_
from ..schema import Column
from ..schema import Table
from ... import event
-from ... import exc as sa_exc
from ... import ForeignKey
from ... import inspect
from ... import Integer
def test_get_pk_constraint_with_schema(self):
self._test_get_pk_constraint(schema=testing.config.test_schema)
- @testing.requires.table_reflection
- @testing.provide_metadata
- def test_deprecated_get_primary_keys(self):
- meta = self.metadata
- users = self.tables.users
- insp = inspect(meta.bind)
- assert_raises_message(
- sa_exc.SADeprecationWarning,
- r".*get_primary_keys\(\) method is deprecated",
- insp.get_primary_keys,
- users.name,
- )
-
@testing.provide_metadata
def _test_get_foreign_keys(self, schema=None):
meta = self.metadata
"Date",
"Time",
"LargeBinary",
- "Binary",
"Boolean",
"Unicode",
"Concatenable",
from .sql.sqltypes import BIGINT # noqa
from .sql.sqltypes import BigInteger # noqa
from .sql.sqltypes import BINARY # noqa
-from .sql.sqltypes import Binary # noqa
from .sql.sqltypes import BLOB # noqa
from .sql.sqltypes import BOOLEAN # noqa
from .sql.sqltypes import Boolean # noqa
from .. import exc
-def warn_deprecated(msg, stacklevel=3):
- warnings.warn(msg, exc.SADeprecationWarning, stacklevel=stacklevel)
+def _warn_with_version(msg, version, type_, stacklevel):
+ warn = type_(msg)
+ warn.deprecated_since = version
+ warnings.warn(warn, stacklevel=stacklevel + 1)
+
+
+def warn_deprecated(msg, version, stacklevel=3):
+ _warn_with_version(msg, version, exc.SADeprecationWarning, stacklevel)
def warn_deprecated_20(msg, stacklevel=3):
msg += " (Background on SQLAlchemy 2.0 at: http://sqlalche.me/e/b8d9)"
- warnings.warn(msg, exc.RemovedIn20Warning, stacklevel=stacklevel)
+ _warn_with_version(
+ msg,
+ exc.RemovedIn20Warning.deprecated_since,
+ exc.RemovedIn20Warning,
+ stacklevel,
+ )
def deprecated_cls(version, message, constructor="__init__"):
constructor,
exc.SADeprecationWarning,
message % dict(func=constructor),
+ version,
header,
)
def decorate(cls):
return _decorate_cls_with_warning(
- cls, constructor, exc.RemovedIn20Warning, message, message
+ cls,
+ constructor,
+ exc.RemovedIn20Warning,
+ message,
+ exc.RemovedIn20Warning.deprecated_since,
+ message,
)
return decorate
def decorate(fn):
return _decorate_with_warning(
- fn, warning, message % dict(func=fn.__name__), header
+ fn, warning, message % dict(func=fn.__name__), version, header
)
return decorate
"""
messages = {}
+ versions = {}
version_warnings = {}
for param, (version, message) in specs.items():
+ versions[param] = version
messages[param] = _sanitize_restructured_text(message)
version_warnings[param] = (
exc.RemovedIn20Warning
if (defaults[m] is None and kwargs[m] is not None) or (
defaults[m] is not None and kwargs[m] != defaults[m]
):
- warnings.warn(
- messages[m], version_warnings[m], stacklevel=3
+ _warn_with_version(
+ messages[m],
+ versions[m],
+ version_warnings[m],
+ stacklevel=3,
)
for m in check_kw:
if m in kwargs:
- warnings.warn(
- messages[m], version_warnings[m], stacklevel=3
+ _warn_with_version(
+ messages[m],
+ versions[m],
+ version_warnings[m],
+ stacklevel=3,
)
return fn(*args, **kwargs)
return decorate
-def deprecated_option_value(parameter_value, default_value, warning_text):
- if parameter_value is None:
- return default_value
- else:
- warn_deprecated(warning_text)
- return parameter_value
-
-
def _sanitize_restructured_text(text):
def repl(m):
type_, name = m.group(1, 2)
def _decorate_cls_with_warning(
- cls, constructor, wtype, message, docstring_header=None
+ cls, constructor, wtype, message, version, docstring_header=None
):
doc = cls.__doc__ is not None and cls.__doc__ or ""
if docstring_header is not None:
setattr(
cls,
constructor,
- _decorate_with_warning(constructor_fn, wtype, message, None),
+ _decorate_with_warning(
+ constructor_fn, wtype, message, version, None
+ ),
)
return cls
-def _decorate_with_warning(func, wtype, message, docstring_header=None):
+def _decorate_with_warning(
+ func, wtype, message, version, docstring_header=None
+):
"""Wrap a function with a warnings.warn and augmented docstring."""
message = _sanitize_restructured_text(message)
def warned(fn, *args, **kwargs):
skip_warning = kwargs.pop("_sa_skip_warning", False)
if not skip_warning:
- warnings.warn(message + warning_only, wtype, stacklevel=3)
+ _warn_with_version(
+ message + warning_only, version, wtype, stacklevel=3
+ )
return fn(*args, **kwargs)
doc = func.__doc__ is not None and func.__doc__ or ""
decorated = warned(func)
decorated.__doc__ = doc
- decorated._sa_warn = lambda: warnings.warn(message, wtype, stacklevel=3)
+ decorated._sa_warn = lambda: _warn_with_version(
+ message, version, wtype, stacklevel=3
+ )
return decorated
--- /dev/null
+from sqlalchemy.dialects.mysql import ENUM
+from sqlalchemy.dialects.mysql import SET
+from sqlalchemy.testing import expect_deprecated_20
+from sqlalchemy.testing import fixtures
+
+
+class DeprecateQuoting(fixtures.TestBase):
+ def test_enum_warning(self):
+ ENUM("a", "b")
+ with expect_deprecated_20(
+ "The 'quoting' parameter to :class:`.mysql.ENUM` is deprecated."
+ ):
+ ENUM("a", quoting="foo")
+
+ def test_set_warning(self):
+ SET("a", "b")
+ with expect_deprecated_20(
+ "The 'quoting' parameter to :class:`.mysql.SET` is deprecated.*"
+ ):
+ SET("a", quoting="foo")
]
self._run_test(specs, [])
- @testing.uses_deprecated("Manually quoting ENUM value literals")
def test_legacy_enum_types(self):
- specs = [(mysql.ENUM("''", "'fleem'"), mysql.ENUM("''", "'fleem'"))]
+ specs = [(mysql.ENUM("", "fleem"), mysql.ENUM("", "fleem"))]
self._run_test(specs, ["enums"])
def test_enum(self):
"""Exercise the ENUM type."""
- with testing.expect_deprecated("Manually quoting ENUM value literals"):
- e1, e2 = mysql.ENUM("'a'", "'b'"), mysql.ENUM("'a'", "'b'")
- e3 = mysql.ENUM("'a'", "'b'", strict=True)
- e4 = mysql.ENUM("'a'", "'b'", strict=True)
+ e1 = mysql.ENUM("a", "b")
+ e2 = mysql.ENUM("a", "b")
+ e3 = mysql.ENUM("a", "b", strict=True)
+ e4 = mysql.ENUM("a", "b", strict=True)
enum_table = Table(
"mysql_enum",
eq_(res, expected)
def _set_fixture_one(self):
- with testing.expect_deprecated("Manually quoting SET value literals"):
- e1, e2 = mysql.SET("'a'", "'b'"), mysql.SET("'a'", "'b'")
- e4 = mysql.SET("'a'", "b")
- e5 = mysql.SET("'a'", "'b'", quoting="quoted")
+ e1 = mysql.SET("a", "b")
+ e2 = mysql.SET("a", "b")
+ e4 = mysql.SET("'a'", "b")
+ e5 = mysql.SET("a", "b")
set_table = Table(
"mysql_set",
@testing.exclude("mysql", "<", (4,), "3.23 can't handle an ENUM of ''")
def test_enum_parse(self):
- with testing.expect_deprecated("Manually quoting ENUM value literals"):
- enum_table = Table(
- "mysql_enum",
- self.metadata,
- Column("e1", mysql.ENUM("'a'")),
- Column("e2", mysql.ENUM("''")),
- Column("e3", mysql.ENUM("a")),
- Column("e4", mysql.ENUM("")),
- Column("e5", mysql.ENUM("'a'", "''")),
- Column("e6", mysql.ENUM("''", "'a'")),
- Column("e7", mysql.ENUM("''", "'''a'''", "'b''b'", "''''")),
- )
+ enum_table = Table(
+ "mysql_enum",
+ self.metadata,
+ Column("e1", mysql.ENUM("a")),
+ Column("e2", mysql.ENUM("")),
+ Column("e3", mysql.ENUM("a")),
+ Column("e4", mysql.ENUM("")),
+ Column("e5", mysql.ENUM("a", "")),
+ Column("e6", mysql.ENUM("", "a")),
+ Column("e7", mysql.ENUM("", "'a'", "b'b", "'")),
+ )
for col in enum_table.c:
self.assert_(repr(col))
@testing.provide_metadata
@testing.exclude("mysql", "<", (5,))
def test_set_parse(self):
- with testing.expect_deprecated("Manually quoting SET value literals"):
- set_table = Table(
- "mysql_set",
- self.metadata,
- Column("e1", mysql.SET("'a'")),
- Column("e2", mysql.SET("''", retrieve_as_bitwise=True)),
- Column("e3", mysql.SET("a")),
- Column("e4", mysql.SET("", retrieve_as_bitwise=True)),
- Column("e5", mysql.SET("'a'", "''", retrieve_as_bitwise=True)),
- Column("e6", mysql.SET("''", "'a'", retrieve_as_bitwise=True)),
- Column(
- "e7",
- mysql.SET(
- "''",
- "'''a'''",
- "'b''b'",
- "''''",
- retrieve_as_bitwise=True,
- ),
- ),
- )
+ set_table = Table(
+ "mysql_set",
+ self.metadata,
+ Column("e1", mysql.SET("a")),
+ Column("e2", mysql.SET("", retrieve_as_bitwise=True)),
+ Column("e3", mysql.SET("a")),
+ Column("e4", mysql.SET("", retrieve_as_bitwise=True)),
+ Column("e5", mysql.SET("a", "", retrieve_as_bitwise=True)),
+ Column("e6", mysql.SET("", "a", retrieve_as_bitwise=True)),
+ Column(
+ "e7",
+ mysql.SET("", "'a'", "b'b", "'", retrieve_as_bitwise=True,),
+ ),
+ )
for col in set_table.c:
self.assert_(repr(col))
from sqlalchemy import cast
from sqlalchemy import Column
from sqlalchemy import DateTime
-from sqlalchemy import dialects
from sqlalchemy import event
from sqlalchemy import exc
from sqlalchemy import extract
]:
eq_(dialect._get_server_version_info(mock_conn(string)), version)
- def test_deprecated_dialect_name_still_loads(self):
- dialects.registry.clear()
- with expect_deprecated(
- "The 'postgres' dialect name " "has been renamed to 'postgresql'"
- ):
- dialect = url.URL("postgres").get_dialect()
- is_(dialect, postgresql.dialect)
-
@testing.requires.psycopg2_compatibility
def test_pg_dialect_use_native_unicode_from_config(self):
config = {
import sqlalchemy as tsa
-from sqlalchemy import column
from sqlalchemy import create_engine
-from sqlalchemy import event
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import INT
from sqlalchemy import Integer
-from sqlalchemy import literal
from sqlalchemy import MetaData
from sqlalchemy import pool
from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy import testing
-from sqlalchemy import TypeDecorator
from sqlalchemy import VARCHAR
from sqlalchemy.engine import reflection
from sqlalchemy.engine.base import Connection
-from sqlalchemy.engine.base import Engine
from sqlalchemy.engine.mock import MockConnection
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
self.ProgrammingError = sqlite3.ProgrammingError
-class HandleErrorTest(fixtures.TestBase):
- __requires__ = ("ad_hoc_engines",)
- __backend__ = True
-
- def tearDown(self):
- Engine.dispatch._clear()
- Engine._has_events = False
-
- def test_legacy_dbapi_error(self):
- engine = engines.testing_engine()
- canary = Mock()
-
- with testing.expect_deprecated(
- r"The ConnectionEvents.dbapi_error\(\) event is deprecated"
- ):
- event.listen(engine, "dbapi_error", canary)
-
- with engine.connect() as conn:
- try:
- conn.exec_driver_sql("SELECT FOO FROM I_DONT_EXIST")
- assert False
- except tsa.exc.DBAPIError as e:
- eq_(canary.mock_calls[0][1][5], e.orig)
- eq_(canary.mock_calls[0][1][2], "SELECT FOO FROM I_DONT_EXIST")
-
- def test_legacy_dbapi_error_no_ad_hoc_context(self):
- engine = engines.testing_engine()
-
- listener = Mock(return_value=None)
- with testing.expect_deprecated(
- r"The ConnectionEvents.dbapi_error\(\) event is deprecated"
- ):
- event.listen(engine, "dbapi_error", listener)
-
- nope = SomeException("nope")
-
- class MyType(TypeDecorator):
- impl = Integer
-
- def process_bind_param(self, value, dialect):
- raise nope
-
- with engine.connect() as conn:
- assert_raises_message(
- tsa.exc.StatementError,
- r"\(.*SomeException\) " r"nope\n\[SQL\: u?SELECT 1 ",
- conn.execute,
- select([1]).where(column("foo") == literal("bar", MyType())),
- )
- # no legacy event
- eq_(listener.mock_calls, [])
-
- def test_legacy_dbapi_error_non_dbapi_error(self):
- engine = engines.testing_engine()
-
- listener = Mock(return_value=None)
- with testing.expect_deprecated(
- r"The ConnectionEvents.dbapi_error\(\) event is deprecated"
- ):
- event.listen(engine, "dbapi_error", listener)
-
- nope = TypeError("I'm not a DBAPI error")
- with engine.connect() as c:
- c.connection.cursor = Mock(
- return_value=Mock(execute=Mock(side_effect=nope))
- )
-
- assert_raises_message(
- TypeError,
- "I'm not a DBAPI error",
- c.exec_driver_sql,
- "select ",
- )
- # no legacy event
- eq_(listener.mock_calls, [])
-
-
def MockDBAPI(): # noqa
def cursor():
return Mock()
is_(ctx.original_exception, nope)
def test_exception_event_non_dbapi_error(self):
- """test that dbapi_error is called with a context in
+ """test that handle_error is called with a context in
cases where DBAPI raises an exception that is not a DBAPI
exception, e.g. internal errors or encoding problems.
# then this would fail.
eq_(
- Blog.posts.impl.get_history(state, dict_, passive=True),
+ Blog.posts.impl.get_history(
+ state, dict_, passive=attributes.PASSIVE_NO_INITIALIZE
+ ),
([p2], (), ()),
)
([f1], (), ()),
)
- def test_deprecated_flags(self):
- assert_raises_message(
- sa_exc.SADeprecationWarning,
- "Passing True for 'passive' is deprecated. "
- "Use attributes.PASSIVE_NO_INITIALIZE",
- attributes.get_history,
- object(),
- "foo",
- True,
- )
-
- assert_raises_message(
- sa_exc.SADeprecationWarning,
- "Passing False for 'passive' is deprecated. "
- "Use attributes.PASSIVE_OFF",
- attributes.get_history,
- object(),
- "foo",
- False,
- )
-
class LazyloadHistoryTest(fixtures.TestBase):
def test_lazy_backref_collections(self):
import sqlalchemy as sa
from sqlalchemy import and_
from sqlalchemy import desc
-from sqlalchemy import event
from sqlalchemy import func
from sqlalchemy import Integer
from sqlalchemy import select
from sqlalchemy import testing
from sqlalchemy import text
from sqlalchemy import true
-from sqlalchemy.ext.declarative import comparable_using
-from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import aliased
from sqlalchemy.orm import attributes
from sqlalchemy.orm import collections
from sqlalchemy.orm import column_property
-from sqlalchemy.orm import comparable_property
from sqlalchemy.orm import configure_mappers
from sqlalchemy.orm import contains_alias
from sqlalchemy.orm import contains_eager
from sqlalchemy.orm import deferred
from sqlalchemy.orm import eagerload
from sqlalchemy.orm import foreign
-from sqlalchemy.orm import identity
from sqlalchemy.orm import instrumentation
from sqlalchemy.orm import joinedload
from sqlalchemy.orm import mapper
-from sqlalchemy.orm import PropComparator
from sqlalchemy.orm import relation
from sqlalchemy.orm import relationship
from sqlalchemy.orm import Session
-from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import synonym
from sqlalchemy.orm import undefer
from sqlalchemy.orm import with_polymorphic
from sqlalchemy.testing import is_true
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
-from sqlalchemy.testing.util import gc_collect
from . import _fixtures
from .inheritance import _poly_fixtures
from .test_options import PathTest as OptionsPathTest
from .test_query import QueryTest
-from .test_transaction import _LocalFixture
-
-
-class DeprecationWarningsTest(fixtures.DeclarativeMappedTest):
- run_setup_classes = "each"
- run_setup_mappers = "each"
- run_define_tables = "each"
- run_create_tables = None
-
- def test_session_weak_identity_map(self):
- with testing.expect_deprecated(
- ".*Session.weak_identity_map parameter as well as the"
- ):
- s = Session(weak_identity_map=True)
-
- is_(s._identity_cls, identity.WeakInstanceDict)
-
- with assertions.expect_deprecated(
- "The Session.weak_identity_map parameter as well as"
- ):
- s = Session(weak_identity_map=False)
-
- is_(s._identity_cls, identity.StrongInstanceDict)
-
- s = Session()
- is_(s._identity_cls, identity.WeakInstanceDict)
-
- def test_session_prune(self):
- s = Session()
-
- with assertions.expect_deprecated(
- r"The Session.prune\(\) method is deprecated along with "
- "Session.weak_identity_map"
- ):
- s.prune()
-
- def test_session_enable_transaction_accounting(self):
- with assertions.expect_deprecated(
- "the Session._enable_transaction_accounting parameter is "
- "deprecated"
- ):
- Session(_enable_transaction_accounting=False)
-
- def test_session_is_modified(self):
- class Foo(self.DeclarativeBasic):
- __tablename__ = "foo"
-
- id = Column(Integer, primary_key=True)
-
- f1 = Foo()
- s = Session()
- with assertions.expect_deprecated(
- "The Session.is_modified.passive flag is deprecated"
- ):
- # this flag was for a long time documented as requiring
- # that it be set to True, so we've changed the default here
- # so that the warning emits
- s.is_modified(f1, passive=True)
-
-
-class DeprecatedAccountingFlagsTest(_LocalFixture):
- def test_rollback_no_accounting(self):
- User, users = self.classes.User, self.tables.users
-
- with testing.expect_deprecated(
- "The Session._enable_transaction_accounting parameter"
- ):
- sess = sessionmaker(_enable_transaction_accounting=False)()
- u1 = User(name="ed")
- sess.add(u1)
- sess.commit()
-
- u1.name = "edwardo"
- sess.rollback()
-
- testing.db.execute(
- users.update(users.c.name == "ed").values(name="edward")
- )
-
- assert u1.name == "edwardo"
- sess.expire_all()
- assert u1.name == "edward"
-
- def test_commit_no_accounting(self):
- User, users = self.classes.User, self.tables.users
-
- with testing.expect_deprecated(
- "The Session._enable_transaction_accounting parameter"
- ):
- sess = sessionmaker(_enable_transaction_accounting=False)()
- u1 = User(name="ed")
- sess.add(u1)
- sess.commit()
-
- u1.name = "edwardo"
- sess.rollback()
-
- testing.db.execute(
- users.update(users.c.name == "ed").values(name="edward")
- )
-
- assert u1.name == "edwardo"
- sess.commit()
-
- assert testing.db.execute(select([users.c.name])).fetchall() == [
- ("edwardo",)
- ]
- assert u1.name == "edwardo"
-
- sess.delete(u1)
- sess.commit()
-
- def test_preflush_no_accounting(self):
- User, users = self.classes.User, self.tables.users
-
- with testing.expect_deprecated(
- "The Session._enable_transaction_accounting parameter"
- ):
- sess = Session(
- _enable_transaction_accounting=False,
- autocommit=True,
- autoflush=False,
- )
- u1 = User(name="ed")
- sess.add(u1)
- sess.flush()
-
- sess.begin()
- u1.name = "edwardo"
- u2 = User(name="some other user")
- sess.add(u2)
-
- sess.rollback()
-
- sess.begin()
- assert testing.db.execute(select([users.c.name])).fetchall() == [
- ("ed",)
- ]
-
-
-class DeprecatedSessionFeatureTest(_fixtures.FixtureTest):
- run_inserts = None
-
- def test_fast_discard_race(self):
- # test issue #4068
- users, User = self.tables.users, self.classes.User
-
- mapper(User, users)
-
- with testing.expect_deprecated(".*identity map are deprecated"):
- sess = Session(weak_identity_map=False)
-
- u1 = User(name="u1")
- sess.add(u1)
- sess.commit()
-
- u1_state = u1._sa_instance_state
- sess.identity_map._dict.pop(u1_state.key)
- ref = u1_state.obj
- u1_state.obj = lambda: None
-
- u2 = sess.query(User).first()
- u1_state._cleanup(ref)
-
- u3 = sess.query(User).first()
-
- is_(u2, u3)
-
- u2_state = u2._sa_instance_state
- assert sess.identity_map.contains_state(u2._sa_instance_state)
- ref = u2_state.obj
- u2_state.obj = lambda: None
- u2_state._cleanup(ref)
- assert not sess.identity_map.contains_state(u2._sa_instance_state)
-
- def test_is_modified_passive_on(self):
- User, Address = self.classes.User, self.classes.Address
- users, addresses = self.tables.users, self.tables.addresses
- mapper(User, users, properties={"addresses": relationship(Address)})
- mapper(Address, addresses)
-
- s = Session()
- u = User(name="fred", addresses=[Address(email_address="foo")])
- s.add(u)
- s.commit()
-
- u.id
-
- def go():
- assert not s.is_modified(u, passive=True)
-
- with testing.expect_deprecated(
- ".*Session.is_modified.passive flag is deprecated "
- ):
- self.assert_sql_count(testing.db, go, 0)
-
- u.name = "newname"
-
- def go():
- assert s.is_modified(u, passive=True)
-
- with testing.expect_deprecated(
- ".*Session.is_modified.passive flag is deprecated "
- ):
- self.assert_sql_count(testing.db, go, 0)
-
-
-class StrongIdentityMapTest(_fixtures.FixtureTest):
- run_inserts = None
-
- def _strong_ident_fixture(self):
- with testing.expect_deprecated(
- ".*Session.weak_identity_map parameter as well as the"
- ):
- sess = create_session(weak_identity_map=False)
-
- def prune():
- with testing.expect_deprecated(".*Session.prune"):
- return sess.prune()
-
- return sess, prune
-
- def _event_fixture(self):
- session = create_session()
-
- @event.listens_for(session, "pending_to_persistent")
- @event.listens_for(session, "deleted_to_persistent")
- @event.listens_for(session, "detached_to_persistent")
- @event.listens_for(session, "loaded_as_persistent")
- def strong_ref_object(sess, instance):
- if "refs" not in sess.info:
- sess.info["refs"] = refs = set()
- else:
- refs = sess.info["refs"]
-
- refs.add(instance)
-
- @event.listens_for(session, "persistent_to_detached")
- @event.listens_for(session, "persistent_to_deleted")
- @event.listens_for(session, "persistent_to_transient")
- def deref_object(sess, instance):
- sess.info["refs"].discard(instance)
-
- def prune():
- if "refs" not in session.info:
- return 0
-
- sess_size = len(session.identity_map)
- session.info["refs"].clear()
- gc_collect()
- session.info["refs"] = set(
- s.obj() for s in session.identity_map.all_states()
- )
- return sess_size - len(session.identity_map)
-
- return session, prune
-
- def test_strong_ref_imap(self):
- self._test_strong_ref(self._strong_ident_fixture)
-
- def test_strong_ref_events(self):
- self._test_strong_ref(self._event_fixture)
-
- def _test_strong_ref(self, fixture):
- s, prune = fixture()
-
- users, User = self.tables.users, self.classes.User
-
- mapper(User, users)
-
- # save user
- s.add(User(name="u1"))
- s.flush()
- user = s.query(User).one()
- user = None
- print(s.identity_map)
- gc_collect()
- assert len(s.identity_map) == 1
-
- user = s.query(User).one()
- assert not s.identity_map._modified
- user.name = "u2"
- assert s.identity_map._modified
- s.flush()
- eq_(users.select().execute().fetchall(), [(user.id, "u2")])
-
- def test_prune_imap(self):
- self._test_prune(self._strong_ident_fixture)
-
- def test_prune_events(self):
- self._test_prune(self._event_fixture)
-
- @testing.requires.cpython
- def _test_prune(self, fixture):
- s, prune = fixture()
-
- users, User = self.tables.users, self.classes.User
-
- mapper(User, users)
-
- for o in [User(name="u%s" % x) for x in range(10)]:
- s.add(o)
- # o is still live after this loop...
-
- self.assert_(len(s.identity_map) == 0)
- eq_(prune(), 0)
- s.flush()
- gc_collect()
- eq_(prune(), 9)
- # o is still in local scope here, so still present
- self.assert_(len(s.identity_map) == 1)
-
- id_ = o.id
- del o
- eq_(prune(), 1)
- self.assert_(len(s.identity_map) == 0)
-
- u = s.query(User).get(id_)
- eq_(prune(), 0)
- self.assert_(len(s.identity_map) == 1)
- u.name = "squiznart"
- del u
- eq_(prune(), 0)
- self.assert_(len(s.identity_map) == 1)
- s.flush()
- eq_(prune(), 1)
- self.assert_(len(s.identity_map) == 0)
-
- s.add(User(name="x"))
- eq_(prune(), 0)
- self.assert_(len(s.identity_map) == 0)
- s.flush()
- self.assert_(len(s.identity_map) == 1)
- eq_(prune(), 1)
- self.assert_(len(s.identity_map) == 0)
-
- u = s.query(User).get(id_)
- s.delete(u)
- del u
- eq_(prune(), 0)
- self.assert_(len(s.identity_map) == 1)
- s.flush()
- eq_(prune(), 0)
- self.assert_(len(s.identity_map) == 0)
class DeprecatedQueryTest(_fixtures.FixtureTest, AssertsCompiledSQL):
)
is_true(dep.compare(subq_version))
- def test_cancel_order_by(self):
- users, User = self.tables.users, self.classes.User
-
- with testing.expect_deprecated(
- "The Mapper.order_by parameter is deprecated, and will be "
- "removed in a future release."
- ):
- mapper(User, users, order_by=users.c.name.desc())
-
- assert (
- "order by users.name desc"
- in str(create_session().query(User).statement).lower()
- )
- assert (
- "order by"
- not in str(
- create_session().query(User).order_by(None).statement
- ).lower()
- )
- assert (
- "order by users.name asc"
- in str(
- create_session()
- .query(User)
- .order_by(User.name.asc())
- .statement
- ).lower()
- )
-
- eq_(
- create_session().query(User).all(),
- [
- User(id=7, name="jack"),
- User(id=9, name="fred"),
- User(id=8, name="ed"),
- User(id=10, name="chuck"),
- ],
- )
-
- eq_(
- create_session().query(User).order_by(User.name).all(),
- [
- User(id=10, name="chuck"),
- User(id=8, name="ed"),
- User(id=9, name="fred"),
- User(id=7, name="jack"),
- ],
- )
-
- def test_comparable(self):
- users = self.tables.users
-
- class extendedproperty(property):
- attribute = 123
-
- def method1(self):
- return "method1"
-
- from sqlalchemy.orm.properties import ColumnProperty
-
- class UCComparator(ColumnProperty.Comparator):
- __hash__ = None
-
- def method1(self):
- return "uccmethod1"
-
- def method2(self, other):
- return "method2"
-
- def __eq__(self, other):
- cls = self.prop.parent.class_
- col = getattr(cls, "name")
- if other is None:
- return col is None
- else:
- return sa.func.upper(col) == sa.func.upper(other)
-
- def map_(with_explicit_property):
- class User(object):
- @extendedproperty
- def uc_name(self):
- if self.name is None:
- return None
- return self.name.upper()
-
- if with_explicit_property:
- args = (UCComparator, User.uc_name)
- else:
- args = (UCComparator,)
-
- with assertions.expect_deprecated(
- r"comparable_property\(\) is deprecated and will be "
- "removed in a future release."
- ):
- mapper(
- User,
- users,
- properties=dict(uc_name=sa.orm.comparable_property(*args)),
- )
- return User
-
- for User in (map_(True), map_(False)):
- sess = create_session()
- sess.begin()
- q = sess.query(User)
-
- assert hasattr(User, "name")
- assert hasattr(User, "uc_name")
-
- eq_(User.uc_name.method1(), "method1")
- eq_(User.uc_name.method2("x"), "method2")
-
- assert_raises_message(
- AttributeError,
- "Neither 'extendedproperty' object nor 'UCComparator' "
- "object associated with User.uc_name has an attribute "
- "'nonexistent'",
- getattr,
- User.uc_name,
- "nonexistent",
- )
-
- # test compile
- assert not isinstance(User.uc_name == "jack", bool)
- u = q.filter(User.uc_name == "JACK").one()
-
- assert u.uc_name == "JACK"
- assert u not in sess.dirty
-
- u.name = "some user name"
- eq_(u.name, "some user name")
- assert u in sess.dirty
- eq_(u.uc_name, "SOME USER NAME")
-
- sess.flush()
- sess.expunge_all()
-
- q = sess.query(User)
- u2 = q.filter(User.name == "some user name").one()
- u3 = q.filter(User.uc_name == "SOME USER NAME").one()
-
- assert u2 is u3
-
- eq_(User.uc_name.attribute, 123)
- sess.rollback()
-
def test_comparable_column(self):
users, User = self.tables.users, self.classes.User
"users.name &= :name_1",
)
- def test_info(self):
- class MyComposite(object):
- pass
-
- with assertions.expect_deprecated(
- r"comparable_property\(\) is deprecated and will be "
- "removed in a future release."
- ):
- for constructor, args in [(comparable_property, "foo")]:
- obj = constructor(info={"x": "y"}, *args)
- eq_(obj.info, {"x": "y"})
- obj.info["q"] = "p"
- eq_(obj.info, {"x": "y", "q": "p"})
-
- obj = constructor(*args)
- eq_(obj.info, {})
- obj.info["q"] = "p"
- eq_(obj.info, {"q": "p"})
-
def test_add_property(self):
users = self.tables.users
name = property(_get_name, _set_name)
- def _uc_name(self):
- if self._name is None:
- return None
- return self._name.upper()
-
- uc_name = property(_uc_name)
- uc_name2 = property(_uc_name)
-
m = mapper(User, users)
- class UCComparator(PropComparator):
- __hash__ = None
-
- def __eq__(self, other):
- cls = self.prop.parent.class_
- col = getattr(cls, "name")
- if other is None:
- return col is None
- else:
- return func.upper(col) == func.upper(other)
-
m.add_property("_name", deferred(users.c.name))
m.add_property("name", synonym("_name"))
- with assertions.expect_deprecated(
- r"comparable_property\(\) is deprecated and will be "
- "removed in a future release."
- ):
- m.add_property("uc_name", comparable_property(UCComparator))
- m.add_property(
- "uc_name2", comparable_property(UCComparator, User.uc_name2)
- )
sess = create_session(autocommit=False)
assert sess.query(User).get(7)
def go():
eq_(u.name, "jack")
- eq_(u.uc_name, "JACK")
- eq_(u.uc_name2, "JACK")
eq_(assert_col, [("get", "jack")], str(assert_col))
self.sql_count_(1, go)
- def test_kwarg_accepted(self):
- class DummyComposite(object):
- def __init__(self, x, y):
- pass
-
- class MyFactory(PropComparator):
- pass
-
- with assertions.expect_deprecated(
- r"comparable_property\(\) is deprecated and will be "
- "removed in a future release."
- ):
- for args in ((comparable_property,),):
- fn = args[0]
- args = args[1:]
- fn(comparator_factory=MyFactory, *args)
-
- def test_merge_synonym_comparable(self):
- users = self.tables.users
-
- class User(object):
- class Comparator(PropComparator):
- pass
-
- def _getValue(self):
- return self._value
-
- def _setValue(self, value):
- setattr(self, "_value", value)
-
- value = property(_getValue, _setValue)
-
- with assertions.expect_deprecated(
- r"comparable_property\(\) is deprecated and will be "
- "removed in a future release."
- ):
- mapper(
- User,
- users,
- properties={
- "uid": synonym("id"),
- "foobar": comparable_property(User.Comparator, User.value),
- },
- )
-
- sess = create_session()
- u = User()
- u.name = "ed"
- sess.add(u)
- sess.flush()
- sess.expunge(u)
- sess.merge(u)
-
-
-class DeprecatedDeclTest(fixtures.TestBase):
- @testing.provide_metadata
- def test_comparable_using(self):
- class NameComparator(sa.orm.PropComparator):
- @property
- def upperself(self):
- cls = self.prop.parent.class_
- col = getattr(cls, "name")
- return sa.func.upper(col)
-
- def operate(self, op, other, **kw):
- return op(self.upperself, other, **kw)
-
- Base = declarative_base(metadata=self.metadata)
-
- with testing.expect_deprecated(
- r"comparable_property\(\) is deprecated and will be "
- "removed in a future release."
- ):
-
- class User(Base, fixtures.ComparableEntity):
-
- __tablename__ = "users"
- id = Column(
- "id",
- Integer,
- primary_key=True,
- test_needs_autoincrement=True,
- )
- name = Column("name", String(50))
-
- @comparable_using(NameComparator)
- @property
- def uc_name(self):
- return self.name is not None and self.name.upper() or None
-
- Base.metadata.create_all()
- sess = create_session()
- u1 = User(name="someuser")
- eq_(u1.name, "someuser", u1.name)
- eq_(u1.uc_name, "SOMEUSER", u1.uc_name)
- sess.add(u1)
- sess.flush()
- sess.expunge_all()
- rt = sess.query(User).filter(User.uc_name == "SOMEUSER").one()
- eq_(rt, u1)
- sess.expunge_all()
- rt = sess.query(User).filter(User.uc_name.startswith("SOMEUSE")).one()
- eq_(rt, u1)
-
class DeprecatedOptionAllTest(OptionsPathTest, _fixtures.FixtureTest):
run_inserts = "once"
*options
)
- def test_subqueryload_mapper_order_by(self):
- users, User, Address, addresses = (
- self.tables.users,
- self.classes.User,
- self.classes.Address,
- self.tables.addresses,
- )
-
- mapper(Address, addresses)
-
- with testing.expect_deprecated(
- ".*Mapper.order_by parameter is deprecated"
- ):
- mapper(
- User,
- users,
- properties={
- "addresses": relationship(
- Address, lazy="subquery", order_by=addresses.c.id
- )
- },
- order_by=users.c.id.desc(),
- )
-
- sess = create_session()
- q = sess.query(User)
-
- result = q.limit(2).all()
- eq_(result, list(reversed(self.static.user_address_result[2:4])))
-
- def test_selectinload_mapper_order_by(self):
- users, User, Address, addresses = (
- self.tables.users,
- self.classes.User,
- self.classes.Address,
- self.tables.addresses,
- )
-
- mapper(Address, addresses)
- with testing.expect_deprecated(
- ".*Mapper.order_by parameter is deprecated"
- ):
- mapper(
- User,
- users,
- properties={
- "addresses": relationship(
- Address, lazy="selectin", order_by=addresses.c.id
- )
- },
- order_by=users.c.id.desc(),
- )
-
- sess = create_session()
- q = sess.query(User)
-
- result = q.limit(2).all()
- eq_(result, list(reversed(self.static.user_address_result[2:4])))
-
- def test_join_mapper_order_by(self):
- """test that mapper-level order_by is adapted to a selectable."""
-
- User, users = self.classes.User, self.tables.users
-
- with testing.expect_deprecated(
- ".*Mapper.order_by parameter is deprecated"
- ):
- mapper(User, users, order_by=users.c.id)
-
- sel = users.select(users.c.id.in_([7, 8]))
- sess = create_session()
-
- with DeprecatedQueryTest._expect_implicit_subquery():
- eq_(
- sess.query(User).select_entity_from(sel).all(),
- [User(name="jack", id=7), User(name="ed", id=8)],
- )
-
def test_defer_addtl_attrs(self):
users, User, Address, addresses = (
self.tables.users,
eq_(Sub._sa_iterator(Sub(), 5), "base_iterate")
eq_(Sub._sa_converter(Sub(), 5), "sub_convert")
- def test_link_event(self):
- canary = []
-
- with testing.expect_deprecated(
- r"The collection.linker\(\) handler is deprecated and will "
- "be removed in a future release. Please refer to the "
- "AttributeEvents"
- ):
-
- class Collection(list):
- @collection.linker
- def _on_link(self, obj):
- canary.append(obj)
-
- class Foo(object):
- pass
-
- instrumentation.register_class(Foo)
- attributes.register_attribute(
- Foo, "attr", uselist=True, typecallable=Collection, useobject=True
- )
-
- f1 = Foo()
- f1.attr.append(3)
-
- eq_(canary, [f1.attr._sa_adapter])
- adapter_1 = f1.attr._sa_adapter
-
- l2 = Collection()
- f1.attr = l2
- eq_(canary, [adapter_1, f1.attr._sa_adapter, None])
-
class NonPrimaryRelationshipLoaderTest(_fixtures.FixtureTest):
run_inserts = "once"
)
def test_internal_identity_conflict_warning_weak(self):
- self._test_internal_identity_conflict_warning(True)
-
- def test_internal_identity_conflict_warning_strong(self):
- self._test_internal_identity_conflict_warning(False)
-
- def _test_internal_identity_conflict_warning(self, weak_identity_map):
# test for issue #4890
# see also test_naturalpks::ReversePKsTest::test_reverse
users, User = self.tables.users, self.classes.User
)
mapper(Address, addresses)
- with testing.expect_deprecated():
- session = Session(weak_identity_map=weak_identity_map)
+ session = Session()
@event.listens_for(session, "after_flush")
def load_collections(session, flush_context):
):
self.assert_compile(or_(and_()), "")
- def test_fromclause_count(self):
- with testing.expect_deprecated(
- r"The FromClause.count\(\) method is deprecated, and will be "
- r"removed in a future release."
- ):
- self.assert_compile(
- table("q", column("x")).count(),
- "SELECT count(q.x) AS tbl_row_count FROM q",
- dialect="default",
- )
-
class ConvertUnicodeDeprecationTest(fixtures.TestBase):
)
-class TextTest(fixtures.TestBase, AssertsCompiledSQL):
- __dialect__ = "default"
-
- def test_legacy_bindparam(self):
- with testing.expect_deprecated(
- "The text.bindparams parameter is deprecated"
- ):
- t = text(
- "select * from foo where lala=:bar and hoho=:whee",
- bindparams=[bindparam("bar", 4), bindparam("whee", 7)],
- )
-
- self.assert_compile(
- t,
- "select * from foo where lala=:bar and hoho=:whee",
- checkparams={"bar": 4, "whee": 7},
- )
-
- def test_legacy_typemap(self):
- table1 = table(
- "mytable",
- column("myid", Integer),
- column("name", String),
- column("description", String),
- )
- with testing.expect_deprecated(
- "The text.typemap parameter is deprecated"
- ):
- t = text(
- "select id, name from user",
- typemap=dict(id=Integer, name=String),
- ).subquery()
-
- stmt = select([table1.c.myid]).select_from(
- table1.join(t, table1.c.myid == t.c.id)
- )
- compiled = stmt.compile()
- eq_(
- compiled._create_result_map(),
- {
- "myid": (
- "myid",
- (table1.c.myid, "myid", "myid", "mytable_myid"),
- table1.c.myid.type,
- )
- },
- )
-
-
class SelectableTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = "default"
subcl.__name__,
), True, subcl, [typ]
- @testing.uses_deprecated(".*Binary.*")
@testing.combinations(_adaptions(), id_="iaaa")
def test_adapt_method(self, is_down_adaption, typ, target_adaptions):
"""ensure all types have a working adapt() method,
-from sqlalchemy.testing import expect_deprecated_20, fixtures
+from sqlalchemy.testing import expect_deprecated_20
+from sqlalchemy.testing import fixtures
from sqlalchemy.util.compat import import_