0.5.2
======
+- orm
+ - Further refined 0.5.1's warning about delete-orphan cascade
+ placed on a many-to-many relation. First, the bad news:
+ the warning will apply to both many-to-many as well as
+ many-to-one relations. This is necessary since in both
+ cases, SQLA does not scan the full set of potential parents
+ when determining "orphan" status - for a persistent object
+ it only detects an in-python de-association event to establish
+ the object as an "orphan". Next, the good news: to support
+ one-to-one via a foreign key or assocation table, or to
+ support one-to-many via an association table, a new flag
+ single_parent=True may be set which indicates objects
+ linked to the relation are only meant to have a single parent.
+ The relation will raise an error if multiple parent-association
+ events occur within Python.
+
+ - Adjusted the attribute instrumentation change from 0.5.1 to
+ fully establish instrumentation for subclasses where the mapper
+ was created after the superclass had already been fully
+ instrumented. [ticket:1292]
+
+ - Fixed bug in delete-orphan cascade whereby two one-to-one
+ relations from two different parent classes to the same target
+ class would prematurely expunge the instance.
+
+ - Fixed an eager loading bug whereby self-referential eager
+ loading would prevent other eager loads, self referential or not,
+ from joining to the parent JOIN properly. Thanks to Alex K
+ for creating a great test case.
+
+ - session.expire() and related methods will not expire() unloaded
+ deferred attributes. This prevents them from being needlessly
+ loaded when the instance is refreshed.
+
+ - query.join()/outerjoin() will now properly join an aliased()
+ construct to the existing left side, even if query.from_self()
+ or query.select_from(someselectable) has been called.
+ [ticket:1293]
+
- sql
- Further fixes to the "percent signs and spaces in column/table
names" functionality. [ticket:1284]
-
+
+- mssql
+ - Restored convert_unicode handling. Results were being passed
+ on through without conversion. [ticket:1291]
+
+ - Really fixing the decimal handling this time. [ticket:1282].
+
+ - Modified table reflection code to use only kwargs when
+ constructing tables. [Ticket:1289]
+
0.5.1
========
Using Passive Deletes
~~~~~~~~~~~~~~~~~~~~~~
-
Use ``passive_deletes=True`` to disable child object loading on a DELETE operation, in conjunction with "ON DELETE (CASCADE|SET NULL)" on your database to automatically cascade deletes to child objects. Note that "ON DELETE" is not supported on SQLite, and requires ``InnoDB`` tables when using MySQL:
.. sourcecode:: python+sql
Mutable Primary Keys / Update Cascades
---------------------------------------
-
As of SQLAlchemy 0.4.2, the primary key attributes of an instance can be changed freely, and will be persisted upon flush. When the primary key of an entity changes, related items which reference the primary key must also be updated as well. For databases which enforce referential integrity, it's required to use the database's ON UPDATE CASCADE functionality in order to propagate primary key changes. For those which don't, the ``passive_cascades`` flag can be set to ``False`` which instructs SQLAlchemy to issue UPDATE statements individually. The ``passive_cascades`` flag can also be ``False`` in conjunction with ON UPDATE CASCADE functionality, although in that case it issues UPDATE statements unnecessarily.
A typical mutable primary key setup might look like:
The ``customer`` relationship specifies only the "save-update" cascade value, indicating most operations will not be cascaded from a parent ``Order`` instance to a child ``User`` instance except for the ``add()`` operation. "save-update" cascade indicates that an ``add()`` on the parent will cascade to all child items, and also that items added to a parent which is already present in the session will also be added.
+Note that the ``delete-orphan`` cascade only functions for relationships where the target object can have a single parent at a time, meaning it is only appropriate for one-to-one or one-to-many relationships. For a :func:`~sqlalchemy.orm.relation` which establishes one-to-one via a local foreign key, i.e. a many-to-one that stores only a single parent, or one-to-one/one-to-many via a "secondary" (association) table, a warning will be issued if ``delete-orphan`` is configured. To disable this warning, also specify the ``single_parent=True`` flag on the relationship, which constrains objects to allow attachment to only one parent at a time.
+
The default value for ``cascade`` on :func:`~sqlalchemy.orm.relation()` is ``save-update, merge``.
Managing Transactions
<div style="text-align:right">
<b>Quick Select:</b> <a href="/docs/05/">0.5</a> | <a href="/docs/04/">0.4</a> | <a href="/docs/03/">0.3</a><br/>
-<b>PDF Download:</b> <a href="sqlalchemy_${release.replace('.', '_')}.pdf">download</a>
+<b>PDF Download:</b> <a href="${pathto('sqlalchemy_' + release.replace('.', '_') + '.pdf', 1)}">download</a>
</div>
${'<%text>'}
\fi
}{\end{fulllineitems}}
+% class method ----------------------------------------------------------
+% \begin{classmethoddesc}[classname]{methodname}{args}
+\newcommand{\classmethodline}[3][\@undefined]{
+\py@sigline{class \bfcode{#2}}{#3}}
+\newenvironment{classmethoddesc}[3][\@undefined]{
+ \begin{fulllineitems}
+ \ifx\@undefined#1\relax
+ \classmethodline{#2}{#3}
+ \else
+ \def\py@thisclass{#1}
+ \classmethodline{#2}{#3}
+ \fi
+}{\end{fulllineitems}}
+
+
% object data attribute --------------------------------------------------
% \begin{memberdesc}[classname]{membername}
\newcommand{\memberline}[2][\py@classbadkey]{%
__all__ = sorted(name for name, obj in locals().items()
if not (name.startswith('_') or inspect.ismodule(obj)))
-__version__ = '0.5.2'
+__version__ = '0.5.3'
del inspect, sys
def visit_NVARCHAR(self, type_):
return self._extend("NVARCHAR", type_)
+ def visit_date(self, type_):
+ # psudocode
+ if self.dialect.version <= 10:
+ return self.visit_DATETIME(type_)
+ else:
+ return self.visit_DATE(type_)
+
+ def visit_time(self, type_):
+ # psudocode
+ if self.dialect.version <= 10:
+ return self.visit_DATETIME(type_)
+ else:
+ return self.visit_TIME(type_)
+
def visit_binary(self, type_):
if type_.length:
return self.visit_BINARY(type_)
from sqlalchemy.dialects.mssql.base import MSDialect, MSDateTimeAsDate, MSDateTimeAsTime
from sqlalchemy import types as sqltypes
+
class MSDialect_pymssql(MSDialect):
supports_sane_rowcount = False
max_identifier_length = 30
based on the foreign key relationships of the association and
child tables.
+ :param single_parent=(True|False):
+ when True, installs a validator which will prevent objects
+ from being associated with more than one parent at a time.
+ This is used for many-to-one or many-to-many relationships that
+ should be treated either as one-to-one or one-to-many. Its
+ usage is optional unless delete-orphan cascade is also
+ set on this relation(), in which case its required (new in 0.5.2).
+
:param uselist=(True|False):
a boolean that indicates if this property should be loaded as a
list or a scalar. In most cases, this value is determined
:param viewonly=False:
when set to True, the relation is used only for loading objects
within the relationship, and has no effect on the unit-of-work
- flush process. Relations with viewonly can specify any kind of
+ flush process. Relationships with viewonly can specify any kind of
join conditions to provide additional views of related objects
onto a parent object. Note that the functionality of a viewonly
relationship has its limits - complicated join conditions may
class _ProxyImpl(object):
accepts_scalar_loader = False
-
+ dont_expire_missing = False
+
def __init__(self, key):
self.key = key
def __init__(self, class_, key,
callable_, trackparent=False, extension=None,
- compare_function=None, active_history=False, parent_token=None, **kwargs):
+ compare_function=None, active_history=False, parent_token=None,
+ dont_expire_missing=False,
+ **kwargs):
"""Construct an AttributeImpl.
\class_
Allows multiple AttributeImpls to all match a single
owner attribute.
+ dont_expire_missing
+ if True, don't add an "expiry" callable to this attribute
+ during state.expire_attributes(None), if no value is present
+ for this key.
+
"""
self.class_ = class_
self.key = key
self.is_equal = compare_function
self.extensions = util.to_list(extension or [])
self.active_history = active_history
-
+ self.dont_expire_missing = dont_expire_missing
+
def hasparent(self, state, optimistic=False):
"""Return the boolean value of a `hasparent` flag attached to the given item.
state.modified_event(self, False, previous)
if self.trackparent:
- if value is not None:
- self.sethasparent(instance_state(value), True)
if previous is not value and previous is not None:
self.sethasparent(instance_state(previous), False)
for ext in self.extensions:
value = ext.set(state, value, previous, initiator or self)
+
+ if self.trackparent:
+ if value is not None:
+ self.sethasparent(instance_state(value), True)
+
return value
def fire_append_event(self, state, value, initiator):
state.modified_event(self, True, NEVER_SET, passive=PASSIVE_NO_INITIALIZE)
+ for ext in self.extensions:
+ value = ext.append(state, value, initiator or self)
+
if self.trackparent and value is not None:
self.sethasparent(instance_state(value), True)
- for ext in self.extensions:
- value = ext.append(state, value, initiator or self)
return value
def fire_pre_remove_event(self, state, initiator):
attribute_names = self.manager.keys()
self.expired = True
self.modified = False
+ filter_deferred = True
+ else:
+ filter_deferred = False
for key in attribute_names:
+ impl = self.manager[key].impl
+ if not filter_deferred or \
+ not impl.dont_expire_missing or \
+ key in self.dict:
+ self.expired_attributes.add(key)
+ if impl.accepts_scalar_loader:
+ self.callables[key] = self
self.dict.pop(key, None)
self.committed_state.pop(key, None)
- self.expired_attributes.add(key)
- if self.manager.get_impl(key).accepts_scalar_loader:
- self.callables[key] = self
def reset(self, key):
"""remove the given attribute and any callables associated with it."""
class DynaLoader(strategies.AbstractRelationLoader):
- def init_class_attribute(self):
+ def init_class_attribute(self, mapper):
self.is_class_level = True
strategies._register_attribute(self,
+ mapper,
useobject=True,
impl_class=DynamicAttributeImpl,
target_mapper=self.parent_property.mapper,
def instrument_class(self, mapper):
raise NotImplementedError()
+ _compile_started = False
+ _compile_finished = False
+
def init(self):
- """Called after all mappers are compiled to assemble
- relationships between mappers, establish instrumented class
- attributes.
+ """Called after all mappers are created to assemble
+ relationships between mappers and perform other post-mapper-creation
+ initialization steps.
+
"""
-
- self._compiled = True
+ self._compile_started = True
self.do_init()
-
+ self._compile_finished = True
+
def do_init(self):
- """Perform subclass-specific initialization steps.
+ """Perform subclass-specific initialization post-mapper-creation steps.
This is a *template* method called by the
- ``MapperProperty`` object's init() method."""
-
+ ``MapperProperty`` object's init() method.
+
+ """
pass
-
+
+ def post_instrument_class(self, mapper):
+ """Perform instrumentation adjustments that need to occur
+ after init() has completed.
+
+ """
+ pass
+
def register_dependencies(self, *args, **kwargs):
"""Called by the ``Mapper`` in response to the UnitOfWork
calling the ``Mapper``'s register_dependencies operation.
def do_init(self):
self.__all_strategies = {}
self.strategy = self.__init_strategy(self.strategy_class)
- if self.is_primary():
- self.strategy.init_class_attribute()
+ def post_instrument_class(self, mapper):
+ if self.is_primary():
+ self.strategy.init_class_attribute(mapper)
+
def build_path(entity, key, prev=None):
if prev:
return prev + (entity, key)
def init(self):
raise NotImplementedError("LoaderStrategy")
- def init_class_attribute(self):
+ def init_class_attribute(self, mapper):
pass
def setup_query(self, context, entity, path, adapter, **kwargs):
if init:
prop.init()
+ prop.post_instrument_class(self)
def compile(self):
self._log("_post_configure_properties() started")
l = [(key, prop) for key, prop in self._props.iteritems()]
for key, prop in l:
- if not getattr(prop, '_compiled', False):
- self._log("initialize prop " + key)
+ self._log("initialize prop " + key)
+
+ if not prop._compile_started:
prop.init()
+
+ if prop._compile_finished:
+ prop.post_instrument_class(self)
+
self._log("_post_configure_properties() complete")
self.compiled = True
if prop is None and raiseerr:
raise sa_exc.InvalidRequestError("Mapper '%s' has no property '%s'" % (str(self), key))
return prop
-
+
@property
def iterate_properties(self):
"""return an iterator of all MapperProperty objects."""
passive_updates=True, remote_side=None,
enable_typechecks=True, join_depth=None,
comparator_factory=None,
+ single_parent=False,
strategy_class=None, _local_remote_pairs=None, query_class=None):
self.uselist = uselist
self.direction = None
self.viewonly = viewonly
self.lazy = lazy
+ self.single_parent = single_parent
self._foreign_keys = foreign_keys
self.collection_class = collection_class
self.passive_deletes = passive_deletes
self._determine_direction()
self._determine_local_remote_pairs()
self._post_init()
+ super(RelationProperty, self).do_init()
def _get_target(self):
if not hasattr(self, 'mapper'):
"the child's mapped tables. Specify 'foreign_keys' "
"argument." % (str(self)))
- if self.cascade.delete_orphan and self.direction is MANYTOMANY:
+ if self.cascade.delete_orphan and not self.single_parent and \
+ (self.direction is MANYTOMANY or self.direction is MANYTOONE):
util.warn("On %s, delete-orphan cascade is not supported on a "
- "many-to-many relation. This will raise an error in 0.6." % self)
+ "many-to-many or many-to-one relationship when single_parent is not set. "
+ " Set single_parent=True on the relation()." % self)
def _determine_local_remote_pairs(self):
if not self.local_remote_pairs:
"added to the primary mapper, i.e. the very first "
"mapper created for class '%s' " % (self.key, self.parent.class_.__name__, self.parent.class_.__name__))
- super(RelationProperty, self).do_init()
def _refers_to_parent_table(self):
return self.parent.mapped_table is self.target or self.parent.mapped_table is self.target
@_generative(__no_statement_condition, __no_limit_offset)
def __join(self, keys, outerjoin, create_aliases, from_joinpoint):
+
+ # copy collections that may mutate so they do not affect
+ # the copied-from query.
self.__currenttables = set(self.__currenttables)
self._polymorphic_adapters = self._polymorphic_adapters.copy()
+ # start from the beginning unless from_joinpoint is set.
if not from_joinpoint:
self.__reset_joinpoint()
+ # join from our from_obj. This is
+ # None unless select_from()/from_self() has been called.
clause = self._from_obj
- right_entity = None
+ # after the method completes,
+ # the query's joinpoint will be set to this.
+ right_entity = None
+
for arg1 in util.to_list(keys):
aliased_entity = False
alias_criterion = False
left_entity = right_entity
prop = of_type = right_entity = right_mapper = None
+ # distinguish between tuples, scalar args
if isinstance(arg1, tuple):
arg1, arg2 = arg1
else:
arg2 = None
+ # determine onclause/right_entity. there
+ # is a little bit of legacy behavior still at work here
+ # which means they might be in either order. may possibly
+ # lock this down to (right_entity, onclause) in 0.6.
if isinstance(arg2, (interfaces.PropComparator, basestring)):
onclause = arg2
right_entity = arg1
onclause = arg2
right_entity = arg1
+ # extract info from the onclause argument, determine
+ # left_entity and right_entity.
if isinstance(onclause, interfaces.PropComparator):
of_type = getattr(onclause, '_of_type', None)
prop = onclause.property
if not right_entity:
right_entity = right_mapper
- elif onclause is None:
- if not left_entity:
- left_entity = self._joinpoint_zero()
- else:
- if not left_entity:
- left_entity = self._joinpoint_zero()
+ elif not left_entity:
+ left_entity = self._joinpoint_zero()
+ # if no initial left-hand clause is set, extract
+ # this from the left_entity or as a last
+ # resort from the onclause argument, if it's
+ # a PropComparator.
if not clause:
- if isinstance(onclause, interfaces.PropComparator):
- clause = onclause.__clause_element__()
-
for ent in self._entities:
if ent.corresponds_to(left_entity):
clause = ent.selectable
break
+
+ if not clause:
+ if isinstance(onclause, interfaces.PropComparator):
+ clause = onclause.__clause_element__()
if not clause:
raise sa_exc.InvalidRequestError("Could not find a FROM clause to join from")
+ # if we have a MapperProperty and the onclause is not already
+ # an instrumented descriptor. this catches of_type()
+ # PropComparators and string-based on clauses.
+ if prop and not isinstance(onclause, attributes.QueryableAttribute):
+ onclause = prop
+
+ # start looking at the right side of the join
+
mp, right_selectable, is_aliased_class = _entity_info(right_entity)
if mp is not None and right_mapper is not None and not mp.common_parent(right_mapper):
if not right_mapper and mp:
right_mapper = mp
+ # determine if we need to wrap the right hand side in an alias.
+ # this occurs based on the create_aliases flag, or if the target
+ # is a selectable, Join, or polymorphically-loading mapper
if right_mapper and not is_aliased_class:
if right_entity is right_selectable:
if not right_selectable.is_derived_from(right_mapper.mapped_table):
- raise sa_exc.InvalidRequestError("Selectable '%s' is not derived from '%s'" % (right_selectable.description, right_mapper.mapped_table.description))
+ raise sa_exc.InvalidRequestError(
+ "Selectable '%s' is not derived from '%s'" %
+ (right_selectable.description, right_mapper.mapped_table.description))
if not isinstance(right_selectable, expression.Alias):
right_selectable = right_selectable.alias()
aliased_entity = True
elif prop:
+ # for joins across plain relation()s, try not to specify the
+ # same joins twice. the __currenttables collection tracks
+ # what plain mapped tables we've joined to already.
+
if prop.table in self.__currenttables:
if prop.secondary is not None and prop.secondary not in self.__currenttables:
# TODO: this check is not strong enough for different paths to the same endpoint which
# does not use secondary tables
- raise sa_exc.InvalidRequestError("Can't join to property '%s'; a path to this table along a different secondary table already exists. Use the `alias=True` argument to `join()`." % descriptor)
-
+ raise sa_exc.InvalidRequestError("Can't join to property '%s'; a path to this "
+ "table along a different secondary table already "
+ "exists. Use the `alias=True` argument to `join()`." % descriptor)
continue
if prop.secondary:
else:
right_entity = prop.mapper
+ # create adapters to the right side, if we've created aliases
if alias_criterion:
right_adapter = ORMAdapter(right_entity,
equivalents=right_mapper._equivalent_columns, chain_to=self._filter_aliases)
- if isinstance(onclause, sql.ClauseElement):
+ # if the onclause is a ClauseElement, adapt it with our right
+ # adapter, then with our query-wide adaptation if any.
+ if isinstance(onclause, expression.ClauseElement):
+ if alias_criterion:
onclause = right_adapter.traverse(onclause)
-
- # TODO: is this a little hacky ?
- if not isinstance(onclause, attributes.QueryableAttribute) or not isinstance(onclause.parententity, AliasedClass):
- if prop:
- # MapperProperty based onclause
- onclause = prop
- else:
- # ClauseElement based onclause
- onclause = self._adapt_clause(onclause, False, True)
-
- clause = orm_join(clause, right_entity, onclause, isouter=outerjoin)
+ onclause = self._adapt_clause(onclause, False, True)
+
+ # determine if we want _ORMJoin to alias the onclause
+ # to the given left side. This is used if we're joining against a
+ # select_from() selectable, from_self() call, or the onclause
+ # has been resolved into a MapperProperty. Otherwise we assume
+ # the onclause itself contains more specific information on how to
+ # construct the onclause.
+ join_to_left = not is_aliased_class or \
+ onclause is prop or \
+ clause is self._from_obj and self._from_obj_alias
+
+ # create the join
+ clause = orm_join(clause, right_entity, onclause, isouter=outerjoin, join_to_left=join_to_left)
+
+ # set up state for the query as a whole
if alias_criterion:
+ # adapt filter() calls based on our right side adaptation
self._filter_aliases = right_adapter
+ # if a polymorphic entity was aliased, establish that
+ # so that MapperEntity/ColumnEntity can pick up on it
+ # and adapt when it renders columns and fetches them from results
if aliased_entity:
- self.__mapper_loads_polymorphically_with(right_mapper, ORMAdapter(right_entity, equivalents=right_mapper._equivalent_columns))
-
+ self.__mapper_loads_polymorphically_with(
+ right_mapper,
+ ORMAdapter(right_entity, equivalents=right_mapper._equivalent_columns)
+ )
+
+ # loop finished. we're selecting from
+ # our final clause now
self._from_obj = clause
+
+ # future joins with from_joinpoint=True join from our established right_entity.
self._joinpoint = right_entity
@_generative(__no_statement_condition)
return util.IdentitySet(self._new.values())
_expire_state = attributes.InstanceState.expire_attributes
+
UOWEventHandler = unitofwork.UOWEventHandler
_sessions = weakref.WeakValueDictionary()
from sqlalchemy import sql, util, log
from sqlalchemy.sql import util as sql_util
from sqlalchemy.sql import visitors, expression, operators
-from sqlalchemy.orm import mapper, attributes
+from sqlalchemy.orm import mapper, attributes, interfaces
from sqlalchemy.orm.interfaces import (
LoaderStrategy, StrategizedOption, MapperOption, PropertyOption,
serialize_path, deserialize_path, StrategizedProperty
from sqlalchemy.orm import session as sessionlib
from sqlalchemy.orm import util as mapperutil
-def _register_attribute(strategy, useobject,
+def _register_attribute(strategy, mapper, useobject,
compare_function=None,
typecallable=None,
copy_function=None,
prop = strategy.parent_property
attribute_ext = util.to_list(prop.extension) or []
+
+ if useobject and prop.single_parent:
+ attribute_ext.append(_SingleParentValidator(prop))
+
if getattr(prop, 'backref', None):
attribute_ext.append(prop.backref.extension)
if useobject:
attribute_ext.append(sessionlib.UOWEventHandler(prop.key))
- for mapper in prop.parent.polymorphic_iterator():
- if (mapper is prop.parent or not mapper.concrete) and mapper.has_property(prop.key):
+ for m in mapper.polymorphic_iterator():
+ if (m is prop.parent or not m.concrete) and m.has_property(prop.key):
attributes.register_attribute_impl(
- mapper.class_,
+ m.class_,
prop.key,
parent_token=prop,
mutable_scalars=mutable_scalars,
c = adapter.columns[c]
column_collection.append(c)
- def init_class_attribute(self):
+ def init_class_attribute(self, mapper):
self.is_class_level = True
coltype = self.columns[0].type
active_history = self.columns[0].primary_key # TODO: check all columns ? check for foreign Key as well?
- _register_attribute(self, useobject=False,
+ _register_attribute(self, mapper, useobject=False,
compare_function=coltype.compare_values,
copy_function=coltype.copy_value,
mutable_scalars=self.columns[0].type.is_mutable(),
class CompositeColumnLoader(ColumnLoader):
"""Strategize the loading of a composite column-based MapperProperty."""
- def init_class_attribute(self):
+ def init_class_attribute(self, mapper):
self.is_class_level = True
self.logger.info("%s register managed composite attribute" % self)
else:
return True
- _register_attribute(self, useobject=False,
+ _register_attribute(self, mapper, useobject=False,
compare_function=compare,
copy_function=copy,
mutable_scalars=True
self.columns = self.parent_property.columns
self.group = self.parent_property.group
- def init_class_attribute(self):
+ def init_class_attribute(self, mapper):
self.is_class_level = True
- _register_attribute(self, useobject=False,
+ _register_attribute(self, mapper, useobject=False,
compare_function=self.columns[0].type.compare_values,
copy_function=self.columns[0].type.copy_value,
mutable_scalars=self.columns[0].type.is_mutable(),
callable_=self.class_level_loader,
+ dont_expire_missing=True
)
def setup_query(self, context, entity, path, adapter, only_load_props=None, **kwargs):
class NoLoader(AbstractRelationLoader):
"""Strategize a relation() that doesn't load data automatically."""
- def init_class_attribute(self):
+ def init_class_attribute(self, mapper):
self.is_class_level = True
- _register_attribute(self,
+ _register_attribute(self, mapper,
useobject=True,
uselist=self.parent_property.uselist,
typecallable = self.parent_property.collection_class,
if self.use_get:
self.logger.info("%s will use query.get() to optimize instance loads" % self)
- def init_class_attribute(self):
+ def init_class_attribute(self, mapper):
self.is_class_level = True
_register_attribute(self,
+ mapper,
useobject=True,
callable_=self.class_level_loader,
uselist = self.parent_property.uselist,
super(EagerLoader, self).init()
self.join_depth = self.parent_property.join_depth
- def init_class_attribute(self):
- self.parent_property._get_strategy(LazyLoader).init_class_attribute()
+ def init_class_attribute(self, mapper):
+ self.parent_property._get_strategy(LazyLoader).init_class_attribute(mapper)
def setup_query(self, context, entity, path, adapter, column_collection=None, parentmapper=None, **kwargs):
"""Add a left outer join to the statement thats being constructed."""
# whether or not the Query will wrap the selectable in a subquery,
# and then attach eager load joins to that (i.e., in the case of LIMIT/OFFSET etc.)
should_nest_selectable = context.query._should_nest_selectable
-
+
if entity in context.eager_joins:
entity_key, default_towrap = entity, entity.selectable
elif should_nest_selectable or not context.from_clause or not sql_util.search(context.from_clause, entity.selectable):
# otherwise, create a single eager join from the from clause.
# Query._compile_context will adapt as needed and append to the
# FROM clause of the select().
- entity_key, default_towrap = None, context.from_clause
-
+ entity_key, default_towrap = None, context.from_clause
+
towrap = context.eager_joins.setdefault(entity_key, default_towrap)
-
+
# create AliasedClauses object to build up the eager query.
clauses = mapperutil.ORMAdapter(mapperutil.AliasedClass(self.mapper),
equivalents=self.mapper._equivalent_columns)
+ join_to_left = False
if adapter:
if getattr(adapter, 'aliased_class', None):
onclause = getattr(adapter.aliased_class, self.key, self.parent_property)
else:
onclause = getattr(mapperutil.AliasedClass(self.parent, adapter.selectable), self.key, self.parent_property)
+
+ if onclause is self.parent_property:
+ # TODO: this is a temporary hack to account for polymorphic eager loads where
+ # the eagerload is referencing via of_type().
+ join_to_left = True
else:
onclause = self.parent_property
-
- context.eager_joins[entity_key] = eagerjoin = mapperutil.outerjoin(towrap, clauses.aliased_class, onclause)
+
+ context.eager_joins[entity_key] = eagerjoin = mapperutil.outerjoin(towrap, clauses.aliased_class, onclause, join_to_left=join_to_left)
# send a hint to the Query as to where it may "splice" this join
eagerjoin.stop_on = entity.selectable
else:
query._attributes[("user_defined_eager_row_processor", paths[-1])] = None
+class _SingleParentValidator(interfaces.AttributeExtension):
+ def __init__(self, prop):
+ self.prop = prop
+
+ def _do_check(self, state, value, oldvalue, initiator):
+ if value is not None:
+ hasparent = initiator.hasparent(attributes.instance_state(value))
+ if hasparent and oldvalue is not value:
+ raise sa_exc.InvalidRequestError("Instance %s is already associated with an instance "
+ "of %s via its %s attribute, and is only allowed a single parent." %
+ (mapperutil.instance_str(value), state.class_, self.prop)
+ )
+ return value
+ def append(self, state, value, initiator):
+ return self._do_check(state, value, None, initiator)
+
+ def set(self, state, value, oldvalue, initiator):
+ return self._do_check(state, value, oldvalue, initiator)
+
+
prop = _state_mapper(state).get_property(self.key)
if newvalue is not None and prop.cascade.save_update and newvalue not in sess:
sess.add(newvalue)
- if prop.cascade.delete_orphan and oldvalue in sess.new:
+ if prop.cascade.delete_orphan and oldvalue in sess.new and \
+ prop.mapper._is_orphan(attributes.instance_state(oldvalue)):
sess.expunge(oldvalue)
return newvalue
__visit_name__ = expression.Join.__visit_name__
- def __init__(self, left, right, onclause=None, isouter=False):
+ def __init__(self, left, right, onclause=None, isouter=False, join_to_left=True):
+ adapt_from = None
+
if hasattr(left, '_orm_mappers'):
left_mapper = left._orm_mappers[1]
- adapt_from = left.right
-
+ if join_to_left:
+ adapt_from = left.right
else:
left_mapper, left, left_is_aliased = _entity_info(left)
- if left_is_aliased or not left_mapper:
+ if join_to_left and (left_is_aliased or not left_mapper):
adapt_from = left
- else:
- adapt_from = None
-
+
right_mapper, right, right_is_aliased = _entity_info(right)
if right_is_aliased:
adapt_to = right
if isinstance(onclause, basestring):
prop = left_mapper.get_property(onclause)
elif isinstance(onclause, attributes.QueryableAttribute):
- # TODO: we might want to honor the current adapt_from,
- # if already set. we would need to adjust how we calculate
- # adapt_from though since it is present in too many cases
- # at the moment (query tests illustrate that).
- adapt_from = onclause.__clause_element__()
+ if not adapt_from:
+ adapt_from = onclause.__clause_element__()
prop = onclause.property
elif isinstance(onclause, MapperProperty):
prop = onclause
prop = None
if prop:
- pj, sj, source, dest, secondary, target_adapter = prop._create_joins(source_selectable=adapt_from, dest_selectable=adapt_to, source_polymorphic=True, dest_polymorphic=True, of_type=right_mapper)
+ pj, sj, source, dest, secondary, target_adapter = prop._create_joins(
+ source_selectable=adapt_from,
+ dest_selectable=adapt_to,
+ source_polymorphic=True,
+ dest_polymorphic=True,
+ of_type=right_mapper)
if sj:
left = sql.join(left, secondary, pj, isouter)
expression.Join.__init__(self, left, right, onclause, isouter)
- def join(self, right, onclause=None, isouter=False):
- return _ORMJoin(self, right, onclause, isouter)
+ def join(self, right, onclause=None, isouter=False, join_to_left=True):
+ return _ORMJoin(self, right, onclause, isouter, join_to_left)
- def outerjoin(self, right, onclause=None):
- return _ORMJoin(self, right, onclause, True)
+ def outerjoin(self, right, onclause=None, join_to_left=True):
+ return _ORMJoin(self, right, onclause, True, join_to_left)
-def join(left, right, onclause=None, isouter=False):
+def join(left, right, onclause=None, isouter=False, join_to_left=True):
"""Produce an inner join between left and right clauses.
In addition to the interface provided by
string name of a relation(), or a class-bound descriptor
representing a relation.
+ join_to_left indicates to attempt aliasing the ON clause,
+ in whatever form it is passed, to the selectable
+ passed as the left side. If False, the onclause
+ is used as is.
+
"""
- return _ORMJoin(left, right, onclause, isouter)
+ return _ORMJoin(left, right, onclause, isouter, join_to_left)
-def outerjoin(left, right, onclause=None):
+def outerjoin(left, right, onclause=None, join_to_left=True):
"""Produce a left outer join between left and right clauses.
In addition to the interface provided by
representing a relation.
"""
- return _ORMJoin(left, right, onclause, True)
+ return _ORMJoin(left, right, onclause, True, join_to_left)
def with_parent(instance, prop):
"""Return criterion which selects instances with a given parent.
+# -*- encoding: utf-8
import testenv; testenv.configure_for_tests()
import datetime, os, pickleable, re
from sqlalchemy import *
table.drop()
+class QueryUnicodeTest(TestBase):
+ __only_on__ = 'mssql'
+
+ def test_convert_unicode(self):
+ meta = MetaData(testing.db)
+ t1 = Table('unitest_table', meta,
+ Column('id', Integer, primary_key=True),
+ Column('descr', mssql.MSText(200, convert_unicode=True)))
+ meta.create_all()
+ con = testing.db.connect()
+
+ # encode in UTF-8 (sting object) because this is the default dialect encoding
+ con.execute(u"insert into unitest_table values ('bien mangé')".encode('UTF-8'))
+
+ try:
+ r = t1.select().execute().fetchone()
+ assert isinstance(r[1], unicode), '%s is %s instead of unicode, working on %s' % (
+ r[1], type(r[1]), meta.bind)
+
+ finally:
+ meta.drop_all()
+
class QueryTest(TestBase):
__only_on__ = 'mssql'
try:
test_items = [decimal.Decimal(d) for d in '1500000.00000000000000000000',
'-1500000.00000000000000000000', '1500000',
- '0.0000000000000000002', '0.2', '-0.0000000000000000002',
- '156666.458923543', '-156666.458923543', '1', '-1', '1234',
+ '0.0000000000000000002', '0.2', '-0.0000000000000000002', '-2E-2',
+ '156666.458923543', '-156666.458923543', '1', '-1', '-1234', '1234',
'2E-12', '4E8', '3E-6', '3E-7', '4.1', '1E-1', '1E-2', '1E-3',
- '1E-4', '1E-5', '1E-6', '1E-7', '1E-8']
+ '1E-4', '1E-5', '1E-6', '1E-7', '1E-1', '1E-8', '0.2732E2', '-0.2432E2', '4.35656E2',
+ '-02452E-2', '45125E-2',
+ '1234.58965E-2', '1.521E+15', '-1E-25', '1E-25', '1254E-25', '-1203E-25',
+ '0', '-0.00', '-0', '4585E12', '000000000000000000012', '000000000000.32E12',
+ '00000000000000.1E+12', '000000000000.2E-32']
+
for value in test_items:
numeric_table.insert().execute(numericcol=value)
import testenv; testenv.configure_for_tests()
-from testlib.sa import Table, Column, Integer, String, ForeignKey, Sequence
+from testlib.sa import Table, Column, Integer, String, ForeignKey, Sequence, exc as sa_exc
from testlib.sa.orm import mapper, relation, create_session, class_mapper, backref
from testlib.sa.orm import attributes, exc as orm_exc
from testlib import testing
assert users.count().scalar() == 1
assert orders.count().scalar() == 0
+class O2OCascadeTest(_fixtures.FixtureTest):
+ run_inserts = None
+
+ @testing.resolve_artifact_names
+ def setup_mappers(self):
+ mapper(Address, addresses)
+ mapper(User, users, properties = {
+ 'address':relation(Address, backref=backref("user", single_parent=True), uselist=False)
+ })
+ @testing.resolve_artifact_names
+ def test_single_parent_raise(self):
+ a1 = Address(email_address='some address')
+ u1 = User(name='u1', address=a1)
+
+ self.assertRaises(sa_exc.InvalidRequestError, Address, email_address='asd', user=u1)
+
+ a2 = Address(email_address='asd')
+ u1.address = a2
+ assert u1.address is not a1
+ assert a1.user is None
+
+
+
class O2MBackrefTest(_fixtures.FixtureTest):
run_inserts = None
extra = relation(Extra, cascade="all, delete")
))
mapper(User, users, properties = dict(
- pref = relation(Pref, lazy=False, cascade="all, delete-orphan")
+ pref = relation(Pref, lazy=False, cascade="all, delete-orphan", single_parent=True )
))
@testing.resolve_artifact_names
@testing.resolve_artifact_names
def setup_mappers(self):
mapper(T1, t1, properties=dict(
- t2=relation(T2, cascade="all, delete-orphan")))
+ t2=relation(T2, cascade="all, delete-orphan", single_parent=True)))
mapper(T2, t2, properties=dict(
- t3=relation(T3, cascade="all, delete-orphan")))
+ t3=relation(T3, cascade="all, delete-orphan", single_parent=True, backref=backref('t2', uselist=False))))
mapper(T3, t3)
@testing.resolve_artifact_names
eq_(sess.query(T2).all(), [T2()])
eq_(sess.query(T3).all(), [])
+ @testing.resolve_artifact_names
+ def test_single_parent_raise(self):
+
+ sess = create_session()
+
+ y = T2(data='T2a')
+ x = T1(data='T1a', t2=y)
+ self.assertRaises(sa_exc.InvalidRequestError, T1, data='T1b', t2=y)
+
+ @testing.resolve_artifact_names
+ def test_single_parent_backref(self):
+
+ sess = create_session()
+
+ y = T3(data='T3a')
+ x = T2(data='T2a', t3=y)
+
+ # cant attach the T3 to another T2
+ self.assertRaises(sa_exc.InvalidRequestError, T2, data='T2b', t3=y)
+
+ # set via backref tho is OK, unsets from previous parent
+ # first
+ z = T2(data='T2b')
+ y.t2 = z
+
+ assert z.t3 is y
+ assert x.t3 is None
+
class M2MCascadeTest(_base.MappedTest):
- """delete-orphan cascade is deprecated on many-to-many."""
-
def define_tables(self, metadata):
Table('a', metadata,
Column('id', Integer, primary_key=True),
class C(_fixtures.Base):
pass
- @testing.emits_warning(".*not supported on a many-to-many")
@testing.resolve_artifact_names
def test_delete_orphan(self):
mapper(A, a, properties={
# if no backref here, delete-orphan failed until [ticket:427] was
# fixed
- 'bs': relation(B, secondary=atob, cascade="all, delete-orphan")
+ 'bs': relation(B, secondary=atob, cascade="all, delete-orphan", single_parent=True)
})
mapper(B, b)
assert b.count().scalar() == 0
assert a.count().scalar() == 1
- @testing.emits_warning(".*not supported on a many-to-many")
@testing.resolve_artifact_names
def test_delete_orphan_cascades(self):
mapper(A, a, properties={
# if no backref here, delete-orphan failed until [ticket:427] was
# fixed
- 'bs':relation(B, secondary=atob, cascade="all, delete-orphan")
+ 'bs':relation(B, secondary=atob, cascade="all, delete-orphan", single_parent=True)
})
mapper(B, b, properties={'cs':relation(C, cascade="all, delete-orphan")})
mapper(C, c)
assert a.count().scalar() == 1
assert c.count().scalar() == 0
- @testing.emits_warning(".*not supported on a many-to-many")
@testing.resolve_artifact_names
def test_cascade_delete(self):
mapper(A, a, properties={
- 'bs':relation(B, secondary=atob, cascade="all, delete-orphan")
+ 'bs':relation(B, secondary=atob, cascade="all, delete-orphan", single_parent=True)
})
mapper(B, b)
assert b.count().scalar() == 0
assert a.count().scalar() == 0
- @testing.emits_warning(".*not supported on a many-to-many")
- @testing.fails_on_everything_except('sqlite')
@testing.resolve_artifact_names
- def test_this_doesnt_work(self):
- """illustrates why cascade with m2m should not be supported
- (i.e. many parents...)
-
- """
+ def test_single_parent_raise(self):
mapper(A, a, properties={
- 'bs':relation(B, secondary=atob, cascade="all, delete-orphan")
+ 'bs':relation(B, secondary=atob, cascade="all, delete-orphan", single_parent=True)
})
mapper(B, b)
sess = create_session()
b1 =B(data='b1')
a1 = A(data='a1', bs=[b1])
- a2 = A(data='a2', bs=[b1])
- sess.add(a1)
- sess.add(a2)
- sess.flush()
+
+ self.assertRaises(sa_exc.InvalidRequestError,
+ A, data='a2', bs=[b1]
+ )
- sess.delete(a1)
+ @testing.resolve_artifact_names
+ def test_single_parent_backref(self):
+ """test that setting m2m via a uselist=False backref bypasses the single_parent raise"""
- # this raises an integrity error on DBs that support FKs
- sess.flush()
+ mapper(A, a, properties={
+ 'bs':relation(B,
+ secondary=atob,
+ cascade="all, delete-orphan", single_parent=True,
+ backref=backref('a', uselist=False))
+ })
+ mapper(B, b)
+
+ sess = create_session()
+ b1 =B(data='b1')
+ a1 = A(data='a1', bs=[b1])
- # still a row present !
- assert atob.count().scalar() ==1
+ self.assertRaises(
+ sa_exc.InvalidRequestError,
+ A, data='a2', bs=[b1]
+ )
- # but no bs !
- assert b.count().scalar() == 0
- assert a.count().scalar() == 1
-
+ a2 = A(data='a2')
+ b1.a = a2
+ assert b1 not in a1.bs
+ assert b1 in a2.bs
class UnsavedOrphansTest(_base.MappedTest):
"""Pending entities that are orphans"""
ForeignKey('accounts.account_id')))
@testing.resolve_artifact_names
- def test_double_parent_expunge(self):
- """Removing a pending item from a collection expunges it from the session."""
-
+ def test_double_parent_expunge_o2m(self):
+ """test the delete-orphan uow event for multiple delete-orphan parent relations."""
+
class Customer(_fixtures.Base):
pass
class Account(_fixtures.Base):
sr.customers.remove(c)
assert c not in s, "Should expunge customer when both parents are gone"
+ @testing.resolve_artifact_names
+ def test_double_parent_expunge_o2o(self):
+ """test the delete-orphan uow event for multiple delete-orphan parent relations."""
+
+ class Customer(_fixtures.Base):
+ pass
+ class Account(_fixtures.Base):
+ pass
+ class SalesRep(_fixtures.Base):
+ pass
+
+ mapper(Customer, customers)
+ mapper(Account, accounts, properties=dict(
+ customer=relation(Customer,
+ cascade="all,delete-orphan",
+ backref="account", uselist=False)))
+ mapper(SalesRep, sales_reps, properties=dict(
+ customer=relation(Customer,
+ cascade="all,delete-orphan",
+ backref="sales_rep", uselist=False)))
+ s = create_session()
+
+ a = Account(balance=0)
+ sr = SalesRep(name="John")
+ s.add_all((a, sr))
+ s.flush()
+
+ c = Customer(name="Jane")
+
+ a.customer = c
+ sr.customer = c
+ assert c in s
+
+ a.customer = None
+ assert c in s, "Should not expunge customer yet, still has one parent"
+
+ sr.customer = None
+ assert c not in s, "Should expunge customer when both parents are gone"
+
+
+
class DoubleParentOrphanTest(_base.MappedTest):
"""test orphan detection for an entity with two parent relations"""
pass
mapper(Address, addresses)
- mapper(Home, homes, properties={'address':relation(Address, cascade="all,delete-orphan")})
- mapper(Business, businesses, properties={'address':relation(Address, cascade="all,delete-orphan")})
+ mapper(Home, homes, properties={'address':relation(Address, cascade="all,delete-orphan", single_parent=True)})
+ mapper(Business, businesses, properties={'address':relation(Address, cascade="all,delete-orphan", single_parent=True)})
session = create_session()
h1 = Home(description='home1', address=Address(street='address1'))
pass
mapper(Address, addresses)
- mapper(Home, homes, properties={'address':relation(Address, cascade="all,delete-orphan")})
- mapper(Business, businesses, properties={'address':relation(Address, cascade="all,delete-orphan")})
+ mapper(Home, homes, properties={'address':relation(Address, cascade="all,delete-orphan", single_parent=True)})
+ mapper(Business, businesses, properties={'address':relation(Address, cascade="all,delete-orphan", single_parent=True)})
session = create_session()
a1 = Address()
]) == d
self.assert_sql_count(testing.db, go, 3)
+class MixedSelfReferentialEagerTest(_base.MappedTest):
+ def define_tables(self, metadata):
+ Table('a_table', metadata,
+ Column('id', Integer, primary_key=True)
+ )
+
+ Table('b_table', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('parent_b1_id', Integer, ForeignKey('b_table.id')),
+ Column('parent_a_id', Integer, ForeignKey('a_table.id')),
+ Column('parent_b2_id', Integer, ForeignKey('b_table.id')))
+
+
+ @testing.resolve_artifact_names
+ def setup_mappers(self):
+ class A(_base.ComparableEntity):
+ pass
+ class B(_base.ComparableEntity):
+ pass
+
+ mapper(A,a_table)
+ mapper(B,b_table,properties = {
+ 'parent_b1': relation(B,
+ remote_side = [b_table.c.id],
+ primaryjoin = (b_table.c.parent_b1_id ==b_table.c.id),
+ order_by = b_table.c.id
+ ),
+ 'parent_z': relation(A,lazy = True),
+ 'parent_b2': relation(B,
+ remote_side = [b_table.c.id],
+ primaryjoin = (b_table.c.parent_b2_id ==b_table.c.id),
+ order_by = b_table.c.id
+ )
+ });
+
+ @testing.resolve_artifact_names
+ def insert_data(self):
+ a_table.insert().execute(dict(id=1), dict(id=2), dict(id=3))
+ b_table.insert().execute(
+ dict(id=1, parent_a_id=2, parent_b1_id=None, parent_b2_id=None),
+ dict(id=2, parent_a_id=1, parent_b1_id=1, parent_b2_id=None),
+ dict(id=3, parent_a_id=1, parent_b1_id=1, parent_b2_id=2),
+ dict(id=4, parent_a_id=3, parent_b1_id=1, parent_b2_id=None),
+ dict(id=5, parent_a_id=3, parent_b1_id=None, parent_b2_id=2),
+ dict(id=6, parent_a_id=1, parent_b1_id=1, parent_b2_id=3),
+ dict(id=7, parent_a_id=2, parent_b1_id=None, parent_b2_id=3),
+ dict(id=8, parent_a_id=2, parent_b1_id=1, parent_b2_id=2),
+ dict(id=9, parent_a_id=None, parent_b1_id=1, parent_b2_id=None),
+ dict(id=10, parent_a_id=3, parent_b1_id=7, parent_b2_id=2),
+ dict(id=11, parent_a_id=3, parent_b1_id=1, parent_b2_id=8),
+ dict(id=12, parent_a_id=2, parent_b1_id=5, parent_b2_id=2),
+ dict(id=13, parent_a_id=3, parent_b1_id=4, parent_b2_id=4),
+ dict(id=14, parent_a_id=3, parent_b1_id=7, parent_b2_id=2),
+ )
+
+ @testing.resolve_artifact_names
+ def test_eager_load(self):
+ session = create_session()
+ def go():
+ eq_(
+ session.query(B).options(eagerload('parent_b1'),eagerload('parent_b2'),eagerload('parent_z')).
+ filter(B.id.in_([2, 8, 11])).order_by(B.id).all(),
+ [
+ B(id=2, parent_z=A(id=1), parent_b1=B(id=1), parent_b2=None),
+ B(id=8, parent_z=A(id=2), parent_b1=B(id=1), parent_b2=B(id=2)),
+ B(id=11, parent_z=A(id=3), parent_b1=B(id=1), parent_b2=B(id=8))
+ ]
+ )
+ self.assert_sql_count(testing.db, go, 1)
+
class SelfReferentialM2MEagerTest(_base.MappedTest):
def define_tables(self, metadata):
Table('widget', metadata,
import gc
from testlib import sa, testing
from testlib.sa import Table, Column, Integer, String, ForeignKey, exc as sa_exc
-from testlib.sa.orm import mapper, relation, create_session, attributes
+from testlib.sa.orm import mapper, relation, create_session, attributes, deferred
from orm import _base, _fixtures
# but now its back, rollback has occured, the _remove_newly_deleted
# is reverted
self.assertEquals(u.name, 'chuck')
-
+
+ @testing.resolve_artifact_names
+ def test_deferred(self):
+ """test that unloaded, deferred attributes aren't included in the expiry list."""
+
+ mapper(Order, orders, properties={'description':deferred(orders.c.description)})
+
+ s = create_session()
+ o1 = s.query(Order).first()
+ assert 'description' not in o1.__dict__
+ s.expire(o1)
+ assert o1.isopen is not None
+ assert 'description' not in o1.__dict__
+ assert o1.description
+
@testing.resolve_artifact_names
def test_lazyload_autoflushes(self):
mapper(User, users, properties={
t1 = Table('t1', metadata,
Column('id', Integer, primary_key=True),
Column('type', String(10), nullable=False),
- Column('info', Text))
+ Column('info', String(255)))
t2 = Table('t2', metadata,
Column('id', Integer, ForeignKey('t1.id'), primary_key=True),
Column('data', String(10), nullable=False))
mapper(Foo, addresses, inherits=User)
assert getattr(Foo().__class__, 'name').impl is not None
+ @testing.resolve_artifact_names
+ def test_deferred_subclass_attribute_instrument(self):
+ class Foo(User):pass
+ mapper(User, users)
+ compile_mappers()
+ mapper(Foo, addresses, inherits=User)
+ assert getattr(Foo().__class__, 'name').impl is not None
+
@testing.resolve_artifact_names
def test_compile_on_get_props_1(self):
m =mapper(User, users)
assert not m.compiled
assert m.get_property('name')
assert m.compiled
-
+
@testing.resolve_artifact_names
def test_add_property(self):
assert_col = []
"LEFT OUTER JOIN addresses AS addresses_1 ON anon_1.users_id = addresses_1.user_id ORDER BY addresses_1.id"
)
+ def test_aliases(self):
+ """test that aliased objects are accessible externally to a from_self() call."""
+
+ s = create_session()
+
+ ualias = aliased(User)
+ eq_(
+ s.query(User, ualias).filter(User.id > ualias.id).from_self(User.name, ualias.name).
+ order_by(User.name, ualias.name).all(),
+ [
+ (u'chuck', u'ed'),
+ (u'chuck', u'fred'),
+ (u'chuck', u'jack'),
+ (u'ed', u'jack'),
+ (u'fred', u'ed'),
+ (u'fred', u'jack')
+ ]
+ )
+
+ eq_(
+ s.query(User, ualias).filter(User.id > ualias.id).from_self(User.name, ualias.name).filter(ualias.name=='ed')\
+ .order_by(User.name, ualias.name).all(),
+ [(u'chuck', u'ed'), (u'fred', u'ed')]
+ )
+
+ eq_(
+ s.query(User, ualias).filter(User.id > ualias.id).from_self(ualias.name, Address.email_address).
+ join(ualias.addresses).order_by(ualias.name, Address.email_address).all(),
+ [
+ (u'ed', u'fred@fred.com'),
+ (u'jack', u'ed@bettyboop.com'),
+ (u'jack', u'ed@lala.com'),
+ (u'jack', u'ed@wood.com'),
+ (u'jack', u'fred@fred.com')]
+ )
+
def test_multiple_entities(self):
sess = create_session()
#"save-update, delete-orphan",
"save-update, delete, delete-orphan"):
mapper(B, tableB, properties={
- 'a':relation(A, cascade=cascade)
+ 'a':relation(A, cascade=cascade, single_parent=True)
})
mapper(A, tableA)
Column('user_id', Integer, ForeignKey('users.user_id'), nullable=False),
Column('datetime', DateTime, nullable=False),
Column('headline', String(500)),
- Column('summary', Text),
+ Column('summary', String(255)),
Column('body', Text),
)