if stack:
self.stack.append(stack)
try:
- x = self.traverse_single(obj, **kwargs)
- if x is None:
- raise "hi " + repr(obj)
- return x
+ return self.traverse_single(obj, **kwargs)
finally:
if stack:
self.stack.pop(-1)
return ".".join(func.packagenames + [func.name]) + (not func.group and " " or "") + self.process(func.clause_expr)
def visit_compound_select(self, cs, asfrom=False, parens=True, **kwargs):
+ stack_entry = {'select':cs}
+
+ if asfrom:
+ stack_entry['is_selected_from'] = stack_entry['is_subquery'] = True
+ elif self.stack and self.stack[-1].get('select'):
+ stack_entry['is_subquery'] = True
+ self.stack.append(stack_entry)
+
text = string.join([self.process(c, asfrom=asfrom, parens=False) for c in cs.selects], " " + cs.keyword + " ")
group_by = self.process(cs._group_by_clause, asfrom=asfrom)
if group_by:
text += " GROUP BY " + group_by
+
text += self.order_by_clause(cs)
text += (cs._limit or cs._offset) and self.limit_clause(cs) or ""
+ self.stack.pop(-1)
+
if asfrom and parens:
return "(" + text + ")"
else:
t = self.process(select._having)
if t:
text += " \nHAVING " + t
-
+
text += self.order_by_clause(select)
text += (select._limit or select._offset) and self.limit_clause(select) or ""
text += self.for_update_clause(select)
\r
def test_update(self):\r
t = table('sometable', column('somecolumn'))\r
- self._test(t.update(t.c.somecolumn==7), "UPDATE sometable SET somecolumn=:somecolumn WHERE sometable.somecolumn = :sometable_somecolumn", somecolumn=10)\r
+ self._test(t.update(t.c.somecolumn==7), "UPDATE sometable SET somecolumn=:somecolumn WHERE sometable.somecolumn = :sometable_somecolumn", somecolumn=10)\r
\r
def test_count(self):\r
t = table('sometable', column('somecolumn'))\r
- self._test(t.count(), "SELECT count(sometable.somecolumn) AS tbl_row_count FROM sometable")\r
+ self._test(t.count(), "SELECT count(sometable.somecolumn) AS tbl_row_count FROM sometable")\r
\r
+ def test_union(self):\r
+ t1 = table('t1', \r
+ column('col1'),\r
+ column('col2'),\r
+ column('col3'),\r
+ column('col4')\r
+ )\r
+ t2 = table('t2',\r
+ column('col1'),\r
+ column('col2'),\r
+ column('col3'),\r
+ column('col4'))\r
+ \r
+ (s1, s2) = (\r
+ select([t1.c.col3.label('col3'), t1.c.col4.label('col4')], t1.c.col2.in_("t1col2r1", "t1col2r2")),\r
+ select([t2.c.col3.label('col3'), t2.c.col4.label('col4')], t2.c.col2.in_("t2col2r2", "t2col2r3"))\r
+ ) \r
+ u = union(s1, s2, order_by=['col3', 'col4'])\r
+ self._test(u, "SELECT t1.col3 AS col3, t1.col4 AS col4 FROM t1 WHERE t1.col2 IN (:t1_col2, :t1_col2_1) UNION SELECT t2.col3 AS col3, t2.col4 AS col4 FROM t2 WHERE t2.col2 IN (:t2_col2, :t2_col2_1) ORDER BY col3, col4")\r
+\r
+ self._test(u.alias('bar').select(), "SELECT bar.col3, bar.col4 FROM (SELECT t1.col3 AS col3, t1.col4 AS col4 FROM t1 WHERE t1.col2 IN (:t1_col2, :t1_col2_1) UNION SELECT t2.col3 AS col3, t2.col4 AS col4 FROM t2 WHERE t2.col2 IN (:t2_col2, :t2_col2_1)) AS bar")\r
+ \r
if __name__ == "__main__":\r
testbase.main()\r