return itertools.chain(*[x._cloned_set for x in elements])
+def _de_clone(
+ elements: Iterable[_CLE],
+) -> Iterable[_CLE]:
+ for x in elements:
+ while x._is_clone_of is not None:
+ x = x._is_clone_of
+ yield x
+
+
def _cloned_intersection(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
"""return the intersection of sets a and b, counting
any overlap between 'cloned' predecessors.
from . import util as sql_util
from ._typing import is_column_element
from ._typing import is_dml
+from .base import _de_clone
from .base import _from_objects
from .base import _NONE_NAME
from .base import _SentinelDefaultCharacterization
enclosing_lateral = kw["enclosing_lateral"]
lateral_from_linter.edges.update(
itertools.product(
- binary.left._from_objects + [enclosing_lateral],
- binary.right._from_objects + [enclosing_lateral],
+ _de_clone(
+ binary.left._from_objects + [enclosing_lateral]
+ ),
+ _de_clone(
+ binary.right._from_objects + [enclosing_lateral]
+ ),
)
)
else:
from_linter.edges.update(
itertools.product(
- binary.left._from_objects, binary.right._from_objects
+ _de_clone(binary.left._from_objects),
+ _de_clone(binary.right._from_objects),
)
)
if asfrom:
if from_linter:
- from_linter.froms[cte] = cte_name
+ from_linter.froms[cte._de_clone()] = cte_name
if not is_new_cte and embedded_in_current_named_cte:
return self.preparer.format_alias(cte, cte_name) # type: ignore[no-any-return] # noqa: E501
return self.preparer.format_alias(alias, alias_name)
elif asfrom:
if from_linter:
- from_linter.froms[alias] = alias_name
+ from_linter.froms[alias._de_clone()] = alias_name
inner = alias.element._compiler_dispatch(
self, asfrom=True, lateral=lateral, **kwargs
if asfrom:
if from_linter:
- from_linter.froms[element] = (
+ from_linter.froms[element._de_clone()] = (
name if name is not None else "(unnamed VALUES element)"
)
if from_linter:
from_linter.edges.update(
itertools.product(
- join.left._from_objects, join.right._from_objects
+ _de_clone(join.left._from_objects),
+ _de_clone(join.right._from_objects),
)
)
self.c = self.tables.table_c
self.d = self.tables.table_d
+ @testing.variation(
+ "what_to_clone", ["nothing", "fromclause", "whereclause", "both"]
+ )
+ def test_cloned_aliases(self, what_to_clone):
+ a1 = self.a.alias()
+ b1 = self.b.alias()
+ c = self.c
+
+ j1 = a1.join(b1, a1.c.col_a == b1.c.col_b)
+ j1_from = j1
+ b1_where = b1
+
+ if what_to_clone.fromclause or what_to_clone.both:
+ a1c = a1._clone()
+ b1c = b1._clone()
+ j1_from = a1c.join(b1c, a1c.c.col_a == b1c.c.col_b)
+
+ if what_to_clone.whereclause or what_to_clone.both:
+ b1_where = b1_where._clone()
+
+ query = (
+ select(c)
+ .select_from(c, j1_from)
+ .where(b1_where.c.col_b == c.c.col_c)
+ )
+ for start in None, c:
+ froms, start = find_unmatching_froms(query, start)
+ assert not froms
+
def test_everything_is_connected(self):
query = (
select(self.a)