From: Mike Bayer Date: Thu, 18 Feb 2021 15:43:16 +0000 (-0500) Subject: Extract table names when comparing to nrte error X-Git-Tag: rel_1_3_24~18 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=38fb3d60284d35dc9446dd9a07f6ce48a177314b;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git Extract table names when comparing to nrte error Fixed issue where the process of joining two tables could fail if one of the tables had an unrelated, unresolvable foreign key constraint which would raise :class:`_exc.NoReferenceError` within the join process, which nonetheless could be bypassed to allow the join to complete. The logic which tested the exception for signficance within the process would make assumptions about the construct which would fail. Fixes: #5952 Change-Id: I492dacd082ddcf8abb1310ed447a6ed734595bb7 (cherry picked from commit 498db831718cb5df213b1afdd2027878e0e72fd4) --- diff --git a/doc/build/changelog/unreleased_13/5952.rst b/doc/build/changelog/unreleased_13/5952.rst new file mode 100644 index 0000000000..7166e92675 --- /dev/null +++ b/doc/build/changelog/unreleased_13/5952.rst @@ -0,0 +1,11 @@ +.. change:: + :tags: bug, orm + :tickets: 5952 + + Fixed issue where the process of joining two tables could fail if one of + the tables had an unrelated, unresolvable foreign key constraint which + would raise :class:`_exc.NoReferenceError` within the join process, which + nonetheless could be bypassed to allow the join to complete. The logic + which tested the exception for signficance within the process would make + assumptions about the construct which would fail. + diff --git a/lib/sqlalchemy/sql/selectable.py b/lib/sqlalchemy/sql/selectable.py index 4c9348e662..37187c1949 100644 --- a/lib/sqlalchemy/sql/selectable.py +++ b/lib/sqlalchemy/sql/selectable.py @@ -1002,8 +1002,9 @@ class Join(FromClause): return bool(constraints) @classmethod + @util.dependencies("sqlalchemy.sql.util") def _joincond_scan_left_right( - cls, a, a_subset, b, consider_as_foreign_keys + cls, sql_util, a, a_subset, b, consider_as_foreign_keys ): constraints = collections.defaultdict(list) @@ -1021,7 +1022,8 @@ class Join(FromClause): try: col = fk.get_referent(left) except exc.NoReferenceError as nrte: - if nrte.table_name == left.name: + table_names = {t.name for t in sql_util.find_tables(left)} + if nrte.table_name in table_names: raise else: continue @@ -1040,7 +1042,8 @@ class Join(FromClause): try: col = fk.get_referent(b) except exc.NoReferenceError as nrte: - if nrte.table_name == b.name: + table_names = {t.name for t in sql_util.find_tables(b)} + if nrte.table_name in table_names: raise else: continue diff --git a/test/sql/test_selectable.py b/test/sql/test_selectable.py index fd8b997c42..d15e3fa21a 100644 --- a/test/sql/test_selectable.py +++ b/test/sql/test_selectable.py @@ -1334,6 +1334,55 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL): assert sql_util.join_condition(t1, t2).compare(t1.c.x == t2.c.id) assert sql_util.join_condition(t2, t1).compare(t1.c.x == t2.c.id) + def test_join_cond_no_such_unrelated_table_dont_compare_names(self): + m = MetaData() + t1 = Table( + "t1", + m, + Column("y", Integer, ForeignKey("t22.id")), + Column("x", Integer, ForeignKey("t2.id")), + Column("q", Integer, ForeignKey("t22.id")), + ) + t2 = Table( + "t2", + m, + Column("id", Integer), + Column("t3id", ForeignKey("t3.id")), + Column("z", ForeignKey("t33.id")), + ) + t3 = Table( + "t3", m, Column("id", Integer), Column("q", ForeignKey("t4.id")) + ) + + j1 = t1.join(t2) + + assert sql_util.join_condition(j1, t3).compare(t2.c.t3id == t3.c.id) + + def test_join_cond_no_such_unrelated_column_dont_compare_names(self): + m = MetaData() + t1 = Table( + "t1", + m, + Column("x", Integer, ForeignKey("t2.id")), + ) + t2 = Table( + "t2", + m, + Column("id", Integer), + Column("t3id", ForeignKey("t3.id")), + Column("q", ForeignKey("t5.q")), + ) + t3 = Table( + "t3", m, Column("id", Integer), Column("t4id", ForeignKey("t4.id")) + ) + t4 = Table("t4", m, Column("id", Integer)) + Table("t5", m, Column("id", Integer)) + j1 = t1.join(t2) + + j2 = t3.join(t4) + + assert sql_util.join_condition(j1, j2).compare(t2.c.t3id == t3.c.id) + def test_join_cond_no_such_related_table(self): m1 = MetaData() m2 = MetaData()