- factored AliasedClauses out of EagerLoader into its own unit; Query makes heavy use of it for joins
- added support to Query for full interaction between aliased and nonalised joins with appended entities, columns, and externally-mapped columns
from sqlalchemy import sql, util, exceptions, sql_util, logging
from sqlalchemy.orm import mapper, object_mapper
+from sqlalchemy.orm import util as mapperutil
from sqlalchemy.orm.interfaces import OperationContext, LoaderStack
import operator
selected from the aliased join created via those methods.
"""
q = self._clone()
+
+ if isinstance(entity, type):
+ entity = mapper.class_mapper(entity)
+ if alias is not None:
+ alias = mapperutil.AliasedClauses(entity.mapped_table, alias=alias)
+
q._entities = q._entities + [(entity, alias, id)]
return q
- def add_column(self, column):
+ def add_column(self, column, id=None):
"""add a SQL ColumnElement to the list of result columns to be returned.
This will have the effect of all result-returning methods returning a tuple
q = self._clone()
# alias non-labeled column elements.
- # TODO: make the generation deterministic
if isinstance(column, sql.ColumnElement) and not hasattr(column, '_label'):
column = column.label(None)
- q._entities = q._entities + [column]
+ q._entities = q._entities + [(column, None, id)]
return q
def options(self, *args):
if self._aliases is not None:
- # adapt only the *last* alias in the list for now.
- # this helps a self-referential join to work, i.e. table.join(table.alias(a)).join(table.alias(b))
- criterion = sql_util.ClauseAdapter(self._aliases[-1]).traverse(criterion, clone=True)
+ criterion = self._aliases.adapt_clause(criterion)
q = self._clone()
if q._criterion is not None:
else:
return self.filter(clause)
- def _join_to(self, keys, outerjoin=False, start=None, create_aliases=False):
+ def _join_to(self, keys, outerjoin=False, start=None, create_aliases=True):
if start is None:
start = self._joinpoint
clause = self._from_obj[-1]
-
+
currenttables = [clause]
class FindJoinedTables(sql.NoColumnVisitor):
def visit_join(self, join):
currenttables.append(join.right)
FindJoinedTables().traverse(clause)
+
mapper = start
alias = None
- aliases = []
for key in util.to_list(keys):
prop = mapper.get_property(key, resolve_synonyms=True)
if prop._is_self_referential() and not create_aliases:
- # TODO: create_aliases automatically ? probably
- raise exceptions.InvalidRequestError("Self-referential query on '%s' property requries create_aliases=True argument." % str(prop))
- # dont re-join to a table already in our from objects
- # TODO: this code has a little bit of overlap with strategies.EagerLoader.AliasedClauses. possibly
- # look into generalizing that functionality for usage in both places
+ raise exceptions.InvalidRequestError("Self-referential query on '%s' property requires create_aliases=True argument." % str(prop))
+
if prop.select_table not in currenttables or create_aliases:
- if outerjoin:
- if prop.secondary:
- clause = clause.outerjoin(prop.secondary, prop.get_join(mapper, primary=True, secondary=False))
- clause = clause.outerjoin(prop.select_table, prop.get_join(mapper, primary=False))
+ if prop.secondary:
+ if create_aliases:
+ alias = mapperutil.PropertyAliasedClauses(prop,
+ prop.get_join(mapper, primary=True, secondary=False),
+ prop.get_join(mapper, primary=False, secondary=True),
+ alias
+ )
+ clause = clause.join(alias.secondary, alias.primaryjoin, isouter=outerjoin).join(alias.alias, alias.secondaryjoin, isouter=outerjoin)
else:
- clause = clause.outerjoin(prop.select_table, prop.get_join(mapper))
+ clause = clause.join(prop.secondary, prop.get_join(mapper, primary=True, secondary=False), isouter=outerjoin)
+ clause = clause.join(prop.select_table, prop.get_join(mapper, primary=False), isouter=outerjoin)
else:
- if prop.secondary:
- if create_aliases:
- join = prop.get_join(mapper, primary=True, secondary=False)
- secondary_alias = prop.secondary.alias()
- aliases.append(secondary_alias)
- if alias is not None:
- join = sql_util.ClauseAdapter(alias).traverse(join, clone=True)
- sql_util.ClauseAdapter(secondary_alias).traverse(join)
- clause = clause.join(secondary_alias, join)
- alias = prop.select_table.alias()
- aliases.append(alias)
- join = prop.get_join(mapper, primary=False)
- join = sql_util.ClauseAdapter(secondary_alias).traverse(join, clone=True)
- sql_util.ClauseAdapter(alias).traverse(join)
- clause = clause.join(alias, join)
- else:
- clause = clause.join(prop.secondary, prop.get_join(mapper, primary=True, secondary=False))
- clause = clause.join(prop.select_table, prop.get_join(mapper, primary=False))
+ if create_aliases:
+ alias = mapperutil.PropertyAliasedClauses(prop,
+ prop.get_join(mapper, primary=True, secondary=False),
+ None,
+ alias
+ )
+ clause = clause.join(alias.alias, alias.primaryjoin, isouter=outerjoin)
else:
- if create_aliases:
- join = prop.get_join(mapper)
- if alias is not None:
- join = sql_util.ClauseAdapter(alias, exclude=prop.remote_side).traverse(join, clone=True)
- alias = prop.select_table.alias()
- aliases.append(alias)
- join = sql_util.ClauseAdapter(alias, exclude=prop.local_side).traverse(join, clone=True)
- clause = clause.join(alias, join)
- else:
- clause = clause.join(prop.select_table, prop.get_join(mapper))
+ clause = clause.join(prop.select_table, prop.get_join(mapper), isouter=outerjoin)
mapper = prop.mapper
+
if create_aliases:
- return (clause, mapper, aliases)
+ return (clause, mapper, alias)
else:
return (clause, mapper, None)
q._group_by = q._group_by + util.to_list(criterion)
return q
- def join(self, prop, aliased=False, id=None):
+ def join(self, prop, id=None, aliased=False):
"""create a join of this ``Query`` object's criterion
to a relationship and return the newly resulting ``Query``.
'prop' may be a string property name or a list of string
property names.
"""
-
- q = self._clone()
- (clause, mapper, aliases) = self._join_to(prop, outerjoin=False, start=self.mapper, create_aliases=aliased)
- q._from_obj = [clause]
- q._joinpoint = mapper
- q._aliases = aliases
- if id:
- q._alias_ids[id] = aliases[-1]
- return q
- def outerjoin(self, prop, aliased=False, id=None):
+ return self._join(prop, id=id, outerjoin=False, aliased=aliased)
+
+ def outerjoin(self, prop, id=None, aliased=False):
"""create a left outer join of this ``Query`` object's criterion
to a relationship and return the newly resulting ``Query``.
'prop' may be a string property name or a list of string
property names.
"""
+
+ return self._join(prop, id=id, outerjoin=True, aliased=aliased)
+
+ def _join(self, prop, id, outerjoin, aliased):
+ (clause, mapper, aliases) = self._join_to(prop, outerjoin=outerjoin, start=self.mapper, create_aliases=aliased)
q = self._clone()
- (clause, mapper, aliases) = self._join_to(prop, outerjoin=True, start=self.mapper, create_aliases=aliased)
q._from_obj = [clause]
q._joinpoint = mapper
q._aliases = aliases
+
+ a = aliases
+ while a is not None:
+ q._alias_ids.setdefault(a.mapper, []).append(a)
+ q._alias_ids.setdefault(a.table, []).append(a)
+ q._alias_ids.setdefault(a.alias, []).append(a)
+ a = a.parentclauses
+
if id:
- q._alias_ids[id] = aliases[-1]
+ q._alias_ids[id] = aliases
return q
def reset_joinpoint(self):
process = []
mappers_or_columns = tuple(self._entities) + mappers_or_columns
if mappers_or_columns:
- for m in mappers_or_columns:
- if isinstance(m, tuple):
- (m, alias, alias_id) = m
- if alias_id is not None:
- try:
- alias = self._alias_ids[alias_id]
- except KeyError:
- raise exceptions.InvalidRequestError("Query has no alias identified by '%s'" % alias_id)
+ for tup in mappers_or_columns:
+ if isinstance(tup, tuple):
+ (m, alias, alias_id) = tup
+ clauses = self._get_entity_clauses(tup)
else:
- alias = alias_id = None
+ clauses = alias = alias_id = None
+ m = tup
if isinstance(m, type):
m = mapper.class_mapper(m)
if isinstance(m, mapper.Mapper):
def x(m):
- row_adapter = sql_util.create_row_adapter(alias, m.select_table)
+ row_adapter = clauses is not None and clauses.row_decorator or (lambda row: row)
appender = []
def proc(context, row):
if not m._instance(context, row_adapter(row), appender):
x(m)
elif isinstance(m, (sql.ColumnElement, basestring)):
def y(m):
+ row_adapter = clauses is not None and clauses.row_decorator or (lambda row: row)
res = []
def proc(context, row):
- res.append(row[m])
+ res.append(row_adapter(row)[m])
process.append((proc, res))
y(m)
result = []
value.setup(context)
# additional entities/columns, add those to selection criterion
- for m in self._entities:
- if isinstance(m, tuple):
- (m, alias, alias_id) = m
- if alias_id is not None:
- try:
- alias = self._alias_ids[alias_id]
- except KeyError:
- raise exceptions.InvalidRequestError("Query has no alias identified by '%s'" % alias_id)
- if isinstance(m, type):
- m = mapper.class_mapper(m)
- if isinstance(m, mapper.Mapper):
- for value in m.iterate_properties:
- value.setup(context, eagertable=alias)
+ for tup in self._entities:
+ (m, alias, alias_id) = tup
+ clauses = self._get_entity_clauses(tup)
+ if isinstance(m, mapper.Mapper):
+ for value in m.iterate_properties:
+ value.setup(context, parentclauses=clauses)
elif isinstance(m, sql.ColumnElement):
+ if clauses is not None:
+ m = clauses.adapt_clause(m)
statement.append_column(m)
return statement
+ def _get_entity_clauses(self, m):
+ """for tuples added via add_entity() or add_column(), attempt to locate
+ an AliasedClauses object which should be used to formulate the query as well
+ as to process result rows."""
+ (m, alias, alias_id) = m
+ if alias is not None:
+ return alias
+ if alias_id is not None:
+ try:
+ return self._alias_ids[alias_id]
+ except KeyError:
+ raise exceptions.InvalidRequestError("Query has no alias identified by '%s'" % alias_id)
+ if isinstance(m, type):
+ m = mapper.class_mapper(m)
+ if isinstance(m, mapper.Mapper):
+ l = self._alias_ids.get(m)
+ if l:
+ if len(l) > 1:
+ raise exceptions.InvalidRequestError("Ambiguous join for entity '%s'; specify id=<someid> to query.join()/query.add_entity()" % str(m))
+ else:
+ return l[0]
+ else:
+ return None
+ elif isinstance(m, sql.ColumnElement):
+ aliases = []
+ for table in sql_util.TableFinder(m, check_columns=True):
+ for a in self._alias_ids.get(table, []):
+ aliases.append(a)
+ if len(aliases) > 1:
+ raise exceptions.InvalidRequestError("Ambiguous join for entity '%s'; specify id=<someid> to query.join()/query.add_column()" % str(m))
+ elif len(aliases) == 1:
+ return aliases[0]
+ else:
+ return None
+
def __log_debug(self, msg):
self.logger.debug(msg)
self._should_log_debug = logging.is_debug_enabled(self.logger)
self.is_composite = hasattr(self.parent_property, 'composite_class')
- def setup_query(self, context, eagertable=None, parentclauses=None, **kwargs):
+ def setup_query(self, context, parentclauses=None, **kwargs):
for c in self.columns:
if parentclauses is not None:
context.statement.append_column(parentclauses.aliased_column(c))
- elif eagertable is not None:
- context.statement.append_column(eagertable.corresponding_column(c))
else:
context.statement.append_column(c)
self.clauses = {}
self.join_depth = self.parent_property.join_depth
-
- class AliasedClauses(object):
- """Defines a set of join conditions and table aliases which
- are aliased on a randomly-generated alias name, corresponding
- to the connection of an optional parent AliasedClauses object
- and a target mapper.
-
- EagerLoader has a distinct AliasedClauses object per parent
- AliasedClauses object, so that all paths from one mapper to
- another across a chain of eagerloaders generates a distinct
- chain of joins. The AliasedClauses objects are generated and
- cached on an as-needed basis.
-
- E.g.::
-
- mapper A -->
- (EagerLoader 'items') -->
- mapper B -->
- (EagerLoader 'keywords') -->
- mapper C
-
- """
-
- def __init__(self, eagerloader, parentclauses=None):
- self.parent = eagerloader
- self.target = eagerloader.select_table
- self.eagertarget = eagerloader.select_table.alias(None)
- self.extra_cols = {}
- if parentclauses is not None:
- self.path = parentclauses.path + (self.parent.parent, self.parent.key)
- else:
- self.path = (self.parent.parent, self.parent.key)
-
- if eagerloader.secondary:
- self.eagersecondary = eagerloader.secondary.alias(None)
- if parentclauses is not None:
- aliasizer = sql_util.ClauseAdapter(self.eagertarget).\
- chain(sql_util.ClauseAdapter(self.eagersecondary)).\
- chain(sql_util.ClauseAdapter(parentclauses.eagertarget))
- else:
- aliasizer = sql_util.ClauseAdapter(self.eagertarget).\
- chain(sql_util.ClauseAdapter(self.eagersecondary))
- self.eagersecondaryjoin = eagerloader.polymorphic_secondaryjoin
- self.eagersecondaryjoin = aliasizer.traverse(self.eagersecondaryjoin, clone=True)
- self.eagerprimary = eagerloader.polymorphic_primaryjoin
- self.eagerprimary = aliasizer.traverse(self.eagerprimary, clone=True)
- else:
- self.eagerprimary = eagerloader.polymorphic_primaryjoin
-
- if parentclauses is not None:
- aliasizer = sql_util.ClauseAdapter(self.eagertarget, exclude=eagerloader.parent_property.local_side)
- aliasizer.chain(sql_util.ClauseAdapter(parentclauses.eagertarget, exclude=eagerloader.parent_property.remote_side))
- else:
- aliasizer = sql_util.ClauseAdapter(self.eagertarget, exclude=eagerloader.parent_property.local_side)
- self.eagerprimary = aliasizer.traverse(self.eagerprimary, clone=True)
-
- if eagerloader.order_by:
- self.eager_order_by = sql_util.ClauseAdapter(self.eagertarget).copy_and_process(util.to_list(eagerloader.order_by))
- else:
- self.eager_order_by = None
- self._row_decorator = sql_util.create_row_adapter(self.eagertarget, self.target)
-
- def __str__(self):
- return "->".join([str(s) for s in self.path])
-
- def aliased_column(self, column):
- """return the aliased version of the given column, creating a new label for it if not already
- present in this AliasedClauses eagertable."""
-
- conv = self.eagertarget.corresponding_column(column, raiseerr=False)
- if conv:
- return conv
-
- if column in self.extra_cols:
- return self.extra_cols[column]
-
- aliased_column = column
- # for column-level subqueries, swap out its selectable with our
- # eager version as appropriate, and manually build the
- # "correlation" list of the subquery.
- class ModifySubquery(sql.ClauseVisitor):
- def visit_select(s, select):
- select._should_correlate = False
- select.append_correlation(self.eagertarget)
- aliased_column = sql_util.ClauseAdapter(self.eagertarget).chain(ModifySubquery()).traverse(aliased_column, clone=True)
- aliased_column = aliased_column.label(None)
- self._row_decorator.map[column] = aliased_column
- self.extra_cols[column] = aliased_column
- return aliased_column
-
def init_class_attribute(self):
self.parent_property._get_strategy(LazyLoader).init_class_attribute()
- def setup_query(self, context, eagertable=None, parentclauses=None, parentmapper=None, **kwargs):
+ def setup_query(self, context, parentclauses=None, parentmapper=None, **kwargs):
"""Add a left outer join to the statement thats being constructed."""
# build a path as we setup the query. the format of this path
try:
clauses = self.clauses[path]
except KeyError:
- clauses = EagerLoader.AliasedClauses(self, parentclauses)
+ clauses = mapperutil.PropertyAliasedClauses(self.parent_property, self.parent_property.polymorphic_primaryjoin, self.parent_property.polymorphic_secondaryjoin, parentclauses)
self.clauses[path] = clauses
if self.secondaryjoin is not None:
- statement._outerjoin = sql.outerjoin(towrap, clauses.eagersecondary, clauses.eagerprimary).outerjoin(clauses.eagertarget, clauses.eagersecondaryjoin)
+ statement._outerjoin = sql.outerjoin(towrap, clauses.secondary, clauses.primaryjoin).outerjoin(clauses.alias, clauses.secondaryjoin)
if self.order_by is False and self.secondary.default_order_by() is not None:
- statement.append_order_by(*clauses.eagersecondary.default_order_by())
+ statement.append_order_by(*clauses.secondary.default_order_by())
else:
- statement._outerjoin = towrap.outerjoin(clauses.eagertarget, clauses.eagerprimary)
- if self.order_by is False and clauses.eagertarget.default_order_by() is not None:
- statement.append_order_by(*clauses.eagertarget.default_order_by())
+ statement._outerjoin = towrap.outerjoin(clauses.alias, clauses.primaryjoin)
+ if self.order_by is False and clauses.alias.default_order_by() is not None:
+ statement.append_order_by(*clauses.alias.default_order_by())
- if clauses.eager_order_by:
- statement.append_order_by(*util.to_list(clauses.eager_order_by))
+ if clauses.order_by:
+ statement.append_order_by(*util.to_list(clauses.order_by))
statement.append_from(statement._outerjoin)
try:
# decorate the row according to the stored AliasedClauses for this eager load
clauses = self.clauses[path]
- decorator = clauses._row_decorator
+ decorator = clauses.row_decorator
except KeyError, k:
# no stored AliasedClauses: eager loading was not set up in the query and
# AliasedClauses never got initialized
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
-from sqlalchemy import sql, util, exceptions
+from sqlalchemy import sql, util, exceptions, sql_util
from sqlalchemy.orm.interfaces import MapperExtension, EXT_PASS
all_cascades = util.Set(["delete", "delete-orphan", "all", "merge",
def visit_binary(self, binary):
self.func(binary)
+class AliasedClauses(object):
+ """Creates aliases of a mapped tables for usage in ORM queries.
+ """
+
+ def __init__(self, mapped_table, alias=None):
+ if alias:
+ self.alias = alias
+ else:
+ self.alias = mapped_table.alias()
+ self.mapped_table = mapped_table
+ self.extra_cols = {}
+ self.row_decorator = self._create_row_adapter()
+
+ def aliased_column(self, column):
+ """return the aliased version of the given column, creating a new label for it if not already
+ present in this AliasedClauses."""
+
+ conv = self.alias.corresponding_column(column, raiseerr=False)
+ if conv:
+ return conv
+
+ if column in self.extra_cols:
+ return self.extra_cols[column]
+
+ aliased_column = column
+ # for column-level subqueries, swap out its selectable with our
+ # eager version as appropriate, and manually build the
+ # "correlation" list of the subquery.
+ class ModifySubquery(sql.ClauseVisitor):
+ def visit_select(s, select):
+ select._should_correlate = False
+ select.append_correlation(self.alias)
+ aliased_column = sql_util.ClauseAdapter(self.alias).chain(ModifySubquery()).traverse(aliased_column, clone=True)
+ aliased_column = aliased_column.label(None)
+ self.row_decorator.map[column] = aliased_column
+ # TODO: this is a little hacky
+ for attr in ('name', '_label'):
+ if hasattr(column, attr):
+ self.row_decorator.map[getattr(column, attr)] = aliased_column
+ self.extra_cols[column] = aliased_column
+ return aliased_column
+
+ def adapt_clause(self, clause):
+ return self.aliased_column(clause)
+# return sql_util.ClauseAdapter(self.alias).traverse(clause, clone=True)
+
+ def _create_row_adapter(self):
+ """Return a callable which,
+ when passed a RowProxy, will return a new dict-like object
+ that translates Column objects to that of this object's Alias before calling upon the row.
+
+ This allows a regular Table to be used to target columns in a row that was in reality generated from an alias
+ of that table, in such a way that the row can be passed to logic which knows nothing about the aliased form
+ of the table.
+ """
+ class AliasedRowAdapter(object):
+ def __init__(self, row):
+ self.row = row
+ def __contains__(self, key):
+ return key in map or key in self.row
+ def has_key(self, key):
+ return key in self
+ def __getitem__(self, key):
+ if key in map:
+ key = map[key]
+ return self.row[key]
+ def keys(self):
+ return map.keys()
+ map = {}
+ for c in self.alias.c:
+ parent = self.mapped_table.corresponding_column(c)
+ map[parent] = c
+ map[parent._label] = c
+ map[parent.name] = c
+ for c in self.extra_cols:
+ map[c] = self.extra_cols[c]
+ # TODO: this is a little hacky
+ for attr in ('name', '_label'):
+ if hasattr(c, attr):
+ map[getattr(c, attr)] = self.extra_cols[c]
+
+ AliasedRowAdapter.map = map
+ return AliasedRowAdapter
+
+
+class PropertyAliasedClauses(AliasedClauses):
+ """extends AliasedClauses to add support for primary/secondary joins on a relation()."""
+
+ def __init__(self, prop, primaryjoin, secondaryjoin, parentclauses=None):
+ super(PropertyAliasedClauses, self).__init__(prop.select_table)
+
+ self.parentclauses = parentclauses
+ if parentclauses is not None:
+ self.path = parentclauses.path + (prop.parent, prop.key)
+ else:
+ self.path = (prop.parent, prop.key)
+
+ self.prop = prop
+
+ if prop.secondary:
+ self.secondary = prop.secondary.alias()
+ if parentclauses is not None:
+ aliasizer = sql_util.ClauseAdapter(self.alias).\
+ chain(sql_util.ClauseAdapter(self.secondary)).\
+ chain(sql_util.ClauseAdapter(parentclauses.alias))
+ else:
+ aliasizer = sql_util.ClauseAdapter(self.alias).\
+ chain(sql_util.ClauseAdapter(self.secondary))
+ self.secondaryjoin = aliasizer.traverse(secondaryjoin, clone=True)
+ self.primaryjoin = aliasizer.traverse(primaryjoin, clone=True)
+ else:
+ if parentclauses is not None:
+ aliasizer = sql_util.ClauseAdapter(self.alias, exclude=prop.local_side)
+ aliasizer.chain(sql_util.ClauseAdapter(parentclauses.alias, exclude=prop.remote_side))
+ else:
+ aliasizer = sql_util.ClauseAdapter(self.alias, exclude=prop.local_side)
+ self.primaryjoin = aliasizer.traverse(primaryjoin, clone=True)
+ self.secondary = None
+ self.secondaryjoin = None
+
+ if prop.order_by:
+ self.order_by = sql_util.ClauseAdapter(self.alias).copy_and_process(util.to_list(prop.order_by))
+ else:
+ self.order_by = None
+
+ mapper = property(lambda self:self.prop.mapper)
+ table = property(lambda self:self.prop.select_table)
+
+ def __str__(self):
+ return "->".join([str(s) for s in self.path])
+
+
def instance_str(instance):
"""Return a string describing an instance."""
meth = getattr(v, "enter_%s" % obj.__visit_name__, None)
if meth:
meth(obj)
-
+
if clone:
obj._copy_internals()
for c in obj.get_children(**self.__traverse_options__):
self.type = sqltypes.to_instance(kwargs.get('type_', None))
self._bind = kwargs.get('bind', None)
self.group = kwargs.pop('group', True)
- self.clauses = ClauseList(operator=kwargs.get('operator', None), group_contents=kwargs.get('group_contents', True), *clauses)
+ clauses = ClauseList(operator=kwargs.get('operator', None), group_contents=kwargs.get('group_contents', True), *clauses)
if self.group:
- self.clause_expr = self.clauses.self_group()
+ self.clause_expr = clauses.self_group()
else:
- self.clause_expr = self.clauses
+ self.clause_expr = clauses
key = property(lambda self:self.name or "_calc_")
def _copy_internals(self):
self.clause_expr = self.clause_expr._clone()
-
+
+ def clauses(self):
+ if isinstance(self.clause_expr, _Grouping):
+ return self.clause_expr.elem
+ else:
+ return self.clause_expr
+ clauses = property(clauses)
+
def get_children(self, **kwargs):
return self.clause_expr,
key = property(lambda self:self.name)
def _copy_internals(self):
+ _CalculatedClause._copy_internals(self)
self._clone_from_clause()
def get_children(self, **kwargs):
columns = c = property(lambda s:s.elem.columns)
def _copy_internals(self):
+ print "GROPING COPY INTERNALS"
self.elem = self.elem._clone()
-
+ print "NEW ID", id(self.elem)
+
def get_children(self, **kwargs):
return self.elem,
corr = self.__correlate
if correlation_state is not None:
corr = correlation_state[self].get('correlate', util.Set()).union(corr)
- return froms.difference(corr)
+ f = froms.difference(corr)
+ if len(f) == 0:
+ raise exceptions.InvalidRequestError("Select statement '%s' is overcorrelated; returned no 'from' clauses" % str(self.__dont_correlate()))
+ return f
else:
return froms
s.append_from(fromclause)
return s
+ def __dont_correlate(self):
+ s = self._generate()
+ s._should_correlate = False
+ return s
+
def correlate(self, fromclause):
s = self._generate()
- s.append_correlation(fromclause)
+ s._should_correlate=False
+ if fromclause is None:
+ s.__correlate = util.Set()
+ else:
+ s.append_correlation(fromclause)
return s
def append_correlation(self, fromclause):
def visit_column(self, column):
if self.check_columns:
- self.traverse(column.table)
+ self.tables.append(column.table)
class ColumnFinder(sql.ClauseVisitor):
def __init__(self):
def visit_clauselist(self, clist):
for i in range(0, len(clist.clauses)):
n = self.convert_element(clist.clauses[i])
+ print "CONVERTEING CLAUSELIST W ID", id(clist)
if n is not None:
clist.clauses[i] = n
col = []
for elem in select._raw_columns:
+ print "RAW COLUMN", elem
n = self.convert_element(elem)
if n is None:
col.append(elem)
newcol = self.selectable.corresponding_column(equiv, raiseerr=False, require_embedded=True, keys_ok=False)
if newcol:
return newcol
+ #if newcol is None:
+ # self.traverse(col)
+ # return col
return newcol
-def create_row_adapter(alias, table):
- """given a sql.Alias and a target selectable, return a callable which,
- when passed a RowProxy, will return a new dict-like object
- that translates Column objects to that of the Alias before calling upon the row.
-
- This allows a regular Table to be used to target columns in a row that was in reality generated from an alias
- of that table, in such a way that the row can be passed to logic which knows nothing about the aliased form
- of the table.
- """
-
- if alias is None:
- return lambda row:row
-
- class AliasedRowAdapter(object):
- def __init__(self, row):
- self.row = row
- def __contains__(self, key):
- return key in map or key in self.row
- def has_key(self, key):
- return key in self
- def __getitem__(self, key):
- if key in map:
- key = map[key]
- return self.row[key]
- def keys(self):
- return map.keys()
- map = {}
- for c in alias.c:
- parent = table.corresponding_column(c)
- map[parent] = c
- map[parent._label] = c
- map[parent.name] = c
- AliasedRowAdapter.map = map
- return AliasedRowAdapter
sess.query(T).join('children').select_by(id=7)
assert False
except exceptions.InvalidRequestError, e:
- assert str(e) == "Self-referential query on 'T.children (T)' property requries create_aliases=True argument.", str(e)
+ assert str(e) == "Self-referential query on 'T.children (T)' property requires create_aliases=True argument.", str(e)
try:
sess.query(T).join(['children']).select_by(id=7)
assert False
except exceptions.InvalidRequestError, e:
- assert str(e) == "Self-referential query on 'T.children (T)' property requries create_aliases=True argument.", str(e)
+ assert str(e) == "Self-referential query on 'T.children (T)' property requires create_aliases=True argument.", str(e)
assert str(usingProperty) == "Engineer E4, status X"
session.clear()
-
+ print "-----------------------------------------------------------------"
# and now for the lightning round, eager !
car1 = session.query(Car).options(eagerload('employee')).get(car1.car_id)
assert str(car1.employee) == "Engineer E4, status X"
assert l[0].concat == l[0].user_id * 2 == 14
assert l[1].concat == l[1].user_id * 2 == 16
- def testexternalcolumns(self):
- """test creating mappings that reference external columns or functions"""
-
- f = (users.c.user_id *2).label('concat')
- try:
- mapper(User, users, properties={
- 'concat': f,
- })
- class_mapper(User)
- except exceptions.ArgumentError, e:
- assert str(e) == "Column '%s' is not represented in mapper's table. Use the `column_property()` function to force this column to be mapped as a read-only attribute." % str(f)
- clear_mappers()
-
- mapper(User, users, properties={
- 'concat': column_property(f),
- 'count': column_property(select([func.count(addresses.c.address_id)], users.c.user_id==addresses.c.user_id, scalar=True).label('count'))
- })
-
- mapper(Address, addresses, properties={
- 'user':relation(User, lazy=False)
- })
-
- sess = create_session()
- l = sess.query(User).select()
- for u in l:
- print "User", u.user_id, u.user_name, u.concat, u.count
- assert l[0].concat == l[0].user_id * 2 == 14
- assert l[1].concat == l[1].user_id * 2 == 16
-
- for option in (None, eagerload('user')):
- for x in range(0, 2):
- sess.clear()
- l = sess.query(Address)
- if option:
- l = l.options(option)
- l = l.all()
- for a in l:
- print "User", a.user.user_id, a.user.user_name, a.user.concat, a.user.count
- assert l[0].user.concat == l[0].user.user_id * 2 == 14
- assert l[1].user.concat == l[1].user.user_id * 2 == 16
- assert l[0].user.count == 1
- assert l[1].user.count == 3
-
-
@testbase.unsupported('firebird')
def testcount(self):
"""test the count function on Query.
})
mapper(Address, addresses)
mapper(Order, orders, properties={
- 'items':relation(Item, secondary=order_items), #m2m
+ 'items':relation(Item, secondary=order_items, order_by=items.c.id), #m2m
'address':relation(Address), # m2o
})
mapper(Item, items, properties={
class JoinTest(QueryTest):
def test_overlapping_paths(self):
- # load a user who has an order that contains item id 3 and address id 1 (order 3, owned by jack)
- result = create_session().query(User).join(['orders', 'items']).filter_by(id=3).join(['orders','address']).filter_by(id=1).all()
- assert [User(id=7, name='jack')] == result
+ for aliased in (True,False):
+ # load a user who has an order that contains item id 3 and address id 1 (order 3, owned by jack)
+ result = create_session().query(User).join(['orders', 'items'], aliased=aliased).filter_by(id=3).join(['orders','address'], aliased=aliased).filter_by(id=1).all()
+ assert [User(id=7, name='jack')] == result
def test_overlapping_paths_outerjoin(self):
result = create_session().query(User).outerjoin(['orders', 'items']).filter_by(id=3).outerjoin(['orders','address']).filter_by(id=1).all()
assert [User(id=7, name='jack')] == result
def test_reset_joinpoint(self):
- # load a user who has an order that contains item id 3 and address id 1 (order 3, owned by jack)
- result = create_session().query(User).join(['orders', 'items']).filter_by(id=3).reset_joinpoint().join(['orders','address']).filter_by(id=1).all()
- assert [User(id=7, name='jack')] == result
+ for aliased in (True, False):
+ # load a user who has an order that contains item id 3 and address id 1 (order 3, owned by jack)
+ result = create_session().query(User).join(['orders', 'items'], aliased=aliased).filter_by(id=3).reset_joinpoint().join(['orders','address'], aliased=aliased).filter_by(id=1).all()
+ assert [User(id=7, name='jack')] == result
- result = create_session().query(User).outerjoin(['orders', 'items']).filter_by(id=3).reset_joinpoint().outerjoin(['orders','address']).filter_by(id=1).all()
- assert [User(id=7, name='jack')] == result
+ result = create_session().query(User).outerjoin(['orders', 'items'], aliased=aliased).filter_by(id=3).reset_joinpoint().outerjoin(['orders','address'], aliased=aliased).filter_by(id=1).all()
+ assert [User(id=7, name='jack')] == result
def test_overlap_with_aliases(self):
oalias = orders.alias('oalias')
q = sess.query(User).join('orders').filter(Order.description=="order 3").join(['orders', 'items']).filter(Order.description=="item 1")
assert [] == q.all()
assert q.count() == 0
+
+ q = sess.query(User).join('orders', aliased=True).filter(Order.items.any(Item.description=='item 4'))
+ assert [User(id=7)] == q.all()
def test_aliased_add_entity(self):
"""test the usage of aliased joins with add_entity()"""
assert False
except exceptions.InvalidRequestError, e:
assert str(e) == "Query has no alias identified by 'fakeid'"
-
+
q = q.add_entity(Order, id='order1').add_entity(Item, id='item1')
assert q.count() == 1
assert [(User(id=7), Order(description='order 3'), Item(description='item 1'))] == q.all()
-
+
+ q = sess.query(User).add_entity(Order).join('orders', aliased=True).filter(Order.description=="order 3").join('orders', aliased=True).filter(Order.description=='order 4')
+ try:
+ q.compile()
+ assert False
+ except exceptions.InvalidRequestError, e:
+ assert str(e) == "Ambiguous join for entity 'Mapper|Order|orders'; specify id=<someid> to query.join()/query.add_entity()"
class SynonymTest(QueryTest):
keep_mappers = True
l = q.instances(selectquery.execute(), Address)
assert l == expected
- q = sess.query(User)
- q = q.add_entity(Address).outerjoin('addresses')
- l = q.all()
- assert l == expected
+ for aliased in (False, True):
+ q = sess.query(User)
+ q = q.add_entity(Address).outerjoin('addresses', aliased=aliased)
+ l = q.all()
+ assert l == expected
- q = sess.query(User).add_entity(Address)
- l = q.join('addresses').filter_by(email_address='ed@bettyboop.com').all()
- assert l == [(user8, address3)]
+ q = sess.query(User).add_entity(Address)
+ l = q.join('addresses', aliased=aliased).filter_by(email_address='ed@bettyboop.com').all()
+ assert l == [(user8, address3)]
- q = sess.query(User, Address).join('addresses').filter_by(email_address='ed@bettyboop.com')
- assert q.all() == [(user8, address3)]
+ q = sess.query(User, Address).join('addresses', aliased=aliased).filter_by(email_address='ed@bettyboop.com')
+ assert q.all() == [(user8, address3)]
- q = sess.query(User, Address).join('addresses').options(eagerload('addresses')).filter_by(email_address='ed@bettyboop.com')
- assert q.all() == [(user8, address3)]
+ q = sess.query(User, Address).join('addresses', aliased=aliased).options(eagerload('addresses')).filter_by(email_address='ed@bettyboop.com')
+ assert q.all() == [(user8, address3)]
def test_aliased_multi_mappers(self):
sess = create_session()
assert l == [(user8, address3)]
def test_multi_columns(self):
+ """test aliased/nonalised joins with the usage of add_column()"""
sess = create_session()
(user7, user8, user9, user10) = sess.query(User).all()
expected = [(user7, 1),
(user9, 1),
(user10, 0)
]
-
- q = sess.query(User)
- q = q.group_by([c for c in users.c]).order_by(User.id).outerjoin('addresses').add_column(func.count(addresses.c.id).label('count'))
- l = q.all()
- assert l == expected
+
+ for aliased in (False, True):
+ q = sess.query(User)
+ q = q.group_by([c for c in users.c]).order_by(User.id).outerjoin('addresses', aliased=aliased).add_column(func.count(addresses.c.id).label('count'))
+ l = q.all()
+ assert l == expected
s = select([users, func.count(addresses.c.id).label('count')]).select_from(users.outerjoin(addresses)).group_by(*[c for c in users.c]).order_by(User.id)
q = sess.query(User)
(user9, 1, "Name:fred"),
(user10, 0, "Name:chuck")]
+ # test with a straight statement
s = select([users, func.count(addresses.c.id).label('count'), ("Name:" + users.c.name).label('concat')], from_obj=[users.outerjoin(addresses)], group_by=[c for c in users.c], order_by=[users.c.id])
q = create_session().query(User)
l = q.add_column("count").add_column("concat").from_statement(s).all()
assert l == expected
+ # test with select_from()
q = create_session().query(User).add_column(func.count(addresses.c.id))\
.add_column(("Name:" + users.c.name)).select_from(users.outerjoin(addresses))\
.group_by([c for c in users.c]).order_by(users.c.id)
assert q.all() == expected
+ # test with outerjoin() both aliased and non
+ for aliased in (False, True):
+ q = create_session().query(User).add_column(func.count(addresses.c.id))\
+ .add_column(("Name:" + users.c.name)).outerjoin('addresses', aliased=aliased)\
+ .group_by([c for c in users.c]).order_by(users.c.id)
+
+ assert q.all() == expected
+
class CustomJoinTest(QueryTest):
keep_mappers = False
"""test aliasing of joins with a custom join condition"""
mapper(Address, addresses)
mapper(Order, orders, properties={
- 'items':relation(Item, secondary=order_items, lazy=False, order_by=items.c.id),
+ 'items':relation(Item, secondary=order_items, lazy=True, order_by=items.c.id),
})
mapper(Item, items)
mapper(User, users, properties = dict(
- addresses = relation(Address, lazy=False),
- open_orders = relation(Order, primaryjoin = and_(orders.c.isopen == 1, users.c.id==orders.c.user_id), lazy=False),
- closed_orders = relation(Order, primaryjoin = and_(orders.c.isopen == 0, users.c.id==orders.c.user_id), lazy=False)
+ addresses = relation(Address, lazy=True),
+ open_orders = relation(Order, primaryjoin = and_(orders.c.isopen == 1, users.c.id==orders.c.user_id), lazy=True),
+ closed_orders = relation(Order, primaryjoin = and_(orders.c.isopen == 0, users.c.id==orders.c.user_id), lazy=True)
))
q = create_session().query(User)
node = sess.query(Node).join(['children', 'children'], aliased=True).filter_by(data='n122').first()
assert node.data=='n1'
+class ExternalColumnsTest(QueryTest):
+ keep_mappers = False
+
+ def setup_mappers(self):
+ pass
+
+ def test_external_columns(self):
+ """test querying mappings that reference external columns or selectables."""
+
+ f = (users.c.id *2).label('concat')
+ try:
+ mapper(User, users, properties={
+ 'concat': f,
+ })
+ class_mapper(User)
+ except exceptions.ArgumentError, e:
+ assert str(e) == "Column '%s' is not represented in mapper's table. Use the `column_property()` function to force this column to be mapped as a read-only attribute." % str(f)
+ clear_mappers()
+
+ mapper(User, users, properties={
+ 'concat': column_property(f),
+ 'count': column_property(select([func.count(addresses.c.id)], users.c.id==addresses.c.user_id, scalar=True).correlate(users).label('count'))
+ })
+
+ mapper(Address, addresses, properties={
+ 'user':relation(User, lazy=True)
+ })
+
+ sess = create_session()
+ l = sess.query(User).select()
+ assert [
+ User(id=7, concat=14, count=1),
+ User(id=8, concat=16, count=3),
+ User(id=9, concat=18, count=1),
+ User(id=10, concat=20, count=0),
+ ] == l
+
+ address_result = [
+ Address(id=1, user=User(id=7, concat=14, count=1)),
+ Address(id=2, user=User(id=8, concat=16, count=3)),
+ Address(id=3, user=User(id=8, concat=16, count=3)),
+ Address(id=4, user=User(id=8, concat=16, count=3)),
+ Address(id=5, user=User(id=9, concat=18, count=1))
+ ]
+
+ assert address_result == sess.query(Address).all()
+
+ # run the eager version twice to test caching of aliased clauses
+ for x in range(2):
+ sess.clear()
+ def go():
+ assert address_result == sess.query(Address).options(eagerload('user')).all()
+ self.assert_sql_count(testbase.db, go, 1)
+
+ tuple_address_result = [(address, address.user) for address in address_result]
+
+ tuple_address_result == sess.query(Address).join('user').add_entity(User).all()
+
+ assert tuple_address_result == sess.query(Address).join('user', aliased=True, id='ualias').add_entity(User, id='ualias').all()
+
if __name__ == '__main__':
testbase.main()
t1alias = t1.alias('t1alias')
vis = sql_util.ClauseAdapter(t1alias)
+ ff = vis.traverse(func.count(t1.c.col1).label('foo'), clone=True)
+ assert ff._get_from_objects() == [t1alias]
+
self.runtest(vis.traverse(select(['*'], from_obj=[t1]), clone=True), "SELECT * FROM table1 AS t1alias")
self.runtest(vis.traverse(select(['*'], t1.c.col1==t2.c.col2), clone=True), "SELECT * FROM table1 AS t1alias, table2 WHERE t1alias.col1 = table2.col2")
self.runtest(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]), clone=True), "SELECT * FROM table1 AS t1alias, table2 WHERE t1alias.col1 = table2.col2")
self.runtest(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t1), clone=True), "SELECT * FROM table2 WHERE t1alias.col1 = table2.col2")
self.runtest(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t2), clone=True), "SELECT * FROM table1 AS t1alias WHERE t1alias.col1 = table2.col2")
-
+
+ ff = vis.traverse(func.count(t1.c.col1).label('foo'), clone=True)
+ self.runtest(ff, "count(t1alias.col1) AS foo")
+ assert ff._get_from_objects() == [t1alias]
+
+# TODO:
+# self.runtest(vis.traverse(select([func.count(t1.c.col1).label('foo')]), clone=True), "SELECT count(t1alias.col1) AS foo FROM table1 AS t1alias")
+
t2alias = t2.alias('t2alias')
vis.chain(sql_util.ClauseAdapter(t2alias))
self.runtest(vis.traverse(select(['*'], t1.c.col1==t2.c.col2), clone=True), "SELECT * FROM table1 AS t1alias, table2 AS t2alias WHERE t1alias.col1 = t2alias.col2")
self.runtest(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t1), clone=True), "SELECT * FROM table2 AS t2alias WHERE t1alias.col1 = t2alias.col2")
self.runtest(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t2), clone=True), "SELECT * FROM table1 AS t1alias WHERE t1alias.col1 = t2alias.col2")
+
+
class SelectTest(selecttests.SQLTest):
"""tests the generative capability of Select"""