- deprecated PassiveDefault - use DefaultClause.
- the BINARY and MSBinary types now generate "BINARY" in all
cases. Omitting the "length" parameter will generate
"BINARY" with no length. Use BLOB to generate an unlengthed
binary column.
- the "quoting='quoted'" argument to MSEnum/ENUM is deprecated.
It's best to rely upon the automatic quoting.
- "shortname" attribute on bindparam() is removed.
- fold_equivalents flag on join is deprecated (will remain
until [ticket:1131] is implemented)
- "scalar" flag on select() is removed, use
select.as_scalar().
- 'transactional' flag on sessionmaker() and others is
removed. Use 'autocommit=True' to indicate 'transactional=False'.
- 'polymorphic_fetch' argument on mapper() is removed.
Loading can be controlled using the 'with_polymorphic'
option.
- 'select_table' argument on mapper() is removed. Use
'with_polymorphic=("*", <some selectable>)' for this
functionality.
- 'proxy' argument on synonym() is removed. This flag
did nothing throughout 0.5, as the "proxy generation"
behavior is now automatic.
- Passing a single list of elements to eagerload(),
eagerload_all(), contains_eager(), lazyload(),
defer(), and undefer() instead of multiple positional
-args is deprecated.
- Passing a single list of elements to query.order_by(),
query.group_by(), query.join(), or query.outerjoin()
instead of multiple positional *args is deprecated.
- query.iterate_instances() is removed. Use query.instances().
- Query.query_from_parent() is removed. Use the
sqlalchemy.orm.with_parent() function to produce a
"parent" clause, or alternatively query.with_parent().
- query._from_self() is removed, use query.from_self()
instead.
- the "comparator" argument to composite() is removed.
Use "comparator_factory".
- RelationProperty._get_join() is removed.
- the 'echo_uow' flag on Session is removed. Use
logging on the "sqlalchemy.orm.unitofwork" name.
- session.clear() is removed. use session.expunge_all().
- session.save(), session.update(), session.save_or_update()
are removed. Use session.add() and session.add_all().
- the "objects" flag on session.flush() remains deprecated.
- the "dont_load=True" flag on session.merge() is deprecated
in favor of "load=False".
- passing an InstanceState (internal SQLAlchemy state object) to
attributes.init_collection() or attributes.get_history() is
deprecated. These functions are public API and normally
expect a regular mapped object instance.
- the 'engine' parameter to declarative_base() is removed.
Use the 'bind' keyword argument.
The internal BackRef() is gone and backref() returns a plain
tuple that is understood by RelationProperty.
+ - Deprecated or removed:
+ * 'allow_null_pks' flag on mapper() is deprecated. It does
+ nothing now and the setting is "on" in all cases.
+ * 'transactional' flag on sessionmaker() and others is
+ removed. Use 'autocommit=True' to indicate 'transactional=False'.
+ * 'polymorphic_fetch' argument on mapper() is removed.
+ Loading can be controlled using the 'with_polymorphic'
+ option.
+ * 'select_table' argument on mapper() is removed. Use
+ 'with_polymorphic=("*", <some selectable>)' for this
+ functionality.
+ * 'proxy' argument on synonym() is removed. This flag
+ did nothing throughout 0.5, as the "proxy generation"
+ behavior is now automatic.
+ * Passing a single list of elements to eagerload(),
+ eagerload_all(), contains_eager(), lazyload(),
+ defer(), and undefer() instead of multiple positional
+ *args is deprecated.
+ * Passing a single list of elements to query.order_by(),
+ query.group_by(), query.join(), or query.outerjoin()
+ instead of multiple positional *args is deprecated.
+ * query.iterate_instances() is removed. Use query.instances().
+ * Query.query_from_parent() is removed. Use the
+ sqlalchemy.orm.with_parent() function to produce a
+ "parent" clause, or alternatively query.with_parent().
+ * query._from_self() is removed, use query.from_self()
+ instead.
+ * the "comparator" argument to composite() is removed.
+ Use "comparator_factory".
+ * RelationProperty._get_join() is removed.
+ * the 'echo_uow' flag on Session is removed. Use
+ logging on the "sqlalchemy.orm.unitofwork" name.
+ * session.clear() is removed. use session.expunge_all().
+ * session.save(), session.update(), session.save_or_update()
+ are removed. Use session.add() and session.add_all().
+ * the "objects" flag on session.flush() remains deprecated.
+ * the "dont_load=True" flag on session.merge() is deprecated
+ in favor of "load=False".
+ * ScopedSession.mapper remains deprecated. See the
+ usage recipe at
+ http://www.sqlalchemy.org/trac/wiki/UsageRecipes/SessionAwareMapper
+ * passing an InstanceState (internal SQLAlchemy state object) to
+ attributes.init_collection() or attributes.get_history() is
+ deprecated. These functions are public API and normally
+ expect a regular mapped object instance.
+ * the 'engine' parameter to declarative_base() is removed.
+ Use the 'bind' keyword argument.
+
- sql
- the autoincrement flag on column now indicates the column
which should be linked to cursor.lastrowid, if that method
of bind parameters currently being processed. This
dict is available in the same way regardless of
single-execute or executemany-style statement execution.
-
+
+ - Deprecated or removed:
+ * "scalar" flag on select() is removed, use
+ select.as_scalar().
+ * "shortname" attribute on bindparam() is removed.
+ * postgres_returning, firebird_returning flags on
+ insert(), update(), delete() are deprecated, use
+ the new returning() method.
+ * fold_equivalents flag on join is deprecated (will remain
+ until [ticket:1131] is implemented)
+
- engines
- transaction isolation level may be specified with
create_engine(... isolation_level="..."); available on
disabled for a particular engine even if logging
for "sqlalchemy.engine" is enabled overall. Note that the
default setting of "echo" is `None`. [ticket:1554]
-
+
+ - deprecated or removed
+ * result.last_inserted_ids() is deprecated. Use
+ result.inserted_primary_key
+
- schema
- deprecated MetaData.connect() and
ThreadLocalMetaData.connect() have been removed - send
- deprecated metadata.table_iterator() method removed (use
sorted_tables)
-
+
+ - deprecated PassiveDefault - use DefaultClause.
+
- the "metadata" argument is removed from DefaultGenerator
and subclasses, but remains locally present on Sequence,
which is a standalone construct in DDL.
MySQLdb can't handle % signs in SQL when executemany() is used,
and SQLA doesn't want to add overhead just to treat that one
non-existent use case. [ticket:1279]
-
+
+ - the BINARY and MSBinary types now generate "BINARY" in all
+ cases. Omitting the "length" parameter will generate
+ "BINARY" with no length. Use BLOB to generate an unlengthed
+ binary column.
+
+ - the "quoting='quoted'" argument to MSEnum/ENUM is deprecated.
+ It's best to rely upon the automatic quoting.
+
- oracle
- unit tests pass 100% with cx_oracle !
This is a fixed length type, and short values will be right-padded
with a server-version-specific pad value.
- :param length: Maximum data length, in bytes. If length is not
- specified, this will generate a BLOB. This usage is deprecated.
+ :param length: Maximum data length, in bytes.
"""
super(BINARY, self).__init__(length=length, **kw)
character, then use 'quoted' mode. Otherwise, use 'unquoted' mode.
'quoted': values in enums are already quoted, they will be used
- directly when generating the schema.
+ directly when generating the schema - this usage is deprecated.
'unquoted': values in enums are not quoted, they will be escaped and
surrounded by single quotes when generating the schema.
"""
self.quoting = kw.pop('quoting', 'auto')
- if self.quoting == 'auto':
+ if self.quoting == 'auto' and len(enums):
# What quoting character are we using?
q = None
for e in enums:
self.quoting = 'quoted'
if self.quoting == 'quoted':
- util.warn_pending_deprecation(
+ util.warn_deprecated(
'Manually quoting ENUM value literals is deprecated. Supply '
'unquoted values and use the quoting= option in cases of '
'ambiguity.')
if type_.length:
return "BINARY(%d)" % type_.length
else:
- return self.visit_BLOB(type_)
+ return "BINARY"
def visit_BLOB(self, type_):
if type_.length:
def declarative_base(bind=None, metadata=None, mapper=None, cls=object,
name='Base', constructor=_declarative_constructor,
- metaclass=DeclarativeMeta, engine=None):
+ metaclass=DeclarativeMeta):
"""Construct a base class for declarative class definitions.
The new base class will be given a metaclass that invokes
:param bind: An optional :class:`~sqlalchemy.engine.base.Connectable`, will be assigned
the ``bind`` attribute on the :class:`~sqlalchemy.MetaData` instance.
- The `engine` keyword argument is a deprecated synonym for `bind`.
+
:param metadata:
An optional :class:`~sqlalchemy.MetaData` instance. All :class:`~sqlalchemy.schema.Table`
"""
lcl_metadata = metadata or MetaData()
- if bind or engine:
- lcl_metadata.bind = bind or engine
+ if bind:
+ lcl_metadata.bind = bind
bases = not isinstance(cls, tuple) and (cls,) or cls
class_dict = dict(_decl_class_registry=dict(),
create_session().
"""
- if 'transactional' in kwargs:
- sa_util.warn_deprecated(
- "The 'transactional' argument to sessionmaker() is deprecated; "
- "use autocommit=True|False instead.")
- if 'autocommit' in kwargs:
- raise TypeError('Specify autocommit *or* transactional, not both.')
- kwargs['autocommit'] = not kwargs.pop('transactional')
-
kwargs.setdefault('autoflush', False)
kwargs.setdefault('autocommit', True)
kwargs.setdefault('expire_on_commit', False)
A value which will be stored in the Column denoted by polymorphic_on,
corresponding to the *class identity* of this mapper.
- polymorphic_fetch
- Deprecated. Unloaded columns load as deferred in all cases; loading
- can be controlled using the "with_polymorphic" option.
-
properties
A dictionary mapping the string names of object attributes to
``MapperProperty`` instances, which define the persistence behavior of
<selectable> argument is required, since it usually requires more
complex UNION queries.
- select_table
- Deprecated. Synonymous with
- ``with_polymorphic=('*', <selectable>)``.
-
version_id_col
A ``Column`` which must have an integer type that will be used to keep
a running *version id* of mapped entities in the database. this is
"""
return Mapper(class_, local_table, *args, **params)
-def synonym(name, map_column=False, descriptor=None, comparator_factory=None, proxy=False):
+def synonym(name, map_column=False, descriptor=None, comparator_factory=None):
"""Set up `name` as a synonym to another mapped property.
Used with the ``properties`` dictionary sent to :func:`~sqlalchemy.orm.mapper`.
``_status``, and the ``status`` attribute on ``MyClass`` will be used to
proxy access to the column-based attribute.
- The `proxy` keyword argument is deprecated and currently does nothing;
- synonyms now always establish an attribute getter/setter function if one
- is not already available.
-
"""
return SynonymProperty(name, map_column=map_column, descriptor=descriptor, comparator_factory=comparator_factory)
"""
return ExtensionOption(ext)
-@sa_util.accepts_a_list_as_starargs(list_deprecation='pending')
+@sa_util.accepts_a_list_as_starargs(list_deprecation='deprecated')
def eagerload(*keys):
"""Return a ``MapperOption`` that will convert the property of the given
name into an eager load.
"""
return strategies.EagerLazyOption(keys, lazy=False)
-@sa_util.accepts_a_list_as_starargs(list_deprecation='pending')
+@sa_util.accepts_a_list_as_starargs(list_deprecation='deprecated')
def eagerload_all(*keys):
"""Return a ``MapperOption`` that will convert all properties along the
given dot-separated path into an eager load.
"""
return strategies.EagerLazyOption(keys, lazy=False, chained=True)
-@sa_util.accepts_a_list_as_starargs(list_deprecation='pending')
+@sa_util.accepts_a_list_as_starargs(list_deprecation='deprecated')
def lazyload(*keys):
"""Return a ``MapperOption`` that will convert the property of the given
name into a lazy load.
"""
return AliasOption(alias)
-@sa_util.accepts_a_list_as_starargs(list_deprecation='pending')
+@sa_util.accepts_a_list_as_starargs(list_deprecation='deprecated')
def contains_eager(*keys, **kwargs):
"""Return a ``MapperOption`` that will indicate to the query that
the given attribute will be eagerly loaded.
return (strategies.EagerLazyOption(keys, lazy=False, propagate_to_loaders=False), strategies.LoadEagerFromAliasOption(keys, alias=alias))
-@sa_util.accepts_a_list_as_starargs(list_deprecation='pending')
+@sa_util.accepts_a_list_as_starargs(list_deprecation='deprecated')
def defer(*keys):
"""Return a ``MapperOption`` that will convert the column property of the
given name into a deferred load.
"""
return strategies.DeferredOption(keys, defer=True)
-@sa_util.accepts_a_list_as_starargs(list_deprecation='pending')
+@sa_util.accepts_a_list_as_starargs(list_deprecation='deprecated')
def undefer(*keys):
"""Return a ``MapperOption`` that will convert the column property of the
given name into a non-deferred (regular column) load.
def _conditional_instance_state(obj):
if not isinstance(obj, state.InstanceState):
obj = instance_state(obj)
+ else:
+ util.warn_deprecated("Passing an InstanceState to get_history() or init_collection() is deprecated.")
return obj
def get_history(obj, key, **kwargs):
query = query.with_parent(instance, self.attr.key)
if self.attr.order_by:
- query = query.order_by(self.attr.order_by)
+ query = query.order_by(*self.attr.order_by)
return query
def append(self, item):
polymorphic_on=None,
_polymorphic_map=None,
polymorphic_identity=None,
- polymorphic_fetch=None,
concrete=False,
- select_table=None,
with_polymorphic=None,
allow_null_pks=None,
batch=True,
if allow_null_pks:
util.warn_deprecated('the allow_null_pks option to Mapper() is deprecated. It is now on in all cases.')
- self.select_table = select_table
- if select_table:
- util.warn_deprecated('select_table option is deprecated. Use with_polymorphic=("*", selectable) '
- 'instead.')
-
- if with_polymorphic:
- raise sa_exc.ArgumentError("select_table can't be used with "
- "with_polymorphic (they define conflicting settings)")
- self.with_polymorphic = ('*', select_table)
- else:
- if with_polymorphic == '*':
- self.with_polymorphic = ('*', None)
- elif isinstance(with_polymorphic, (tuple, list)):
- if isinstance(with_polymorphic[0], (basestring, tuple, list)):
- self.with_polymorphic = with_polymorphic
- else:
- self.with_polymorphic = (with_polymorphic, None)
- elif with_polymorphic is not None:
- raise sa_exc.ArgumentError("Invalid setting for with_polymorphic")
+ if with_polymorphic == '*':
+ self.with_polymorphic = ('*', None)
+ elif isinstance(with_polymorphic, (tuple, list)):
+ if isinstance(with_polymorphic[0], (basestring, tuple, list)):
+ self.with_polymorphic = with_polymorphic
else:
- self.with_polymorphic = None
+ self.with_polymorphic = (with_polymorphic, None)
+ elif with_polymorphic is not None:
+ raise sa_exc.ArgumentError("Invalid setting for with_polymorphic")
+ else:
+ self.with_polymorphic = None
if isinstance(self.local_table, expression._SelectBaseMixin):
raise sa_exc.InvalidRequestError(
# indicates this Mapper should be used to construct the object instance for that row.
self.polymorphic_identity = polymorphic_identity
- if polymorphic_fetch:
- util.warn_deprecated('polymorphic_fetch option is deprecated. Unloaded columns '
- 'load as deferred in all cases; loading can be controlled '
- 'using the "with_polymorphic" option.')
-
# a dictionary of 'polymorphic identity' names, associating those names with
# Mappers that will be used to construct object instances upon a select operation.
if _polymorphic_map is None:
"""subclasses ColumnProperty to provide composite type support."""
def __init__(self, class_, *columns, **kwargs):
- if 'comparator' in kwargs:
- util.warn_deprecated("The 'comparator' argument to CompositeProperty is deprecated. Use comparator_factory.")
- kwargs['comparator_factory'] = kwargs['comparator']
super(CompositeProperty, self).__init__(*columns, **kwargs)
self._col_position_map = util.column_dict((c, i) for i, c in enumerate(columns))
self.composite_class = class_
if obj is not None:
dest_list.append(obj)
if not load:
- coll = attributes.init_collection(dest_state, self.key)
+ coll = attributes.init_state_collection(dest_state, dest_dict, self.key)
for c in dest_list:
coll.append_without_event(c)
else:
source_selectable,
dest_selectable, secondary, target_adapter)
- def _get_join(self, parent, primary=True, secondary=True, polymorphic_parent=True):
- """deprecated. use primary_join_against(), secondary_join_against(), full_join_against()"""
-
- pj, sj, source, dest, secondarytable, adapter = self._create_joins(source_polymorphic=polymorphic_parent)
-
- if primary and secondary:
- return pj & sj
- elif primary:
- return pj
- elif secondary:
- return sj
- else:
- raise AssertionError("illegal condition")
-
def register_dependencies(self, uowcommit):
if not self.viewonly:
self._dependency_processor.register_dependencies(uowcommit)
).identity_key_from_primary_key(ident)
return self._get(key, ident)
- @classmethod
- @util.deprecated('Deprecated. Use sqlalchemy.orm.with_parent '
- 'in conjunction with filter().')
- def query_from_parent(cls, instance, property, **kwargs):
- """Return a new Query with criterion corresponding to a parent instance.
-
- Return a newly constructed Query object, with criterion corresponding
- to a relationship to the given parent instance.
-
- instance
- a persistent or detached instance which is related to class
- represented by this query.
-
- property
- string name of the property which relates this query's class to the
- instance.
-
- \**kwargs
- all extra keyword arguments are propagated to the constructor of
- Query.
-
- """
- mapper = object_mapper(instance)
- prop = mapper.get_property(property, resolve_synonyms=True)
- target = prop.mapper
- criterion = prop.compare(operators.eq, instance, value_is_parent=True)
- return Query(target, **kwargs).filter(criterion)
-
@_generative()
def correlate(self, *args):
self._correlate = self._correlate.union(_orm_selectable(s) for s in args)
q._set_entities(entities)
return q
- _from_self = from_self
-
@_generative()
def _from_selectable(self, fromclause):
self._statement = self._criterion = None
@_generative(_no_statement_condition, _no_limit_offset)
- @util.accepts_a_list_as_starargs(list_deprecation='pending')
+ @util.accepts_a_list_as_starargs(list_deprecation='deprecated')
def order_by(self, *criterion):
"""apply one or more ORDER BY criterion to the query and return the newly resulting ``Query``"""
self._order_by = self._order_by + criterion
@_generative(_no_statement_condition, _no_limit_offset)
- @util.accepts_a_list_as_starargs(list_deprecation='pending')
+ @util.accepts_a_list_as_starargs(list_deprecation='deprecated')
def group_by(self, *criterion):
"""apply one or more GROUP BY criterion to the query and return the newly resulting ``Query``"""
expression.except_all(*([self]+ list(q)))
)
- @util.accepts_a_list_as_starargs(list_deprecation='pending')
+ @util.accepts_a_list_as_starargs(list_deprecation='deprecated')
def join(self, *props, **kwargs):
"""Create a join against this ``Query`` object's criterion
and apply generatively, returning the newly resulting ``Query``.
raise TypeError("unknown arguments: %s" % ','.join(kwargs.iterkeys()))
return self._join(props, outerjoin=False, create_aliases=aliased, from_joinpoint=from_joinpoint)
- @util.accepts_a_list_as_starargs(list_deprecation='pending')
+ @util.accepts_a_list_as_starargs(list_deprecation='deprecated')
def outerjoin(self, *props, **kwargs):
"""Create a left outer join against this ``Query`` object's criterion
and apply generatively, retunring the newly resulting ``Query``.
if not self._yield_per:
break
- iterate_instances = util.deprecated()(instances)
def _get(self, key=None, ident=None, refresh_state=None, lockmode=None, only_load_props=None, passive=None):
lockmode = lockmode or self._lockmode
that is local to the ``sessionmaker()`` function, and is not sent
directly to the constructor for ``Session``.
- echo_uow
- Deprecated. Use
- ``logging.getLogger('sqlalchemy.orm.unitofwork').setLevel(logging.DEBUG)``.
-
_enable_transaction_accounting
Defaults to ``True``. A legacy-only flag which when ``False``
disables *all* 0.5-style object accounting on transaction boundaries,
present until they are removed using expunge(), clear(), or purge().
"""
- if 'transactional' in kwargs:
- util.warn_deprecated(
- "The 'transactional' argument to sessionmaker() is deprecated; "
- "use autocommit=True|False instead.")
- autocommit = not kwargs.pop('transactional')
-
kwargs['bind'] = bind
kwargs['autoflush'] = autoflush
kwargs['autocommit'] = autocommit
public_methods = (
'__contains__', '__iter__', 'add', 'add_all', 'begin', 'begin_nested',
- 'clear', 'close', 'commit', 'connection', 'delete', 'execute', 'expire',
+ 'close', 'commit', 'connection', 'delete', 'execute', 'expire',
'expire_all', 'expunge', 'expunge_all', 'flush', 'get_bind', 'is_modified',
- 'merge', 'query', 'refresh', 'rollback', 'save',
- 'save_or_update', 'scalar', 'update')
+ 'merge', 'query', 'refresh', 'rollback',
+ 'scalar')
def __init__(self, bind=None, autoflush=True, expire_on_commit=True,
_enable_transaction_accounting=True,
- autocommit=False, twophase=False, echo_uow=None,
+ autocommit=False, twophase=False,
weak_identity_map=True, binds=None, extension=None, query_cls=query.Query):
"""Construct a new Session.
"""
- if echo_uow is not None:
- util.warn_deprecated(
- "echo_uow is deprecated. "
- "Use logging.getLogger('sqlalchemy.orm.unitofwork').setLevel(logging.DEBUG).")
- log.class_logger(UOWTransaction, echo_uow)
-
if weak_identity_map:
self._identity_cls = identity.WeakInstanceDict
else:
self._new = {}
self._deleted = {}
- clear = util.deprecated("Use session.expunge_all()")(expunge_all)
-
# TODO: need much more test coverage for bind_mapper() and similar !
# TODO: + crystalize + document resolution order vis. bind_mapper/bind_table
self.identity_map.discard(state)
self._deleted.pop(state, None)
- @util.deprecated("Use session.add()")
- def save(self, instance):
- """Add a transient (unsaved) instance to this ``Session``.
-
- This operation cascades the `save_or_update` method to associated
- instances if the relation is mapped with ``cascade="save-update"``.
-
-
- """
- state = _state_for_unsaved_instance(instance)
- self._save_impl(state)
- self._cascade_save_or_update(state)
-
def _save_without_cascade(self, instance):
"""Used by scoping.py to save on init without cascade."""
state = _state_for_unsaved_instance(instance, create=True)
self._save_impl(state)
- @util.deprecated("Use session.add()")
- def update(self, instance):
- """Bring a detached (saved) instance into this ``Session``.
-
- If there is a persistent instance with the same instance key, but
- different identity already associated with this ``Session``, an
- InvalidRequestError exception is thrown.
-
- This operation cascades the `save_or_update` method to associated
- instances if the relation is mapped with ``cascade="save-update"``.
-
- """
- try:
- state = attributes.instance_state(instance)
- except exc.NO_STATE:
- raise exc.UnmappedInstanceError(instance)
- self._update_impl(state)
- self._cascade_save_or_update(state)
-
def add(self, instance):
"""Place an object in the ``Session``.
self._save_or_update_impl(state)
self._cascade_save_or_update(state)
- save_or_update = (
- util.deprecated("Use session.add()")(add))
-
def _cascade_save_or_update(self, state):
for state, mapper in _cascade_unknown_state_iterator('save-update', state, halt_on=lambda c:c in self):
self._save_or_update_impl(state)
def __repr__(self):
return "DefaultClause(%r, for_update=%r)" % (self.arg, self.for_update)
-# alias; deprecated starting 0.5.0
-PassiveDefault = DefaultClause
+class PassiveDefault(DefaultClause):
+ def __init__(self, *arg, **kw):
+ util.warn_deprecated("PassiveDefault is deprecated. Use DefaultClause.")
+ DefaultClause.__init__(self, *arg, **kw)
class Constraint(SchemaItem):
"""A table-level SQL constraint."""
if params:
pd = {}
for bindparam, name in self.bind_names.iteritems():
- for paramname in (bindparam.key, bindparam.shortname, name):
+ for paramname in (bindparam.key, name):
if paramname in params:
pd[name] = params[paramname]
break
``Connectable`` instances can be located within its contained
``ClauseElement`` members.
- scalar=False
- deprecated. Use select(...).as_scalar() to create a "scalar
- column" proxy for an existing Select object.
-
"""
- if 'scalar' in kwargs:
- util.warn_deprecated(
- 'scalar option is deprecated; see docs for details')
- scalar = kwargs.pop('scalar', False)
- s = Select(columns, whereclause=whereclause, from_obj=from_obj, **kwargs)
- if scalar:
- return s.as_scalar()
- else:
- return s
+ return Select(columns, whereclause=whereclause, from_obj=from_obj, **kwargs)
def subquery(alias, *args, **kwargs):
"""Return an :class:`~sqlalchemy.sql.expression.Alias` object derived
"""
return TableClause(name, *columns)
-def bindparam(key, value=None, shortname=None, type_=None, unique=False, required=False):
+def bindparam(key, value=None, type_=None, unique=False, required=False):
"""Create a bind parameter clause with the given key.
value
a sqlalchemy.types.TypeEngine object indicating the type of this
bind param, will invoke type-specific bind parameter processing
- shortname
- deprecated.
-
unique
if True, bind params sharing the same name will have their
underlying ``key`` modified to a uniquely generated name.
"""
if isinstance(key, ColumnClause):
- return _BindParamClause(key.name, value, type_=key.type, unique=unique, shortname=shortname, required=required)
+ return _BindParamClause(key.name, value, type_=key.type, unique=unique, required=required)
else:
- return _BindParamClause(key, value, type_=type_, unique=unique, shortname=shortname, required=required)
+ return _BindParamClause(key, value, type_=type_, unique=unique, required=required)
def outparam(key, type_=None):
"""Create an 'OUT' parameter for usage in functions (stored procedures), for
__visit_name__ = 'bindparam'
quote = None
- def __init__(self, key, value, type_=None, unique=False, isoutparam=False, shortname=None, required=False):
+ def __init__(self, key, value, type_=None, unique=False, isoutparam=False, required=False):
"""Construct a _BindParamClause.
key
overridden by the dictionary of parameters sent to statement
compilation/execution.
- shortname
- deprecated.
-
type\_
A ``TypeEngine`` object that will be used to pre-process the
value corresponding to this ``_BindParamClause`` at
self.unique = unique
self.value = value
self.isoutparam = isoutparam
- self.shortname = shortname
self.required = required
if type_ is None:
based on the join criterion of this :class:`Join`. This will
recursively apply to any joins directly nested by this one
as well. This flag is specific to a particular use case
- by the ORM and will be deprecated in 0.6.
+ by the ORM and is deprecated as of 0.6.
:param \**kwargs: all other kwargs are sent to the
underlying :func:`select()` function.
global sql_util
if not sql_util:
from sqlalchemy.sql import util as sql_util
+ util.warn_deprecated("fold_equivalents is deprecated.")
collist = sql_util.folded_equivalents(self)
else:
collist = [self.left, self.right]
:param mutable: defaults to True; implements
:meth:`AbstractType.is_mutable`. When ``True``, incoming
- objects *must* provide an ``__eq__()`` method which
+ objects should provide an ``__eq__()`` method which
performs the desired deep comparison of members, or the
- ``comparator`` argument must be present. Otherwise,
- comparisons are done by comparing pickle strings.
- The pickle form of comparison is a deprecated usage and will
- raise a warning.
+ ``comparator`` argument must be present.
:param comparator: optional. a 2-arg callable predicate used
- to compare values of this type. Otherwise, either
- the == operator is used to compare values, or if mutable==True
- and the incoming object does not implement __eq__(), the value
- of pickle.dumps(obj) is compared. The last option is a deprecated
- usage and will raise a warning.
+ to compare values of this type. Otherwise,
+ the == operator is used to compare values.
"""
self.protocol = protocol
(m.MSBoolean, "t.col"),
(m.MSEnum, "t.col"),
- (m.MSEnum("'1'", "'2'"), "t.col"),
+ (m.MSEnum("1", "2"), "t.col"),
(m.MSSet, "t.col"),
- (m.MSSet("'1'", "'2'"), "t.col"),
+ (m.MSSet("1", "2"), "t.col"),
]
for type_, expected in specs:
def produce_test(parent, child, direction):
"""produce a testcase for A->B->C inheritance with a self-referential
relationship between two of the classes, using either one-to-many or
- many-to-one."""
+ many-to-one.
+
+ the old "no discriminator column" pattern is used.
+
+ """
class ABCTest(_base.MappedTest):
@classmethod
def define_tables(cls, metadata):
child_table.update(values={child_table.c.parent_id:None}).execute()
super(ABCTest, self).teardown()
+
+ @testing.uses_deprecated("fold_equivalents is deprecated.")
def test_roundtrip(self):
parent_table = {"a":ta, "b":tb, "c": tc}[parent]
child_table = {"a":ta, "b":tb, "c": tc}[child]
Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
Column('category', String(70)))
+ @testing.uses_deprecated("fold_equivalents is deprecated.")
def test_manytoone_lazyload(self):
"""test that lazy load clause to a polymorphic child mapper generates correctly [ticket:493]"""
class PersistentObject(object):
def go():
# currently, it doesn't matter if we say Company.employees, or Company.employees.of_type(Engineer). eagerloader doesn't
# pick up on the "of_type()" as of yet.
- eq_(sess.query(Company).options(eagerload_all([Company.employees.of_type(Engineer), Engineer.machines])).all(), assert_result)
+ eq_(sess.query(Company).options(eagerload_all(Company.employees.of_type(Engineer), Engineer.machines)).all(), assert_result)
# in the case of select_type='', the eagerload doesn't take in this case;
# it eagerloads company->people, then a load for each of 5 rows, then lazyload of "machines"
sess = create_session()
def go():
# test load People with eagerload to engineers + machines
- eq_(sess.query(Person).with_polymorphic('*').options(eagerload([Engineer.machines])).filter(Person.name=='dilbert').all(),
+ eq_(sess.query(Person).with_polymorphic('*').options(eagerload(Engineer.machines)).filter(Person.name=='dilbert').all(),
[Engineer(name="dilbert", engineer_name="dilbert", primary_language="java", status="regular engineer", machines=[Machine(name="IBM ThinkPad"), Machine(name="IPhone")])]
)
self.assert_sql_count(testing.db, go, 1)
eq_(sess.query(Person).select_from(people.join(engineers)).join(Engineer.machines).all(), [e1, e2, e3])
eq_(sess.query(Person).select_from(people.join(engineers)).join(Engineer.machines).filter(Machine.name.ilike("%ibm%")).all(), [e1, e3])
- eq_(sess.query(Company).join([('employees', people.join(engineers)), Engineer.machines]).all(), [c1, c2])
- eq_(sess.query(Company).join([('employees', people.join(engineers)), Engineer.machines]).filter(Machine.name.ilike("%thinkpad%")).all(), [c1])
+ eq_(sess.query(Company).join(('employees', people.join(engineers)), Engineer.machines).all(), [c1, c2])
+ eq_(sess.query(Company).join(('employees', people.join(engineers)), Engineer.machines).filter(Machine.name.ilike("%thinkpad%")).all(), [c1])
else:
eq_(sess.query(Company).select_from(companies.join(people).join(engineers)).filter(Engineer.primary_language=='java').all(), [c1])
- eq_(sess.query(Company).join(['employees']).filter(Engineer.primary_language=='java').all(), [c1])
+ eq_(sess.query(Company).join('employees').filter(Engineer.primary_language=='java').all(), [c1])
eq_(sess.query(Person).join(Engineer.machines).all(), [e1, e2, e3])
eq_(sess.query(Person).join(Engineer.machines).filter(Machine.name.ilike("%ibm%")).all(), [e1, e3])
- eq_(sess.query(Company).join(['employees', Engineer.machines]).all(), [c1, c2])
- eq_(sess.query(Company).join(['employees', Engineer.machines]).filter(Machine.name.ilike("%thinkpad%")).all(), [c1])
+ eq_(sess.query(Company).join('employees', Engineer.machines).all(), [c1, c2])
+ eq_(sess.query(Company).join('employees', Engineer.machines).filter(Machine.name.ilike("%thinkpad%")).all(), [c1])
# non-polymorphic
eq_(sess.query(Engineer).join(Engineer.machines).all(), [e1, e2, e3])
# here's the new way
eq_(sess.query(Company).join(Company.employees.of_type(Engineer)).filter(Engineer.primary_language=='java').all(), [c1])
- eq_(sess.query(Company).join([Company.employees.of_type(Engineer), 'machines']).filter(Machine.name.ilike("%thinkpad%")).all(), [c1])
+ eq_(sess.query(Company).join(Company.employees.of_type(Engineer), 'machines').filter(Machine.name.ilike("%thinkpad%")).all(), [c1])
def test_join_through_polymorphic(self):
for aliased in (True, False):
eq_(
sess.query(Company).\
- join(['employees', 'paperwork'], aliased=aliased).filter(Paperwork.description.like('%#2%')).all(),
+ join('employees', 'paperwork', aliased=aliased).filter(Paperwork.description.like('%#2%')).all(),
[c1]
)
eq_(
sess.query(Company).\
- join(['employees', 'paperwork'], aliased=aliased).filter(Paperwork.description.like('%#%')).all(),
+ join('employees', 'paperwork', aliased=aliased).filter(Paperwork.description.like('%#%')).all(),
[c1, c2]
)
eq_(
sess.query(Company).\
- join(['employees', 'paperwork'], aliased=aliased).filter(Person.name.in_(['dilbert', 'vlad'])).filter(Paperwork.description.like('%#2%')).all(),
+ join('employees', 'paperwork', aliased=aliased).filter(Person.name.in_(['dilbert', 'vlad'])).filter(Paperwork.description.like('%#2%')).all(),
[c1]
)
eq_(
sess.query(Company).\
- join(['employees', 'paperwork'], aliased=aliased).filter(Person.name.in_(['dilbert', 'vlad'])).filter(Paperwork.description.like('%#%')).all(),
+ join('employees', 'paperwork', aliased=aliased).filter(Person.name.in_(['dilbert', 'vlad'])).filter(Paperwork.description.like('%#%')).all(),
[c1, c2]
)
join('employees').
filter(Employee.name.startswith('J')).
distinct().
- order_by([sa.desc(Department.name)]))
+ order_by(sa.desc(Department.name)))
eq_(q.count(), 2)
assert q[0] is d2
el = Element()
x = Bar()
x.element = el
- eq_(attributes.get_history(attributes.instance_state(x), 'element'), ([el], (), ()))
+ eq_(attributes.get_state_history(attributes.instance_state(x), 'element'), ([el], (), ()))
attributes.instance_state(x).commit_all(attributes.instance_dict(x))
- (added, unchanged, deleted) = attributes.get_history(attributes.instance_state(x), 'element')
+ (added, unchanged, deleted) = attributes.get_state_history(attributes.instance_state(x), 'element')
assert added == ()
assert unchanged == [el]
x = Foo()
attributes.instance_state(x).commit_all(attributes.instance_dict(x))
x.col2.append(bar4)
- eq_(attributes.get_history(attributes.instance_state(x), 'col2'), ([bar4], [bar1, bar2, bar3], []))
+ eq_(attributes.get_state_history(attributes.instance_state(x), 'col2'), ([bar4], [bar1, bar2, bar3], []))
def test_parenttrack(self):
class Foo(object):pass
# case 1. new object
f = Foo()
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), (), ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), (), ()))
f.someattr = "hi"
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), (['hi'], (), ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), (['hi'], (), ()))
attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), ['hi'], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), ['hi'], ()))
f.someattr = 'there'
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), (['there'], (), ['hi']))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), (['there'], (), ['hi']))
attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), ['there'], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), ['there'], ()))
del f.someattr
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), (), ['there']))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), (), ['there']))
# case 2. object with direct dictionary settings (similar to a load operation)
f = Foo()
f.__dict__['someattr'] = 'new'
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), ['new'], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), ['new'], ()))
f.someattr = 'old'
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), (['old'], (), ['new']))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), (['old'], (), ['new']))
attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), ['old'], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), ['old'], ()))
# setting None on uninitialized is currently a change for a scalar attribute
# no lazyload occurs so this allows overwrite operation to proceed
f = Foo()
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), (), ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), (), ()))
f.someattr = None
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ([None], (), ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([None], (), ()))
f = Foo()
f.__dict__['someattr'] = 'new'
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), ['new'], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), ['new'], ()))
f.someattr = None
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ([None], (), ['new']))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([None], (), ['new']))
# set same value twice
f = Foo()
attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
f.someattr = 'one'
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), (['one'], (), ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), (['one'], (), ()))
f.someattr = 'two'
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), (['two'], (), ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), (['two'], (), ()))
def test_mutable_scalar(self):
# case 1. new object
f = Foo()
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), (), ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), (), ()))
f.someattr = {'foo':'hi'}
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ([{'foo':'hi'}], (), ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([{'foo':'hi'}], (), ()))
attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), [{'foo':'hi'}], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [{'foo':'hi'}], ()))
eq_(attributes.instance_state(f).committed_state['someattr'], {'foo':'hi'})
f.someattr['foo'] = 'there'
eq_(attributes.instance_state(f).committed_state['someattr'], {'foo':'hi'})
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ([{'foo':'there'}], (), [{'foo':'hi'}]))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([{'foo':'there'}], (), [{'foo':'hi'}]))
attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), [{'foo':'there'}], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [{'foo':'there'}], ()))
# case 2. object with direct dictionary settings (similar to a load operation)
f = Foo()
f.__dict__['someattr'] = {'foo':'new'}
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), [{'foo':'new'}], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [{'foo':'new'}], ()))
f.someattr = {'foo':'old'}
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ([{'foo':'old'}], (), [{'foo':'new'}]))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([{'foo':'old'}], (), [{'foo':'new'}]))
attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), [{'foo':'old'}], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [{'foo':'old'}], ()))
def test_use_object(self):
# case 1. new object
f = Foo()
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), [None], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [None], ()))
f.someattr = hi
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ([hi], (), ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([hi], (), ()))
attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), [hi], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [hi], ()))
f.someattr = there
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ([there], (), [hi]))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([there], (), [hi]))
attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), [there], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [there], ()))
del f.someattr
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ([None], (), [there]))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([None], (), [there]))
# case 2. object with direct dictionary settings (similar to a load operation)
f = Foo()
f.__dict__['someattr'] = 'new'
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), ['new'], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), ['new'], ()))
f.someattr = old
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ([old], (), ['new']))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([old], (), ['new']))
attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), [old], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [old], ()))
# setting None on uninitialized is currently not a change for an object attribute
# (this is different than scalar attribute). a lazyload has occured so if its
# None, its really None
f = Foo()
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), [None], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [None], ()))
f.someattr = None
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), [None], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [None], ()))
f = Foo()
f.__dict__['someattr'] = 'new'
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), ['new'], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), ['new'], ()))
f.someattr = None
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ([None], (), ['new']))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([None], (), ['new']))
# set same value twice
f = Foo()
attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
f.someattr = 'one'
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), (['one'], (), ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), (['one'], (), ()))
f.someattr = 'two'
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), (['two'], (), ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), (['two'], (), ()))
def test_object_collections_set(self):
class Foo(_base.BasicEntity):
# case 1. new object
f = Foo()
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), [], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [], ()))
f.someattr = [hi]
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ([hi], [], []))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([hi], [], []))
attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), [hi], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [hi], ()))
f.someattr = [there]
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ([there], [], [hi]))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([there], [], [hi]))
attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), [there], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [there], ()))
f.someattr = [hi]
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ([hi], [], [there]))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([hi], [], [there]))
f.someattr = [old, new]
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ([old, new], [], [there]))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([old, new], [], [there]))
# case 2. object with direct settings (similar to a load operation)
f = Foo()
- collection = attributes.init_collection(attributes.instance_state(f), 'someattr')
+ collection = attributes.init_collection(f, 'someattr')
collection.append_without_event(new)
attributes.instance_state(f).commit_all(attributes.instance_dict(f))
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), [new], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [new], ()))
f.someattr = [old]
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ([old], [], [new]))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([old], [], [new]))
attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), [old], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [old], ()))
def test_dict_collections(self):
class Foo(_base.BasicEntity):
new = Bar(name='new')
f = Foo()
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), [], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [], ()))
f.someattr['hi'] = hi
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ([hi], [], []))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([hi], [], []))
f.someattr['there'] = there
- eq_(tuple([set(x) for x in attributes.get_history(attributes.instance_state(f), 'someattr')]), (set([hi, there]), set(), set()))
+ eq_(tuple([set(x) for x in attributes.get_state_history(attributes.instance_state(f), 'someattr')]), (set([hi, there]), set(), set()))
attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
- eq_(tuple([set(x) for x in attributes.get_history(attributes.instance_state(f), 'someattr')]), (set(), set([hi, there]), set()))
+ eq_(tuple([set(x) for x in attributes.get_state_history(attributes.instance_state(f), 'someattr')]), (set(), set([hi, there]), set()))
def test_object_collections_mutate(self):
class Foo(_base.BasicEntity):
# case 1. new object
f = Foo(id=1)
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), [], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [], ()))
f.someattr.append(hi)
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ([hi], [], []))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([hi], [], []))
attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), [hi], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [hi], ()))
f.someattr.append(there)
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ([there], [hi], []))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([there], [hi], []))
attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), [hi, there], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [hi, there], ()))
f.someattr.remove(there)
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ([], [hi], [there]))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([], [hi], [there]))
f.someattr.append(old)
f.someattr.append(new)
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ([old, new], [hi], [there]))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([old, new], [hi], [there]))
attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), [hi, old, new], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [hi, old, new], ()))
f.someattr.pop(0)
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ([], [old, new], [hi]))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([], [old, new], [hi]))
# case 2. object with direct settings (similar to a load operation)
f = Foo()
f.__dict__['id'] = 1
- collection = attributes.init_collection(attributes.instance_state(f), 'someattr')
+ collection = attributes.init_collection(f, 'someattr')
collection.append_without_event(new)
attributes.instance_state(f).commit_all(attributes.instance_dict(f))
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), [new], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [new], ()))
f.someattr.append(old)
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ([old], [new], []))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([old], [new], []))
attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), [new, old], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [new, old], ()))
f = Foo()
- collection = attributes.init_collection(attributes.instance_state(f), 'someattr')
+ collection = attributes.init_collection(f, 'someattr')
collection.append_without_event(new)
attributes.instance_state(f).commit_all(attributes.instance_dict(f))
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), [new], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [new], ()))
f.id = 1
f.someattr.remove(new)
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ([], [], [new]))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([], [], [new]))
# case 3. mixing appends with sets
f = Foo()
f.someattr.append(hi)
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ([hi], [], []))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([hi], [], []))
f.someattr.append(there)
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ([hi, there], [], []))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([hi, there], [], []))
f.someattr = [there]
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ([there], [], []))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([there], [], []))
# case 4. ensure duplicates show up, order is maintained
f = Foo()
f.someattr.append(hi)
f.someattr.append(there)
f.someattr.append(hi)
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ([hi, there, hi], [], []))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([hi, there, hi], [], []))
attributes.instance_state(f).commit_all(attributes.instance_dict(f))
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ((), [hi, there, hi], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [hi, there, hi], ()))
f.someattr = []
- eq_(attributes.get_history(attributes.instance_state(f), 'someattr'), ([], [], [hi, there, hi]))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([], [], [hi, there, hi]))
def test_collections_via_backref(self):
class Foo(_base.BasicEntity):
f1 = Foo()
b1 = Bar()
- eq_(attributes.get_history(attributes.instance_state(f1), 'bars'), ((), [], ()))
- eq_(attributes.get_history(attributes.instance_state(b1), 'foo'), ((), [None], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f1), 'bars'), ((), [], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(b1), 'foo'), ((), [None], ()))
#b1.foo = f1
f1.bars.append(b1)
- eq_(attributes.get_history(attributes.instance_state(f1), 'bars'), ([b1], [], []))
- eq_(attributes.get_history(attributes.instance_state(b1), 'foo'), ([f1], (), ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f1), 'bars'), ([b1], [], []))
+ eq_(attributes.get_state_history(attributes.instance_state(b1), 'foo'), ([f1], (), ()))
b2 = Bar()
f1.bars.append(b2)
- eq_(attributes.get_history(attributes.instance_state(f1), 'bars'), ([b1, b2], [], []))
- eq_(attributes.get_history(attributes.instance_state(b1), 'foo'), ([f1], (), ()))
- eq_(attributes.get_history(attributes.instance_state(b2), 'foo'), ([f1], (), ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f1), 'bars'), ([b1, b2], [], []))
+ eq_(attributes.get_state_history(attributes.instance_state(b1), 'foo'), ([f1], (), ()))
+ eq_(attributes.get_state_history(attributes.instance_state(b2), 'foo'), ([f1], (), ()))
def test_lazy_backref_collections(self):
class Foo(_base.BasicEntity):
f = Foo()
bar4 = Bar()
bar4.foo = f
- eq_(attributes.get_history(attributes.instance_state(f), 'bars'), ([bar4], [bar1, bar2, bar3], []))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'bars'), ([bar4], [bar1, bar2, bar3], []))
lazy_load = None
f = Foo()
bar4 = Bar()
bar4.foo = f
- eq_(attributes.get_history(attributes.instance_state(f), 'bars'), ([bar4], [], []))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'bars'), ([bar4], [], []))
lazy_load = [bar1, bar2, bar3]
attributes.instance_state(f).expire_attributes(['bars'])
- eq_(attributes.get_history(attributes.instance_state(f), 'bars'), ((), [bar1, bar2, bar3], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'bars'), ((), [bar1, bar2, bar3], ()))
def test_collections_via_lazyload(self):
class Foo(_base.BasicEntity):
f = Foo()
f.bars = []
- eq_(attributes.get_history(attributes.instance_state(f), 'bars'), ([], [], [bar1, bar2, bar3]))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'bars'), ([], [], [bar1, bar2, bar3]))
f = Foo()
f.bars.append(bar4)
- eq_(attributes.get_history(attributes.instance_state(f), 'bars'), ([bar4], [bar1, bar2, bar3], []) )
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'bars'), ([bar4], [bar1, bar2, bar3], []) )
f = Foo()
f.bars.remove(bar2)
- eq_(attributes.get_history(attributes.instance_state(f), 'bars'), ([], [bar1, bar3], [bar2]))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'bars'), ([], [bar1, bar3], [bar2]))
f.bars.append(bar4)
- eq_(attributes.get_history(attributes.instance_state(f), 'bars'), ([bar4], [bar1, bar3], [bar2]))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'bars'), ([bar4], [bar1, bar3], [bar2]))
f = Foo()
del f.bars[1]
- eq_(attributes.get_history(attributes.instance_state(f), 'bars'), ([], [bar1, bar3], [bar2]))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'bars'), ([], [bar1, bar3], [bar2]))
lazy_load = None
f = Foo()
f.bars.append(bar2)
- eq_(attributes.get_history(attributes.instance_state(f), 'bars'), ([bar2], [], []))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'bars'), ([bar2], [], []))
def test_scalar_via_lazyload(self):
class Foo(_base.BasicEntity):
f = Foo()
eq_(f.bar, "hi")
- eq_(attributes.get_history(attributes.instance_state(f), 'bar'), ((), ["hi"], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), ((), ["hi"], ()))
f = Foo()
f.bar = None
- eq_(attributes.get_history(attributes.instance_state(f), 'bar'), ([None], (), ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), ([None], (), ()))
f = Foo()
f.bar = "there"
- eq_(attributes.get_history(attributes.instance_state(f), 'bar'), (["there"], (), ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), (["there"], (), ()))
f.bar = "hi"
- eq_(attributes.get_history(attributes.instance_state(f), 'bar'), (["hi"], (), ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), (["hi"], (), ()))
f = Foo()
eq_(f.bar, "hi")
del f.bar
- eq_(attributes.get_history(attributes.instance_state(f), 'bar'), ((), (), ["hi"]))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), ((), (), ["hi"]))
assert f.bar is None
- eq_(attributes.get_history(attributes.instance_state(f), 'bar'), ([None], (), ["hi"]))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), ([None], (), ["hi"]))
def test_scalar_via_lazyload_with_active(self):
class Foo(_base.BasicEntity):
f = Foo()
eq_(f.bar, "hi")
- eq_(attributes.get_history(attributes.instance_state(f), 'bar'), ((), ["hi"], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), ((), ["hi"], ()))
f = Foo()
f.bar = None
- eq_(attributes.get_history(attributes.instance_state(f), 'bar'), ([None], (), ['hi']))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), ([None], (), ['hi']))
f = Foo()
f.bar = "there"
- eq_(attributes.get_history(attributes.instance_state(f), 'bar'), (["there"], (), ['hi']))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), (["there"], (), ['hi']))
f.bar = "hi"
- eq_(attributes.get_history(attributes.instance_state(f), 'bar'), ((), ["hi"], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), ((), ["hi"], ()))
f = Foo()
eq_(f.bar, "hi")
del f.bar
- eq_(attributes.get_history(attributes.instance_state(f), 'bar'), ((), (), ["hi"]))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), ((), (), ["hi"]))
assert f.bar is None
- eq_(attributes.get_history(attributes.instance_state(f), 'bar'), ([None], (), ["hi"]))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), ([None], (), ["hi"]))
def test_scalar_object_via_lazyload(self):
class Foo(_base.BasicEntity):
# operations
f = Foo()
- eq_(attributes.get_history(attributes.instance_state(f), 'bar'), ((), [bar1], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), ((), [bar1], ()))
f = Foo()
f.bar = None
- eq_(attributes.get_history(attributes.instance_state(f), 'bar'), ([None], (), [bar1]))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), ([None], (), [bar1]))
f = Foo()
f.bar = bar2
- eq_(attributes.get_history(attributes.instance_state(f), 'bar'), ([bar2], (), [bar1]))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), ([bar2], (), [bar1]))
f.bar = bar1
- eq_(attributes.get_history(attributes.instance_state(f), 'bar'), ((), [bar1], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), ((), [bar1], ()))
f = Foo()
eq_(f.bar, bar1)
del f.bar
- eq_(attributes.get_history(attributes.instance_state(f), 'bar'), ([None], (), [bar1]))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), ([None], (), [bar1]))
assert f.bar is None
- eq_(attributes.get_history(attributes.instance_state(f), 'bar'), ([None], (), [bar1]))
+ eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), ([None], (), [bar1]))
class ListenerTest(_base.ORMTest):
def test_receive_changes(self):
sess.flush()
from sqlalchemy.orm import attributes
- eq_(attributes.get_history(attributes.instance_state(u1), 'addresses'), ([], [Address(email_address='lala@hoho.com')], []))
+ eq_(attributes.get_history(u1, 'addresses'), ([], [Address(email_address='lala@hoho.com')], []))
sess.expunge_all()
f1 = Foo()
f1.name = 'f1'
- eq_(attributes.get_history(attributes.instance_state(f1), 'name'), (['f1'], (), ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f1), 'name'), (['f1'], (), ()))
b1 = Bar()
b1.name = 'b1'
f1.bars.append(b1)
- eq_(attributes.get_history(attributes.instance_state(f1), 'bars'), ([b1], [], []))
+ eq_(attributes.get_state_history(attributes.instance_state(f1), 'bars'), ([b1], [], []))
attributes.instance_state(f1).commit_all(attributes.instance_dict(f1))
attributes.instance_state(b1).commit_all(attributes.instance_dict(b1))
- eq_(attributes.get_history(attributes.instance_state(f1), 'name'), ((), ['f1'], ()))
- eq_(attributes.get_history(attributes.instance_state(f1), 'bars'), ((), [b1], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f1), 'name'), ((), ['f1'], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f1), 'bars'), ((), [b1], ()))
f1.name = 'f1mod'
b2 = Bar()
b2.name = 'b2'
f1.bars.append(b2)
- eq_(attributes.get_history(attributes.instance_state(f1), 'name'), (['f1mod'], (), ['f1']))
- eq_(attributes.get_history(attributes.instance_state(f1), 'bars'), ([b2], [b1], []))
+ eq_(attributes.get_state_history(attributes.instance_state(f1), 'name'), (['f1mod'], (), ['f1']))
+ eq_(attributes.get_state_history(attributes.instance_state(f1), 'bars'), ([b2], [b1], []))
f1.bars.remove(b1)
- eq_(attributes.get_history(attributes.instance_state(f1), 'bars'), ([b2], [], [b1]))
+ eq_(attributes.get_state_history(attributes.instance_state(f1), 'bars'), ([b2], [], [b1]))
def test_null_instrumentation(self):
class Foo(MyBaseClass):
@testing.resolve_artifact_names
def test_selectby(self):
res = create_session().query(Foo).filter_by(range=5)
- assert res.order_by([Foo.bar])[0].bar == 5
- assert res.order_by([sa.desc(Foo.bar)])[0].bar == 95
+ assert res.order_by(Foo.bar)[0].bar == 5
+ assert res.order_by(sa.desc(Foo.bar))[0].bar == 95
@testing.fails_on('maxdb', 'FIXME: unknown')
@testing.resolve_artifact_names
@testing.resolve_artifact_names
def test_order_by(self):
query = create_session().query(Foo)
- assert query.order_by([Foo.bar])[0].bar == 0
- assert query.order_by([sa.desc(Foo.bar)])[0].bar == 99
+ assert query.order_by(Foo.bar)[0].bar == 0
+ assert query.order_by(sa.desc(Foo.bar))[0].bar == 99
@testing.resolve_artifact_names
def test_offset(self):
query = create_session().query(Foo)
- assert list(query.order_by([Foo.bar]).offset(10))[0].bar == 10
+ assert list(query.order_by(Foo.bar).offset(10))[0].bar == 10
@testing.resolve_artifact_names
def test_offset(self):
"""Query.join"""
session = create_session()
- q = (session.query(User).join(['orders', 'addresses']).
+ q = (session.query(User).join('orders', 'addresses').
filter(Address.id == 1))
eq_([User(id=7)], q.all())
"""Query.outerjoin"""
session = create_session()
- q = (session.query(User).outerjoin(['orders', 'addresses']).
+ q = (session.query(User).outerjoin('orders', 'addresses').
filter(sa.or_(Order.id == None, Address.id == 1)))
eq_(set([User(id=7), User(id=8), User(id=10)]),
set(q.all()))
session = create_session()
- q = (session.query(User).outerjoin(['orders', 'addresses']).
+ q = (session.query(User).outerjoin('orders', 'addresses').
filter(sa.or_(Order.id == None, Address.id == 1)))
eq_(q.count(), 4)
def test_replace_property(self):
m = mapper(User, users)
m.add_property('_name',users.c.name)
- m.add_property('name', synonym('_name', proxy=True))
+ m.add_property('name', synonym('_name'))
sess = create_session()
u = sess.query(User).filter_by(name='jack').one()
mapper(User, users, properties=dict(
addresses = relation(mapper(Address, addresses), lazy=True),
uname = synonym('name'),
- adlist = synonym('addresses', proxy=True),
+ adlist = synonym('addresses'),
adname = synonym('addresses')
))
mapper(User, users, properties=dict(
addresses = relation(mapper(Address, addresses), lazy=True,
order_by=addresses.c.id),
- adlist = synonym('addresses', proxy=True)))
+ adlist = synonym('addresses')))
def go():
sa.orm.eagerload("addresses"),
sa.orm.defer("name"),
sa.orm.defer(User.name),
- sa.orm.defer([User.name]),
sa.orm.eagerload("addresses", User.addresses),
- sa.orm.eagerload(["addresses", User.addresses]),
]:
opt2 = pickle.loads(pickle.dumps(opt))
eq_(opt.key, opt2.key)
class FromSelfTest(QueryTest, AssertsCompiledSQL):
def test_filter(self):
- assert [User(id=8), User(id=9)] == create_session().query(User).filter(User.id.in_([8,9]))._from_self().all()
+ assert [User(id=8), User(id=9)] == create_session().query(User).filter(User.id.in_([8,9])).from_self().all()
- assert [User(id=8), User(id=9)] == create_session().query(User).order_by(User.id).slice(1,3)._from_self().all()
- assert [User(id=8)] == list(create_session().query(User).filter(User.id.in_([8,9]))._from_self().order_by(User.id)[0:1])
+ assert [User(id=8), User(id=9)] == create_session().query(User).order_by(User.id).slice(1,3).from_self().all()
+ assert [User(id=8)] == list(create_session().query(User).filter(User.id.in_([8,9])).from_self().order_by(User.id)[0:1])
def test_join(self):
assert [
(User(id=8), Address(id=3)),
(User(id=8), Address(id=4)),
(User(id=9), Address(id=5))
- ] == create_session().query(User).filter(User.id.in_([8,9]))._from_self().\
+ ] == create_session().query(User).filter(User.id.in_([8,9])).from_self().\
join('addresses').add_entity(Address).order_by(User.id, Address.id).all()
def test_group_by(self):
sess = create_session()
eq_(
- sess.query(User, Address).filter(User.id==Address.user_id).filter(Address.id.in_([2, 5]))._from_self().all(),
+ sess.query(User, Address).filter(User.id==Address.user_id).filter(Address.id.in_([2, 5])).from_self().all(),
[
(User(id=8), Address(id=2)),
(User(id=9), Address(id=5))
)
eq_(
- sess.query(User, Address).filter(User.id==Address.user_id).filter(Address.id.in_([2, 5]))._from_self().options(eagerload('addresses')).first(),
+ sess.query(User, Address).filter(User.id==Address.user_id).filter(Address.id.in_([2, 5])).from_self().options(eagerload('addresses')).first(),
# order_by(User.id, Address.id).first(),
(User(id=8, addresses=[Address(), Address(), Address()]), Address(id=2)),
q6 = q4.union(q5)
for q in (q3, q6):
- eq_(q.all(),
+ eq_(q.order_by(User.id).all(),
[
(User(id=7, name=u'jack'), u'x'),
(User(id=7, name=u'jack'), u'y'),
o = sess.query(Order).filter(with_parent(u1, User.orders)).all()
assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o
- # test static method
- @testing.uses_deprecated(".*Use sqlalchemy.orm.with_parent")
- def go():
- o = Query.query_from_parent(u1, property='orders', session=sess).all()
- assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o
- go()
-
# test generative criterion
o = sess.query(Order).with_parent(u1).filter(orders.c.id>2).all()
assert [Order(description="order 3"), Order(description="order 5")] == o
def test_overlapping_paths(self):
for aliased in (True,False):
# load a user who has an order that contains item id 3 and address id 1 (order 3, owned by jack)
- result = create_session().query(User).join(['orders', 'items'], aliased=aliased).\
- filter_by(id=3).join(['orders','address'], aliased=aliased).filter_by(id=1).all()
+ result = create_session().query(User).join('orders', 'items', aliased=aliased).\
+ filter_by(id=3).join('orders','address', aliased=aliased).filter_by(id=1).all()
assert [User(id=7, name='jack')] == result
def test_overlapping_paths_outerjoin(self):
- result = create_session().query(User).outerjoin(['orders', 'items']).\
- filter_by(id=3).outerjoin(['orders','address']).filter_by(id=1).all()
+ result = create_session().query(User).outerjoin('orders', 'items').\
+ filter_by(id=3).outerjoin('orders','address').filter_by(id=1).all()
assert [User(id=7, name='jack')] == result
def test_from_joinpoint(self):
orderalias = aliased(Order)
itemalias = aliased(Item)
eq_(
- sess.query(User).join([('orders', orderalias), ('items', itemalias)]).filter(itemalias.description == 'item 4').all(),
+ sess.query(User).join(('orders', orderalias), ('items', itemalias)).filter(itemalias.description == 'item 4').all(),
[User(name='jack')]
)
eq_(
- sess.query(User).join([('orders', orderalias), ('items', itemalias)]).filter(orderalias.user_id==9).filter(itemalias.description=='item 4').all(),
+ sess.query(User).join(('orders', orderalias), ('items', itemalias)).filter(orderalias.user_id==9).filter(itemalias.description=='item 4').all(),
[]
)
def test_orderby_arg_bug(self):
sess = create_session()
# no arg error
- result = sess.query(User).join('orders', aliased=True).order_by([Order.id]).reset_joinpoint().order_by(users.c.id).all()
+ result = sess.query(User).join('orders', aliased=True).order_by(Order.id).reset_joinpoint().order_by(users.c.id).all()
def test_no_onclause(self):
sess = create_session()
def test_reset_joinpoint(self):
for aliased in (True, False):
# load a user who has an order that contains item id 3 and address id 1 (order 3, owned by jack)
- result = create_session().query(User).join(['orders', 'items'], aliased=aliased).filter_by(id=3).reset_joinpoint().join(['orders','address'], aliased=aliased).filter_by(id=1).all()
+ result = create_session().query(User).join('orders', 'items', aliased=aliased).filter_by(id=3).reset_joinpoint().join('orders','address', aliased=aliased).filter_by(id=1).all()
assert [User(id=7, name='jack')] == result
- result = create_session().query(User).outerjoin(['orders', 'items'], aliased=aliased).filter_by(id=3).reset_joinpoint().outerjoin(['orders','address'], aliased=aliased).filter_by(id=1).all()
+ result = create_session().query(User).outerjoin('orders', 'items', aliased=aliased).filter_by(id=3).reset_joinpoint().outerjoin('orders','address', aliased=aliased).filter_by(id=1).all()
assert [User(id=7, name='jack')] == result
def test_overlap_with_aliases(self):
oalias = orders.alias('oalias')
- result = create_session().query(User).select_from(users.join(oalias)).filter(oalias.c.description.in_(["order 1", "order 2", "order 3"])).join(['orders', 'items']).order_by(User.id).all()
+ result = create_session().query(User).select_from(users.join(oalias)).filter(oalias.c.description.in_(["order 1", "order 2", "order 3"])).join('orders', 'items').order_by(User.id).all()
assert [User(id=7, name='jack'), User(id=9, name='fred')] == result
- result = create_session().query(User).select_from(users.join(oalias)).filter(oalias.c.description.in_(["order 1", "order 2", "order 3"])).join(['orders', 'items']).filter_by(id=4).all()
+ result = create_session().query(User).select_from(users.join(oalias)).filter(oalias.c.description.in_(["order 1", "order 2", "order 3"])).join('orders', 'items').filter_by(id=4).all()
assert [User(id=7, name='jack')] == result
def test_aliased(self):
# test two aliasized paths, one to 'orders' and the other to 'orders','items'.
# one row is returned because user 7 has order 3 and also has order 1 which has item 1
# this tests a o2m join and a m2m join.
- q = sess.query(User).join('orders', aliased=True).filter(Order.description=="order 3").join(['orders', 'items'], aliased=True).filter(Item.description=="item 1")
+ q = sess.query(User).join('orders', aliased=True).filter(Order.description=="order 3").join('orders', 'items', aliased=True).filter(Item.description=="item 1")
assert q.count() == 1
assert [User(id=7)] == q.all()
# test the control version - same joins but not aliased. rows are not returned because order 3 does not have item 1
- q = sess.query(User).join('orders').filter(Order.description=="order 3").join(['orders', 'items']).filter(Item.description=="item 1")
+ q = sess.query(User).join('orders').filter(Order.description=="order 3").join('orders', 'items').filter(Item.description=="item 1")
assert [] == q.all()
assert q.count() == 0
['orders', 'items_syn'],
['orders_syn', 'items_syn'],
):
- result = create_session().query(User).join(j).filter_by(id=3).all()
+ result = create_session().query(User).join(*j).filter_by(id=3).all()
assert [User(id=7, name='jack'), User(id=9, name='fred')] == result
def test_with_parent(self):
sel = users.select(User.id.in_([7, 8])).alias()
q = sess.query(User)
u2 = aliased(User)
- q2 = q.select_from(sel).filter(u2.id>1).order_by([User.id, sel.c.id, u2.id]).values(User.name, sel.c.name, u2.name)
+ q2 = q.select_from(sel).filter(u2.id>1).order_by(User.id, sel.c.id, u2.id).values(User.name, sel.c.name, u2.name)
eq_(list(q2), [(u'jack', u'jack', u'jack'), (u'jack', u'jack', u'ed'), (u'jack', u'jack', u'fred'), (u'jack', u'jack', u'chuck'), (u'ed', u'ed', u'jack'), (u'ed', u'ed', u'ed'), (u'ed', u'ed', u'fred'), (u'ed', u'ed', u'chuck')])
@testing.fails_on('mssql', 'FIXME: unknown')
sess = create_session()
q = sess.query(User)
- q2 = q.group_by([User.name.like('%j%')]).order_by(desc(User.name.like('%j%'))).values(User.name.like('%j%'), func.count(User.name.like('%j%')))
+ q2 = q.group_by(User.name.like('%j%')).order_by(desc(User.name.like('%j%'))).values(User.name.like('%j%'), func.count(User.name.like('%j%')))
eq_(list(q2), [(True, 1), (False, 3)])
q2 = q.order_by(desc(User.name.like('%j%'))).values(User.name.like('%j%'))
]
q = sess.query(User)
- q = q.group_by([c for c in users.c]).order_by(User.id).outerjoin('addresses').add_column(func.count(Address.id).label('count'))
+ q = q.group_by(users).order_by(User.id).outerjoin('addresses').add_column(func.count(Address.id).label('count'))
eq_(q.all(), expected)
sess.expunge_all()
adalias = aliased(Address)
q = sess.query(User)
- q = q.group_by([c for c in users.c]).order_by(User.id).outerjoin(('addresses', adalias)).add_column(func.count(adalias.id).label('count'))
+ q = q.group_by(users).order_by(User.id).outerjoin(('addresses', adalias)).add_column(func.count(adalias.id).label('count'))
eq_(q.all(), expected)
sess.expunge_all()
+ # TODO: figure out why group_by(users) doesn't work here
s = select([users, func.count(addresses.c.id).label('count')]).select_from(users.outerjoin(addresses)).group_by(*[c for c in users.c]).order_by(User.id)
q = sess.query(User)
l = q.add_column("count").from_statement(s).all()
adalias = addresses.alias()
q = create_session().query(User).add_column(func.count(adalias.c.id))\
.add_column(("Name:" + users.c.name)).outerjoin(('addresses', adalias))\
- .group_by([c for c in users.c]).order_by(users.c.id)
+ .group_by(users).order_by(users.c.id)
assert q.all() == expected
# test with select_from()
q = create_session().query(User).add_column(func.count(addresses.c.id))\
.add_column(("Name:" + users.c.name)).select_from(users.outerjoin(addresses))\
- .group_by([c for c in users.c]).order_by(users.c.id)
+ .group_by(users).order_by(users.c.id)
assert q.all() == expected
sess.expunge_all()
q = create_session().query(User).add_column(func.count(addresses.c.id))\
.add_column(("Name:" + users.c.name)).outerjoin('addresses')\
- .group_by([c for c in users.c]).order_by(users.c.id)
+ .group_by(users).order_by(users.c.id)
assert q.all() == expected
sess.expunge_all()
q = create_session().query(User).add_column(func.count(adalias.c.id))\
.add_column(("Name:" + users.c.name)).outerjoin(('addresses', adalias))\
- .group_by([c for c in users.c]).order_by(users.c.id)
+ .group_by(users).order_by(users.c.id)
assert q.all() == expected
sess.expunge_all()
])
def go():
- eq_(sess.query(User).select_from(sel).options(eagerload_all('orders.items.keywords')).join(['orders', 'items', 'keywords'], aliased=True).filter(Keyword.name.in_(['red', 'big', 'round'])).all(), [
+ eq_(sess.query(User).select_from(sel).options(eagerload_all('orders.items.keywords')).join('orders', 'items', 'keywords', aliased=True).filter(Keyword.name.in_(['red', 'big', 'round'])).all(), [
User(name=u'jack',orders=[
Order(description=u'order 1',items=[
Item(description=u'item 1',keywords=[Keyword(name=u'red'), Keyword(name=u'big'), Keyword(name=u'round')]),
sess.expunge_all()
sel2 = orders.select(orders.c.id.in_([1,2,3]))
- eq_(sess.query(Order).select_from(sel2).join(['items', 'keywords']).filter(Keyword.name == 'red').order_by(Order.id).all(), [
+ eq_(sess.query(Order).select_from(sel2).join('items', 'keywords').filter(Keyword.name == 'red').order_by(Order.id).all(), [
Order(description=u'order 1',id=1),
Order(description=u'order 2',id=2),
])
- eq_(sess.query(Order).select_from(sel2).join(['items', 'keywords'], aliased=True).filter(Keyword.name == 'red').order_by(Order.id).all(), [
+ eq_(sess.query(Order).select_from(sel2).join('items', 'keywords', aliased=True).filter(Keyword.name == 'red').order_by(Order.id).all(), [
Order(description=u'order 1',id=1),
Order(description=u'order 2',id=2),
])
))
q = create_session().query(User)
- assert [User(id=7)] == q.join(['open_orders', 'items'], aliased=True).filter(Item.id==4).join(['closed_orders', 'items'], aliased=True).filter(Item.id==3).all()
+ assert [User(id=7)] == q.join('open_orders', 'items', aliased=True).filter(Item.id==4).join('closed_orders', 'items', aliased=True).filter(Item.id==3).all()
class SelfReferentialTest(_base.MappedTest):
run_setup_mappers = 'once'
assert ret == [('n12',)]
- node = sess.query(Node).join(['children', 'children'], aliased=True).filter_by(data='n122').first()
+ node = sess.query(Node).join('children', 'children', aliased=True).filter_by(data='n122').first()
assert node.data=='n1'
node = sess.query(Node).filter_by(data='n122').join('parent', aliased=True).filter_by(data='n12').\
assert c.scalar("select count(1) from users") == 1
- @testing.uses_deprecated()
@engines.close_open_connections
@testing.resolve_artifact_names
- def test_save_update_delete(self):
+ def test_add_delete(self):
s = create_session()
mapper(User, users, properties={
user = User(name='u1')
- assert_raises_message(sa.exc.InvalidRequestError, "is not persisted", s.update, user)
assert_raises_message(sa.exc.InvalidRequestError, "is not persisted", s.delete, user)
s.add(user)
assert user in s
assert user not in s.dirty
- assert_raises_message(sa.exc.InvalidRequestError, "is already persistent", s.save, user)
-
s2 = create_session()
assert_raises_message(sa.exc.InvalidRequestError, "is already attached to session", s2.delete, user)
raises_('refresh', user_arg)
- raises_('save', user_arg)
-
- raises_('save_or_update', user_arg)
-
- raises_('update', user_arg)
-
instance_methods = self._public_session_methods() - self._class_methods
eq_(watchdog, instance_methods,
# select both
userlist = session.query(User).filter(
- users.c.id.in_([u.id, u2.id])).order_by([users.c.name]).all()
+ users.c.id.in_([u.id, u2.id])).order_by(users.c.name).all()
eq_(u.id, userlist[0].id)
eq_(userlist[0].name, 'modifiedname')
r = s.execute(userid='fred').fetchall()
assert len(r) == 1
- def test_bindparam_shortname(self):
- """test the 'shortname' field on BindParamClause."""
- users.insert().execute(user_id = 7, user_name = 'jack')
- users.insert().execute(user_id = 8, user_name = 'fred')
- u = bindparam('userid', shortname='someshortname')
- s = users.select(users.c.user_name==u)
- r = s.execute(someshortname='fred').fetchall()
- assert len(r) == 1
-
def test_bindparam_detection(self):
dialect = default.DefaultDialect(paramstyle='qmark')
prep = lambda q: str(sql.text(q).compile(dialect=dialect))
except exc.InvalidRequestError, err:
assert str(err) == "Select objects don't have a type. Call as_scalar() on this Select object to return a 'scalar' version of this Select.", str(err)
- s = select([table1.c.myid], scalar=True, correlate=False)
+ s = select([table1.c.myid], correlate=False).as_scalar()
self.assert_compile(select([table1, s]), "SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable) AS anon_1 FROM mytable")
- s = select([table1.c.myid], scalar=True)
+ s = select([table1.c.myid]).as_scalar()
self.assert_compile(select([table2, s]), "SELECT myothertable.otherid, myothertable.othername, (SELECT mytable.myid FROM mytable) AS anon_1 FROM myothertable")
s = select([table1.c.myid]).correlate(None).as_scalar()
"FROM places, zips WHERE zips.zipcode = :zipcode_3 ORDER BY dist, places.nm")
zalias = zips.alias('main_zip')
- qlat = select([zips.c.latitude], zips.c.zipcode == zalias.c.zipcode, scalar=True)
- qlng = select([zips.c.longitude], zips.c.zipcode == zalias.c.zipcode, scalar=True)
+ qlat = select([zips.c.latitude], zips.c.zipcode == zalias.c.zipcode).as_scalar()
+ qlng = select([zips.c.longitude], zips.c.zipcode == zalias.c.zipcode).as_scalar()
q = select([places.c.id, places.c.nm, zalias.c.zipcode, func.latlondist(qlat, qlng).label('dist')],
order_by = ['dist', places.c.nm]
)
self.assert_compile(q, "SELECT places.id, places.nm, main_zip.zipcode, latlondist((SELECT zips.latitude FROM zips WHERE zips.zipcode = main_zip.zipcode), (SELECT zips.longitude FROM zips WHERE zips.zipcode = main_zip.zipcode)) AS dist FROM places, zips AS main_zip ORDER BY dist, places.nm")
a1 = table2.alias('t2alias')
- s1 = select([a1.c.otherid], table1.c.myid==a1.c.otherid, scalar=True)
+ s1 = select([a1.c.otherid], table1.c.myid==a1.c.otherid).as_scalar()
j1 = table1.join(table2, table1.c.myid==table2.c.otherid)
s2 = select([table1, s1], from_obj=j1)
self.assert_compile(s2, "SELECT mytable.myid, mytable.name, mytable.description, (SELECT t2alias.otherid FROM myothertable AS t2alias WHERE mytable.myid = t2alias.otherid) AS anon_1 FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid")