]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- cyclomatic complexity: _join_condition goes from E to a B
authorMike Bayer <mike_mp@zzzcomputing.com>
Sat, 27 Sep 2014 20:46:23 +0000 (16:46 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sat, 27 Sep 2014 22:15:21 +0000 (18:15 -0400)
lib/sqlalchemy/sql/selectable.py

index 24804866214608b5314b27751ceb73371b658365..b4df87e5435a6e33268c95d1d2378285726fcae2 100644 (file)
@@ -746,6 +746,33 @@ class Join(FromClause):
         providing a "natural join".
 
         """
+        constraints = cls._joincond_scan_left_right(
+            a, a_subset, b, consider_as_foreign_keys)
+
+        if len(constraints) > 1:
+            cls._joincond_trim_constraints(
+                a, b, constraints, consider_as_foreign_keys)
+
+        if len(constraints) == 0:
+            if isinstance(b, FromGrouping):
+                hint = " Perhaps you meant to convert the right side to a "\
+                    "subquery using alias()?"
+            else:
+                hint = ""
+            raise exc.NoForeignKeysError(
+                "Can't find any foreign key relationships "
+                "between '%s' and '%s'.%s" %
+                (a.description, b.description, hint))
+
+        crit = [(x == y) for x, y in list(constraints.values())[0]]
+        if len(crit) == 1:
+            return (crit[0])
+        else:
+            return and_(*crit)
+
+    @classmethod
+    def _joincond_scan_left_right(
+            cls, a, a_subset, b, consider_as_foreign_keys):
         constraints = collections.defaultdict(list)
 
         for left in (a_subset, a):
@@ -780,57 +807,41 @@ class Join(FromClause):
                         if nrte.table_name == b.name:
                             raise
                         else:
-                            # this is totally covered.  can't get
-                            # coverage to mark it.
                             continue
 
                     if col is not None:
                         constraints[fk.constraint].append((col, fk.parent))
             if constraints:
                 break
+        return constraints
 
+    @classmethod
+    def _joincond_trim_constraints(
+            cls, a, b, constraints, consider_as_foreign_keys):
+        # more than one constraint matched.  narrow down the list
+        # to include just those FKCs that match exactly to
+        # "consider_as_foreign_keys".
+        if consider_as_foreign_keys:
+            for const in list(constraints):
+                if set(f.parent for f in const.elements) != set(
+                        consider_as_foreign_keys):
+                    del constraints[const]
+
+        # if still multiple constraints, but
+        # they all refer to the exact same end result, use it.
         if len(constraints) > 1:
-            # more than one constraint matched.  narrow down the list
-            # to include just those FKCs that match exactly to
-            # "consider_as_foreign_keys".
-            if consider_as_foreign_keys:
-                for const in list(constraints):
-                    if set(f.parent for f in const.elements) != set(
-                            consider_as_foreign_keys):
-                        del constraints[const]
-
-            # if still multiple constraints, but
-            # they all refer to the exact same end result, use it.
-            if len(constraints) > 1:
-                dedupe = set(tuple(crit) for crit in constraints.values())
-                if len(dedupe) == 1:
-                    key = list(constraints)[0]
-                    constraints = {key: constraints[key]}
-
-            if len(constraints) != 1:
-                raise exc.AmbiguousForeignKeysError(
-                    "Can't determine join between '%s' and '%s'; "
-                    "tables have more than one foreign key "
-                    "constraint relationship between them. "
-                    "Please specify the 'onclause' of this "
-                    "join explicitly." % (a.description, b.description))
-
-        if len(constraints) == 0:
-            if isinstance(b, FromGrouping):
-                hint = " Perhaps you meant to convert the right side to a "\
-                    "subquery using alias()?"
-            else:
-                hint = ""
-            raise exc.NoForeignKeysError(
-                "Can't find any foreign key relationships "
-                "between '%s' and '%s'.%s" %
-                (a.description, b.description, hint))
-
-        crit = [(x == y) for x, y in list(constraints.values())[0]]
-        if len(crit) == 1:
-            return (crit[0])
-        else:
-            return and_(*crit)
+            dedupe = set(tuple(crit) for crit in constraints.values())
+            if len(dedupe) == 1:
+                key = list(constraints)[0]
+                constraints = {key: constraints[key]}
+
+        if len(constraints) != 1:
+            raise exc.AmbiguousForeignKeysError(
+                "Can't determine join between '%s' and '%s'; "
+                "tables have more than one foreign key "
+                "constraint relationship between them. "
+                "Please specify the 'onclause' of this "
+                "join explicitly." % (a.description, b.description))
 
     def select(self, whereclause=None, **kwargs):
         """Create a :class:`.Select` from this :class:`.Join`.