Fixed issue where referencing a CTE multiple times in conjunction with a
polymorphic SELECT could result in multiple "clones" of the same CTE being
constructed, which would then trigger these two CTEs as duplicates. To
resolve, the two CTEs are deep-compared when this occurs to ensure that
they are equivalent, then are treated as equivalent.
Fixes: #8357
Change-Id: I1f634a9cf7a6c4256912aac1a00506aecea3b0e2
(cherry picked from commit
85fa363c846f4ed287565c43c32e2cca29470e25)
--- /dev/null
+.. change::
+ :tags: bug, orm
+ :tickets: 8357
+
+ Fixed issue where referencing a CTE multiple times in conjunction with a
+ polymorphic SELECT could result in multiple "clones" of the same CTE being
+ constructed, which would then trigger these two CTEs as duplicates. To
+ resolve, the two CTEs are deep-compared when this occurs to ensure that
+ they are equivalent, then are treated as equivalent.
+
del self.level_name_by_cte[existing_cte_reference_cte]
else:
- raise exc.CompileError(
- "Multiple, unrelated CTEs found with "
- "the same name: %r" % cte_name
- )
+ # if the two CTEs are deep-copy identical, consider them
+ # the same, **if** they are clones, that is, they came from
+ # the ORM or other visit method
+ if (
+ cte._is_clone_of is not None
+ or existing_cte._is_clone_of is not None
+ ) and cte.compare(existing_cte):
+ is_new_cte = False
+ else:
+ raise exc.CompileError(
+ "Multiple, unrelated CTEs found with "
+ "the same name: %r" % cte_name
+ )
if not asfrom and not is_new_cte:
return None
from sqlalchemy import desc
from sqlalchemy import exc as sa_exc
+from sqlalchemy import exists
from sqlalchemy import func
from sqlalchemy import select
from sqlalchemy import testing
)
e1, e2, e3, b1, m1 = cls.e1, cls.e2, cls.e3, cls.b1, cls.m1
+ @testing.requires.ctes
+ def test_cte_clone_issue(self):
+ """test #8357"""
+
+ sess = fixture_session()
+
+ cte = select(Engineer.person_id).cte(name="test_cte")
+
+ stmt = (
+ select(Engineer)
+ .where(exists().where(Engineer.person_id == cte.c.person_id))
+ .where(exists().where(Engineer.person_id == cte.c.person_id))
+ ).order_by(Engineer.person_id)
+
+ self.assert_compile(
+ stmt,
+ "WITH test_cte AS (SELECT engineers.person_id AS person_id "
+ "FROM people JOIN engineers ON people.person_id = "
+ "engineers.person_id) SELECT engineers.person_id, "
+ "people.person_id AS person_id_1, people.company_id, "
+ "people.name, people.type, engineers.status, "
+ "engineers.engineer_name, engineers.primary_language FROM people "
+ "JOIN engineers ON people.person_id = engineers.person_id WHERE "
+ "(EXISTS (SELECT * FROM test_cte WHERE engineers.person_id = "
+ "test_cte.person_id)) AND (EXISTS (SELECT * FROM test_cte "
+ "WHERE engineers.person_id = test_cte.person_id)) "
+ "ORDER BY engineers.person_id",
+ )
+ result = sess.scalars(stmt)
+ eq_(
+ result.all(),
+ [
+ Engineer(name="dilbert"),
+ Engineer(name="wally"),
+ Engineer(name="vlad"),
+ ],
+ )
+
def test_loads_at_once(self):
"""
Test that all objects load from the full query, when
"SELECT cs1.x, cs2.x AS x_1 FROM bar AS cs1, cte AS cs2",
)
- def test_conflicting_names(self):
+ @testing.combinations(True, False, argnames="identical")
+ @testing.combinations(True, False, argnames="use_clone")
+ def test_conflicting_names(self, identical, use_clone):
"""test a flat out name conflict."""
s1 = select(1)
c1 = s1.cte(name="cte1", recursive=True)
- s2 = select(1)
- c2 = s2.cte(name="cte1", recursive=True)
+ if use_clone:
+ c2 = c1._clone()
+ if not identical:
+ c2 = c2.union(select(2))
+ else:
+ if identical:
+ s2 = select(1)
+ else:
+ s2 = select(column("q"))
+ c2 = s2.cte(name="cte1", recursive=True)
s = select(c1, c2)
- assert_raises_message(
- CompileError,
- "Multiple, unrelated CTEs found " "with the same name: 'cte1'",
- s.compile,
- )
+
+ if use_clone and identical:
+ self.assert_compile(
+ s,
+ 'WITH RECURSIVE cte1("1") AS (SELECT 1) SELECT cte1.1, '
+ 'cte1.1 AS "1_1" FROM cte1',
+ )
+ else:
+ assert_raises_message(
+ CompileError,
+ "Multiple, unrelated CTEs found " "with the same name: 'cte1'",
+ s.compile,
+ )
def test_with_recursive_no_name_currently_buggy(self):
s1 = select(1)