del self.level_name_by_cte[existing_cte_reference_cte]
else:
- # if the two CTEs are deep-copy identical, consider them
- # the same, **if** they are clones, that is, they came from
- # the ORM or other visit method
if (
- cte._is_clone_of is not None
- or existing_cte._is_clone_of is not None
- ) and cte.compare(existing_cte):
+ # if the two CTEs have the same hash, which we expect
+ # here means that one/both is an annotated of the other
+ (hash(cte) == hash(existing_cte))
+ # or...
+ or (
+ (
+ # if they are clones, i.e. they came from the ORM
+ # or some other visit method
+ cte._is_clone_of is not None
+ or existing_cte._is_clone_of is not None
+ )
+ # and are deep-copy identical
+ and cte.compare(existing_cte)
+ )
+ ):
+ # then consider these two CTEs the same
is_new_cte = False
else:
+ # otherwise these are two CTEs that either will render
+ # differently, or were indicated separately by the user,
+ # with the same name
raise exc.CompileError(
"Multiple, unrelated CTEs found with "
"the same name: %r" % cte_name
from sqlalchemy import testing
from sqlalchemy import text
from sqlalchemy import true
+from sqlalchemy import union_all
from sqlalchemy import update
from sqlalchemy.dialects import mssql
from sqlalchemy.engine import default
)
@testing.combinations(True, False, argnames="identical")
- @testing.combinations(True, False, argnames="use_clone")
- def test_conflicting_names(self, identical, use_clone):
+ @testing.variation("clone_type", ["none", "clone", "annotated"])
+ def test_conflicting_names(self, identical, clone_type):
"""test a flat out name conflict."""
s1 = select(1)
c1 = s1.cte(name="cte1", recursive=True)
- if use_clone:
+ if clone_type.clone:
c2 = c1._clone()
if not identical:
c2 = c2.union(select(2))
+ elif clone_type.annotated:
+ # this does not seem to trigger the issue that was fixed in
+ # #12364 howver is still a worthy test
+ c2 = c1._annotate({"foo": "bar"})
+ if not identical:
+ c2 = c2.union(select(2))
else:
if identical:
s2 = select(1)
s = select(c1, c2)
- if use_clone and identical:
+ if clone_type.clone and identical:
self.assert_compile(
s,
'WITH RECURSIVE cte1("1") AS (SELECT 1) SELECT cte1.1, '
'cte1.1 AS "1_1" FROM cte1',
)
+ elif clone_type.annotated and identical:
+ # annotated seems to have a slightly different rendering
+ # scheme here
+ self.assert_compile(
+ s,
+ 'WITH RECURSIVE cte1("1") AS (SELECT 1) SELECT cte1.1, '
+ 'cte1.1 AS "1__1" FROM cte1',
+ )
else:
assert_raises_message(
CompileError,
s.compile,
)
+ @testing.variation("annotated", [True, False])
+ def test_cte_w_annotated(self, annotated):
+ """test #12364"""
+
+ A = table("a", column("i"), column("j"))
+ B = table("b", column("i"), column("j"))
+
+ a = select(A).where(A.c.i > A.c.j).cte("filtered_a")
+
+ if annotated:
+ a = a._annotate({"foo": "bar"})
+
+ a1 = select(a.c.i, literal(1).label("j"))
+ b = select(B).join(a, a.c.i == B.c.i).where(B.c.j.is_not(None))
+
+ query = union_all(a1, b)
+ self.assert_compile(
+ query,
+ "WITH filtered_a AS "
+ "(SELECT a.i AS i, a.j AS j FROM a WHERE a.i > a.j) "
+ "SELECT filtered_a.i, :param_1 AS j FROM filtered_a "
+ "UNION ALL SELECT b.i, b.j "
+ "FROM b JOIN filtered_a ON filtered_a.i = b.i "
+ "WHERE b.j IS NOT NULL",
+ )
+
def test_with_recursive_no_name_currently_buggy(self):
s1 = select(1)
c1 = s1.cte(name="cte1", recursive=True)