cloned = {}
column_translate = [{}]
- # TODO: should we be using isinstance() for this,
- # as this whole system won't work for custom Join/Select
- # subclasses where compilation routines
- # call down to compiler.visit_join(), compiler.visit_select()
- join_name = selectable.Join.__visit_name__
- select_name = selectable.Select.__visit_name__
- alias_name = selectable.Alias.__visit_name__
+
def visit(element, **kw):
if element in column_translate[-1]:
return column_translate[-1][element]
newelem = cloned[element] = element._clone()
- if newelem.__visit_name__ is join_name and \
+ if newelem.is_selectable and newelem._is_join and \
isinstance(newelem.right, selectable.FromGrouping):
newelem._reset_exported()
newelem.right = selectable_
newelem.onclause = visit(newelem.onclause, **kw)
- elif newelem.__visit_name__ is alias_name \
- and newelem.element.__visit_name__ is select_name:
- column_translate.append({})
+
+ elif newelem.is_selectable and newelem._is_from_container:
+ # if we hit an Alias or CompoundSelect, put a marker in the
+ # stack.
+ kw['transform_clue'] = 'select_container'
+ newelem._copy_internals(clone=visit, **kw)
+ elif newelem.is_selectable and newelem._is_select:
+ barrier_select = kw.get('transform_clue', None) == 'select_container'
+ # if we're still descended from an Alias/CompoundSelect, we're
+ # in a FROM clause, so start with a new translate collection
+ if barrier_select:
+ column_translate.append({})
+ kw['transform_clue'] = 'inside_select'
newelem._copy_internals(clone=visit, **kw)
- del column_translate[-1]
+ if barrier_select:
+ del column_translate[-1]
else:
newelem._copy_internals(clone=visit, **kw)
named_with_column = False
_hide_froms = []
+ _is_join = False
+ _is_select = False
+ _is_from_container = False
+
_textual = False
"""a marker that allows us to easily distinguish a :class:`.TextAsFrom`
or similar object from other kinds of :class:`.FromClause` objects."""
"""
__visit_name__ = 'join'
+ _is_join = True
+
def __init__(self, left, right, onclause=None, isouter=False):
"""Construct a new :class:`.Join`.
__visit_name__ = 'alias'
named_with_column = True
+ _is_from_container = True
+
def __init__(self, selectable, name=None):
baseselectable = selectable
while isinstance(baseselectable, Alias):
INTERSECT = util.symbol('INTERSECT')
INTERSECT_ALL = util.symbol('INTERSECT ALL')
+ _is_from_container = True
+
def __init__(self, keyword, *selects, **kwargs):
self._auto_correlate = kwargs.pop('correlate', False)
self.keyword = keyword
_correlate = ()
_correlate_except = None
_memoized_property = SelectBase._memoized_property
+ _is_select = True
def __init__(self,
columns=None,
-from sqlalchemy import Table, Column, Integer, MetaData, ForeignKey, select, exists
+from sqlalchemy import Table, Column, Integer, MetaData, ForeignKey, select, exists, union
from sqlalchemy.testing import fixtures, AssertsCompiledSQL, eq_
from sqlalchemy import util
from sqlalchemy.engine import default
Column('a_id', Integer, ForeignKey('a.id'))
)
+b1 = Table('b1', m,
+ Column('id', Integer, primary_key=True),
+ Column('a_id', Integer, ForeignKey('a.id'))
+ )
+
+b2 = Table('b2', m,
+ Column('id', Integer, primary_key=True),
+ Column('a_id', Integer, ForeignKey('a.id'))
+ )
+
a_to_b = Table('a_to_b', m,
Column('a_id', Integer, ForeignKey('a.id')),
Column('b_id', Integer, ForeignKey('b.id')),
self._a_atobalias_balias
)
+ def test_b_ab1_union_b_ab2(self):
+ j1 = a.join(b1)
+ j2 = a.join(b2)
+
+ b_j1 = b.join(j1)
+ b_j2 = b.join(j2)
+
+ s = union(
+ select([b_j1], use_labels=True),
+ select([b_j2], use_labels=True)
+ ).select(use_labels=True)
+
+ self._test(
+ s,
+ self._b_ab1_union_c_ab2
+ )
+
class JoinRewriteTest(_JoinRewriteTestBase, fixtures.TestBase):
"""test rendering of each join with right-nested rewritten as
"JOIN b AS b_1 ON b_1.id = a_to_b_1.b_id) AS anon_1 ON a.id = anon_1.a_to_b_1_a_id"
)
+ _b_ab1_union_c_ab2 = (
+ "SELECT b_id AS b_id, b_a_id AS b_a_id, a_id AS a_id, b1_id AS b1_id, "
+ "b1_a_id AS b1_a_id FROM "
+ "(SELECT b.id AS b_id, b.a_id AS b_a_id, anon_1.a_id AS a_id, "
+ "anon_1.b1_id AS b1_id, anon_1.b1_a_id AS b1_a_id "
+ "FROM b JOIN (SELECT a.id AS a_id, b1.id AS b1_id, b1.a_id AS b1_a_id "
+ "FROM a JOIN b1 ON a.id = b1.a_id) AS anon_1 ON anon_1.a_id = b.a_id "
+ "UNION "
+ "SELECT b.id AS b_id, b.a_id AS b_a_id, anon_2.a_id AS a_id, "
+ "anon_2.b2_id AS b2_id, anon_2.b2_a_id AS b2_a_id "
+ "FROM b JOIN (SELECT a.id AS a_id, b2.id AS b2_id, b2.a_id AS b2_a_id "
+ "FROM a JOIN b2 ON a.id = b2.a_id) AS anon_2 ON anon_2.a_id = b.a_id)"
+ )
+
class JoinPlainTest(_JoinRewriteTestBase, fixtures.TestBase):
"""test rendering of each join with normal nesting."""
@util.classproperty
"JOIN b AS b_1 ON b_1.id = a_to_b_1.b_id) ON a.id = a_to_b_1.a_id"
)
+ _b_ab1_union_c_ab2 = (
+ "SELECT b_id AS b_id, b_a_id AS b_a_id, a_id AS a_id, b1_id AS b1_id, "
+ "b1_a_id AS b1_a_id FROM "
+ "(SELECT b.id AS b_id, b.a_id AS b_a_id, a.id AS a_id, b1.id AS b1_id, "
+ "b1.a_id AS b1_a_id FROM b "
+ "JOIN (a JOIN b1 ON a.id = b1.a_id) ON a.id = b.a_id "
+ "UNION "
+ "SELECT b.id AS b_id, b.a_id AS b_a_id, a.id AS a_id, b2.id AS b2_id, "
+ "b2.a_id AS b2_a_id FROM b "
+ "JOIN (a JOIN b2 ON a.id = b2.a_id) ON a.id = b.a_id)"
+ )
+
class JoinNoUseLabelsTest(_JoinRewriteTestBase, fixtures.TestBase):
@util.classproperty
def __dialect__(cls):
"JOIN b AS b_1 ON b_1.id = a_to_b_1.b_id) ON a.id = a_to_b_1.a_id"
)
+ _b_ab1_union_c_ab2 = (
+ "SELECT b_id, b_a_id, a_id, b1_id, b1_a_id "
+ "FROM (SELECT b.id AS b_id, b.a_id AS b_a_id, a.id AS a_id, "
+ "b1.id AS b1_id, b1.a_id AS b1_a_id "
+ "FROM b JOIN (a JOIN b1 ON a.id = b1.a_id) ON a.id = b.a_id "
+ "UNION "
+ "SELECT b.id AS b_id, b.a_id AS b_a_id, a.id AS a_id, b2.id AS b2_id, "
+ "b2.a_id AS b2_a_id "
+ "FROM b JOIN (a JOIN b2 ON a.id = b2.a_id) ON a.id = b.a_id)"
+ )
+
class JoinExecTest(_JoinRewriteTestBase, fixtures.TestBase):
"""invoke the SQL on the current backend to ensure compatibility"""
_a_bc = _a_bc_comma_a1_selbc = _a__b_dc = _a_bkeyassoc = \
_a_bkeyassoc_aliased = _a_atobalias_balias_c_w_exists = \
- _a_atobalias_balias = None
+ _a_atobalias_balias = _b_ab1_union_c_ab2 = None
@classmethod
def setup_class(cls):