class Immutable(object):
"""mark a ClauseElement as 'immutable' when expressions are cloned."""
+ _is_immutable = True
+
def unique_params(self, *optionaldict, **kwargs):
raise NotImplementedError("Immutable objects do not support copying")
class SingletonConstant(Immutable):
"""Represent SQL constants like NULL, TRUE, FALSE"""
+ _is_singleton_constant = True
+
def __new__(cls, *arg, **kw):
return cls._singleton
def _create_singleton(cls):
obj = object.__new__(cls)
obj.__init__()
- cls._singleton = obj
- # don't proxy singletons. this means that a SingletonConstant
- # will never be a "corresponding column" in a statement; the constant
- # can be named directly and as it is often/usually compared against using
- # "IS", it can't be adapted to a subquery column in any case.
- # see :ticket:`6259`.
- proxy_set = frozenset()
+ # for a long time this was an empty frozenset, meaning
+ # a SingletonConstant would never be a "corresponding column" in
+ # a statement. This referred to #6259. However, in #7154 we see
+ # that we do in fact need "correspondence" to work when matching cols
+ # in result sets, so the non-correspondence was moved to a more
+ # specific level when we are actually adapting expressions for SQL
+ # render only.
+ obj.proxy_set = frozenset([obj])
+ cls._singleton = obj
def _from_objects(*elements):
_is_bind_parameter = False
_is_clause_list = False
_is_lambda_element = False
+ _is_singleton_constant = False
+ _is_immutable = False
_order_by_label_element = None
"""
return Label(name, self, self.type)
- def _anon_label(self, seed):
+ def _anon_label(self, seed, add_hash=None):
while self._is_clone_of is not None:
self = self._is_clone_of
# as of 1.4 anonymous label for ColumnElement uses hash(), not id(),
# as the identifier, because a column and its annotated version are
# the same thing in a SQL statement
+ hash_value = hash(self)
+
+ if add_hash:
+ # this path is used for disambiguating anon labels that would
+ # otherwise be the same name for the same element repeated.
+ # an additional numeric value is factored in for each label.
+
+ # shift hash(self) (which is id(self), typically 8 byte integer)
+ # 16 bits leftward. fill extra add_hash on right
+ assert add_hash < (2 << 15)
+ assert seed
+ hash_value = (hash_value << 16) | add_hash
+
+ # extra underscore is added for labels with extra hash
+ # values, to isolate the "deduped anon" namespace from the
+ # regular namespace. eliminates chance of these
+ # manufactured hash values overlapping with regular ones for some
+ # undefined python interpreter
+ seed = seed + "_"
+
if isinstance(seed, _anonymous_label):
return _anonymous_label.safe_construct(
- hash(self), "", enclosing_label=seed
+ hash_value, "", enclosing_label=seed
)
- return _anonymous_label.safe_construct(hash(self), seed or "anon")
+ return _anonymous_label.safe_construct(hash_value, seed or "anon")
@util.memoized_property
def _anon_name_label(self):
def anon_key_label(self):
return self._anon_key_label
- @util.memoized_property
- def _dedupe_anon_label(self):
+ def _dedupe_anon_label_idx(self, idx):
"""label to apply to a column that is anon labeled, but repeated
in the SELECT, so that we have to make an "extra anon" label that
disambiguates it from the previous appearance.
# "CAST(casttest.v1 AS DECIMAL) AS anon__1"
if label is None:
- return self._dedupe_anon_tq_label
+ return self._dedupe_anon_tq_label_idx(idx)
else:
- return self._anon_label(label + "_")
+ return self._anon_label(label, add_hash=idx)
@util.memoized_property
def _anon_tq_label(self):
def _anon_tq_key_label(self):
return self._anon_label(getattr(self, "_tq_key_label", None))
- @util.memoized_property
- def _dedupe_anon_tq_label(self):
+ def _dedupe_anon_tq_label_idx(self, idx):
label = getattr(self, "_tq_label", None) or "anon"
- return self._anon_label(label + "_")
+
+ return self._anon_label(label, add_hash=idx)
class WrapsColumnExpression(object):
return wce._anon_name_label
return super(WrapsColumnExpression, self)._anon_name_label
- @property
- def _dedupe_anon_label(self):
+ def _dedupe_anon_label_idx(self, idx):
wce = self.wrapped_column_expression
nal = wce._non_anon_label
if nal:
return self._anon_label(nal + "_")
else:
- return self._dedupe_anon_tq_label
+ return self._dedupe_anon_tq_label_idx(idx)
class BindParameter(roles.InElementRole, ColumnElement):
table_qualified = self._label_style is LABEL_STYLE_TABLENAME_PLUS_COL
label_style_none = self._label_style is LABEL_STYLE_NONE
+ # a counter used for "dedupe" labels, which have double underscores
+ # in them and are never referred by name; they only act
+ # as positional placeholders. they need only be unique within
+ # the single columns clause they're rendered within (required by
+ # some dbs such as mysql). So their anon identity is tracked against
+ # a fixed counter rather than hash() identity.
+ dedupe_hash = 1
+
for c in cols:
repeated = False
# here, "required_label_name" is sent as
# "None" and "fallback_label_name" is sent.
if table_qualified:
- fallback_label_name = c._dedupe_anon_tq_label
+ fallback_label_name = (
+ c._dedupe_anon_tq_label_idx(dedupe_hash)
+ )
+ dedupe_hash += 1
else:
- fallback_label_name = c._dedupe_anon_label
+ fallback_label_name = c._dedupe_anon_label_idx(
+ dedupe_hash
+ )
+ dedupe_hash += 1
else:
fallback_label_name = c._anon_name_label
else:
if table_qualified:
required_label_name = (
fallback_label_name
- ) = c._dedupe_anon_tq_label
+ ) = c._dedupe_anon_tq_label_idx(dedupe_hash)
+ dedupe_hash += 1
else:
required_label_name = (
fallback_label_name
- ) = c._dedupe_anon_label
+ ) = c._dedupe_anon_label_idx(dedupe_hash)
+ dedupe_hash += 1
repeated = True
else:
names[required_label_name] = c
if table_qualified:
required_label_name = (
fallback_label_name
- ) = c._dedupe_anon_tq_label
+ ) = c._dedupe_anon_tq_label_idx(dedupe_hash)
+ dedupe_hash += 1
else:
required_label_name = (
fallback_label_name
- ) = c._dedupe_anon_label
+ ) = c._dedupe_anon_label_idx(dedupe_hash)
+ dedupe_hash += 1
repeated = True
else:
names[effective_name] = c
return newcol
@util.preload_module("sqlalchemy.sql.functions")
- def replace(self, col):
+ def replace(self, col, _include_singleton_constants=False):
functions = util.preloaded.sql_functions
if isinstance(col, FromClause) and not isinstance(
elif not isinstance(col, ColumnElement):
return None
+ elif not _include_singleton_constants and col._is_singleton_constant:
+ # dont swap out NULL, TRUE, FALSE for a label name
+ # in a SQL statement that's being rewritten,
+ # leave them as the constant. This is first noted in #6259,
+ # however the logic to check this moved here as of #7154 so that
+ # it is made specific to SQL rewriting and not all column
+ # correspondence
+ return None
if "adapt_column" in col._annotations:
col = col._annotations["adapt_column"]
return newcol
def _locate_col(self, col):
-
- c = ClauseAdapter.traverse(self, col)
+ # both replace and traverse() are overly complicated for what
+ # we are doing here and we would do better to have an inlined
+ # version that doesn't build up as much overhead. the issue is that
+ # sometimes the lookup does in fact have to adapt the insides of
+ # say a labeled scalar subquery. However, if the object is an
+ # Immutable, i.e. Column objects, we can skip the "clone" /
+ # "copy internals" part since those will be no-ops in any case.
+ # additionally we want to catch singleton objects null/true/false
+ # and make sure they are adapted as well here.
+
+ if col._is_immutable:
+ for vis in self.visitor_iterator:
+ c = vis.replace(col, _include_singleton_constants=True)
+ if c is not None:
+ break
+ else:
+ c = col
+ else:
+ c = ClauseAdapter.traverse(self, col)
if self._wrap:
c2 = self._wrap._locate_col(c)
# test that the outer IS NULL is rendered, not adapted
# test that the inner query includes the NULL we asked for
+
+ # ironically, this statement would not actually fetch due to the NULL
+ # not allowing adaption and therefore failing on the result set
+ # matching, this was addressed in #7154.
self.assert_compile(
stmt,
"SELECT anon_2.anon_1, anon_2.id, anon_2.name, "
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import Integer
+from sqlalchemy import null
from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy import testing
from sqlalchemy import text
+from sqlalchemy import true
from sqlalchemy.orm import aliased
from sqlalchemy.orm import backref
from sqlalchemy.orm import close_all_sessions
class A(Base):
__tablename__ = "a"
- id = Column(Integer, primary_key=True)
+ id = Column(
+ Integer, primary_key=True, test_needs_autoincrement=True
+ )
b_id = Column(ForeignKey("b.id"))
c_id = Column(ForeignKey("c.id"))
class B(Base):
__tablename__ = "b"
- id = Column(Integer, primary_key=True)
+ id = Column(
+ Integer, primary_key=True, test_needs_autoincrement=True
+ )
c_id = Column(ForeignKey("c.id"))
c = relationship("C")
class C(Base):
__tablename__ = "c"
- id = Column(Integer, primary_key=True)
+ id = Column(
+ Integer, primary_key=True, test_needs_autoincrement=True
+ )
d_id = Column(ForeignKey("d.id"))
d = relationship("D")
class D(Base):
__tablename__ = "d"
- id = Column(Integer, primary_key=True)
+ id = Column(
+ Integer, primary_key=True, test_needs_autoincrement=True
+ )
@classmethod
def define_tables(cls, metadata):
class User(Base):
__tablename__ = "cs_user"
- id = Column(Integer, primary_key=True)
+ id = Column(
+ Integer, primary_key=True, test_needs_autoincrement=True
+ )
data = Column(Integer)
class LD(Base):
__tablename__ = "cs_ld"
- id = Column(Integer, primary_key=True)
+ id = Column(
+ Integer, primary_key=True, test_needs_autoincrement=True
+ )
user_id = Column(Integer, ForeignKey("cs_user.id"))
user = relationship(User, primaryjoin=user_id == User.id)
__tablename__ = "cs_a"
- id = Column(Integer, primary_key=True)
+ id = Column(
+ Integer, primary_key=True, test_needs_autoincrement=True
+ )
ld_id = Column(Integer, ForeignKey("cs_ld.id"))
ld = relationship(LD, primaryjoin=ld_id == LD.id)
__tablename__ = "cs_lda"
- id = Column(Integer, primary_key=True)
+ id = Column(
+ Integer, primary_key=True, test_needs_autoincrement=True
+ )
ld_id = Column(Integer, ForeignKey("cs_ld.id"))
a_id = Column(Integer, ForeignKey("cs_a.id"))
a = relationship(A, primaryjoin=a_id == A.id)
class A(Base):
__tablename__ = "a"
- id = Column(Integer, primary_key=True)
+ id = Column(
+ Integer, primary_key=True, test_needs_autoincrement=True
+ )
bs = relationship("B")
class B(Base):
__tablename__ = "b"
- id = Column(Integer, primary_key=True)
+ id = Column(
+ Integer, primary_key=True, test_needs_autoincrement=True
+ )
a_id = Column(ForeignKey("a.id"))
cs = relationship("C")
class C(Base):
__tablename__ = "c"
- id = Column(Integer, primary_key=True)
+ id = Column(
+ Integer, primary_key=True, test_needs_autoincrement=True
+ )
b_id = Column(ForeignKey("b.id"))
@classmethod
{"pk_1": 4},
),
)
+
+
+class SingletonConstantSubqTest(_fixtures.FixtureTest):
+ """POC test for both #7153 and #7154"""
+
+ run_inserts = "once"
+ run_deletes = None
+
+ __backend__ = True
+
+ @classmethod
+ def setup_mappers(cls):
+ cls._setup_stock_mapping()
+
+ def test_limited_eager_w_null(self):
+ User = self.classes.User
+ Address = self.classes.Address
+
+ stmt = (
+ select(User, null())
+ .options(joinedload(User.addresses))
+ .where(User.id == 8)
+ .limit(10)
+ )
+
+ session = fixture_session()
+
+ def go():
+ eq_(
+ session.execute(stmt).unique().all(),
+ [
+ (
+ User(
+ id=8, addresses=[Address(), Address(), Address()]
+ ),
+ None,
+ )
+ ],
+ )
+
+ self.assert_sql_count(testing.db, go, 1)
+
+ def test_limited_eager_w_multi_null_booleans(self):
+ User = self.classes.User
+ Address = self.classes.Address
+
+ stmt = (
+ select(User, null(), null(), null(), true(), true())
+ .options(joinedload(User.addresses))
+ .where(User.id == 8)
+ .limit(10)
+ )
+
+ session = fixture_session()
+
+ def go():
+ eq_(
+ session.execute(stmt).unique().all(),
+ [
+ (
+ User(
+ id=8, addresses=[Address(), Address(), Address()]
+ ),
+ None,
+ None,
+ None,
+ True,
+ True,
+ )
+ ],
+ )
+
+ self.assert_sql_count(testing.db, go, 1)
(
lambda s, User: (User.id,) + tuple([null()] * 3),
"users.id AS users_id, NULL AS anon_1, NULL AS anon__1, "
- "NULL AS anon__1",
+ "NULL AS anon__2",
(7, None, None, None),
),
)
from sqlalchemy.sql import table
from sqlalchemy.sql import util as sql_util
from sqlalchemy.sql.elements import BooleanClauseList
+from sqlalchemy.sql.elements import ColumnElement
from sqlalchemy.sql.expression import ClauseElement
from sqlalchemy.sql.expression import ClauseList
from sqlalchemy.sql.expression import HasPrefixes
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import eq_
from sqlalchemy.testing import eq_ignore_whitespace
+from sqlalchemy.testing import expect_raises
from sqlalchemy.testing import expect_raises_message
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
"foo_bar.id AS foo_bar_id_1, " # 2. 1st foo_bar.id, disamb from 1
"foo.bar_id AS foo_bar_id__1, " # 3. 2nd foo.bar_id, dedupe from 1
"foo.id AS foo_id__1, "
- "foo.bar_id AS foo_bar_id__1, " # 4. 3rd foo.bar_id, same as 3
- "foo_bar.id AS foo_bar_id__2, " # 5. 2nd foo_bar.id
- "foo_bar.id AS foo_bar_id__2 " # 6. 3rd foo_bar.id, same as 5
+ "foo.bar_id AS foo_bar_id__2, " # 4. 3rd foo.bar_id, dedupe again
+ "foo_bar.id AS foo_bar_id__3, " # 5. 2nd foo_bar.id
+ "foo_bar.id AS foo_bar_id__4 " # 6. 3rd foo_bar.id, dedupe again
"FROM foo, foo_bar",
)
eq_(
"foo_bar.id AS foo_bar_id_1, " # 2. 1st foo_bar.id, disamb from 1
"foo.bar_id AS foo_bar_id__1, " # 3. 2nd foo.bar_id, dedupe from 1
"foo.id AS foo_id__1, "
- "foo.bar_id AS foo_bar_id__1, " # 4. 3rd foo.bar_id, same as 3
- "foo_bar.id AS foo_bar_id__2, " # 5. 2nd foo_bar.id
- "foo_bar.id AS foo_bar_id__2 " # 6. 3rd foo_bar.id, same as 5
+ "foo.bar_id AS foo_bar_id__2, " # 4. 3rd foo.bar_id, dedupe again
+ "foo_bar.id AS foo_bar_id__3, " # 5. 2nd foo_bar.id
+ "foo_bar.id AS foo_bar_id__4 " # 6. 3rd foo_bar.id, dedupe again
"FROM foo, foo_bar"
") AS anon_1",
)
"foo_bar.id AS foo_bar_id_1, " # 2. 1st foo_bar.id, disamb from 1
"foo.bar_id AS foo_bar_id__1, " # 3. 2nd foo.bar_id, dedupe from 1
"foo.id AS foo_id__1, "
- "foo.bar_id AS foo_bar_id__1, " # 4. 3rd foo.bar_id, same as 3
- "foo_bar.id AS foo_bar_id__2, " # 5. 2nd foo_bar.id
- "foo_bar.id AS foo_bar_id__2 " # 6. 3rd foo_bar.id, same as 5
+ "foo.bar_id AS foo_bar_id__2, " # 4. 3rd foo.bar_id, dedupe again
+ "foo_bar.id AS foo_bar_id__3, " # 5. 2nd foo_bar.id
+ "foo_bar.id AS foo_bar_id__4 " # 6. 3rd foo_bar.id, dedupe again
"FROM foo, foo_bar",
)
LABEL_STYLE_TABLENAME_PLUS_COL
),
"SELECT t.a AS t_a, t.a AS t_a__1, t.b AS t_b, "
- "t.a AS t_a__1 FROM t",
+ "t.a AS t_a__2 FROM t",
)
def test_dupe_columns_use_labels_derived_selectable(self):
self.assert_compile(
select(stmt).set_label_style(LABEL_STYLE_NONE),
"SELECT anon_1.t_a, anon_1.t_a, anon_1.t_b, anon_1.t_a FROM "
- "(SELECT t.a AS t_a, t.a AS t_a__1, t.b AS t_b, t.a AS t_a__1 "
+ "(SELECT t.a AS t_a, t.a AS t_a__1, t.b AS t_b, t.a AS t_a__2 "
"FROM t) AS anon_1",
)
LABEL_STYLE_TABLENAME_PLUS_COL
),
"SELECT t.a AS t_a, t.a AS t_a__1, t.b AS t_b, "
- "t.a AS t_a__1 FROM t",
+ "t.a AS t_a__2 FROM t",
)
self.assert_compile(
LABEL_STYLE_TABLENAME_PLUS_COL
),
"SELECT t.a AS t_a, t.a AS t_a__1, t.b AS t_b, "
- "t.a AS t_a__1 FROM t",
+ "t.a AS t_a__2 FROM t",
)
self.assert_compile(
LABEL_STYLE_TABLENAME_PLUS_COL
),
"SELECT t.a AS t_a, t.a AS t_a__1, t.b AS t_b, "
- "t.a AS t_a__1 FROM t",
+ "t.a AS t_a__2 FROM t",
)
def test_dupe_columns_use_labels_derived_selectable_mix_annotations(self):
self.assert_compile(
select(stmt).set_label_style(LABEL_STYLE_NONE),
"SELECT anon_1.t_a, anon_1.t_a, anon_1.t_b, anon_1.t_a FROM "
- "(SELECT t.a AS t_a, t.a AS t_a__1, t.b AS t_b, t.a AS t_a__1 "
+ "(SELECT t.a AS t_a, t.a AS t_a__1, t.b AS t_b, t.a AS t_a__2 "
"FROM t) AS anon_1",
)
self.assert_compile(
stmt,
"SELECT foo.bar_id AS foo_bar_id, foo_bar.id AS foo_bar_id_1, "
- "foo_bar.id AS foo_bar_id__1, foo_bar.id AS foo_bar_id__1, "
- "foo_bar.id AS foo_bar_id__1 FROM foo, foo_bar",
+ "foo_bar.id AS foo_bar_id__1, foo_bar.id AS foo_bar_id__2, "
+ "foo_bar.id AS foo_bar_id__3 FROM foo, foo_bar",
)
def test_dupe_columns_use_labels_from_anon(self):
LABEL_STYLE_TABLENAME_PLUS_COL
),
"SELECT t_1.a AS t_1_a, t_1.a AS t_1_a__1, t_1.b AS t_1_b, "
- "t_1.a AS t_1_a__1 "
+ "t_1.a AS t_1_a__2 "
"FROM t AS t_1",
)
"WHERE mytable.myid = myothertable.otherid ORDER BY myid",
)
+ def test_deduping_hash_algo(self):
+ """related to #7153.
+
+ testing new feature "add_hash" of _anon_label which adds an additional
+ integer value as part of what the anon label is deduplicated upon.
+
+ """
+
+ class Thing(ColumnElement):
+ def __init__(self, hash_):
+ self._hash = hash_
+
+ def __hash__(self):
+ return self._hash
+
+ t1 = Thing(10)
+ t2 = Thing(11)
+
+ # this is the collision case. therefore we assert that this
+ # add_hash has to be below 16 bits.
+ # eq_(
+ # t1._anon_label('hi', add_hash=65537),
+ # t2._anon_label('hi', add_hash=1)
+ # )
+ with expect_raises(AssertionError):
+ t1._anon_label("hi", add_hash=65536)
+
+ for i in range(50):
+ ne_(
+ t1._anon_label("hi", add_hash=i),
+ t2._anon_label("hi", add_hash=1),
+ )
+
+ def test_deduping_unique_across_selects(self):
+ """related to #7153
+
+ looking to see that dedupe anon labels use a unique hash not only
+ within each statement but across multiple statements.
+
+ """
+
+ s1 = select(null(), null())
+
+ s2 = select(true(), true())
+
+ s3 = union(s1, s2)
+
+ self.assert_compile(
+ s3,
+ "SELECT NULL AS anon_1, NULL AS anon__1 " "UNION "
+ # without the feature tested in test_deduping_hash_algo we'd get
+ # "SELECT true AS anon_2, true AS anon__1",
+ "SELECT true AS anon_2, true AS anon__2",
+ dialect="default_enhanced",
+ )
+
def test_compound_grouping(self):
s = select(column("foo"), column("bar")).select_from(text("bat"))
# the label name, but that's a different issue
self.assert_compile(
stmt1,
- "SELECT :foo_1 AS anon_1, :foo_1 AS anon__1, :foo_1 AS anon__1",
+ "SELECT :foo_1 AS anon_1, :foo_1 AS anon__1, :foo_1 AS anon__2",
)
def _test_binds_no_hash_collision(self):
"WITH RECURSIVE anon_1(foo_id, foo_bar_id, foo_bar_id_1) AS "
"(SELECT foo.id AS foo_id, foo.bar_id AS foo_bar_id, "
"foo_bar.id AS foo_bar_id_1, foo.bar_id AS foo_bar_id__1, "
- "foo.id AS foo_id__1, foo.bar_id AS foo_bar_id__1, "
- "foo_bar.id AS foo_bar_id__2, foo_bar.id AS foo_bar_id__2 "
+ "foo.id AS foo_id__1, foo.bar_id AS foo_bar_id__2, "
+ "foo_bar.id AS foo_bar_id__3, foo_bar.id AS foo_bar_id__4 "
"FROM foo, foo_bar) "
"SELECT anon_1.foo_id, anon_1.foo_bar_id, anon_1.foo_bar_id_1, "
"anon_1.foo_bar_id AS foo_bar_id_2, anon_1.foo_id AS foo_id_1, "
"keyed2_a",
"keyed3_a",
"keyed2_a__1",
- "keyed2_a__1",
- "keyed3_a__1",
+ "keyed2_a__2",
"keyed3_a__1",
+ "keyed3_a__2",
"keyed3_d",
"keyed3_d__1",
],
from sqlalchemy import Column
from sqlalchemy import exc
from sqlalchemy import exists
+from sqlalchemy import false
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import Integer
from sqlalchemy import Table
from sqlalchemy import testing
from sqlalchemy import text
+from sqlalchemy import true
from sqlalchemy import type_coerce
from sqlalchemy import TypeDecorator
from sqlalchemy import union
eq_(list(stmt.c.keys()), ["q"])
eq_(list(stmt2.c.keys()), ["q", "p"])
+ @testing.combinations(
+ func.now(), null(), true(), false(), literal_column("10"), column("x")
+ )
+ def test_const_object_correspondence(self, c):
+ """test #7154"""
+
+ stmt = select(c).subquery()
+
+ stmt2 = select(stmt)
+
+ is_(
+ stmt2.selected_columns.corresponding_column(c),
+ stmt2.selected_columns[0],
+ )
+
def test_append_column_after_visitor_replace(self):
# test for a supported idiom that matches the deprecated / removed
# replace_selectable method
repr(obj)
-class WithLabelsTest(fixtures.TestBase):
+class WithLabelsTest(AssertsCompiledSQL, fixtures.TestBase):
def _assert_result_keys(self, s, keys):
compiled = s.compile()
eq_(list(sel.subquery().c.keys()), ["t1_x", "t1_y", "t1_x_1"])
self._assert_result_keys(sel, ["t1_x__1", "t1_x", "t1_y"])
+ def _columns_repeated_identity(self):
+ m = MetaData()
+ t1 = Table("t1", m, Column("x", Integer), Column("y", Integer))
+ return select(t1.c.x, t1.c.y, t1.c.x, t1.c.x, t1.c.x).set_label_style(
+ LABEL_STYLE_NONE
+ )
+
+ def _anon_columns_repeated_identity_one(self):
+ m = MetaData()
+ t1 = Table("t1", m, Column("x", Integer), Column("y", Integer))
+ return select(t1.c.x, null(), null(), null()).set_label_style(
+ LABEL_STYLE_NONE
+ )
+
+ def _anon_columns_repeated_identity_two(self):
+ fn = func.now()
+ return select(fn, fn, fn, fn).set_label_style(LABEL_STYLE_NONE)
+
+ def test_columns_repeated_identity_disambiguate(self):
+ """test #7153"""
+ sel = self._columns_repeated_identity().set_label_style(
+ LABEL_STYLE_DISAMBIGUATE_ONLY
+ )
+
+ self.assert_compile(
+ sel,
+ "SELECT t1.x, t1.y, t1.x AS x__1, t1.x AS x__2, "
+ "t1.x AS x__3 FROM t1",
+ )
+
+ def test_columns_repeated_identity_subquery_disambiguate(self):
+ """test #7153"""
+ sel = self._columns_repeated_identity()
+
+ stmt = select(sel.subquery()).set_label_style(
+ LABEL_STYLE_DISAMBIGUATE_ONLY
+ )
+
+ # databases like MySQL won't allow the subquery to have repeated labels
+ # even if we don't try to access them
+ self.assert_compile(
+ stmt,
+ "SELECT anon_1.x, anon_1.y, anon_1.x AS x_1, anon_1.x AS x_2, "
+ "anon_1.x AS x_3 FROM "
+ "(SELECT t1.x AS x, t1.y AS y, t1.x AS x__1, t1.x AS x__2, "
+ "t1.x AS x__3 FROM t1) AS anon_1",
+ )
+
def _labels_overlap(self):
m = MetaData()
t1 = Table("t", m, Column("x_id", Integer))