(self.key, self.parent.class_)
)
- def _determine_joins(self):
- """Determine the 'primaryjoin' and 'secondaryjoin' attributes,
- if not passed to the constructor already.
-
- This is based on analysis of the foreign key relationships
- between the parent and target mapped selectables.
-
- """
- if self.secondaryjoin is not None and self.secondary is None:
- raise sa_exc.ArgumentError("Property '" + self.key
- + "' specified with secondary join condition but "
- "no secondary argument")
-
- # if join conditions were not specified, figure them out based
- # on foreign keys
-
- def _search_for_join(mapper, table):
- # find a join between the given mapper's mapped table and
- # the given table. will try the mapper's local table first
- # for more specificity, then if not found will try the more
- # general mapped table, which in the case of inheritance is
- # a join.
- return join_condition(mapper.mapped_table, table,
- a_subset=mapper.local_table)
-
- try:
- if self.secondary is not None:
- if self.secondaryjoin is None:
- self.secondaryjoin = _search_for_join(self.mapper,
- self.secondary)
- if self.primaryjoin is None:
- self.primaryjoin = _search_for_join(self.parent,
- self.secondary)
- else:
- if self.primaryjoin is None:
- self.primaryjoin = _search_for_join(self.parent,
- self.target)
- except sa_exc.ArgumentError, e:
- raise sa_exc.ArgumentError("Could not determine join "
- "condition between parent/child tables on "
- "relationship %s. Specify a 'primaryjoin' "
- "expression. If 'secondary' is present, "
- "'secondaryjoin' is needed as well."
- % self)
-
- def _columns_are_mapped(self, *cols):
- """Return True if all columns in the given collection are
- mapped by the tables referenced by this :class:`.Relationship`.
-
- """
- for c in cols:
- if self.secondary is not None \
- and self.secondary.c.contains_column(c):
- continue
- if not self.parent.mapped_table.c.contains_column(c) and \
- not self.target.c.contains_column(c):
- return False
- return True
-
- def _sync_pairs_from_join(self, join_condition, primary):
- """Determine a list of "source"/"destination" column pairs
- based on the given join condition, as well as the
- foreign keys argument.
-
- "source" would be a column referenced by a foreign key,
- and "destination" would be the column who has a foreign key
- reference to "source".
-
- """
-
- fks = self._user_defined_foreign_keys
- # locate pairs
- eq_pairs = criterion_as_pairs(join_condition,
- consider_as_foreign_keys=fks,
- any_operator=self.viewonly)
-
- # couldn't find any fks, but we have
- # "secondary" - assume the "secondary" columns
- # are the fks
- if not eq_pairs and \
- self.secondary is not None and \
- not fks:
- fks = set(self.secondary.c)
- eq_pairs = criterion_as_pairs(join_condition,
- consider_as_foreign_keys=fks,
- any_operator=self.viewonly)
-
- if eq_pairs:
- util.warn("No ForeignKey objects were present "
- "in secondary table '%s'. Assumed referenced "
- "foreign key columns %s for join condition '%s' "
- "on relationship %s" % (
- self.secondary.description,
- ", ".join(sorted(["'%s'" % col for col in fks])),
- join_condition,
- self
- ))
-
- # Filter out just to columns that are mapped.
- # If viewonly, allow pairs where the FK col
- # was part of "foreign keys" - the column it references
- # may be in an un-mapped table - see
- # test.orm.test_relationships.ViewOnlyComplexJoin.test_basic
- # for an example of this.
- eq_pairs = [(l, r) for (l, r) in eq_pairs
- if self._columns_are_mapped(l, r)
- or self.viewonly and
- r in fks]
-
- if eq_pairs:
- return eq_pairs
-
- # from here below is just determining the best error message
- # to report. Check for a join condition using any operator
- # (not just ==), perhaps they need to turn on "viewonly=True".
- if not self.viewonly and criterion_as_pairs(join_condition,
- consider_as_foreign_keys=self._user_defined_foreign_keys,
- any_operator=True):
-
- err = "Could not locate any "\
- "foreign-key-equated, locally mapped column "\
- "pairs for %s "\
- "condition '%s' on relationship %s." % (
- primary and 'primaryjoin' or 'secondaryjoin',
- join_condition,
- self
- )
-
- if not self._user_defined_foreign_keys:
- err += " Ensure that the "\
- "referencing Column objects have a "\
- "ForeignKey present, or are otherwise part "\
- "of a ForeignKeyConstraint on their parent "\
- "Table, or specify the foreign_keys parameter "\
- "to this relationship."
-
- err += " For more "\
- "relaxed rules on join conditions, the "\
- "relationship may be marked as viewonly=True."
-
- raise sa_exc.ArgumentError(err)
- else:
- if self._user_defined_foreign_keys:
- raise sa_exc.ArgumentError("Could not determine "
- "relationship direction for %s condition "
- "'%s', on relationship %s, using manual "
- "'foreign_keys' setting. Do the columns "
- "in 'foreign_keys' represent all, and "
- "only, the 'foreign' columns in this join "
- "condition? Does the %s Table already "
- "have adequate ForeignKey and/or "
- "ForeignKeyConstraint objects established "
- "(in which case 'foreign_keys' is usually "
- "unnecessary)?"
- % (
- primary and 'primaryjoin' or 'secondaryjoin',
- join_condition,
- self,
- primary and 'mapped' or 'secondary'
- ))
- else:
- raise sa_exc.ArgumentError("Could not determine "
- "relationship direction for %s condition "
- "'%s', on relationship %s. Ensure that the "
- "referencing Column objects have a "
- "ForeignKey present, or are otherwise part "
- "of a ForeignKeyConstraint on their parent "
- "Table, or specify the foreign_keys parameter "
- "to this relationship."
- % (
- primary and 'primaryjoin' or 'secondaryjoin',
- join_condition,
- self
- ))
-
- def _determine_synchronize_pairs(self):
- """Resolve 'primary'/foreign' column pairs from the primaryjoin
- and secondaryjoin arguments.
-
- """
- if self.local_remote_pairs:
- if not self._user_defined_foreign_keys:
- raise sa_exc.ArgumentError(
- "foreign_keys argument is "
- "required with _local_remote_pairs argument")
- self.synchronize_pairs = []
- for l, r in self.local_remote_pairs:
- if r in self._user_defined_foreign_keys:
- self.synchronize_pairs.append((l, r))
- elif l in self._user_defined_foreign_keys:
- self.synchronize_pairs.append((r, l))
- else:
- self.synchronize_pairs = self._sync_pairs_from_join(
- self.primaryjoin,
- True)
-
- self._calculated_foreign_keys = util.column_set(
- r for (l, r) in
- self.synchronize_pairs)
-
- if self.secondaryjoin is not None:
- self.secondary_synchronize_pairs = self._sync_pairs_from_join(
- self.secondaryjoin,
- False)
- self._calculated_foreign_keys.update(
- r for (l, r) in
- self.secondary_synchronize_pairs)
- else:
- self.secondary_synchronize_pairs = None
-
- def _determine_direction(self):
- """Determine if this relationship is one to many, many to one,
- many to many.
-
- This is derived from the primaryjoin, presence of "secondary",
- and in the case of self-referential the "remote side".
-
- """
- if self.secondaryjoin is not None:
- self.direction = MANYTOMANY
- elif self._refers_to_parent_table():
-
- # self referential defaults to ONETOMANY unless the "remote"
- # side is present and does not reference any foreign key
- # columns
-
- if self.local_remote_pairs:
- remote = [r for (l, r) in self.local_remote_pairs]
- elif self.remote_side:
- remote = self.remote_side
- else:
- remote = None
- if not remote or self._calculated_foreign_keys.difference(l for (l,
- r) in self.synchronize_pairs).intersection(remote):
- self.direction = ONETOMANY
- else:
- self.direction = MANYTOONE
- else:
- parentcols = util.column_set(self.parent.mapped_table.c)
- targetcols = util.column_set(self.mapper.mapped_table.c)
-
- # fk collection which suggests ONETOMANY.
- onetomany_fk = targetcols.intersection(
- self._calculated_foreign_keys)
+ def _setup_join_conditions(self):
+ self._join_condition = jc = relationships.JoinCondition(
+ parent_selectable=self.parent.mapped_table,
+ child_selectable=self.mapper.mapped_table,
+ parent_local_selectable=self.parent.local_table,
+ child_local_selectable=self.mapper.local_table,
+ primaryjoin=self.primaryjoin,
+ secondary=self.secondary,
+ secondaryjoin=self.secondaryjoin,
+ parent_equivalents=self.parent._equivalent_columns,
+ child_equivalents=self.mapper._equivalent_columns,
+ consider_as_foreign_keys=self._user_defined_foreign_keys,
+ local_remote_pairs=self.local_remote_pairs,
+ remote_side=self.remote_side,
+ self_referential=self._is_self_referential,
+ prop=self,
+ support_sync=not self.viewonly,
+ can_be_synced_fn=self._columns_are_mapped
+ )
- self.primaryjoin = jc.primaryjoin
- self.secondaryjoin = jc.secondaryjoin
++ self.primaryjoin = jc.deannotated_primaryjoin
++ self.secondaryjoin = jc.deannotated_secondaryjoin
+ self.direction = jc.direction
+ self.local_remote_pairs = jc.local_remote_pairs
+ self.remote_side = jc.remote_columns
+ self.local_columns = jc.local_columns
+ self.synchronize_pairs = jc.synchronize_pairs
+ self._calculated_foreign_keys = jc.foreign_key_columns
+ self.secondary_synchronize_pairs = jc.secondary_synchronize_pairs
- # fk collection which suggests MANYTOONE.
+ def _check_conflicts(self):
+ """Test that this relationship is legal, warn about
+ inheritance conflicts."""
- manytoone_fk = parentcols.intersection(
- self._calculated_foreign_keys)
+ if not self.is_primary() \
+ and not mapper.class_mapper(
+ self.parent.class_,
+ compile=False).has_property(self.key):
+ raise sa_exc.ArgumentError("Attempting to assign a new "
+ "relationship '%s' to a non-primary mapper on "
+ "class '%s'. New relationships can only be added "
+ "to the primary mapper, i.e. the very first mapper "
+ "created for class '%s' " % (self.key,
+ self.parent.class_.__name__,
+ self.parent.class_.__name__))
- if onetomany_fk and manytoone_fk:
- # fks on both sides. do the same test only based on the
- # local side.
- referents = [c for (c, f) in self.synchronize_pairs]
- onetomany_local = parentcols.intersection(referents)
- manytoone_local = targetcols.intersection(referents)
+ # check for conflicting relationship() on superclass
+ if not self.parent.concrete:
+ for inheriting in self.parent.iterate_to_root():
+ if inheriting is not self.parent \
+ and inheriting.has_property(self.key):
+ util.warn("Warning: relationship '%s' on mapper "
+ "'%s' supersedes the same relationship "
+ "on inherited mapper '%s'; this can "
+ "cause dependency issues during flush"
+ % (self.key, self.parent, inheriting))
- if onetomany_local and not manytoone_local:
- self.direction = ONETOMANY
- elif manytoone_local and not onetomany_local:
- self.direction = MANYTOONE
- else:
- raise sa_exc.ArgumentError(
- "Can't determine relationship"
- " direction for relationship '%s' - foreign "
- "key columns are present in both the parent "
- "and the child's mapped tables. Specify "
- "'foreign_keys' argument." % self)
- elif onetomany_fk:
- self.direction = ONETOMANY
- elif manytoone_fk:
- self.direction = MANYTOONE
- else:
- raise sa_exc.ArgumentError("Can't determine relationship "
- "direction for relationship '%s' - foreign "
- "key columns are present in neither the parent "
- "nor the child's mapped tables" % self)
+ def _check_cascade_settings(self):
if self.cascade.delete_orphan and not self.single_parent \
and (self.direction is MANYTOMANY or self.direction
is MANYTOONE):
"'%s' on relationship '%s': property of that "
"name exists on mapper '%s'" % (backref_key,
self, mapper))
+
+ # determine primaryjoin/secondaryjoin for the
+ # backref. Use the one we had, so that
+ # a custom join doesn't have to be specified in
+ # both directions.
if self.secondary is not None:
- pj = kwargs.pop('primaryjoin', self.secondaryjoin)
- sj = kwargs.pop('secondaryjoin', self.primaryjoin)
+ # for many to many, just switch primaryjoin/
- # secondaryjoin.
- pj = kwargs.pop('primaryjoin', self.secondaryjoin)
- sj = kwargs.pop('secondaryjoin', self.primaryjoin)
++ # secondaryjoin. use the annotated
++ # pj/sj on the _join_condition.
++ pj = kwargs.pop('primaryjoin', self._join_condition.secondaryjoin)
++ sj = kwargs.pop('secondaryjoin', self._join_condition.primaryjoin)
else:
- pj = kwargs.pop('primaryjoin', self.primaryjoin)
+ pj = kwargs.pop('primaryjoin',
+ self._join_condition.primaryjoin_reverse_remote)
sj = kwargs.pop('secondaryjoin', None)
if sj:
raise sa_exc.InvalidRequestError(
--- /dev/null
- self.local_remote_pairs = list(lrp)
- self.synchronize_pairs = sync_pairs
- self.secondary_synchronize_pairs = secondary_sync_pairs
+ # orm/relationships.py
+ # Copyright (C) 2005-2012 the SQLAlchemy authors and contributors <see AUTHORS file>
+ #
+ # This module is part of SQLAlchemy and is released under
+ # the MIT License: http://www.opensource.org/licenses/mit-license.php
+
+ """Heuristics related to join conditions as used in
+ :func:`.relationship`.
+
+ Provides the :class:`.JoinCondition` object, which encapsulates
+ SQL annotation and aliasing behavior focused on the `primaryjoin`
+ and `secondaryjoin` aspects of :func:`.relationship`.
+
+ """
+
+ from sqlalchemy import sql, util, log, exc as sa_exc, schema
+ from sqlalchemy.sql.util import ClauseAdapter, criterion_as_pairs, \
+ join_condition, _shallow_annotate, visit_binary_product,\
+ _deep_deannotate
+ from sqlalchemy.sql import operators, expression, visitors
+ from sqlalchemy.orm.interfaces import MANYTOMANY, MANYTOONE, ONETOMANY
+
+ def remote(expr):
+ return _annotate_columns(expr, {"remote":True})
+
+ def foreign(expr):
+ return _annotate_columns(expr, {"foreign":True})
+
+ def remote_foreign(expr):
+ return _annotate_columns(expr, {"foreign":True,
+ "remote":True})
+
+ def _annotate_columns(element, annotations):
+ def clone(elem):
+ if isinstance(elem, expression.ColumnClause):
+ elem = elem._annotate(annotations.copy())
+ elem._copy_internals(clone=clone)
+ return elem
+
+ if element is not None:
+ element = clone(element)
+ return element
+
+ class JoinCondition(object):
+ def __init__(self,
+ parent_selectable,
+ child_selectable,
+ parent_local_selectable,
+ child_local_selectable,
+ primaryjoin=None,
+ secondary=None,
+ secondaryjoin=None,
+ parent_equivalents=None,
+ child_equivalents=None,
+ consider_as_foreign_keys=None,
+ local_remote_pairs=None,
+ remote_side=None,
+ self_referential=False,
+ prop=None,
+ support_sync=True,
+ can_be_synced_fn=lambda *c: True
+ ):
+ self.parent_selectable = parent_selectable
+ self.parent_local_selectable = parent_local_selectable
+ self.child_selectable = child_selectable
+ self.child_local_selectable = child_local_selectable
+ self.parent_equivalents = parent_equivalents
+ self.child_equivalents = child_equivalents
+ self.primaryjoin = primaryjoin
+ self.secondaryjoin = secondaryjoin
+ self.secondary = secondary
+ self.consider_as_foreign_keys = consider_as_foreign_keys
+ self._local_remote_pairs = local_remote_pairs
+ self._remote_side = remote_side
+ self.prop = prop
+ self.self_referential = self_referential
+ self.support_sync = support_sync
+ self.can_be_synced_fn = can_be_synced_fn
+ self._determine_joins()
+ self._annotate_fks()
+ self._annotate_remote()
+ self._annotate_local()
+ self._setup_pairs()
+ self._check_foreign_cols(self.primaryjoin, True)
+ if self.secondaryjoin is not None:
+ self._check_foreign_cols(self.secondaryjoin, False)
+ self._determine_direction()
+ self._check_remote_side()
+ self._log_joins()
+
+ def _log_joins(self):
+ if self.prop is None:
+ return
+ log = self.prop.logger
+ log.info('%s setup primary join %s', self,
+ self.primaryjoin)
+ log.info('%s setup secondary join %s', self,
+ self.secondaryjoin)
+ log.info('%s synchronize pairs [%s]', self,
+ ','.join('(%s => %s)' % (l, r) for (l, r) in
+ self.synchronize_pairs))
+ log.info('%s secondary synchronize pairs [%s]', self,
+ ','.join('(%s => %s)' % (l, r) for (l, r) in
+ self.secondary_synchronize_pairs or []))
+ log.info('%s local/remote pairs [%s]', self,
+ ','.join('(%s / %s)' % (l, r) for (l, r) in
+ self.local_remote_pairs))
+ log.info('%s relationship direction %s', self,
+ self.direction)
+
+ def _determine_joins(self):
+ """Determine the 'primaryjoin' and 'secondaryjoin' attributes,
+ if not passed to the constructor already.
+
+ This is based on analysis of the foreign key relationships
+ between the parent and target mapped selectables.
+
+ """
+ if self.secondaryjoin is not None and self.secondary is None:
+ raise sa_exc.ArgumentError(
+ "Property %s specified with secondary "
+ "join condition but "
+ "no secondary argument" % self.prop)
+
+ # find a join between the given mapper's mapped table and
+ # the given table. will try the mapper's local table first
+ # for more specificity, then if not found will try the more
+ # general mapped table, which in the case of inheritance is
+ # a join.
+ try:
+ if self.secondary is not None:
+ if self.secondaryjoin is None:
+ self.secondaryjoin = \
+ join_condition(
+ self.child_selectable,
+ self.secondary,
+ a_subset=self.child_local_selectable,
+ consider_as_foreign_keys=\
+ self.consider_as_foreign_keys or None
+ )
+ if self.primaryjoin is None:
+ self.primaryjoin = \
+ join_condition(
+ self.parent_selectable,
+ self.secondary,
+ a_subset=self.parent_local_selectable,
+ consider_as_foreign_keys=\
+ self.consider_as_foreign_keys or None
+ )
+ else:
+ if self.primaryjoin is None:
+ self.primaryjoin = \
+ join_condition(
+ self.parent_selectable,
+ self.child_selectable,
+ a_subset=self.parent_local_selectable,
+ consider_as_foreign_keys=\
+ self.consider_as_foreign_keys or None
+ )
+ except sa_exc.NoForeignKeysError, nfke:
+ if self.secondary is not None:
+ raise sa_exc.NoForeignKeysError("Could not determine join "
+ "condition between parent/child tables on "
+ "relationship %s - there are no foreign keys "
+ "linking these tables via secondary table '%s'. "
+ "Ensure that referencing columns are associated "
+ "with a ForeignKey or ForeignKeyConstraint, or "
+ "specify 'primaryjoin' and 'secondaryjoin' "
+ "expressions."
+ % (self.prop, self.secondary))
+ else:
+ raise sa_exc.NoForeignKeysError("Could not determine join "
+ "condition between parent/child tables on "
+ "relationship %s - there are no foreign keys "
+ "linking these tables. "
+ "Ensure that referencing columns are associated "
+ "with a ForeignKey or ForeignKeyConstraint, or "
+ "specify a 'primaryjoin' expression."
+ % self.prop)
+ except sa_exc.AmbiguousForeignKeysError, afke:
+ if self.secondary is not None:
+ raise sa_exc.AmbiguousForeignKeysError(
+ "Could not determine join "
+ "condition between parent/child tables on "
+ "relationship %s - there are multiple foreign key "
+ "paths linking the tables via secondary table '%s'. "
+ "Specify the 'foreign_keys' "
+ "argument, providing a list of those columns which "
+ "should be counted as containing a foreign key "
+ "reference from the secondary table to each of the "
+ "parent and child tables."
+ % (self.prop, self.secondary))
+ else:
+ raise sa_exc.AmbiguousForeignKeysError(
+ "Could not determine join "
+ "condition between parent/child tables on "
+ "relationship %s - there are multiple foreign key "
+ "paths linking the tables. Specify the "
+ "'foreign_keys' argument, providing a list of those "
+ "columns which should be counted as containing a "
+ "foreign key reference to the parent table."
+ % self.prop)
+
+ @util.memoized_property
+ def primaryjoin_reverse_remote(self):
+ """Return the primaryjoin condition suitable for the
+ "reverse" direction.
+
+ If the primaryjoin was delivered here with pre-existing
+ "remote" annotations, the local/remote annotations
+ are reversed. Otherwise, the local/remote annotations
+ are removed.
+
+ """
+ if self._has_remote_annotations:
+ def replace(element):
+ if "remote" in element._annotations:
+ v = element._annotations.copy()
+ del v['remote']
+ v['local'] = True
+ return element._with_annotations(v)
+ elif "local" in element._annotations:
+ v = element._annotations.copy()
+ del v['local']
+ v['remote'] = True
+ return element._with_annotations(v)
+ return visitors.replacement_traverse(
+ self.primaryjoin, {}, replace)
+ else:
+ if self._has_foreign_annotations:
+ # TODO: coverage
+ return _deep_deannotate(self.primaryjoin,
+ values=("local", "remote"))
+ else:
+ return _deep_deannotate(self.primaryjoin)
+
+ def _has_annotation(self, clause, annotation):
+ for col in visitors.iterate(clause, {}):
+ if annotation in col._annotations:
+ return True
+ else:
+ return False
+
+ @util.memoized_property
+ def _has_foreign_annotations(self):
+ return self._has_annotation(self.primaryjoin, "foreign")
+
+ @util.memoized_property
+ def _has_remote_annotations(self):
+ return self._has_annotation(self.primaryjoin, "remote")
+
+ def _annotate_fks(self):
+ """Annotate the primaryjoin and secondaryjoin
+ structures with 'foreign' annotations marking columns
+ considered as foreign.
+
+ """
+ if self._has_foreign_annotations:
+ return
+
+ if self.consider_as_foreign_keys:
+ self._annotate_from_fk_list()
+ else:
+ self._annotate_present_fks()
+
+ def _annotate_from_fk_list(self):
+ def check_fk(col):
+ if col in self.consider_as_foreign_keys:
+ return col._annotate({"foreign":True})
+ self.primaryjoin = visitors.replacement_traverse(
+ self.primaryjoin,
+ {},
+ check_fk
+ )
+ if self.secondaryjoin is not None:
+ self.secondaryjoin = visitors.replacement_traverse(
+ self.secondaryjoin,
+ {},
+ check_fk
+ )
+
+ def _annotate_present_fks(self):
+ if self.secondary is not None:
+ secondarycols = util.column_set(self.secondary.c)
+ else:
+ secondarycols = set()
+ def is_foreign(a, b):
+ if isinstance(a, schema.Column) and \
+ isinstance(b, schema.Column):
+ if a.references(b):
+ return a
+ elif b.references(a):
+ return b
+
+ if secondarycols:
+ if a in secondarycols and b not in secondarycols:
+ return a
+ elif b in secondarycols and a not in secondarycols:
+ return b
+
+ def visit_binary(binary):
+ if not isinstance(binary.left, sql.ColumnElement) or \
+ not isinstance(binary.right, sql.ColumnElement):
+ return
+
+ if "foreign" not in binary.left._annotations and \
+ "foreign" not in binary.right._annotations:
+ col = is_foreign(binary.left, binary.right)
+ if col is not None:
+ if col.compare(binary.left):
+ binary.left = binary.left._annotate(
+ {"foreign":True})
+ elif col.compare(binary.right):
+ binary.right = binary.right._annotate(
+ {"foreign":True})
+
+ self.primaryjoin = visitors.cloned_traverse(
+ self.primaryjoin,
+ {},
+ {"binary":visit_binary}
+ )
+ if self.secondaryjoin is not None:
+ self.secondaryjoin = visitors.cloned_traverse(
+ self.secondaryjoin,
+ {},
+ {"binary":visit_binary}
+ )
+
+ def _refers_to_parent_table(self):
+ """Return True if the join condition contains column
+ comparisons where both columns are in both tables.
+
+ """
+ pt = self.parent_selectable
+ mt = self.child_selectable
+ result = [False]
+ def visit_binary(binary):
+ c, f = binary.left, binary.right
+ if (
+ isinstance(c, expression.ColumnClause) and \
+ isinstance(f, expression.ColumnClause) and \
+ pt.is_derived_from(c.table) and \
+ pt.is_derived_from(f.table) and \
+ mt.is_derived_from(c.table) and \
+ mt.is_derived_from(f.table)
+ ):
+ result[0] = True
+ visitors.traverse(
+ self.primaryjoin,
+ {},
+ {"binary":visit_binary}
+ )
+ return result[0]
+
+ def _tables_overlap(self):
+ """Return True if parent/child tables have some overlap."""
+
+ return self.parent_selectable.is_derived_from(
+ self.child_local_selectable) or \
+ self.child_selectable.is_derived_from(
+ self.parent_local_selectable)
+
+ def _annotate_remote(self):
+ """Annotate the primaryjoin and secondaryjoin
+ structures with 'remote' annotations marking columns
+ considered as part of the 'remote' side.
+
+ """
+ if self._has_remote_annotations:
+ return
+
+ parentcols = util.column_set(self.parent_selectable.c)
+
+ if self.secondary is not None:
+ self._annotate_remote_secondary()
+ elif self._local_remote_pairs or self._remote_side:
+ self._annotate_remote_from_args()
+ elif self._refers_to_parent_table():
+ self._annotate_selfref(lambda col:"foreign" in col._annotations)
+ elif self._tables_overlap():
+ self._annotate_remote_with_overlap()
+ else:
+ self._annotate_remote_distinct_selectables()
+
+ def _annotate_remote_secondary(self):
+ """annotate 'remote' in primaryjoin, secondaryjoin
+ when 'secondary' is present.
+
+ """
+ def repl(element):
+ if self.secondary.c.contains_column(element):
+ return element._annotate({"remote":True})
+ self.primaryjoin = visitors.replacement_traverse(
+ self.primaryjoin, {}, repl)
+ self.secondaryjoin = visitors.replacement_traverse(
+ self.secondaryjoin, {}, repl)
+
+ def _annotate_selfref(self, fn):
+ """annotate 'remote' in primaryjoin, secondaryjoin
+ when the relationship is detected as self-referential.
+
+ """
+ def visit_binary(binary):
+ equated = binary.left.compare(binary.right)
+ if isinstance(binary.left, expression.ColumnClause) and \
+ isinstance(binary.right, expression.ColumnClause):
+ # assume one to many - FKs are "remote"
+ if fn(binary.left):
+ binary.left = binary.left._annotate({"remote":True})
+ if fn(binary.right) and \
+ not equated:
+ binary.right = binary.right._annotate(
+ {"remote":True})
+ else:
+ self._warn_non_column_elements()
+
+ self.primaryjoin = visitors.cloned_traverse(
+ self.primaryjoin, {},
+ {"binary":visit_binary})
+
+ def _annotate_remote_from_args(self):
+ """annotate 'remote' in primaryjoin, secondaryjoin
+ when the 'remote_side' or '_local_remote_pairs'
+ arguments are used.
+
+ """
+ if self._local_remote_pairs:
+ if self._remote_side:
+ raise sa_exc.ArgumentError(
+ "remote_side argument is redundant "
+ "against more detailed _local_remote_side "
+ "argument.")
+
+ remote_side = [r for (l, r) in self._local_remote_pairs]
+ else:
+ remote_side = self._remote_side
+
+ if self._refers_to_parent_table():
+ self._annotate_selfref(lambda col:col in remote_side)
+ else:
+ def repl(element):
+ if element in remote_side:
+ return element._annotate({"remote":True})
+ self.primaryjoin = visitors.replacement_traverse(
+ self.primaryjoin, {}, repl)
+
+ def _annotate_remote_with_overlap(self):
+ """annotate 'remote' in primaryjoin, secondaryjoin
+ when the parent/child tables have some set of
+ tables in common, though is not a fully self-referential
+ relationship.
+
+ """
+ def visit_binary(binary):
+ binary.left, binary.right = proc_left_right(binary.left,
+ binary.right)
+ binary.right, binary.left = proc_left_right(binary.right,
+ binary.left)
+ def proc_left_right(left, right):
+ if isinstance(left, expression.ColumnClause) and \
+ isinstance(right, expression.ColumnClause):
+ if self.child_selectable.c.contains_column(right) and \
+ self.parent_selectable.c.contains_column(left):
+ right = right._annotate({"remote":True})
+ else:
+ self._warn_non_column_elements()
+
+ return left, right
+
+ self.primaryjoin = visitors.cloned_traverse(
+ self.primaryjoin, {},
+ {"binary":visit_binary})
+
+ def _annotate_remote_distinct_selectables(self):
+ """annotate 'remote' in primaryjoin, secondaryjoin
+ when the parent/child tables are entirely
+ separate.
+
+ """
+ def repl(element):
+ if self.child_selectable.c.contains_column(element) and \
+ (
+ not self.parent_local_selectable.c.\
+ contains_column(element)
+ or self.child_local_selectable.c.\
+ contains_column(element)
+ ):
+ return element._annotate({"remote":True})
+ self.primaryjoin = visitors.replacement_traverse(
+ self.primaryjoin, {}, repl)
+
+ def _warn_non_column_elements(self):
+ util.warn(
+ "Non-simple column elements in primary "
+ "join condition for property %s - consider using "
+ "remote() annotations to mark the remote side."
+ % self.prop
+ )
+
+ def _annotate_local(self):
+ """Annotate the primaryjoin and secondaryjoin
+ structures with 'local' annotations.
+
+ This annotates all column elements found
+ simultaneously in the parent table
+ and the join condition that don't have a
+ 'remote' annotation set up from
+ _annotate_remote() or user-defined.
+
+ """
+ if self._has_annotation(self.primaryjoin, "local"):
+ return
+
+ parentcols = util.column_set(self.parent_selectable.c)
+
+ if self._local_remote_pairs:
+ local_side = util.column_set([l for (l, r)
+ in self._local_remote_pairs])
+ else:
+ local_side = util.column_set(self.parent_selectable.c)
+
+ def locals_(elem):
+ if "remote" not in elem._annotations and \
+ elem in local_side:
+ return elem._annotate({"local":True})
+ self.primaryjoin = visitors.replacement_traverse(
+ self.primaryjoin, {}, locals_
+ )
+
+ def _check_remote_side(self):
+ if not self.local_remote_pairs:
+ raise sa_exc.ArgumentError('Relationship %s could '
+ 'not determine any unambiguous local/remote column '
+ 'pairs based on join condition and remote_side '
+ 'arguments. '
+ 'Consider using the remote() annotation to '
+ 'accurately mark those elements of the join '
+ 'condition that are on the remote side of '
+ 'the relationship.'
+ % (self.prop, ))
+
+ def _check_foreign_cols(self, join_condition, primary):
+ """Check the foreign key columns collected and emit error
+ messages."""
+
+ can_sync = False
+
+ foreign_cols = self._gather_columns_with_annotation(
+ join_condition, "foreign")
+
+ has_foreign = bool(foreign_cols)
+
+ if primary:
+ can_sync = bool(self.synchronize_pairs)
+ else:
+ can_sync = bool(self.secondary_synchronize_pairs)
+
+ if self.support_sync and can_sync or \
+ (not self.support_sync and has_foreign):
+ return
+
+ # from here below is just determining the best error message
+ # to report. Check for a join condition using any operator
+ # (not just ==), perhaps they need to turn on "viewonly=True".
+ if self.support_sync and has_foreign and not can_sync:
+ err = "Could not locate any simple equality expressions "\
+ "involving locally mapped foreign key columns for "\
+ "%s join condition "\
+ "'%s' on relationship %s." % (
+ primary and 'primary' or 'secondary',
+ join_condition,
+ self.prop
+ )
+ err += \
+ " Ensure that referencing columns are associated "\
+ "with a ForeignKey or ForeignKeyConstraint, or are "\
+ "annotated in the join condition with the foreign() "\
+ "annotation. To allow comparison operators other than "\
+ "'==', the relationship can be marked as viewonly=True."
+
+ raise sa_exc.ArgumentError(err)
+ else:
+ err = "Could not locate any relevant foreign key columns "\
+ "for %s join condition '%s' on relationship %s." % (
+ primary and 'primary' or 'secondary',
+ join_condition,
+ self.prop
+ )
+ err += \
+ ' Ensure that referencing columns are associated '\
+ 'with a ForeignKey or ForeignKeyConstraint, or are '\
+ 'annotated in the join condition with the foreign() '\
+ 'annotation.'
+ raise sa_exc.ArgumentError(err)
+
+ def _determine_direction(self):
+ """Determine if this relationship is one to many, many to one,
+ many to many.
+
+ """
+ if self.secondaryjoin is not None:
+ self.direction = MANYTOMANY
+ else:
+ parentcols = util.column_set(self.parent_selectable.c)
+ targetcols = util.column_set(self.child_selectable.c)
+
+ # fk collection which suggests ONETOMANY.
+ onetomany_fk = targetcols.intersection(
+ self.foreign_key_columns)
+
+ # fk collection which suggests MANYTOONE.
+
+ manytoone_fk = parentcols.intersection(
+ self.foreign_key_columns)
+
+ if onetomany_fk and manytoone_fk:
+ # fks on both sides. test for overlap of local/remote
+ # with foreign key
+ self_equated = self.remote_columns.intersection(
+ self.local_columns
+ )
+ onetomany_local = self.remote_columns.\
+ intersection(self.foreign_key_columns).\
+ difference(self_equated)
+ manytoone_local = self.local_columns.\
+ intersection(self.foreign_key_columns).\
+ difference(self_equated)
+ if onetomany_local and not manytoone_local:
+ self.direction = ONETOMANY
+ elif manytoone_local and not onetomany_local:
+ self.direction = MANYTOONE
+ else:
+ raise sa_exc.ArgumentError(
+ "Can't determine relationship"
+ " direction for relationship '%s' - foreign "
+ "key columns within the join condition are present "
+ "in both the parent and the child's mapped tables. "
+ "Ensure that only those columns referring "
+ "to a parent column are marked as foreign, "
+ "either via the foreign() annotation or "
+ "via the foreign_keys argument."
+ % self.prop)
+ elif onetomany_fk:
+ self.direction = ONETOMANY
+ elif manytoone_fk:
+ self.direction = MANYTOONE
+ else:
+ raise sa_exc.ArgumentError("Can't determine relationship "
+ "direction for relationship '%s' - foreign "
+ "key columns are present in neither the parent "
+ "nor the child's mapped tables" % self.prop)
+
++ def _deannotate_pairs(self, collection):
++ """provide deannotation for the various lists of
++ pairs, so that using them in hashes doesn't incur
++ high-overhead __eq__() comparisons against
++ original columns mapped.
++
++ """
++ return [(x._deannotate(), y._deannotate())
++ for x, y in collection]
++
+ def _setup_pairs(self):
+ sync_pairs = []
+ lrp = util.OrderedSet([])
+ secondary_sync_pairs = []
+
+ def go(joincond, collection):
+ def visit_binary(binary, left, right):
+ if "remote" in right._annotations and \
+ "remote" not in left._annotations and \
+ self.can_be_synced_fn(left):
+ lrp.add((left, right))
+ elif "remote" in left._annotations and \
+ "remote" not in right._annotations and \
+ self.can_be_synced_fn(right):
+ lrp.add((right, left))
+ if binary.operator is operators.eq and \
+ self.can_be_synced_fn(left, right):
+ if "foreign" in right._annotations:
+ collection.append((left, right))
+ elif "foreign" in left._annotations:
+ collection.append((right, left))
+ visit_binary_product(visit_binary, joincond)
+
+ for joincond, collection in [
+ (self.primaryjoin, sync_pairs),
+ (self.secondaryjoin, secondary_sync_pairs)
+ ]:
+ if joincond is None:
+ continue
+ go(joincond, collection)
+
- return s
++ self.local_remote_pairs = self._deannotate_pairs(lrp)
++ self.synchronize_pairs = self._deannotate_pairs(sync_pairs)
++ self.secondary_synchronize_pairs = self._deannotate_pairs(secondary_sync_pairs)
+
+ @util.memoized_property
+ def remote_columns(self):
+ return self._gather_join_annotations("remote")
+
+ @util.memoized_property
+ def local_columns(self):
+ return self._gather_join_annotations("local")
+
+ @util.memoized_property
+ def foreign_key_columns(self):
+ return self._gather_join_annotations("foreign")
+
++ @util.memoized_property
++ def deannotated_primaryjoin(self):
++ return _deep_deannotate(self.primaryjoin)
++
++ @util.memoized_property
++ def deannotated_secondaryjoin(self):
++ if self.secondaryjoin is not None:
++ return _deep_deannotate(self.secondaryjoin)
++ else:
++ return None
++
+ def _gather_join_annotations(self, annotation):
+ s = set(
+ self._gather_columns_with_annotation(
+ self.primaryjoin, annotation)
+ )
+ if self.secondaryjoin is not None:
+ s.update(
+ self._gather_columns_with_annotation(
+ self.secondaryjoin, annotation)
+ )
- lazywhere = self.primaryjoin
++ return set([x._deannotate() for x in s])
+
+ def _gather_columns_with_annotation(self, clause, *annotation):
+ annotation = set(annotation)
+ return set([
+ col for col in visitors.iterate(clause, {})
+ if annotation.issubset(col._annotations)
+ ])
+
+
+ def join_targets(self, source_selectable,
+ dest_selectable,
+ aliased,
+ single_crit=None):
+ """Given a source and destination selectable, create a
+ join between them.
+
+ This takes into account aliasing the join clause
+ to reference the appropriate corresponding columns
+ in the target objects, as well as the extra child
+ criterion, equivalent column sets, etc.
+
+ """
+
+ # place a barrier on the destination such that
+ # replacement traversals won't ever dig into it.
+ # its internal structure remains fixed
+ # regardless of context.
+ dest_selectable = _shallow_annotate(
+ dest_selectable,
+ {'no_replacement_traverse':True})
+
+ primaryjoin, secondaryjoin, secondary = self.primaryjoin, \
+ self.secondaryjoin, self.secondary
+
+ # adjust the join condition for single table inheritance,
+ # in the case that the join is to a subclass
+ # this is analogous to the
+ # "_adjust_for_single_table_inheritance()" method in Query.
+
+ if single_crit is not None:
+ if secondaryjoin is not None:
+ secondaryjoin = secondaryjoin & single_crit
+ else:
+ primaryjoin = primaryjoin & single_crit
+
+ if aliased:
+ if secondary is not None:
+ secondary = secondary.alias()
+ primary_aliasizer = ClauseAdapter(secondary)
+ secondary_aliasizer = \
+ ClauseAdapter(dest_selectable,
+ equivalents=self.child_equivalents).\
+ chain(primary_aliasizer)
+ if source_selectable is not None:
+ primary_aliasizer = \
+ ClauseAdapter(secondary).\
+ chain(ClauseAdapter(source_selectable,
+ equivalents=self.parent_equivalents))
+ secondaryjoin = \
+ secondary_aliasizer.traverse(secondaryjoin)
+ else:
+ primary_aliasizer = ClauseAdapter(dest_selectable,
+ exclude_fn=lambda c: "local" in c._annotations,
+ equivalents=self.child_equivalents)
+ if source_selectable is not None:
+ primary_aliasizer.chain(
+ ClauseAdapter(source_selectable,
+ exclude_fn=lambda c: "remote" in c._annotations,
+ equivalents=self.parent_equivalents))
+ secondary_aliasizer = None
+
+ primaryjoin = primary_aliasizer.traverse(primaryjoin)
+ target_adapter = secondary_aliasizer or primary_aliasizer
+ target_adapter.exclude_fn = None
+ else:
+ target_adapter = None
+ return primaryjoin, secondaryjoin, secondary, \
+ target_adapter, dest_selectable
+
+ def create_lazy_clause(self, reverse_direction=False):
+ binds = util.column_dict()
+ lookup = util.column_dict()
+ equated_columns = util.column_dict()
+
+ if reverse_direction and self.secondaryjoin is None:
+ for l, r in self.local_remote_pairs:
+ _list = lookup.setdefault(r, [])
+ _list.append((r, l))
+ equated_columns[l] = r
+ else:
+ for l, r in self.local_remote_pairs:
+ _list = lookup.setdefault(l, [])
+ _list.append((l, r))
+ equated_columns[r] = l
+
+ def col_to_bind(col):
+ if col in lookup:
+ for tobind, equated in lookup[col]:
+ if equated in binds:
+ return None
+ if col not in binds:
+ binds[col] = sql.bindparam(
+ None, None, type_=col.type, unique=True)
+ return binds[col]
+ return None
+
- if self.secondaryjoin is None or not reverse_direction:
++ lazywhere = self.deannotated_primaryjoin
+
- if self.secondaryjoin is not None:
- secondaryjoin = self.secondaryjoin
++ if self.deannotated_secondaryjoin is None or not reverse_direction:
+ lazywhere = visitors.replacement_traverse(
+ lazywhere, {}, col_to_bind)
+
++ if self.deannotated_secondaryjoin is not None:
++ secondaryjoin = self.deannotated_secondaryjoin
+ if reverse_direction:
+ secondaryjoin = visitors.replacement_traverse(
+ secondaryjoin, {}, col_to_bind)
+ lazywhere = sql.and_(lazywhere, secondaryjoin)
+
+ bind_to_col = dict((binds[col].key, col) for col in binds)
+
+ return lazywhere, bind_to_col, equated_columns
+
+
+