--- /dev/null
+.. change::
+ :tags: bug, sql
+ :tickets: 6256
+
+ Repaired and solidified issues regarding custom functions and other
+ arbitrary expression constructs which within SQLAlchemy's column labeling
+ mechanics would seek to use ``str(obj)`` to get a string representation to
+ use as an anonymous column name in the ``.c`` collection of a subquery.
+ This is a very legacy behavior that performs poorly and leads to lots of
+ issues, so has been revised to no longer perform any compilation by
+ establishing specific methods on :class:`.FunctionElement` to handle this
+ case, as SQL functions are the only use case that it came into play. An
+ effect of this behavior is that an unlabeled column expression with no
+ derivable name will be given an arbitrary label starting with the prefix
+ ``"_no_label"`` in the ``.c`` collection of a subquery; these were
+ previously being represented either as the generic stringification of that
+ expression, or as an internal symbol.
columns.append(self.process(col_expr, within_columns_clause=False))
self._add_to_result_map(
- getattr(col_expr, "name", col_expr.anon_label),
- getattr(col_expr, "name", col_expr.anon_label),
+ getattr(col_expr, "name", col_expr._anon_name_label),
+ getattr(col_expr, "name", col_expr._anon_name_label),
(
column,
getattr(column, "name", None),
fetch_strategy = _cursor.FullyBufferedCursorFetchStrategy(
self.cursor,
[
- (getattr(col, "name", col.anon_label), None)
+ (getattr(col, "name", col._anon_name_label), None)
for col in self.compiled.returning
],
initial_buffer=[tuple(returning_params)],
):
result_expr = _CompileLabel(
col_expr,
- column.anon_label
+ column._anon_name_label
if not column_is_repeated
else column._dedupe_label_anon_label,
)
elif self.key:
return self.key
else:
- try:
- return str(self)
- except exc.UnsupportedCompilationError:
- return self.anon_label
+ return getattr(self, "name", "_no_label")
def _make_proxy(
- self, selectable, name=None, name_is_truncatable=False, **kw
+ self, selectable, name=None, key=None, name_is_truncatable=False, **kw
):
"""Create a new :class:`_expression.ColumnElement` representing this
- :class:`_expression.ColumnElement`
- as it appears in the select list of a
- descending selectable.
+ :class:`_expression.ColumnElement` as it appears in the select list of
+ a descending selectable.
"""
if name is None:
- name = self.anon_label
- key = self._proxy_key
+ name = self._anon_name_label
+ if key is None:
+ key = self._proxy_key
else:
key = name
return _anonymous_label.safe_construct(hash(self), seed or "anon")
@util.memoized_property
- def anon_label(self):
+ def _anon_name_label(self):
"""Provides a constant 'anonymous label' for this ColumnElement.
This is a label() expression which will be named at compile time.
for expressions that are known to be 'unnamed' like binary
expressions and function calls.
+ .. versionchanged:: 1.4.9 - this attribute was not intended to be
+ public and is renamed to _anon_name_label. anon_name exists
+ for backwards compat
+
"""
name = getattr(self, "name", None)
return self._anon_label(name)
@util.memoized_property
- def anon_key_label(self):
+ def _anon_key_label(self):
"""Provides a constant 'anonymous key label' for this ColumnElement.
Compare to ``anon_label``, except that the "key" of the column,
This is used when a deduplicating key is placed into the columns
collection of a selectable.
+ .. versionchanged:: 1.4.9 - this attribute was not intended to be
+ public and is renamed to _anon_key_label. anon_key_label exists
+ for backwards compat
+
"""
- name = getattr(self, "key", None) or getattr(self, "name", None)
- return self._anon_label(name)
+ return self._anon_label(self._proxy_key)
+
+ @property
+ @util.deprecated(
+ "1.4",
+ "The :attr:`_expression.ColumnElement.anon_label` attribute is now "
+ "private, and the public accessor is deprecated.",
+ )
+ def anon_label(self):
+ return self._anon_name_label
+
+ @property
+ @util.deprecated(
+ "1.4",
+ "The :attr:`_expression.ColumnElement.anon_key_label` attribute is "
+ "now private, and the public accessor is deprecated.",
+ )
+ def anon_key_label(self):
+ return self._anon_key_label
@util.memoized_property
def _dedupe_anon_label(self):
return None
@property
- def anon_label(self):
+ def _anon_name_label(self):
wce = self.wrapped_column_expression
if hasattr(wce, "name"):
return wce.name
- elif hasattr(wce, "anon_label"):
- return wce.anon_label
+ elif hasattr(wce, "_anon_name_label"):
+ return wce._anon_name_label
else:
- return super(WrapsColumnExpression, self).anon_label
+ return super(WrapsColumnExpression, self)._anon_name_label
class BindParameter(roles.InElementRole, ColumnElement):
@property
def _label(self):
- return getattr(self.element, "_label", None) or self.anon_label
+ return getattr(self.element, "_label", None) or self._anon_name_label
@property
def _proxies(self):
return self._Annotated__element.info
@util.memoized_property
- def anon_label(self):
- return self._Annotated__element.anon_label
+ def _anon_name_label(self):
+ return self._Annotated__element._anon_name_label
class _truncated_label(quoted_name):
if name in names:
if not pa:
pa.append(prefix_anon_map())
- name = c.anon_key_label % pa[0]
+ name = c._anon_key_label % pa[0]
else:
names.add(name)
return (None, c, False)
elif use_tablename_labels:
if c._label is None:
- repeated = c.anon_label in names
- names[c.anon_label] = c
+ repeated = c._anon_name_label in names
+ names[c._anon_name_label] = c
return (None, c, repeated)
elif getattr(c, "name", None) is None:
# this is a scalar_select(). need to improve this case
- repeated = c.anon_label in names
- names[c.anon_label] = c
+ repeated = c._anon_name_label in names
+ names[c._anon_name_label] = c
return (None, c, repeated)
if use_tablename_labels:
if use_tablename_labels:
name = c._label_anon_label
else:
- name = c.anon_label
+ name = c._anon_name_label
if anon_for_dupe_key and name in names:
# here, c._label_anon_label is definitely unique to
if key is not None and key in keys_seen:
if pa is None:
pa = prefix_anon_map()
- key = c.anon_key_label % pa
+ key = c._anon_key_label % pa
keys_seen.add(key)
else:
key = c._proxy_key
MyType(), "POSTGRES_FOO", dialect=postgresql.dialect()
)
+ def test_no_compile_for_col_label(self):
+ class MyThingy(FunctionElement):
+ pass
+
+ @compiles(MyThingy)
+ def visit_thingy(thingy, compiler, **kw):
+ raise Exception(
+ "unfriendly exception, dont catch this, dont run this"
+ )
+
+ @compiles(MyThingy, "postgresql")
+ def visit_thingy_pg(thingy, compiler, **kw):
+ return "mythingy"
+
+ subq = select(MyThingy("text")).subquery()
+
+ stmt = select(subq)
+
+ self.assert_compile(
+ stmt,
+ "SELECT anon_2.anon_1 FROM (SELECT mythingy AS anon_1) AS anon_2",
+ dialect="postgresql",
+ )
+
def test_stateful(self):
class MyThingy(ColumnClause):
def __init__(self):
MyThingy(),
)
- def test_no_default_proxy_generation(self):
+ @testing.combinations((True,), (False,))
+ def test_no_default_proxy_generation(self, named):
class my_function(FunctionElement):
- name = "my_function"
+ if named:
+ name = "my_function"
type = Numeric()
@compiles(my_function, "sqlite")
self.assert_compile(
stmt,
- "SELECT my_function(t1.q) AS my_function_1 FROM t1",
+ "SELECT my_function(t1.q) AS my_function_1 FROM t1"
+ if named
+ else "SELECT my_function(t1.q) AS anon_1 FROM t1",
dialect="sqlite",
)
- eq_(stmt.selected_columns.keys(), [stmt._raw_columns[0].anon_label])
+ if named:
+ eq_(stmt.selected_columns.keys(), ["my_function"])
+ else:
+ eq_(stmt.selected_columns.keys(), ["_no_label"])
def test_no_default_message(self):
class MyThingy(ClauseElement):
func.lala(table1.c.name).label("gg"),
)
- eq_(list(s1.subquery().c.keys()), ["myid", "foobar", str(f1), "gg"])
+ eq_(list(s1.subquery().c.keys()), ["myid", "foobar", "hoho", "gg"])
meta = MetaData()
t1 = Table("mytable", meta, Column("col1", Integer))
)
for col, key, expr, lbl in (
(table1.c.name, "name", "mytable.name", None),
- (exprs[0], str(exprs[0]), "mytable.myid = :myid_1", "anon_1"),
- (exprs[1], str(exprs[1]), "hoho(mytable.myid)", "hoho_1"),
+ (
+ exprs[0],
+ "_no_label",
+ "mytable.myid = :myid_1",
+ "anon_1",
+ ),
+ (exprs[1], "hoho", "hoho(mytable.myid)", "hoho_1"),
(
exprs[2],
- str(exprs[2]),
+ "_no_label",
"CAST(mytable.name AS NUMERIC)",
"name", # due to [ticket:4449]
),
t = table1
s1 = select(col).select_from(t)
- assert list(s1.subquery().c.keys()) == [key], list(s1.c.keys())
+ eq_(list(s1.subquery().c.keys()), [key])
if lbl:
self.assert_compile(
"a": ("a", (t.c.a, "a", "a", "t_a"), t.c.a.type, 0),
"bar": ("bar", (l1, "bar"), l1.type, 1),
"anon_1": (
- tc.anon_label,
+ tc._anon_name_label,
(tc_anon_label, "anon_1", tc),
tc.type,
2,
vis = Vis()
s2 = vis.traverse(s1)
- eq_(list(s2.selected_columns)[0].anon_label, c1.anon_label)
+ eq_(list(s2.selected_columns)[0]._anon_name_label, c1._anon_name_label)
@testing.combinations(
("clone",), ("pickle",), ("conv_to_unique"), ("none"), argnames="meth"
select(f), "SELECT t1.col1 * :col1_1 AS anon_1 FROM t1"
)
- f.anon_label
+ f._anon_name_label
a = t.alias()
f = sql_util.ClauseAdapter(a).traverse(f)
# anon_label, e.g. a truncated_label, is used here because
# the expr has no name, no key, and myop() can't create a
# string, so this is the last resort
- eq_(s.selected_columns.keys(), ["x", "y", expr.anon_label])
+ eq_(s.selected_columns.keys(), ["x", "y", "_no_label"])
s = select(t, expr).subquery()
- eq_(s.c.keys(), ["x", "y", expr.anon_label])
+ eq_(s.c.keys(), ["x", "y", "_no_label"])
def test_cloned_intersection(self):
t1 = table("t1", column("x"))
cloned, literal_column("2").label("b")
)
cloned.add_columns.non_generative(cloned, func.foo())
- eq_(list(cloned.selected_columns.keys()), ["a", "b", "foo()"])
+ eq_(list(cloned.selected_columns.keys()), ["a", "b", "foo"])
def test_clone_col_list_changes_then_proxy(self):
t = table("t", column("q"), column("p"))
# TODO: failing due to proxy_set not correct
assert u1.corresponding_column(table1_new.c.col2) is u1.c.col2
+ def test_unnamed_exprs_keys(self):
+ s1 = select(
+ table1.c.col1 == 5,
+ table1.c.col1 == 10,
+ func.count(table1.c.col1),
+ literal_column("x"),
+ ).subquery()
+
+ eq_(s1.c.keys(), ["_no_label", "_no_label_1", "count", "x"])
+
def test_union_alias_dupe_keys(self):
s1 = select(table1.c.col1, table1.c.col2, table2.c.col1)
s2 = select(table2.c.col1, table2.c.col2, table2.c.col3)
eq_(y_a.key, "q")
is_(x_a.table, t)
eq_(x_a.info, {"q": "p", "z": "h"})
- eq_(t.c.x.anon_label, x_a.anon_label)
+ eq_(t.c.x._anon_name_label, x_a._anon_name_label)
def test_custom_constructions(self):
from sqlalchemy.schema import Column