cloned = {}
column_translate = [{}]
-
def visit(element, **kw):
if element in column_translate[-1]:
return column_translate[-1][element]
right = visit(newelem.right, **kw)
selectable_ = selectable.Select(
- [right.element],
- use_labels=True).alias()
+ [right.element],
+ use_labels=True).alias()
+
for c in selectable_.c:
c._key_label = c.key
c._label = c.name
newelem.onclause = visit(newelem.onclause, **kw)
- elif newelem.is_selectable and newelem._is_from_container:
- # if we hit an Alias or CompoundSelect, put a marker in the
- # stack.
+ elif newelem._is_from_container:
+ # if we hit an Alias, CompoundSelect or ScalarSelect, put a
+ # marker in the stack.
kw['transform_clue'] = 'select_container'
newelem._copy_internals(clone=visit, **kw)
elif newelem.is_selectable and newelem._is_select:
- barrier_select = kw.get('transform_clue', None) == 'select_container'
- # if we're still descended from an Alias/CompoundSelect, we're
+ barrier_select = kw.get('transform_clue', None) == \
+ 'select_container'
+ # if we're still descended from an
+ # Alias/CompoundSelect/ScalarSelect, we're
# in a FROM clause, so start with a new translate collection
if barrier_select:
column_translate.append({})
m = MetaData()
+
a = Table('a', m,
Column('id', Integer, primary_key=True)
)
Column('id', Integer, primary_key=True)
)
+f = Table('f', m,
+ Column('id', Integer, primary_key=True),
+ Column('a_id', ForeignKey('a.id'))
+ )
+
b_key = Table('b_key', m,
Column('id', Integer, primary_key=True, key='bid'),
)
self._b_a_id_double_overlap_annotated
)
+ def test_f_b1a_where_in_b2a(self):
+ # test issue #3130
+ b1a = a.join(b1)
+ b2a = a.join(b2)
+ subq = select([b2.c.id]).select_from(b2a)
+ s = select([f]).select_from(f.join(b1a)).where(b1.c.id.in_(subq))
+
+ s = s.apply_labels()
+ self._test(
+ s,
+ self._f_b1a_where_in_b2a
+ )
+
+
class JoinRewriteTest(_JoinRewriteTestBase, fixtures.TestBase):
"""test rendering of each join with right-nested rewritten as
aliased SELECT statements.."""
"FROM b JOIN b_a ON b.id = b_a.id) AS anon_1"
)
+ _f_b1a_where_in_b2a = (
+ "SELECT f.id AS f_id, f.a_id AS f_a_id "
+ "FROM f JOIN (SELECT a.id AS a_id, b1.id AS b1_id, b1.a_id AS b1_a_id "
+ "FROM a JOIN b1 ON a.id = b1.a_id) AS anon_1 ON anon_1.a_id = f.a_id "
+ "WHERE anon_1.b1_id IN (SELECT b2.id "
+ "FROM a JOIN b2 ON a.id = b2.a_id)"
+ )
+
class JoinPlainTest(_JoinRewriteTestBase, fixtures.TestBase):
"""test rendering of each join with normal nesting."""
@util.classproperty
"FROM b JOIN b_a ON b.id = b_a.id) AS anon_1"
)
+ _f_b1a_where_in_b2a = (
+ "SELECT f.id AS f_id, f.a_id AS f_a_id "
+ "FROM f JOIN (a JOIN b1 ON a.id = b1.a_id) ON a.id = f.a_id "
+ "WHERE b1.id IN (SELECT b2.id "
+ "FROM a JOIN b2 ON a.id = b2.a_id)"
+ )
+
class JoinNoUseLabelsTest(_JoinRewriteTestBase, fixtures.TestBase):
@util.classproperty
def __dialect__(cls):
"FROM b JOIN b_a ON b.id = b_a.id) AS anon_1"
)
+ _f_b1a_where_in_b2a = (
+ "SELECT f.id, f.a_id "
+ "FROM f JOIN (a JOIN b1 ON a.id = b1.a_id) ON a.id = f.a_id "
+ "WHERE b1.id IN (SELECT b2.id "
+ "FROM a JOIN b2 ON a.id = b2.a_id)"
+ )
+
class JoinExecTest(_JoinRewriteTestBase, fixtures.TestBase):
"""invoke the SQL on the current backend to ensure compatibility"""
_a_bc = _a_bc_comma_a1_selbc = _a__b_dc = _a_bkeyassoc = \
_a_bkeyassoc_aliased = _a_atobalias_balias_c_w_exists = \
_a_atobalias_balias = _b_ab1_union_c_ab2 = \
- _b_a_id_double_overlap_annotated = None
+ _b_a_id_double_overlap_annotated = _f_b1a_where_in_b2a = None
@classmethod
def setup_class(cls):