def visit_grouping(self, grouping, asfrom=False, **kwargs):
return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")"
+ def visit_select_statement_grouping(self, grouping, **kwargs):
+ return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")"
+
def visit_label_reference(
self, element, within_columns_clause=False, **kwargs
):
col_source = cte.element.selects[0]
else:
assert False, "cte should only be against SelectBase"
+
+ # TODO: can we get at the .columns_plus_names collection
+ # that is already (or will be?) generated for the SELECT
+ # rather than calling twice?
recur_cols = [
- c
- for c in util.unique_list(
- col_source._all_selected_columns
- )
- if c is not None
+ # TODO: proxy_name is not technically safe,
+ # see test_cte->
+ # test_with_recursive_no_name_currently_buggy. not
+ # clear what should be done with such a case
+ fallback_label_name or proxy_name
+ for (
+ _,
+ proxy_name,
+ fallback_label_name,
+ c,
+ repeated,
+ ) in (col_source._generate_columns_plus_names(True))
+ if not repeated
]
text += "(%s)" % (
", ".join(
- self.preparer.format_column(
+ self.preparer.format_label_name(
ident, anon_map=self.anon_map
)
for ident in recur_cols
return self.quote(name)
+ def format_label_name(
+ self,
+ name,
+ anon_map=None,
+ ):
+ """Prepare a quoted column name."""
+
+ if anon_map is not None and isinstance(
+ name, elements._truncated_label
+ ):
+ name = name.apply_map(anon_map)
+
+ return self.quote(name)
+
def format_column(
self,
column,
self.assert_compile(
select(u_alias),
- "SELECT anon_1.id FROM ((SELECT users.name, users.id FROM users "
- "WHERE users.id = :id_1 UNION SELECT users.name, users.id "
+ "SELECT anon_1.id FROM ((SELECT users.name AS name, "
+ "users.id AS id FROM users "
+ "WHERE users.id = :id_1 UNION SELECT users.name AS name, "
+ "users.id AS id "
"FROM users WHERE users.id = :id_2) "
"UNION SELECT users.name AS name, users.id AS id "
"FROM users WHERE users.id = :id_3) AS anon_1",
self.assert_compile(
select(u_alias).options(undefer(u_alias.name)),
"SELECT anon_1.name, anon_1.id FROM "
- "((SELECT users.name, users.id FROM users "
- "WHERE users.id = :id_1 UNION SELECT users.name, users.id "
+ "((SELECT users.name AS name, users.id AS id FROM users "
+ "WHERE users.id = :id_1 UNION SELECT users.name AS name, "
+ "users.id AS id "
"FROM users WHERE users.id = :id_2) "
"UNION SELECT users.name AS name, users.id AS id "
"FROM users WHERE users.id = :id_3) AS anon_1",
+from sqlalchemy import Column
from sqlalchemy import delete
+from sqlalchemy import Integer
+from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL
+from sqlalchemy import MetaData
+from sqlalchemy import Table
from sqlalchemy import testing
from sqlalchemy import text
from sqlalchemy import update
s.compile,
)
+ def test_with_recursive_no_name_currently_buggy(self):
+ s1 = select(1)
+ c1 = s1.cte(name="cte1", recursive=True)
+
+ # this is nonsensical at the moment
+ self.assert_compile(
+ select(c1),
+ 'WITH RECURSIVE cte1("1") AS (SELECT 1) SELECT cte1.1 FROM cte1',
+ )
+
+ # however, so is subquery, which is worse as it isn't even trying
+ # to quote "1" as a label
+ self.assert_compile(
+ select(s1.subquery()), "SELECT anon_1.1 FROM (SELECT 1) AS anon_1"
+ )
+
+ def test_wrecur_dupe_col_names(self):
+ """test #6710"""
+
+ manager = table("manager", column("id"))
+ employee = table("employee", column("id"), column("manager_id"))
+
+ top_q = select(employee, manager).join_from(
+ employee, manager, employee.c.manager_id == manager.c.id
+ )
+
+ top_q = top_q.cte("cte", recursive=True)
+
+ bottom_q = (
+ select(employee, manager)
+ .join_from(
+ employee, manager, employee.c.manager_id == manager.c.id
+ )
+ .join(top_q, top_q.c.id == employee.c.id)
+ )
+
+ rec_cte = select(top_q.union_all(bottom_q))
+ self.assert_compile(
+ rec_cte,
+ "WITH RECURSIVE cte(id, manager_id, id_1) AS "
+ "(SELECT employee.id AS id, employee.manager_id AS manager_id, "
+ "manager.id AS id_1 FROM employee JOIN manager "
+ "ON employee.manager_id = manager.id UNION ALL "
+ "SELECT employee.id AS id, employee.manager_id AS manager_id, "
+ "manager.id AS id_1 FROM employee JOIN manager ON "
+ "employee.manager_id = manager.id "
+ "JOIN cte ON cte.id = employee.id) "
+ "SELECT cte.id, cte.manager_id, cte.id_1 FROM cte",
+ )
+
+ def test_wrecur_dupe_col_names_w_grouping(self):
+ """test #6710
+
+ by adding order_by() to the top query, the CTE will have
+ a compound select with the first element a SelectStatementGrouping
+ object, which we can test has the correct methods for the compiler
+ to call upon.
+
+ """
+
+ manager = table("manager", column("id"))
+ employee = table("employee", column("id"), column("manager_id"))
+
+ top_q = (
+ select(employee, manager)
+ .join_from(
+ employee, manager, employee.c.manager_id == manager.c.id
+ )
+ .order_by(employee.c.id)
+ .cte("cte", recursive=True)
+ )
+
+ bottom_q = (
+ select(employee, manager)
+ .join_from(
+ employee, manager, employee.c.manager_id == manager.c.id
+ )
+ .join(top_q, top_q.c.id == employee.c.id)
+ )
+
+ rec_cte = select(top_q.union_all(bottom_q))
+
+ self.assert_compile(
+ rec_cte,
+ "WITH RECURSIVE cte(id, manager_id, id_1) AS "
+ "((SELECT employee.id AS id, employee.manager_id AS manager_id, "
+ "manager.id AS id_1 FROM employee JOIN manager "
+ "ON employee.manager_id = manager.id ORDER BY employee.id) "
+ "UNION ALL "
+ "SELECT employee.id AS id, employee.manager_id AS manager_id, "
+ "manager.id AS id_1 FROM employee JOIN manager ON "
+ "employee.manager_id = manager.id "
+ "JOIN cte ON cte.id = employee.id) "
+ "SELECT cte.id, cte.manager_id, cte.id_1 FROM cte",
+ )
+
+ def test_wrecur_ovlp_lbls_plus_dupes_separate_keys_use_labels(self):
+ """test a condition related to #6710.
+
+ also see test_compiler->
+ test_overlapping_labels_plus_dupes_separate_keys_use_labels
+
+ for a non cte form of this test.
+
+ """
+
+ m = MetaData()
+ foo = Table(
+ "foo",
+ m,
+ Column("id", Integer),
+ Column("bar_id", Integer, key="bb"),
+ )
+ foo_bar = Table("foo_bar", m, Column("id", Integer, key="bb"))
+
+ stmt = select(
+ foo.c.id,
+ foo.c.bb,
+ foo_bar.c.bb,
+ foo.c.bb,
+ foo.c.id,
+ foo.c.bb,
+ foo_bar.c.bb,
+ foo_bar.c.bb,
+ ).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
+
+ cte = stmt.cte(recursive=True)
+
+ self.assert_compile(
+ select(cte),
+ "WITH RECURSIVE anon_1(foo_id, foo_bar_id, foo_bar_id_1) AS "
+ "(SELECT foo.id AS foo_id, foo.bar_id AS foo_bar_id, "
+ "foo_bar.id AS foo_bar_id_1, foo.bar_id AS foo_bar_id__1, "
+ "foo.id AS foo_id__1, foo.bar_id AS foo_bar_id__1, "
+ "foo_bar.id AS foo_bar_id__2, foo_bar.id AS foo_bar_id__2 "
+ "FROM foo, foo_bar) "
+ "SELECT anon_1.foo_id, anon_1.foo_bar_id, anon_1.foo_bar_id_1, "
+ "anon_1.foo_bar_id AS foo_bar_id_2, anon_1.foo_id AS foo_id_1, "
+ "anon_1.foo_bar_id AS foo_bar_id_3, "
+ "anon_1.foo_bar_id_1 AS foo_bar_id_1_1, "
+ "anon_1.foo_bar_id_1 AS foo_bar_id_1_2 FROM anon_1",
+ )
+
def test_union(self):
orders = table("orders", column("region"), column("amount"))