.. change::
:tags: firebird, bug
- :ticket: 2622
+ :tickets: 2622
Added missing import for "fdb" to the experimental
"firebird+fdb" dialect.
.. change::
:tags: orm, bug
- :ticket: 2614
-
- Added a new exception to detect the case where two
- subclasses are being loaded using with_polymorphic(),
- each subclass contains a relationship attribute of the same
- name, and eager loading is being applied to one or both.
- This is an ongoing bug which can't be immediately fixed,
- so since the results are usually wrong in any case it raises an
- exception for now. 0.7 has the same issue, so an exception
- raise here probably means the code was returning the wrong
- data in 0.7.
+ :tickets: 2614
+
+ A second overhaul of aliasing/internal pathing mechanics
+ now allows two subclasses to have different relationships
+ of the same name, supported with subquery or joined eager
+ loading on both simultaneously when a full polymorphic
+ load is used.
+
+ .. change::
+ :tags: orm, bug
+ :tickets: 2617
+
+ Fixed bug whereby a multi-hop subqueryload within
+ a particular with_polymorphic load would produce a KeyError.
+ Takes advantage of the same internal pathing overhaul
+ as :ticket:`2614`.
.. change::
:tags: sql, bug
from ..orm import class_mapper
from ..orm.session import Session
from ..orm.mapper import Mapper
+from ..orm.interfaces import MapperProperty
from ..orm.attributes import QueryableAttribute
from .. import Table, Column
from ..engine import Engine
id = "attribute:" + key + ":" + b64encode(pickle.dumps(cls))
elif isinstance(obj, Mapper) and not obj.non_primary:
id = "mapper:" + b64encode(pickle.dumps(obj.class_))
+ elif isinstance(obj, MapperProperty) and not obj.parent.non_primary:
+ id = "mapperprop:" + b64encode(pickle.dumps(obj.parent.class_)) + \
+ ":" + obj.key
elif isinstance(obj, Table):
id = "table:" + str(obj)
elif isinstance(obj, Column) and isinstance(obj.table, Table):
elif type_ == "mapper":
cls = pickle.loads(b64decode(args))
return class_mapper(cls)
+ elif type_ == "mapperprop":
+ mapper, keyname = args.split(':')
+ cls = pickle.loads(b64decode(args))
+ return class_mapper(cls).attrs[keyname]
elif type_ == "table":
return metadata.tables[args]
elif type_ == "column":
return operator(self.comparator, value)
+ def __repr__(self):
+ return '<%s at 0x%x; %s>' % (
+ self.__class__.__name__,
+ id(self), self.key)
class PropComparator(operators.ColumnOperators):
"""Defines boolean, comparison, and other operators for
return None
def _get_context_strategy(self, context, path):
- # this is essentially performance inlining.
- key = ('loaderstrategy', path.reduced_path + (self.key,))
- cls = None
- if key in context.attributes:
- cls = context.attributes[key]
- else:
+ strategy_cls = path._inlined_get_for(self, context, 'loaderstrategy')
+
+ if not strategy_cls:
wc_key = self._wildcard_path
if wc_key and wc_key in context.attributes:
- cls = context.attributes[wc_key]
+ strategy_cls = context.attributes[wc_key]
- if cls:
+ if strategy_cls:
try:
- return self._strategies[cls]
+ return self._strategies[strategy_cls]
except KeyError:
- return self.__init_strategy(cls)
+ return self.__init_strategy(strategy_cls)
return self.strategy
def _get_strategy(self, cls):
def _find_entity_prop_comparator(self, query, token, mapper, raiseerr):
if orm_util._is_aliased_class(mapper):
searchfor = mapper
- isa = False
else:
searchfor = orm_util._class_to_mapper(mapper)
- isa = True
for ent in query._mapper_entities:
if ent.corresponds_to(searchfor):
return ent
# exhaust current_path before
# matching tokens to entities
if current_path:
- if current_path[1] == token:
+ if current_path[1].key == token:
current_path = current_path[2:]
continue
else:
# matching tokens to entities
if current_path:
if current_path[0:2] == \
- [token._parententity, prop.key]:
+ [token._parententity, prop]:
current_path = current_path[2:]
continue
else:
raiseerr)
if not entity:
return no_result
+
path_element = entity.entity_zero
mapper = entity.mapper
else:
raise sa_exc.ArgumentError("Attribute '%s' does not "
"link from element '%s'" % (token, path_element))
- path = path[path_element][prop.key]
+ path = path[path_element][prop]
paths.append(path)
if not ext_info.is_aliased_class:
ac = orm_util.with_polymorphic(
ext_info.mapper.base_mapper,
- ext_info.mapper, aliased=True)
+ ext_info.mapper, aliased=True,
+ _use_mapper_path=True)
ext_info = inspect(ac)
path.set(query, "path_with_polymorphic", ext_info)
else:
new_populators = []
existing_populators = []
eager_populators = []
+
load_path = context.query._current_path + path \
if context.query._current_path.path \
else path
delayed_populators = []
pops = (new_populators, existing_populators, delayed_populators,
eager_populators)
+
for prop in mapper._props.itervalues():
+
for i, pop in enumerate(prop.create_row_processor(
- context, path,
+ context,
+ path,
mapper, row, adapter)):
if pop is not None:
pops[i].append((prop.key, pop))
if sub_mapper is mapper:
return None
- # replace the tip of the path info with the subclass mapper
- # being used, that way accurate "load_path" info is available
- # for options invoked during deferred loads, e.g.
- # query(Person).options(defer(Engineer.machines, Machine.name)).
- # for AliasedClass paths, disregard this step (new in 0.8).
return instance_processor(
sub_mapper,
context,
- path.parent[sub_mapper]
- if not path.is_aliased_class
- else path,
+ path,
adapter,
polymorphic_from=mapper)
return configure_subclass_mapper
dispatch = event.dispatcher(events.MapperEvents)
@util.memoized_property
- def _sa_path_registry(self):
+ def _path_registry(self):
return PathRegistry.per_mapper(self)
def _configure_inheritance(self):
if _new_mappers:
configure_mappers()
if not self.with_polymorphic:
- return [self]
+ return []
return self._mappers_from_spec(*self.with_polymorphic)
@_memoized_configured_property
return list(self._iterate_polymorphic_properties(
self._with_polymorphic_mappers))
+
def _iterate_polymorphic_properties(self, mappers=None):
"""Return an iterator of MapperProperty objects which will render into
a SELECT."""
-
if mappers is None:
mappers = self._with_polymorphic_mappers
ent.setup_entity(*d[entity])
def _mapper_loads_polymorphically_with(self, mapper, adapter):
- for m2 in mapper._with_polymorphic_mappers:
+ for m2 in mapper._with_polymorphic_mappers or [mapper]:
self._polymorphic_adapters[m2] = adapter
for m in m2.iterate_to_root():
self._polymorphic_adapters[m.local_table] = adapter
self._with_polymorphic = ext_info.with_polymorphic_mappers
self._polymorphic_discriminator = \
ext_info.polymorphic_on
+ self.entity_zero = ext_info
if ext_info.is_aliased_class:
- self.entity_zero = ext_info.entity
- self._label_name = self.entity_zero._sa_label_name
+ self._label_name = self.entity_zero.name
else:
- self.entity_zero = self.mapper
self._label_name = self.mapper.class_.__name__
- self.path = self.entity_zero._sa_path_registry
+ self.path = self.entity_zero._path_registry
def set_with_polymorphic(self, query, cls_or_mappers,
selectable, polymorphic_on):
+ """Receive an update from a call to query.with_polymorphic().
+
+ Note the newer style of using a free standing with_polymporphic()
+ construct doesn't make use of this method.
+
+
+ """
if self.is_aliased_class:
+ # TODO: invalidrequest ?
raise NotImplementedError(
"Can't use with_polymorphic() against "
"an Aliased object"
return self.entity_zero
def corresponds_to(self, entity):
- entity_info = inspect(entity)
- if entity_info.is_aliased_class or self.is_aliased_class:
- return entity is self.entity_zero \
- or \
- entity in self._with_polymorphic
- else:
- return entity.common_parent(self.entity_zero)
+ if entity.is_aliased_class:
+ if self.is_aliased_class:
+ if entity._base_alias is self.entity_zero._base_alias:
+ return True
+ return False
+ elif self.is_aliased_class:
+ if self.entity_zero._use_mapper_path:
+ return entity in self._with_polymorphic
+ else:
+ return entity is self.entity_zero
+
+ return entity.common_parent(self.entity_zero)
def adapt_to_selectable(self, query, sel):
query._entities.append(self)
if self.entity_zero is None:
return False
elif _is_aliased_class(entity):
+ # TODO: polymorphic subclasses ?
return entity is self.entity_zero
else:
return not _is_aliased_class(self.entity_zero) and \
self.uselist = self.parent_property.uselist
- def _warn_existing_path(self):
- raise sa_exc.InvalidRequestError(
- "Eager loading cannot currently function correctly when two or "
- "more "
- "same-named attributes associated with multiple polymorphic "
- "classes "
- "of the same base are present. Encountered more than one "
- r"eager path for attribute '%s' on mapper '%s'." %
- (self.key, self.parent.base_mapper, ))
-
class NoLoader(AbstractRelationshipLoader):
"""Provide loading behavior for a :class:`.RelationshipProperty`
q = q.autoflush(False)
if state.load_path:
- q = q._with_current_path(state.load_path[self.key])
+ q = q._with_current_path(state.load_path[self.parent_property])
if state.load_options:
q = q._conditional_options(*state.load_options)
if not context.query._enable_eagerloads:
return
- path = path[self.key]
+ path = path[self.parent_property]
# build up a path indicating the path from the leftmost
# entity to the thing we're subquery loading.
# add new query to attributes to be picked up
# by create_row_processor
- existing = path.replace(context, "subquery", q)
- if existing:
- self._warn_existing_path()
+ path.set(context, "subquery", q)
def _get_leftmost(self, subq_path):
subq_path = subq_path.path
subq_mapper = orm_util._class_to_mapper(subq_path[0])
# determine attributes of the leftmost mapper
- if self.parent.isa(subq_mapper) and self.key == subq_path[1]:
+ if self.parent.isa(subq_mapper) and self.parent_property is subq_path[1]:
leftmost_mapper, leftmost_prop = \
self.parent, self.parent_property
else:
leftmost_mapper, leftmost_prop = \
subq_mapper, \
- subq_mapper._props[subq_path[1]]
+ subq_path[1]
leftmost_cols = leftmost_prop.local_columns
# the original query now becomes a subquery
# which we'll join onto.
embed_q = q.with_labels().subquery()
- left_alias = orm_util.AliasedClass(leftmost_mapper, embed_q)
+ left_alias = orm_util.AliasedClass(leftmost_mapper, embed_q,
+ use_mapper_path=True)
return left_alias
def _prep_for_joins(self, left_alias, subq_path):
- subq_path = subq_path.path
-
# figure out what's being joined. a.k.a. the fun part
- to_join = [
- (subq_path[i], subq_path[i + 1])
- for i in xrange(0, len(subq_path), 2)
- ]
+ to_join = []
+ pairs = list(subq_path.pairs())
+
+ for i, (mapper, prop) in enumerate(pairs):
+ if i > 0:
+ # look at the previous mapper in the chain -
+ # if it is as or more specific than this prop's
+ # mapper, use that instead.
+ # note we have an assumption here that
+ # the non-first element is always going to be a mapper,
+ # not an AliasedClass
+
+ prev_mapper = pairs[i - 1][1].mapper
+ to_append = prev_mapper if prev_mapper.isa(mapper) else mapper
+ else:
+ to_append = mapper
+
+ to_join.append((to_append, prop.key))
# determine the immediate parent class we are joining from,
# which needs to be aliased.
-
if len(to_join) > 1:
- info = inspect(subq_path[-2])
+ info = inspect(to_join[-1][0])
if len(to_join) < 2:
# in the case of a one level eager load, this is the
# in the vast majority of cases, and [ticket:2014]
# illustrates a case where sub_path[-2] is a subclass
# of self.parent
- parent_alias = orm_util.AliasedClass(subq_path[-2])
+ parent_alias = orm_util.AliasedClass(to_join[-1][0],
+ use_mapper_path=True)
else:
# if of_type() were used leading to this relationship,
# self.parent is more specific than subq_path[-2]
- parent_alias = orm_util.AliasedClass(self.parent)
+ parent_alias = orm_util.AliasedClass(self.parent,
+ use_mapper_path=True)
local_cols = self.parent_property.local_columns
"population - eager loading cannot be applied." %
self)
- path = path[self.key]
+ path = path[self.parent_property]
subq = path.get(context, 'subquery')
+
if subq is None:
return None, None, None
if not context.query._enable_eagerloads:
return
- path = path[self.key]
+ path = path[self.parent_property]
with_polymorphic = None
with_polymorphic = None
path = path[self.mapper]
+
for value in self.mapper._iterate_polymorphic_properties(
mappers=with_polymorphic):
value.setup(
if with_poly_info:
to_adapt = with_poly_info.entity
else:
- to_adapt = orm_util.AliasedClass(self.mapper)
+ to_adapt = orm_util.AliasedClass(self.mapper,
+ use_mapper_path=True)
clauses = orm_util.ORMAdapter(
to_adapt,
equivalents=self.mapper._equivalent_columns,
)
add_to_collection = context.secondary_columns
- existing = path.replace(context, "eager_row_processor", clauses)
- if existing:
- self._warn_existing_path()
+ path.set(context, "eager_row_processor", clauses)
+
return clauses, adapter, add_to_collection, allow_innerjoin
def _create_eager_join(self, context, entity,
onclause = getattr(
orm_util.AliasedClass(
self.parent,
- adapter.selectable
+ adapter.selectable,
+ use_mapper_path=True
),
self.key, self.parent_property
)
"population - eager loading cannot be applied." %
self)
- our_path = path[self.key]
+ our_path = path[self.parent_property]
eager_adapter = self._create_eager_adapter(
context,
def process_query_property(self, query, paths):
if self.chained:
for path in paths[0:-1]:
- (root_mapper, propname) = path.path[-2:]
- prop = root_mapper._props[propname]
+ (root_mapper, prop) = path.path[-2:]
adapter = query._polymorphic_adapters.get(prop.mapper, None)
path.setdefault(query,
"user_defined_eager_row_processor",
adapter)
- root_mapper, propname = paths[-1].path[-2:]
- prop = root_mapper._props[propname]
+ root_mapper, prop = paths[-1].path[-2:]
if self.alias is not None:
if isinstance(self.alias, basestring):
self.alias = prop.target.alias(self.alias)
else:
return None
+def _unreduce_path(path):
+ return PathRegistry.deserialize(path)
class PathRegistry(object):
"""Represent query load paths and registry functions.
self.path == other.path
def set(self, reg, key, value):
- reg._attributes[(key, self.reduced_path)] = value
-
- def replace(self, reg, key, value):
- path_key = (key, self.reduced_path)
- existing = reg._attributes.get(path_key, None)
- reg._attributes[path_key] = value
- return existing
+ reg._attributes[(key, self.path)] = value
def setdefault(self, reg, key, value):
- reg._attributes.setdefault((key, self.reduced_path), value)
+ reg._attributes.setdefault((key, self.path), value)
def get(self, reg, key, value=None):
- key = (key, self.reduced_path)
+ key = (key, self.path)
if key in reg._attributes:
return reg._attributes[key]
else:
def length(self):
return len(self.path)
+ def pairs(self):
+ path = self.path
+ for i in xrange(0, len(path), 2):
+ yield path[i], path[i + 1]
+
def contains_mapper(self, mapper):
return mapper in self.path
def contains(self, reg, key):
- return (key, self.reduced_path) in reg._attributes
+ return (key, self.path) in reg._attributes
+
+ def __reduce__(self):
+ return _unreduce_path, (self.serialize(), )
def serialize(self):
path = self.path
return zip(
[m.class_ for m in [path[i] for i in range(0, len(path), 2)]],
- [path[i] for i in range(1, len(path), 2)] + [None]
+ [path[i].key for i in range(1, len(path), 2)] + [None]
)
@classmethod
if path is None:
return None
- p = tuple(chain(*[(class_mapper(mcls), key) for mcls, key in path]))
+ p = tuple(chain(*[(class_mapper(mcls),
+ class_mapper(mcls).attrs[key]
+ if key is not None else None)
+ for mcls, key in path]))
if p and p[-1] is None:
p = p[0:-1]
return cls.coerce(p)
@classmethod
def token(cls, token):
- return KeyRegistry(cls.root, token)
+ return TokenRegistry(cls.root, token)
def __add__(self, other):
return util.reduce(
"""
path = ()
- reduced_path = ()
- def __getitem__(self, mapper):
- return mapper._sa_path_registry
+ def __getitem__(self, entity):
+ return entity._path_registry
PathRegistry.root = RootRegistry()
+class TokenRegistry(PathRegistry):
+ def __init__(self, parent, token):
+ self.token = token
+ self.parent = parent
+ self.path = parent.path + (token,)
-class KeyRegistry(PathRegistry):
- def __init__(self, parent, key):
- self.key = key
+ def __getitem__(self, entity):
+ raise NotImplementedError()
+
+class PropRegistry(PathRegistry):
+ def __init__(self, parent, prop):
+ # restate this path in terms of the
+ # given MapperProperty's parent.
+ insp = inspection.inspect(parent[-1])
+ if not insp.is_aliased_class or insp._use_mapper_path:
+ parent = parent.parent[prop.parent]
+ elif insp.is_aliased_class and insp.with_polymorphic_mappers:
+ if prop.parent is not insp.mapper and \
+ prop.parent in insp.with_polymorphic_mappers:
+ subclass_entity = parent[-1]._entity_for_mapper(prop.parent)
+ parent = parent.parent[subclass_entity]
+
+ self.prop = prop
self.parent = parent
- self.path = parent.path + (key,)
- self.reduced_path = parent.reduced_path + (key,)
+ self.path = parent.path + (prop,)
def __getitem__(self, entity):
if isinstance(entity, (int, slice)):
is_aliased_class = False
def __init__(self, parent, entity):
- self.key = reduced_key = entity
+ self.key = entity
self.parent = parent
- if hasattr(entity, 'base_mapper'):
- reduced_key = entity.base_mapper
- else:
- self.is_aliased_class = True
+ self.is_aliased_class = entity.is_aliased_class
self.path = parent.path + (entity,)
- self.reduced_path = parent.reduced_path + (reduced_key,)
def __nonzero__(self):
return True
else:
return dict.__getitem__(self, entity)
+ def _inlined_get_for(self, prop, context, key):
+ """an inlined version of:
+
+ cls = path[mapperproperty].get(context, key)
+
+ Skips the isinstance() check in __getitem__
+ and the extra method call for get().
+ Used by StrategizedProperty for its
+ very frequent lookup.
+
+ """
+ path = dict.__getitem__(self, prop)
+ path_key = (key, path.path)
+ if path_key in context._attributes:
+ return context._attributes[path_key]
+ else:
+ return None
+
def __missing__(self, key):
- self[key] = item = KeyRegistry(self, key)
+ self[key] = item = PropRegistry(self, key)
return item
def __init__(self, cls, alias=None,
name=None,
adapt_on_names=False,
+ # TODO: None for default here?
with_polymorphic_mappers=(),
- with_polymorphic_discriminator=None):
+ with_polymorphic_discriminator=None,
+ base_alias=None,
+ use_mapper_path=False):
mapper = _class_to_mapper(cls)
if alias is None:
alias = mapper._with_polymorphic_selectable.alias(name=name)
mapper,
alias,
name,
- with_polymorphic_mappers,
+ with_polymorphic_mappers
+ if with_polymorphic_mappers
+ else mapper.with_polymorphic_mappers,
with_polymorphic_discriminator
+ if with_polymorphic_discriminator is not None
+ else mapper.polymorphic_on,
+ base_alias,
+ use_mapper_path
)
+
self._setup(self._aliased_insp, adapt_on_names)
+
def _setup(self, aliased_insp, adapt_on_names):
self.__adapt_on_names = adapt_on_names
mapper = aliased_insp.mapper
equivalents=mapper._equivalent_columns,
adapt_on_names=self.__adapt_on_names)
for poly in aliased_insp.with_polymorphic_mappers:
- setattr(self, poly.class_.__name__,
- AliasedClass(poly.class_, alias))
+ if poly is not mapper:
+ setattr(self, poly.class_.__name__,
+ AliasedClass(poly.class_, alias, base_alias=self,
+ use_mapper_path=self._aliased_insp._use_mapper_path))
- # used to assign a name to the RowTuple object
- # returned by Query.
- self._sa_label_name = aliased_insp.name
self.__name__ = 'AliasedClass_%s' % self.__target.__name__
- @util.memoized_property
- def _sa_path_registry(self):
- return PathRegistry.per_mapper(self)
-
def __getstate__(self):
return {
'mapper': self._aliased_insp.mapper,
'with_polymorphic_mappers':
self._aliased_insp.with_polymorphic_mappers,
'with_polymorphic_discriminator':
- self._aliased_insp.polymorphic_on
+ self._aliased_insp.polymorphic_on,
+ 'base_alias': self._aliased_insp._base_alias.entity,
+ 'use_mapper_path': self._aliased_insp._use_mapper_path
}
def __setstate__(self, state):
state['mapper'],
state['alias'],
state['name'],
- state.get('with_polymorphic_mappers'),
- state.get('with_polymorphic_discriminator')
+ state['with_polymorphic_mappers'],
+ state['with_polymorphic_discriminator'],
+ state['base_alias'],
+ state['use_mapper_path']
)
self._setup(self._aliased_insp, state['adapt_on_names'])
queryattr = attributes.QueryableAttribute(
self, key,
impl=existing.impl,
- parententity=self,
+ parententity=self._aliased_insp,
comparator=comparator)
setattr(self, key, queryattr)
return queryattr
id(self), self.__target.__name__)
-AliasedInsp = util.namedtuple("AliasedInsp", [
- "entity",
- "mapper",
- "selectable",
- "name",
- "with_polymorphic_mappers",
- "polymorphic_on"
- ])
-
-
-class AliasedInsp(_InspectionAttr, AliasedInsp):
+class AliasedInsp(_InspectionAttr):
"""Provide an inspection interface for an
:class:`.AliasedClass` object.
"""
+ def __init__(self, entity, mapper, selectable, name,
+ with_polymorphic_mappers, polymorphic_on,
+ _base_alias, _use_mapper_path):
+ self.entity = entity
+ self.mapper = mapper
+ self.selectable = selectable
+ self.name = name
+ self.with_polymorphic_mappers = with_polymorphic_mappers
+ self.polymorphic_on = polymorphic_on
+
+ # a little dance to get serialization to work
+ self._base_alias = _base_alias._aliased_insp if _base_alias \
+ and _base_alias is not entity else self
+ self._use_mapper_path = _use_mapper_path
+
+
is_aliased_class = True
"always returns True"
:class:`.AliasedInsp`."""
return self.mapper.class_
+ @util.memoized_property
+ def _path_registry(self):
+ if self._use_mapper_path:
+ return self.mapper._path_registry
+ else:
+ return PathRegistry.per_mapper(self)
+
+ def _entity_for_mapper(self, mapper):
+ self_poly = self.with_polymorphic_mappers
+ if mapper in self_poly:
+ return getattr(self.entity, mapper.class_.__name__)._aliased_insp
+ elif mapper.isa(self.mapper):
+ return self
+ else:
+ assert False, "mapper %s doesn't correspond to %s" % (mapper, self)
+
+ def __repr__(self):
+ return '<AliasedInsp at 0x%x; %s>' % (
+ id(self), self.class_.__name__)
+
inspection._inspects(AliasedClass)(lambda target: target._aliased_insp)
+inspection._inspects(AliasedInsp)(lambda target: target)
def aliased(element, alias=None, name=None, adapt_on_names=False):
def with_polymorphic(base, classes, selectable=False,
polymorphic_on=None, aliased=False,
- innerjoin=False):
+ innerjoin=False, _use_mapper_path=False):
"""Produce an :class:`.AliasedClass` construct which specifies
columns for descendant mappers of the given base.
return AliasedClass(base,
selectable,
with_polymorphic_mappers=mappers,
- with_polymorphic_discriminator=polymorphic_on)
+ with_polymorphic_discriminator=polymorphic_on,
+ use_mapper_path=_use_mapper_path)
def _orm_annotate(element, exclude=None):
description = entity
entity = insp.c
elif insp.is_aliased_class:
+ entity = insp.entity
description = entity
elif hasattr(insp, "mapper"):
description = entity = insp.mapper.class_
metadata = MetaData()
a = Table("a", metadata,
Column('id', Integer, primary_key=True),
+ Column('foo', Integer),
+ Column('bar', Integer)
)
m1 = mapper(A, a)
@profile_memory()
def go():
- ma = aliased(A)
- m1._sa_path_registry['foo'][ma]['bar']
+ ma = sa.inspect(aliased(A))
+ m1._path_registry[m1.attrs.foo][ma][m1.attrs.bar]
go()
clear_mappers()
.all(),
expected)
- # TODO: this fails due to the change
- # in _configure_subclass_mapper. however we might not
- # need it anymore.
- def test_polymorphic_option(self):
- """
- Test that polymorphic loading sets state.load_path with its
- actual mapper on a subclass, and not the superclass mapper.
-
- This only works for non-aliased mappers.
- """
- paths = []
- class MyOption(interfaces.MapperOption):
- propagate_to_loaders = True
- def process_query_conditionally(self, query):
- paths.append(query._current_path.path)
-
- sess = create_session()
- names = ['dilbert', 'pointy haired boss']
- dilbert, boss = (
- sess.query(Person)
- .options(MyOption())
- .filter(Person.name.in_(names))
- .order_by(Person.name).all())
-
- dilbert.machines
- boss.paperwork
-
- eq_(paths,
- [(class_mapper(Engineer), 'machines'),
- (class_mapper(Boss), 'paperwork')])
def test_subclass_option_pathing(self):
from sqlalchemy.orm import defer
sess = create_session()
- names = ['dilbert', 'pointy haired boss']
dilbert = sess.query(Person).\
options(defer(Engineer.machines, Machine.name)).\
filter(Person.name == 'dilbert').first()
.filter(palias.name.in_(['dilbert', 'wally'])).all(),
[e1, e2])
- def test_self_referential(self):
+ def test_self_referential_one(self):
sess = create_session()
palias = aliased(Person)
expected = [(m1, e1), (m1, e2), (m1, b1)]
.order_by(Person.person_id, palias.person_id).all(),
expected)
+ def test_self_referential_two(self):
+ sess = create_session()
+ palias = aliased(Person)
+ expected = [(m1, e1), (m1, e2), (m1, b1)]
+
eq_(sess.query(Person, palias)
.filter(Person.company_id == palias.company_id)
.filter(Person.name == 'dogbert')
from sqlalchemy.orm import create_session, relationship, mapper, \
contains_eager, joinedload, subqueryload, subqueryload_all,\
- Session, aliased
+ Session, aliased, with_polymorphic
from sqlalchemy import Integer, String, ForeignKey
from sqlalchemy.engine import default
def test_subq_through_related(self):
Parent = self.classes.Parent
- Sub = self.classes.Sub
+ Base = self.classes.Base
sess = Session()
+
def go():
eq_(sess.query(Parent)
- .options(subqueryload_all(Parent.children, Sub.related))
+ .options(subqueryload_all(Parent.children, Base.related))
.order_by(Parent.data).all(),
[p1, p2])
self.assert_sql_count(testing.db, go, 3)
+ def test_subq_through_related_aliased(self):
+ Parent = self.classes.Parent
+ Base = self.classes.Base
+ pa = aliased(Parent)
+ sess = Session()
+
+ def go():
+ eq_(sess.query(pa)
+ .options(subqueryload_all(pa.children, Base.related))
+ .order_by(pa.data).all(),
+ [p1, p2])
+ self.assert_sql_count(testing.db, go, 3)
+
class SubClassEagerToSubClassTest(fixtures.MappedTest):
"""Test joinedloads from subclass to subclass mappers"""
[p1, p2])
self.assert_sql_count(testing.db, go, 2)
+class SameNamedPropTwoPolymorphicSubClassesTest(fixtures.MappedTest):
+ """test pathing when two subclasses contain a different property
+ for the same name, and polymorphic loading is used.
+
+ #2614
+
+ """
+ run_setup_classes = 'once'
+ run_setup_mappers = 'once'
+ run_inserts = 'once'
+ run_deletes = None
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table('a', metadata,
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('type', String(10))
+ )
+ Table('b', metadata,
+ Column('id', Integer, ForeignKey('a.id'), primary_key=True)
+ )
+ Table('btod', metadata,
+ Column('bid', Integer, ForeignKey('b.id'), nullable=False),
+ Column('did', Integer, ForeignKey('d.id'), nullable=False)
+ )
+ Table('c', metadata,
+ Column('id', Integer, ForeignKey('a.id'), primary_key=True)
+ )
+ Table('ctod', metadata,
+ Column('cid', Integer, ForeignKey('c.id'), nullable=False),
+ Column('did', Integer, ForeignKey('d.id'), nullable=False)
+ )
+ Table('d', metadata,
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
+ )
+
+ @classmethod
+ def setup_classes(cls):
+ class A(cls.Comparable):
+ pass
+ class B(A):
+ pass
+ class C(A):
+ pass
+ class D(cls.Comparable):
+ pass
+
+ @classmethod
+ def setup_mappers(cls):
+ A = cls.classes.A
+ B = cls.classes.B
+ C = cls.classes.C
+ D = cls.classes.D
+
+ mapper(A, cls.tables.a, polymorphic_on=cls.tables.a.c.type)
+ mapper(B, cls.tables.b, inherits=A, polymorphic_identity='b',
+ properties={
+ 'related': relationship(D, secondary=cls.tables.btod)
+ })
+ mapper(C, cls.tables.c, inherits=A, polymorphic_identity='c',
+ properties={
+ 'related': relationship(D, secondary=cls.tables.ctod)
+ })
+ mapper(D, cls.tables.d)
+
+
+ @classmethod
+ def insert_data(cls):
+ B = cls.classes.B
+ C = cls.classes.C
+ D = cls.classes.D
+
+ session = Session()
+
+ d = D()
+ session.add_all([
+ B(related=[d]),
+ C(related=[d])
+ ])
+ session.commit()
+
+ def test_free_w_poly_subquery(self):
+ A = self.classes.A
+ B = self.classes.B
+ C = self.classes.C
+ D = self.classes.D
+
+ session = Session()
+ d = session.query(D).one()
+ a_poly = with_polymorphic(A, [B, C])
+ def go():
+ for a in session.query(a_poly).\
+ options(
+ subqueryload(a_poly.B.related),
+ subqueryload(a_poly.C.related)):
+ eq_(a.related, [d])
+ self.assert_sql_count(testing.db, go, 3)
+
+ def test_fixed_w_poly_subquery(self):
+ A = self.classes.A
+ B = self.classes.B
+ C = self.classes.C
+ D = self.classes.D
+
+ session = Session()
+ d = session.query(D).one()
+ def go():
+ for a in session.query(A).with_polymorphic([B, C]).\
+ options(subqueryload(B.related), subqueryload(C.related)):
+ eq_(a.related, [d])
+ self.assert_sql_count(testing.db, go, 3)
+
+ def test_free_w_poly_joined(self):
+ A = self.classes.A
+ B = self.classes.B
+ C = self.classes.C
+ D = self.classes.D
+
+ session = Session()
+ d = session.query(D).one()
+ a_poly = with_polymorphic(A, [B, C])
+ def go():
+ for a in session.query(a_poly).\
+ options(
+ joinedload(a_poly.B.related),
+ joinedload(a_poly.C.related)):
+ eq_(a.related, [d])
+ self.assert_sql_count(testing.db, go, 1)
+
+ def test_fixed_w_poly_joined(self):
+ A = self.classes.A
+ B = self.classes.B
+ C = self.classes.C
+ D = self.classes.D
+
+ session = Session()
+ d = session.query(D).one()
+ def go():
+ for a in session.query(A).with_polymorphic([B, C]).\
+ options(joinedload(B.related), joinedload(C.related)):
+ eq_(a.related, [d])
+ self.assert_sql_count(testing.db, go, 1)
+
+
+class SubClassToSubClassFromParentTest(fixtures.MappedTest):
+ """test #2617
+
+ """
+ run_setup_classes = 'once'
+ run_setup_mappers = 'once'
+ run_inserts = 'once'
+ run_deletes = None
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table('z', metadata,
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
+ )
+ Table('a', metadata,
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('type', String(10)),
+ Column('z_id', Integer, ForeignKey('z.id'))
+ )
+ Table('b', metadata,
+ Column('id', Integer, ForeignKey('a.id'), primary_key=True)
+ )
+ Table('d', metadata,
+ Column('id', Integer, ForeignKey('a.id'), primary_key=True),
+ Column('b_id', Integer, ForeignKey('b.id'))
+ )
+
+ @classmethod
+ def setup_classes(cls):
+ class Z(cls.Comparable):
+ pass
+ class A(cls.Comparable):
+ pass
+ class B(A):
+ pass
+ class D(A):
+ pass
+
+ @classmethod
+ def setup_mappers(cls):
+ Z = cls.classes.Z
+ A = cls.classes.A
+ B = cls.classes.B
+ D = cls.classes.D
+
+ mapper(Z, cls.tables.z)
+ mapper(A, cls.tables.a, polymorphic_on=cls.tables.a.c.type,
+ with_polymorphic='*',
+ properties={
+ 'zs': relationship(Z, lazy="subquery")
+ })
+ mapper(B, cls.tables.b, inherits=A, polymorphic_identity='b',
+ properties={
+ 'related': relationship(D, lazy="subquery",
+ primaryjoin=cls.tables.d.c.b_id ==
+ cls.tables.b.c.id)
+ })
+ mapper(D, cls.tables.d, inherits=A, polymorphic_identity='d')
+
+
+ @classmethod
+ def insert_data(cls):
+ B = cls.classes.B
+
+ session = Session()
+ session.add(B())
+ session.commit()
+
+ def test_2617(self):
+ A = self.classes.A
+ session = Session()
+ def go():
+ a1 = session.query(A).first()
+ eq_(a1.related, [])
+ self.assert_sql_count(testing.db, go, 3)
sess = create_session()
context = sess.query(Manager).options(subqueryload('stuff'))._compile_context()
- subq = context.attributes[('subquery', (class_mapper(Employee), 'stuff'))]
+ subq = context.attributes[('subquery',
+ (class_mapper(Manager), class_mapper(Manager).attrs.stuff))]
self.assert_compile(subq,
'SELECT employee_stuff.id AS '
})
mapper(Keyword, keywords)
+
+
for opt, count in [
((
joinedload(User.orders, Order.items),
assert len(list(session)) == 3
-
-class WarnFor2614TestBase(object):
-
- @classmethod
- def define_tables(cls, metadata):
- Table('a', metadata,
- Column('id', Integer, primary_key=True),
- Column('type', String(50)),
- )
- Table('b', metadata,
- Column('id', Integer, ForeignKey('a.id'), primary_key=True),
- )
- Table('c', metadata,
- Column('id', Integer, ForeignKey('a.id'), primary_key=True),
- )
- Table('d', metadata,
- Column('id', Integer, primary_key=True),
- Column('bid', Integer, ForeignKey('b.id')),
- Column('cid', Integer, ForeignKey('c.id')),
- )
-
- def _mapping(self, lazy_b=True, lazy_c=True):
- class A(object):
- pass
-
- class B(A):
- pass
-
- class C(A):
- pass
-
- class D(object):
- pass
-
- mapper(A, self.tables.a, polymorphic_on=self.tables.a.c.type)
- mapper(B, self.tables.b, inherits=A, polymorphic_identity='b',
- properties={
- 'ds': relationship(D, lazy=lazy_b)
- })
- mapper(C, self.tables.c, inherits=A, polymorphic_identity='c',
- properties={
- 'ds': relationship(D, lazy=lazy_c)
- })
- mapper(D, self.tables.d)
-
-
- return A, B, C, D
-
- def _assert_raises(self, fn):
- assert_raises_message(
- sa.exc.InvalidRequestError,
- "Eager loading cannot currently function correctly when two or more "
- r"same\-named attributes associated with multiple polymorphic classes "
- "of the same base are present. Encountered more than one "
- r"eager path for attribute 'ds' on mapper 'Mapper\|A\|a'.",
- fn
- )
-
- def test_poly_both_eager(self):
- A, B, C, D = self._mapping(lazy_b=self.eager_name,
- lazy_c=self.eager_name)
-
- session = Session()
- self._assert_raises(
- session.query(A).with_polymorphic('*').all
- )
-
- def test_poly_one_eager(self):
- A, B, C, D = self._mapping(lazy_b=self.eager_name, lazy_c=True)
-
- session = Session()
- session.query(A).with_polymorphic('*').all()
-
- def test_poly_both_option(self):
- A, B, C, D = self._mapping()
-
- session = Session()
- self._assert_raises(
- session.query(A).with_polymorphic('*').options(
- self.eager_option(B.ds), self.eager_option(C.ds)).all
- )
-
- def test_poly_one_option_bs(self):
- A, B, C, D = self._mapping()
-
- session = Session()
-
- # sucks, can't even do eager() on just one of them, as B.ds
- # hits for both
- self._assert_raises(
- session.query(A).with_polymorphic('*').\
- options(self.eager_option(B.ds)).all
- )
-
- def test_poly_one_option_cs(self):
- A, B, C, D = self._mapping()
-
- session = Session()
-
- # sucks, can't even do eager() on just one of them, as B.ds
- # hits for both
- self._assert_raises(
- session.query(A).with_polymorphic('*').\
- options(self.eager_option(C.ds)).all
- )
-
- def test_single_poly_one_option_bs(self):
- A, B, C, D = self._mapping()
-
- session = Session()
-
- session.query(A).with_polymorphic(B).\
- options(self.eager_option(B.ds)).all()
-
- def test_lazy_True(self):
- A, B, C, D = self._mapping()
-
- session = Session()
- session.query(A).with_polymorphic('*').all()
-
-class WarnFor2614Test(WarnFor2614TestBase, fixtures.MappedTest):
- eager_name = "joined"
-
- def eager_option(self, arg):
- return joinedload(arg)
def test_with_polymorphic(self):
User = self.classes.User
insp = inspect(User)
- eq_(insp.with_polymorphic_mappers, [insp])
+ eq_(insp.with_polymorphic_mappers, [])
def test_col_property(self):
User = self.classes.User
is_(prop._parentmapper, class_mapper(User))
is_(prop.mapper, class_mapper(Address))
- is_(prop._parententity, ua)
+ is_(prop._parententity, inspect(ua))
def test_insp_column_prop(self):
User = self.classes.User
assert not hasattr(prop, "mapper")
- is_(prop._parententity, ua)
+ is_(prop._parententity, inspect(ua))
def test_rel_accessors(self):
User = self.classes.User
)
- def test_multiple_explicit_entities(self):
+ def test_multiple_explicit_entities_one(self):
Node = self.classes.Node
sess = create_session()
(Node(data='n122'), Node(data='n12'), Node(data='n1'))
)
+ def test_multiple_explicit_entities_two(self):
+ Node = self.classes.Node
+
+ sess = create_session()
+
+ parent = aliased(Node)
+ grandparent = aliased(Node)
eq_(
sess.query(Node, parent, grandparent).\
join(parent, Node.parent).\
(Node(data='n122'), Node(data='n12'), Node(data='n1'))
)
+ def test_multiple_explicit_entities_three(self):
+ Node = self.classes.Node
+
+ sess = create_session()
+
+ parent = aliased(Node)
+ grandparent = aliased(Node)
# same, change order around
eq_(
sess.query(parent, grandparent, Node).\
(Node(data='n12'), Node(data='n1'), Node(data='n122'))
)
+ def test_multiple_explicit_entities_four(self):
+ Node = self.classes.Node
+
+ sess = create_session()
+
+ parent = aliased(Node)
+ grandparent = aliased(Node)
eq_(
sess.query(Node, parent, grandparent).\
join(parent, Node.parent).\
(Node(data='n122'), Node(data='n12'), Node(data='n1'))
)
+ def test_multiple_explicit_entities_five(self):
+ Node = self.classes.Node
+
+ sess = create_session()
+
+ parent = aliased(Node)
+ grandparent = aliased(Node)
eq_(
sess.query(Node, parent, grandparent).\
join(parent, Node.parent).\
reserved: maps.c.state})
-
from sqlalchemy import MetaData, null, exists, text, union, literal, \
literal_column, func, between, Unicode, desc, and_, bindparam, \
select, distinct, or_, collate
+from sqlalchemy import inspect
from sqlalchemy import exc as sa_exc, util
from sqlalchemy.sql import compiler, table, column
from sqlalchemy.sql import expression
if i % 2 == 0:
if isinstance(item, type):
item = class_mapper(item)
+ else:
+ if isinstance(item, basestring):
+ item = inspect(r[-1]).mapper.attrs[item]
r.append(item)
return tuple(r)
q = sess.query(User)
opt = self._option_fixture('email_address', 'id')
q = sess.query(Address)._with_current_path(
- orm_util.PathRegistry.coerce([class_mapper(User), 'addresses'])
+ orm_util.PathRegistry.coerce([inspect(User),
+ inspect(User).attrs.addresses])
)
self._assert_path_result(opt, q, [])
class SubAddr(Address):
pass
mapper(SubAddr, inherits=Address, properties={
- 'flub':relationship(Dingaling)
+ 'flub': relationship(Dingaling)
})
q = sess.query(Address)
opt = self._option_fixture(SubAddr.flub)
- self._assert_path_result(opt, q, [(Address, 'flub')])
+ self._assert_path_result(opt, q, [(SubAddr, 'flub')])
def test_from_subclass_to_subclass_attr(self):
Dingaling, Address = self.classes.Dingaling, self.classes.Address
class SubAddr(Address):
pass
mapper(SubAddr, inherits=Address, properties={
- 'flub':relationship(Dingaling)
+ 'flub': relationship(Dingaling)
})
q = sess.query(SubAddr)
class SubAddr(Address):
pass
mapper(SubAddr, inherits=Address, properties={
- 'flub':relationship(Dingaling)
+ 'flub': relationship(Dingaling)
})
q = sess.query(Address)
opt = self._option_fixture(SubAddr.user)
- self._assert_path_result(opt, q, [(Address, 'user')])
+ self._assert_path_result(opt, q,
+ [(Address, inspect(Address).attrs.user)])
def test_of_type(self):
User, Address = self.classes.User, self.classes.Address
q = sess.query(User)
opt = self._option_fixture(User.addresses.of_type(SubAddr), SubAddr.user)
+ u_mapper = inspect(User)
+ a_mapper = inspect(Address)
self._assert_path_result(opt, q, [
- (User, 'addresses'),
- (User, 'addresses', SubAddr, 'user')
+ (u_mapper, u_mapper.attrs.addresses),
+ (u_mapper, u_mapper.attrs.addresses, a_mapper, a_mapper.attrs.user)
])
def test_of_type_plus_level(self):
class SubAddr(Address):
pass
mapper(SubAddr, inherits=Address, properties={
- 'flub':relationship(Dingaling)
+ 'flub': relationship(Dingaling)
})
q = sess.query(User)
opt = self._option_fixture(User.addresses.of_type(SubAddr), SubAddr.flub)
+ u_mapper = inspect(User)
+ sa_mapper = inspect(SubAddr)
self._assert_path_result(opt, q, [
- (User, 'addresses'),
- (User, 'addresses', SubAddr, 'flub')
+ (u_mapper, u_mapper.attrs.addresses),
+ (u_mapper, u_mapper.attrs.addresses, sa_mapper, sa_mapper.attrs.flub)
])
def test_aliased_single(self):
ualias = aliased(User)
q = sess.query(ualias)
opt = self._option_fixture(ualias.addresses)
- self._assert_path_result(opt, q, [(ualias, 'addresses')])
+ self._assert_path_result(opt, q, [(inspect(ualias), 'addresses')])
def test_with_current_aliased_single(self):
User, Address = self.classes.User, self.classes.Address
self._make_path_registry([Address, 'user'])
)
opt = self._option_fixture(Address.user, ualias.addresses)
- self._assert_path_result(opt, q, [(ualias, 'addresses')])
+ self._assert_path_result(opt, q, [(inspect(ualias), 'addresses')])
def test_with_current_aliased_single_nonmatching_option(self):
User, Address = self.classes.User, self.classes.Address
cls.tables.addresses, cls.classes.Address,
cls.tables.orders, cls.classes.Order)
mapper(User, users, properties={
- 'addresses':relationship(Address),
- 'orders':relationship(Order)
+ 'addresses': relationship(Address),
+ 'orders': relationship(Order)
})
mapper(Address, addresses)
mapper(Order, orders)
cls.classes.Keyword,
cls.classes.Item)
mapper(Keyword, keywords, properties={
- "keywords":column_property(keywords.c.name + "some keyword")
+ "keywords": column_property(keywords.c.name + "some keyword")
})
mapper(Item, items,
properties=dict(keywords=relationship(Keyword,
q = create_session().query(*entity_list).\
options(joinedload(option))
- key = ('loaderstrategy', (class_mapper(Item), 'keywords'))
+ key = ('loaderstrategy', (inspect(Item), inspect(Item).attrs.keywords))
assert key in q._attributes
def _assert_eager_with_entity_exception(self, entity_list, options,
assert_raises_message(sa.exc.ArgumentError, message,
create_session().query(column).options,
joinedload(eager_option))
+
@classmethod
def fixtures(cls):
return dict(
- foo = [
+ foo=[
('id', 'type', 'related_id'),
(1, 'bar', 1),
(2, 'bar', 2),
(3, 'baz', 1),
(4, 'baz', 2),
],
- bar = [
+ bar=[
('id', ),
(1,),
(2,)
],
- baz = [
+ baz=[
('id', ),
(3,),
(4,)
],
- related = [
+ related=[
('id', ),
(1,),
(2,)
@classmethod
def setup_mappers(cls):
mapper(cls.classes.Foo, cls.tables.foo, properties={
- 'related':relationship(cls.classes.Related)
+ 'related': relationship(cls.classes.Related)
}, polymorphic_on=cls.tables.foo.c.type)
mapper(cls.classes.Bar, cls.tables.bar, polymorphic_identity='bar',
inherits=cls.classes.Foo)
inherits=cls.classes.Foo)
mapper(cls.classes.Related, cls.tables.related)
- def test_caches_query_per_base(self):
+ def test_caches_query_per_base_subq(self):
Foo, Bar, Baz, Related = self.classes.Foo, self.classes.Bar, \
self.classes.Baz, self.classes.Related
s = Session(testing.db)
def go():
eq_(
- s.query(Foo).with_polymorphic([Bar, Baz]).order_by(Foo.id).options(subqueryload(Foo.related)).all(),
+ s.query(Foo).with_polymorphic([Bar, Baz]).\
+ order_by(Foo.id).\
+ options(subqueryload(Foo.related)).all(),
[
- Bar(id=1,related=Related(id=1)),
- Bar(id=2,related=Related(id=2)),
- Baz(id=3,related=Related(id=1)),
- Baz(id=4,related=Related(id=2))
+ Bar(id=1, related=Related(id=1)),
+ Bar(id=2, related=Related(id=2)),
+ Baz(id=3, related=Related(id=1)),
+ Baz(id=4, related=Related(id=2))
]
)
self.assert_sql_count(testing.db, go, 2)
+ def test_caches_query_per_base_joined(self):
+ # technically this should be in test_eager_relations
+ Foo, Bar, Baz, Related = self.classes.Foo, self.classes.Bar, \
+ self.classes.Baz, self.classes.Related
+ s = Session(testing.db)
+ def go():
+ eq_(
+ s.query(Foo).with_polymorphic([Bar, Baz]).\
+ order_by(Foo.id).\
+ options(joinedload(Foo.related)).all(),
+ [
+ Bar(id=1, related=Related(id=1)),
+ Bar(id=2, related=Related(id=2)),
+ Baz(id=3, related=Related(id=1)),
+ Baz(id=4, related=Related(id=2))
+ ]
+ )
+ self.assert_sql_count(testing.db, go, 1)
+
class CyclicalInheritingEagerTestOne(fixtures.MappedTest):
@classmethod
def test_from_subclass(self):
Director = self.classes.Director
- PersistentObject = self.classes.PersistentObject
-
s = create_session()
ctx = s.query(Director).options(subqueryload('*'))._compile_context()
- q = ctx.attributes[('subquery', (inspect(PersistentObject), 'movies'))]
+ q = ctx.attributes[('subquery',
+ (inspect(Director), inspect(Director).attrs.movies))]
self.assert_compile(q,
"SELECT anon_1.movie_id AS anon_1_movie_id, "
"anon_1.persistent_id AS anon_1_persistent_id, "
d = session.query(Director).options(subqueryload('*')).first()
assert len(list(session)) == 3
-from . import test_eager_relations
-
-class WarnFor2614Test(test_eager_relations.WarnFor2614TestBase, fixtures.MappedTest):
- eager_name = "subquery"
-
- def eager_option(self, arg):
- return subqueryload(arg)
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import Table
-from sqlalchemy.orm import aliased
+from sqlalchemy.orm import aliased, with_polymorphic
from sqlalchemy.orm import mapper, create_session
from sqlalchemy.testing import fixtures
from test.orm import _fixtures
key = util.identity_key(User, row=row)
eq_(key, (User, (1,)))
+
class PathRegistryTest(_fixtures.FixtureTest):
run_setup_mappers = 'once'
run_inserts = None
umapper = inspect(self.classes.User)
is_(
util.RootRegistry()[umapper],
- umapper._sa_path_registry
+ umapper._path_registry
)
eq_(
util.RootRegistry()[umapper],
path = PathRegistry.coerce((umapper,))
eq_(
- path['addresses'][amapper]['email_address'],
- PathRegistry.coerce((umapper, 'addresses',
- amapper, 'email_address'))
+ path[umapper.attrs.addresses][amapper]
+ [amapper.attrs.email_address],
+ PathRegistry.coerce((umapper, umapper.attrs.addresses,
+ amapper, amapper.attrs.email_address))
)
def test_entity_boolean(self):
def test_key_boolean(self):
umapper = inspect(self.classes.User)
- path = PathRegistry.coerce((umapper, 'addresses'))
+ path = PathRegistry.coerce((umapper, umapper.attrs.addresses))
is_(bool(path), True)
def test_aliased_class(self):
User = self.classes.User
ua = aliased(User)
- path = PathRegistry.coerce((ua, 'addresses'))
+ ua_insp = inspect(ua)
+ path = PathRegistry.coerce((ua_insp, ua_insp.mapper.attrs.addresses))
assert path.parent.is_aliased_class
def test_indexed_entity(self):
umapper = inspect(self.classes.User)
amapper = inspect(self.classes.Address)
- path = PathRegistry.coerce((umapper, 'addresses',
- amapper, 'email_address'))
+ path = PathRegistry.coerce((umapper, umapper.attrs.addresses,
+ amapper, amapper.attrs.email_address))
is_(path[0], umapper)
is_(path[2], amapper)
def test_indexed_key(self):
umapper = inspect(self.classes.User)
amapper = inspect(self.classes.Address)
- path = PathRegistry.coerce((umapper, 'addresses',
- amapper, 'email_address'))
- eq_(path[1], 'addresses')
- eq_(path[3], 'email_address')
+ path = PathRegistry.coerce((umapper, umapper.attrs.addresses,
+ amapper, amapper.attrs.email_address))
+ eq_(path[1], umapper.attrs.addresses)
+ eq_(path[3], amapper.attrs.email_address)
def test_slice(self):
umapper = inspect(self.classes.User)
amapper = inspect(self.classes.Address)
- path = PathRegistry.coerce((umapper, 'addresses',
- amapper, 'email_address'))
- eq_(path[1:3], ('addresses', amapper))
+ path = PathRegistry.coerce((umapper, umapper.attrs.addresses,
+ amapper, amapper.attrs.email_address))
+ eq_(path[1:3], (umapper.attrs.addresses, amapper))
def test_addition(self):
umapper = inspect(self.classes.User)
amapper = inspect(self.classes.Address)
- p1 = PathRegistry.coerce((umapper, 'addresses'))
- p2 = PathRegistry.coerce((amapper, 'email_address'))
+ p1 = PathRegistry.coerce((umapper, umapper.attrs.addresses))
+ p2 = PathRegistry.coerce((amapper, amapper.attrs.email_address))
eq_(
p1 + p2,
- PathRegistry.coerce((umapper, 'addresses',
- amapper, 'email_address'))
+ PathRegistry.coerce((umapper, umapper.attrs.addresses,
+ amapper, amapper.attrs.email_address))
)
def test_length(self):
amapper = inspect(self.classes.Address)
pneg1 = PathRegistry.coerce(())
p0 = PathRegistry.coerce((umapper,))
- p1 = PathRegistry.coerce((umapper, 'addresses'))
- p2 = PathRegistry.coerce((umapper, 'addresses', amapper))
- p3 = PathRegistry.coerce((umapper, 'addresses',
- amapper, 'email_address'))
+ p1 = PathRegistry.coerce((umapper, umapper.attrs.addresses))
+ p2 = PathRegistry.coerce((umapper, umapper.attrs.addresses, amapper))
+ p3 = PathRegistry.coerce((umapper, umapper.attrs.addresses,
+ amapper, amapper.attrs.email_address))
eq_(len(pneg1), 0)
eq_(len(p0), 1)
def test_eq(self):
umapper = inspect(self.classes.User)
amapper = inspect(self.classes.Address)
- p1 = PathRegistry.coerce((umapper, 'addresses'))
- p2 = PathRegistry.coerce((umapper, 'addresses'))
- p3 = PathRegistry.coerce((umapper, 'other'))
- p4 = PathRegistry.coerce((amapper, 'addresses'))
- p5 = PathRegistry.coerce((umapper, 'addresses', amapper))
- p6 = PathRegistry.coerce((amapper, 'user', umapper, 'addresses'))
- p7 = PathRegistry.coerce((amapper, 'user', umapper, 'addresses',
- amapper, 'email_address'))
+ u_alias = inspect(aliased(self.classes.User))
+ p1 = PathRegistry.coerce((umapper, umapper.attrs.addresses))
+ p2 = PathRegistry.coerce((umapper, umapper.attrs.addresses))
+ p3 = PathRegistry.coerce((umapper, umapper.attrs.name))
+ p4 = PathRegistry.coerce((u_alias, umapper.attrs.addresses))
+ p5 = PathRegistry.coerce((umapper, umapper.attrs.addresses, amapper))
+ p6 = PathRegistry.coerce((amapper, amapper.attrs.user, umapper,
+ umapper.attrs.addresses))
+ p7 = PathRegistry.coerce((amapper, amapper.attrs.user, umapper,
+ umapper.attrs.addresses,
+ amapper, amapper.attrs.email_address))
is_(p1 == p2, True)
is_(p1 == p3, False)
def test_contains_mapper(self):
umapper = inspect(self.classes.User)
amapper = inspect(self.classes.Address)
- p1 = PathRegistry.coerce((umapper, 'addresses'))
+ p1 = PathRegistry.coerce((umapper, umapper.attrs.addresses))
assert p1.contains_mapper(umapper)
assert not p1.contains_mapper(amapper)
umapper = inspect(self.classes.User)
amapper = inspect(self.classes.Address)
- p1 = PathRegistry.coerce((umapper, 'addresses'))
- p2 = PathRegistry.coerce((umapper, 'addresses', amapper))
- p3 = PathRegistry.coerce((amapper, 'email_address'))
+ p1 = PathRegistry.coerce((umapper, umapper.attrs.addresses))
+ p2 = PathRegistry.coerce((umapper, umapper.attrs.addresses, amapper))
+ p3 = PathRegistry.coerce((amapper, amapper.attrs.email_address))
eq_(
- p1.path, (umapper, 'addresses')
+ p1.path, (umapper, umapper.attrs.addresses)
)
eq_(
- p2.path, (umapper, 'addresses', amapper)
+ p2.path, (umapper, umapper.attrs.addresses, amapper)
)
eq_(
- p3.path, (amapper, 'email_address')
+ p3.path, (amapper, amapper.attrs.email_address)
)
def test_registry_set(self):
umapper = inspect(self.classes.User)
amapper = inspect(self.classes.Address)
- p1 = PathRegistry.coerce((umapper, 'addresses'))
- p2 = PathRegistry.coerce((umapper, 'addresses', amapper))
- p3 = PathRegistry.coerce((amapper, 'email_address'))
+ p1 = PathRegistry.coerce((umapper, umapper.attrs.addresses))
+ p2 = PathRegistry.coerce((umapper, umapper.attrs.addresses, amapper))
+ p3 = PathRegistry.coerce((amapper, amapper.attrs.email_address))
p1.set(reg, "p1key", "p1value")
p2.set(reg, "p2key", "p2value")
umapper = inspect(self.classes.User)
amapper = inspect(self.classes.Address)
- p1 = PathRegistry.coerce((umapper, 'addresses'))
- p2 = PathRegistry.coerce((umapper, 'addresses', amapper))
- p3 = PathRegistry.coerce((amapper, 'email_address'))
+ p1 = PathRegistry.coerce((umapper, umapper.attrs.addresses))
+ p2 = PathRegistry.coerce((umapper, umapper.attrs.addresses, amapper))
+ p3 = PathRegistry.coerce((amapper, amapper.attrs.email_address))
reg.update(
{
('p1key', p1.path): 'p1value',
umapper = inspect(self.classes.User)
amapper = inspect(self.classes.Address)
- p1 = PathRegistry.coerce((umapper, 'addresses'))
- p2 = PathRegistry.coerce((umapper, 'addresses', amapper))
- p3 = PathRegistry.coerce((amapper, 'email_address'))
+ p1 = PathRegistry.coerce((umapper, umapper.attrs.addresses))
+ p2 = PathRegistry.coerce((umapper, umapper.attrs.addresses, amapper))
+ p3 = PathRegistry.coerce((amapper, amapper.attrs.email_address))
reg.update(
{
('p1key', p1.path): 'p1value',
umapper = inspect(self.classes.User)
amapper = inspect(self.classes.Address)
- p1 = PathRegistry.coerce((umapper, 'addresses'))
- p2 = PathRegistry.coerce((umapper, 'addresses', amapper))
+ p1 = PathRegistry.coerce((umapper, umapper.attrs.addresses))
+ p2 = PathRegistry.coerce((umapper, umapper.attrs.addresses, amapper))
reg.update(
{
('p1key', p1.path): 'p1value',
umapper = inspect(self.classes.User)
amapper = inspect(self.classes.Address)
- p1 = PathRegistry.coerce((umapper, 'addresses', amapper,
- 'email_address'))
- p2 = PathRegistry.coerce((umapper, 'addresses', amapper))
- p3 = PathRegistry.coerce((umapper, 'addresses'))
+ p1 = PathRegistry.coerce((umapper, umapper.attrs.addresses, amapper,
+ amapper.attrs.email_address))
+ p2 = PathRegistry.coerce((umapper, umapper.attrs.addresses, amapper))
+ p3 = PathRegistry.coerce((umapper, umapper.attrs.addresses))
eq_(
p1.serialize(),
[(User, "addresses"), (Address, "email_address")]
amapper = inspect(self.classes.Address)
- p1 = PathRegistry.coerce((umapper, 'addresses', amapper,
- 'email_address'))
- p2 = PathRegistry.coerce((umapper, 'addresses', amapper))
- p3 = PathRegistry.coerce((umapper, 'addresses'))
+ p1 = PathRegistry.coerce((umapper, umapper.attrs.addresses, amapper,
+ amapper.attrs.email_address))
+ p2 = PathRegistry.coerce((umapper, umapper.attrs.addresses, amapper))
+ p3 = PathRegistry.coerce((umapper, umapper.attrs.addresses))
eq_(
PathRegistry.deserialize([(User, "addresses"),
PathRegistry.deserialize([(User, "addresses")]),
p3
)
+
+from .inheritance import _poly_fixtures
+class PathRegistryInhTest(_poly_fixtures._Polymorphic):
+ run_setup_mappers = 'once'
+ run_inserts = None
+ run_deletes = None
+
+ def test_plain(self):
+ Person = _poly_fixtures.Person
+ Engineer = _poly_fixtures.Engineer
+ pmapper = inspect(Person)
+ emapper = inspect(Engineer)
+
+ p1 = PathRegistry.coerce((pmapper, emapper.attrs.machines))
+
+ # given a mapper and an attribute on a subclass,
+ # the path converts what you get to be against that subclass
+ eq_(
+ p1.path,
+ (emapper, emapper.attrs.machines)
+ )
+
+ def test_plain_compound(self):
+ Company = _poly_fixtures.Company
+ Person = _poly_fixtures.Person
+ Engineer = _poly_fixtures.Engineer
+ cmapper = inspect(Company)
+ pmapper = inspect(Person)
+ emapper = inspect(Engineer)
+
+ p1 = PathRegistry.coerce((cmapper, cmapper.attrs.employees,
+ pmapper, emapper.attrs.machines))
+
+ # given a mapper and an attribute on a subclass,
+ # the path converts what you get to be against that subclass
+ eq_(
+ p1.path,
+ (cmapper, cmapper.attrs.employees, emapper, emapper.attrs.machines)
+ )
+
+ def test_plain_aliased(self):
+ Person = _poly_fixtures.Person
+ Engineer = _poly_fixtures.Engineer
+ emapper = inspect(Engineer)
+
+ p_alias = aliased(Person)
+ p_alias = inspect(p_alias)
+
+ p1 = PathRegistry.coerce((p_alias, emapper.attrs.machines))
+ # plain AliasedClass - the path keeps that AliasedClass directly
+ # as is in the path
+ eq_(
+ p1.path,
+ (p_alias, emapper.attrs.machines)
+ )
+
+ def test_plain_aliased_compound(self):
+ Company = _poly_fixtures.Company
+ Person = _poly_fixtures.Person
+ Engineer = _poly_fixtures.Engineer
+ cmapper = inspect(Company)
+ emapper = inspect(Engineer)
+
+ c_alias = aliased(Company)
+ p_alias = aliased(Person)
+
+ c_alias = inspect(c_alias)
+ p_alias = inspect(p_alias)
+
+ p1 = PathRegistry.coerce((c_alias, cmapper.attrs.employees,
+ p_alias, emapper.attrs.machines))
+ # plain AliasedClass - the path keeps that AliasedClass directly
+ # as is in the path
+ eq_(
+ p1.path,
+ (c_alias, cmapper.attrs.employees, p_alias, emapper.attrs.machines)
+ )
+
+ def test_with_poly_sub(self):
+ Person = _poly_fixtures.Person
+ Engineer = _poly_fixtures.Engineer
+ emapper = inspect(Engineer)
+
+ p_poly = with_polymorphic(Person, [Engineer])
+ e_poly = inspect(p_poly.Engineer)
+ p_poly = inspect(p_poly)
+
+ p1 = PathRegistry.coerce((p_poly, emapper.attrs.machines))
+
+ # polymorphic AliasedClass - the path uses _entity_for_mapper()
+ # to get the most specific sub-entity
+ eq_(
+ p1.path,
+ (e_poly, emapper.attrs.machines)
+ )
+
+ def test_with_poly_base(self):
+ Person = _poly_fixtures.Person
+ Engineer = _poly_fixtures.Engineer
+ pmapper = inspect(Person)
+ emapper = inspect(Engineer)
+
+ p_poly = with_polymorphic(Person, [Engineer])
+ p_poly = inspect(p_poly)
+
+ # "name" is actually on Person, not Engineer
+ p1 = PathRegistry.coerce((p_poly, emapper.attrs.name))
+
+ # polymorphic AliasedClass - because "name" is on Person,
+ # we get Person, not Engineer
+ eq_(
+ p1.path,
+ (p_poly, pmapper.attrs.name)
+ )
+
+ def test_with_poly_use_mapper(self):
+ Person = _poly_fixtures.Person
+ Engineer = _poly_fixtures.Engineer
+ emapper = inspect(Engineer)
+
+ p_poly = with_polymorphic(Person, [Engineer], _use_mapper_path=True)
+ p_poly = inspect(p_poly)
+
+ p1 = PathRegistry.coerce((p_poly, emapper.attrs.machines))
+
+ # polymorphic AliasedClass with the "use_mapper_path" flag -
+ # the AliasedClass acts just like the base mapper
+ eq_(
+ p1.path,
+ (emapper, emapper.attrs.machines)
+ )
+