The Alias object no longer has "element" and "original", it now
has "wrapped" and "element" (the name .original is also left
as a descriptor for legacy access by third party dialects).
These two data members refer to the
dual roles Alias needs to play, where in the Python sense it needs
to refer to the thing it was applied against directly, whereas in the
SQL sense it needs to refer to the ultimate "non-alias" thing it
refers towards. Both are necessary to maintain. However, the change
here has each Alias object access the non-Alias object immediately
so that the "unwrapping" is simpler and does not need any special
logic.
In the SQL sense, Alias objects don't nest, the only potential
was that of the CTE, however there is no such thing as
a nested CTE, see link below.
This change is an interim change along the way to breaking Alias
into more classes and breaking away Select objects from being
FromClause objects.
Change-Id: Ie7a0d064226cb074ca745505129b5ec7d879e389
References: https://stackoverflow.com/questions/
1413516/can-you-create-nested-with-clauses-for-common-table-expressions
)
return (
- self.process(alias.original, asfrom=asfrom, **kwargs)
+ self.process(alias.element, asfrom=asfrom, **kwargs)
+ " "
+ self.preparer.format_alias(alias, alias_name)
)
else:
- return self.process(alias.original, **kwargs)
+ return self.process(alias.element, **kwargs)
def visit_substring_func(self, func, **kw):
s = self.process(func.clauses.clauses[0])
@_with_legacy_schema_aliasing
def visit_alias(self, alias, **kw):
# translate for schema-qualified table aliases
- kw["mssql_aliased"] = alias.original
+ kw["mssql_aliased"] = alias.element
return super(MSSQLCompiler, self).visit_alias(alias, **kw)
@_with_legacy_schema_aliasing
elif (
resolved._is_from_clause
and isinstance(resolved, selectable.Alias)
- and resolved.original._is_select_statement
+ and resolved.element._is_select_statement
):
self._warn_for_scalar_subquery_coercion()
- return resolved.original.scalar_subquery()
+ return resolved.element.scalar_subquery()
else:
self._raise_for_expected(original_element, argname)
if resolved._is_from_clause:
if (
isinstance(resolved, selectable.Alias)
- and resolved.original._is_select_statement
+ and resolved.element._is_select_statement
):
- return resolved.original
+ return resolved.element
else:
return resolved.select()
else:
if resolved._is_from_clause:
if (
isinstance(resolved, selectable.Alias)
- and resolved.original._is_select_statement
+ and resolved.element._is_select_statement
):
- return resolved.original
+ return resolved.element
else:
return resolved.select()
else:
if self.positional:
kwargs["positional_names"] = self.cte_positional[cte] = []
- text += " AS \n" + cte.original._compiler_dispatch(
+ text += " AS \n" + cte.element._compiler_dispatch(
self, asfrom=True, **kwargs
)
if ashint:
return self.preparer.format_alias(alias, alias_name)
elif asfrom:
- ret = alias.original._compiler_dispatch(
+ ret = alias.element._compiler_dispatch(
self, asfrom=True, **kwargs
) + self.get_render_as_alias_suffix(
self.preparer.format_alias(alias, alias_name)
return ret
else:
- return alias.original._compiler_dispatch(self, **kwargs)
+ return alias.element._compiler_dispatch(self, **kwargs)
def visit_lateral(self, lateral, **kw):
kw["lateral"] = True
)
def _init(self, selectable, name=None):
- baseselectable = selectable
- while isinstance(baseselectable, Alias):
- baseselectable = baseselectable.element
- self.original = baseselectable
- self.supports_execution = baseselectable.supports_execution
+ self.wrapped = selectable
+ if isinstance(selectable, Alias):
+ selectable = selectable.element
+ assert not isinstance(selectable, Alias)
+
+ self.supports_execution = selectable.supports_execution
if self.supports_execution:
- self._execution_options = baseselectable._execution_options
+ self._execution_options = selectable._execution_options
self.element = selectable
self._orig_name = name
if name is None:
- if self.original.named_with_column:
- name = getattr(self.original, "name", None)
+ if (
+ isinstance(selectable, FromClause)
+ and selectable.named_with_column
+ ):
+ name = getattr(selectable, "name", None)
name = _anonymous_label("%%(%d %s)s" % (id(self), name or "anon"))
self.name = name
def self_group(self, against=None):
if (
isinstance(against, CompoundSelect)
- and isinstance(self.original, Select)
- and self.original._needs_parens_for_grouping()
+ and isinstance(self.element, Select)
+ and self.element._needs_parens_for_grouping()
):
return FromGrouping(self)
else:
return self.name.encode("ascii", "backslashreplace")
+ @property
+ def original(self):
+ """legacy for dialects that are referring to Alias.original"""
+ return self.element
+
def is_derived_from(self, fromclause):
if fromclause in self._cloned_set:
return True
return self.element.is_derived_from(fromclause)
def _populate_column_collection(self):
- for col in self.element.columns._all_columns:
+ for col in self.wrapped.columns._all_columns:
col._make_proxy(self)
def _refresh_for_new_column(self, column):
- col = self.element._refresh_for_new_column(column)
+ col = self.wrapped._refresh_for_new_column(column)
if col is not None:
if not self._cols_populated:
return None
if isinstance(self.element, TableClause):
return
self._reset_exported()
- self.element = clone(self.element, **kw)
- baseselectable = self.element
- while isinstance(baseselectable, Alias):
- baseselectable = baseselectable.element
- self.original = baseselectable
+ self.wrapped = clone(self.wrapped, **kw)
+ if isinstance(self.wrapped, Alias):
+ self.element = self.wrapped.element
+ else:
+ self.element = self.wrapped
def get_children(self, column_collections=True, **kw):
if column_collections:
for c in self.c:
yield c
- yield self.element
+ yield self.wrapped
def _cache_key(self, **kw):
return (self.__class__, self.element._cache_key(**kw), self._orig_name)
def alias(self, name=None, flat=False):
return CTE._construct(
- self.original,
+ self.element,
name=name,
recursive=self.recursive,
_cte_alias=self,
def union(self, other):
return CTE._construct(
- self.original.union(other),
+ self.element.union(other),
name=self.name,
recursive=self.recursive,
_restates=self._restates.union([self]),
def union_all(self, other):
return CTE._construct(
- self.original.union_all(other),
+ self.element.union_all(other),
name=self.name,
recursive=self.recursive,
_restates=self._restates.union([self]),
s._refresh_for_new_column(q)
assert q in s.c.b_x.proxy_set
+ def test_alias_alias_samename_init(self):
+ a = table("a", column("x"))
+ b = table("b", column("y"))
+ s1 = select([a, b]).apply_labels().alias()
+ s2 = s1.alias()
+
+ s1.c
+ s2.c
+
+ q = column("x")
+ b.append_column(q)
+
+ s2._refresh_for_new_column(q)
+
+ is_(s1.corresponding_column(s2.c.b_x), s1.c.b_x)
+
def test_aliased_select_samename_uninit(self):
a = table("a", column("x"))
b = table("b", column("y"))
"SELECT t_1.c FROM t AS t_1 FOR SHARE OF t_1",
dialect="postgresql",
)
+
+
+class AliasTest(fixtures.TestBase, AssertsCompiledSQL):
+ __dialect__ = "default"
+
+ def test_legacy_original_accessor(self):
+ t = table("t", column("c"))
+ a1 = t.alias()
+ a2 = a1.alias()
+ a3 = a2.alias()
+
+ is_(a1.original, t)
+ is_(a2.original, t)
+ is_(a3.original, t)
+
+ def test_wrapped(self):
+ t = table("t", column("c"))
+ a1 = t.alias()
+ a2 = a1.alias()
+ a3 = a2.alias()
+
+ is_(a1.element, t)
+ is_(a2.element, t)
+ is_(a3.element, t)
+
+ is_(a3.wrapped, a2)
+ is_(a2.wrapped, a1)
+ is_(a1.wrapped, t)
+
+ def test_get_children_preserves_wrapped(self):
+ t = table("t", column("c"))
+ stmt = select([t])
+ a1 = stmt.alias()
+ a2 = a1.alias()
+ eq_(set(a2.get_children(column_collections=False)), {a1})
+
+ def test_wrapped_correspondence(self):
+ t = table("t", column("c"))
+ stmt = select([t])
+ a1 = stmt.alias()
+ a2 = a1.alias()
+
+ is_(a1.corresponding_column(a2.c.c), a1.c.c)
+
+ def test_copy_internals_preserves_wrapped(self):
+ t = table("t", column("c"))
+ stmt = select([t])
+ a1 = stmt.alias()
+ a2 = a1.alias()
+
+ is_(a2.element, a2.wrapped.element)
+
+ a3 = a2._clone()
+ a3._copy_internals()
+ is_(a1.corresponding_column(a3.c.c), a1.c.c)
+ is_(a3.element, a3.wrapped.element)