)
aliased_adapter = None
elif ext_info.is_aliased_class:
- aliased_adapter = sql_util.ColumnAdapter(
- ext_info.selectable,
- ext_info.mapper._equivalent_columns
- )
+ aliased_adapter = ext_info._adapter
else:
aliased_adapter = None
clauses = orm_util.ORMAdapter(
to_adapt,
equivalents=self.mapper._equivalent_columns,
- adapt_required=True, allow_label_resolve=False)
+ adapt_required=True, allow_label_resolve=False,
+ anonymize_labels=True)
assert clauses.aliased_class is not None
if self.parent_property.direction != interfaces.MANYTOONE:
"""
def __init__(self, entity, equivalents=None, adapt_required=False,
- chain_to=None, allow_label_resolve=True):
+ chain_to=None, allow_label_resolve=True,
+ anonymize_labels=False):
info = inspection.inspect(entity)
self.mapper = info.mapper
sql_util.ColumnAdapter.__init__(
self, selectable, equivalents, chain_to,
adapt_required=adapt_required,
- allow_label_resolve=allow_label_resolve)
+ allow_label_resolve=allow_label_resolve,
+ anonymize_labels=anonymize_labels)
def replace(self, elem):
entity = elem._annotations.get('parentmapper', None)
if alias is None:
alias = mapper._with_polymorphic_selectable.alias(
name=name, flat=flat)
+
self._aliased_insp = AliasedInsp(
self,
mapper,
self._base_alias = _base_alias or self
self._use_mapper_path = _use_mapper_path
- self._adapter = sql_util.ClauseAdapter(
+ self._adapter = sql_util.ColumnAdapter(
selectable, equivalents=mapper._equivalent_columns,
- adapt_on_names=adapt_on_names)
+ adapt_on_names=adapt_on_names, anonymize_labels=True)
self._adapt_on_names = adapt_on_names
self._target = mapper.class_
# here; we can only add a label in the ORDER BY for an individual
# label expression in the columns clause.
- # TODO: we should see if we can bring _resolve_label
- # into this
-
-
- raw_col = set(l._order_by_label_element.name
- for l in order_by_select._raw_columns
- if l._order_by_label_element is not None)
+ raw_col = set(order_by_select._label_resolve_dict.keys())
return ", ".join(
s for s in
self,
render_label_as_label=c._order_by_label_element if
c._order_by_label_element is not None and
- c._order_by_label_element.name in raw_col
+ c._order_by_label_element._label in raw_col
else None,
**kw)
for c in clauselist.clauses)
"""
+ _allow_label_resolve = True
+ """A flag that can be flipped to prevent a column from being resolvable
+ by string label name."""
+
_alt_names = ()
def self_group(self, against=None):
else:
return super(ColumnElement, self)._negate()
- _allow_label_resolve = True
-
@util.memoized_property
def type(self):
return type_api.NULLTYPE
# interpreted in a column expression situation
key = _label = _resolve_label = None
+ _allow_label_resolve = False
+
def __init__(
self,
text,
def get_children(self, **kwargs):
return self.element,
- def _copy_internals(self, clone=_clone, **kw):
+ def _copy_internals(self, clone=_clone, anonymize_labels=False, **kw):
self.element = clone(self.element, **kw)
+ if anonymize_labels:
+ self.name = _anonymous_label(
+ '%%(%d %s)s' % (
+ id(self), getattr(self.element, 'name', 'anon'))
+ )
+ self.key = self._label = self._key_label = self.name
@property
def _from_objects(self):
def __init__(self, selectable, equivalents=None,
include=None, exclude=None,
include_fn=None, exclude_fn=None,
- adapt_on_names=False):
- self.__traverse_options__ = {'stop_on': [selectable]}
+ adapt_on_names=False, anonymize_labels=False):
+ self.__traverse_options__ = {
+ 'stop_on': [selectable],
+ 'anonymize_labels': anonymize_labels}
self.selectable = selectable
if include:
assert not include_fn
def __init__(self, selectable, equivalents=None,
chain_to=None, include=None,
exclude=None, adapt_required=False,
- allow_label_resolve=True):
+ adapt_on_names=False,
+ allow_label_resolve=True,
+ anonymize_labels=False):
ClauseAdapter.__init__(self, selectable, equivalents,
- include, exclude)
+ include, exclude,
+ adapt_on_names=adapt_on_names,
+ anonymize_labels=anonymize_labels)
+
if chain_to:
self.chain(chain_to)
self.columns = util.populate_column_dict(self._locate_col)
ac.columns = util.populate_column_dict(ac._locate_col)
return ac
- adapt_clause = ClauseAdapter.traverse
+ def traverse(self, obj):
+ new_obj = ClauseAdapter.traverse(self, obj)
+ if new_obj is not obj:
+ self.columns[obj] = new_obj
+ return new_obj
+
+ adapt_clause = traverse
adapt_list = ClauseAdapter.copy_and_process
def _wrap(self, local, wrapped):
if c is None:
c = self.adapt_clause(col)
- # anonymize labels in case they have a hardcoded name
- # see test_eager_relations.py -> SubqueryTest.test_label_anonymizing
- if isinstance(c, Label):
- c = c.label(None)
-
# adapt_required used by eager loading to indicate that
# we don't trust a result row column that is not translated.
# this is to prevent a column from being interpreted as that
"WHERE c.bid = anon_1.b_aid"
)
+ t1 = table("table1",
+ column("col1"),
+ column("col2"),
+ column("col3"),
+ )
+ t2 = table("table2",
+ column("col1"),
+ column("col2"),
+ column("col3"),
+ )
+
+ def test_label_anonymize_one(self):
+ t1a = t1.alias()
+ adapter = sql_util.ClauseAdapter(t1a, anonymize_labels=True)
+
+ expr = select([t1.c.col2]).where(t1.c.col3 == 5).label('expr')
+ expr_adapted = adapter.traverse(expr)
+
+ stmt = select([expr, expr_adapted]).order_by(expr, expr_adapted)
+ self.assert_compile(
+ stmt,
+ "SELECT "
+ "(SELECT table1.col2 FROM table1 WHERE table1.col3 = :col3_1) "
+ "AS expr, "
+ "(SELECT table1_1.col2 FROM table1 AS table1_1 "
+ "WHERE table1_1.col3 = :col3_2) AS anon_1 "
+ "ORDER BY expr, anon_1"
+ )
+
+ def test_label_anonymize_two(self):
+ t1a = t1.alias()
+ adapter = sql_util.ClauseAdapter(t1a, anonymize_labels=True)
+
+ expr = select([t1.c.col2]).where(t1.c.col3 == 5).label(None)
+ expr_adapted = adapter.traverse(expr)
+
+ stmt = select([expr, expr_adapted]).order_by(expr, expr_adapted)
+ self.assert_compile(
+ stmt,
+ "SELECT "
+ "(SELECT table1.col2 FROM table1 WHERE table1.col3 = :col3_1) "
+ "AS anon_1, "
+ "(SELECT table1_1.col2 FROM table1 AS table1_1 "
+ "WHERE table1_1.col3 = :col3_2) AS anon_2 "
+ "ORDER BY anon_1, anon_2"
+ )
class SpliceJoinsTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = 'default'
"""Test the TextClause and related constructs."""
from sqlalchemy.testing import fixtures, AssertsCompiledSQL, eq_, \
- assert_raises_message, expect_warnings
+ assert_raises_message, expect_warnings, assert_warnings
from sqlalchemy import text, select, Integer, String, Float, \
bindparam, and_, func, literal_column, exc, MetaData, Table, Column,\
asc, func, desc, union
"somelabel DESC"
)
- def test_anonymized_via_columnadapter(self):
+ def test_columnadapter_anonymized(self):
+ """test issue #3148
+
+ Testing the anonymization applied from the ColumnAdapter.columns
+ collection, typically as used in eager loading.
+
+ """
+ exprs = [
+ table1.c.myid,
+ table1.c.name.label('t1name'),
+ func.foo("hoho").label('x')]
+
+ ta = table1.alias()
+ adapter = sql_util.ColumnAdapter(ta, anonymize_labels=True)
+
+ s1 = select([adapter.columns[expr] for expr in exprs]).\
+ apply_labels().order_by("myid", "t1name", "x")
+
+ def go():
+ # the labels here are anonymized, so label naming
+ # can't catch these.
+ self.assert_compile(
+ s1,
+ "SELECT mytable_1.myid AS mytable_1_myid, "
+ "mytable_1.name AS name_1, foo(:foo_2) AS foo_1 "
+ "FROM mytable AS mytable_1 ORDER BY mytable_1.myid, t1name, x"
+ )
+
+ assert_warnings(
+ go,
+ ["Can't resolve label reference 't1name'",
+ "Can't resolve label reference 'x'"], regex=True)
+
+ def test_columnadapter_non_anonymized(self):
"""test issue #3148
Testing the anonymization applied from the ColumnAdapter.columns
s1 = select([adapter.columns[expr] for expr in exprs]).\
apply_labels().order_by("myid", "t1name", "x")
- # our "t1name" and "x" labels get modified
+ # labels are maintained
self.assert_compile(
s1,
"SELECT mytable_1.myid AS mytable_1_myid, "
- "mytable_1.name AS name_1, foo(:foo_2) AS foo_1 "
- "FROM mytable AS mytable_1 ORDER BY mytable_1.myid, name_1, foo_1"
+ "mytable_1.name AS t1name, foo(:foo_1) AS x "
+ "FROM mytable AS mytable_1 ORDER BY mytable_1.myid, t1name, x"
)
+