else:
return self.bindtemplate % {'name':name}
- def visit_alias(self, alias, asfrom=False, **kwargs):
- if asfrom:
+ def visit_alias(self, alias, asfrom=False, ashint=False, fromhints=None, **kwargs):
+ if asfrom or ashint:
alias_name = isinstance(alias.name, sql._generated_label) and \
self._truncated_identifier("alias", alias.name) or alias.name
-
- return self.process(alias.original, asfrom=True, **kwargs) + " AS " + \
+ if ashint:
+ return self.preparer.format_alias(alias, alias_name)
+ elif asfrom:
+ ret = self.process(alias.original, asfrom=True, **kwargs) + " AS " + \
self.preparer.format_alias(alias, alias_name)
+
+ if fromhints and alias in fromhints:
+ hinttext = self.get_from_hint_text(alias, fromhints[alias])
+ if hinttext:
+ ret += " " + hinttext
+
+ return ret
else:
return self.process(alias.original, **kwargs)
else:
return column
+ def get_select_hint_text(self, byfroms):
+ return None
+
+ def get_from_hint_text(self, table, text):
+ return None
+
def visit_select(self, select, asfrom=False, parens=True,
- iswrapper=False, compound_index=1, **kwargs):
+ iswrapper=False, fromhints=None,
+ compound_index=1, **kwargs):
entry = self.stack and self.stack[-1] or {}
]
text = "SELECT " # we're off to a good start !
+
+ if select._hints:
+ byfrom = dict([
+ (from_, hinttext % {'name':self.process(from_, ashint=True)})
+ for (from_, dialect), hinttext in
+ select._hints.iteritems()
+ if dialect in ('*', self.dialect.name)
+ ])
+ hint_text = self.get_select_hint_text(byfrom)
+ if hint_text:
+ text += hint_text + " "
+
if select._prefixes:
text += " ".join(self.process(x, **kwargs) for x in select._prefixes) + " "
text += self.get_select_precolumns(select)
if froms:
text += " \nFROM "
- text += ', '.join(self.process(f, asfrom=True, **kwargs) for f in froms)
+
+ if select._hints:
+ text += ', '.join([self.process(f,
+ asfrom=True, fromhints=byfrom,
+ **kwargs)
+ for f in froms])
+ else:
+ text += ', '.join([self.process(f,
+ asfrom=True, **kwargs)
+ for f in froms])
else:
text += self.default_from()
text += " OFFSET " + str(select._offset)
return text
- def visit_table(self, table, asfrom=False, **kwargs):
- if asfrom:
+ def visit_table(self, table, asfrom=False, ashint=False, fromhints=None, **kwargs):
+ if asfrom or ashint:
if getattr(table, "schema", None):
- return self.preparer.quote_schema(table.schema, table.quote_schema) + \
+ ret = self.preparer.quote_schema(table.schema, table.quote_schema) + \
"." + self.preparer.quote(table.name, table.quote)
else:
- return self.preparer.quote(table.name, table.quote)
+ ret = self.preparer.quote(table.name, table.quote)
+ if fromhints and table in fromhints:
+ hinttext = self.get_from_hint_text(table, fromhints[table])
+ if hinttext:
+ ret += " " + hinttext
+ return ret
else:
return ""
def visit_join(self, join, asfrom=False, **kwargs):
- return (self.process(join.left, asfrom=True) + \
+ return (self.process(join.left, asfrom=True, **kwargs) + \
(join.isouter and " LEFT OUTER JOIN " or " JOIN ") + \
- self.process(join.right, asfrom=True) + " ON " + self.process(join.onclause))
+ self.process(join.right, asfrom=True, **kwargs) + " ON " + \
+ self.process(join.onclause, **kwargs))
def visit_sequence(self, seq):
return None
else:
self.assert_compile(s1, "SELECT %s FROM (SELECT %s FROM mytable)" % (expr,expr))
+ def test_hints(self):
+ s = select([table1.c.myid]).with_hint(table1, "test hint %(name)s")
+
+ s2 = select([table1.c.myid]).\
+ with_hint(table1, "index(%(name)s idx)", 'oracle').\
+ with_hint(table1, "WITH HINT INDEX idx", 'sybase')
+
+ a1 = table1.alias()
+ s3 = select([a1.c.myid]).with_hint(a1, "index(%(name)s hint)")
+
+ subs4 = select([
+ table1, table2
+ ]).select_from(table1.join(table2, table1.c.myid==table2.c.otherid)).\
+ with_hint(table1, 'hint1')
+
+ s4 = select([table3]).select_from(
+ table3.join(
+ subs4,
+ subs4.c.othername==table3.c.otherstuff
+ )
+ ).\
+ with_hint(table3, 'hint3')
+
+ subs5 = select([
+ table1, table2
+ ]).select_from(table1.join(table2, table1.c.myid==table2.c.otherid))
+ s5 = select([table3]).select_from(
+ table3.join(
+ subs5,
+ subs5.c.othername==table3.c.otherstuff
+ )
+ ).\
+ with_hint(table3, 'hint3').\
+ with_hint(table1, 'hint1')
+
+ t1 = table('QuotedName', column('col1'))
+ s6 = select([t1.c.col1]).where(t1.c.col1>10).with_hint(t1, '%(name)s idx1')
+ a2 = t1.alias('SomeName')
+ s7 = select([a2.c.col1]).where(a2.c.col1>10).with_hint(a2, '%(name)s idx1')
+
+ mysql_d, oracle_d, sybase_d = \
+ mysql.dialect(), \
+ oracle.dialect(), \
+ sybase.dialect()
+
+ for stmt, dialect, expected in [
+ (s, mysql_d,
+ "SELECT mytable.myid FROM mytable test hint mytable"),
+ (s, oracle_d,
+ "SELECT /*+ test hint mytable */ mytable.myid FROM mytable"),
+ (s, sybase_d,
+ "SELECT mytable.myid FROM mytable test hint mytable"),
+ (s2, mysql_d,
+ "SELECT mytable.myid FROM mytable"),
+ (s2, oracle_d,
+ "SELECT /*+ index(mytable idx) */ mytable.myid FROM mytable"),
+ (s2, sybase_d,
+ "SELECT mytable.myid FROM mytable WITH HINT INDEX idx"),
+ (s3, mysql_d,
+ "SELECT mytable_1.myid FROM mytable AS mytable_1 "
+ "index(mytable_1 hint)"),
+ (s3, oracle_d,
+ "SELECT /*+ index(mytable_1 hint) */ mytable_1.myid FROM "
+ "mytable mytable_1"),
+ (s3, sybase_d,
+ "SELECT mytable_1.myid FROM mytable AS mytable_1 "
+ "index(mytable_1 hint)"),
+ (s4, mysql_d,
+ "SELECT thirdtable.userid, thirdtable.otherstuff FROM thirdtable "
+ "hint3 INNER JOIN (SELECT mytable.myid, mytable.name, "
+ "mytable.description, myothertable.otherid, "
+ "myothertable.othername FROM mytable hint1 INNER "
+ "JOIN myothertable ON mytable.myid = myothertable.otherid) "
+ "ON othername = thirdtable.otherstuff"),
+ (s4, sybase_d,
+ "SELECT thirdtable.userid, thirdtable.otherstuff FROM thirdtable "
+ "hint3 JOIN (SELECT mytable.myid, mytable.name, "
+ "mytable.description, myothertable.otherid, "
+ "myothertable.othername FROM mytable hint1 "
+ "JOIN myothertable ON mytable.myid = myothertable.otherid) "
+ "ON othername = thirdtable.otherstuff"),
+ (s4, oracle_d,
+ "SELECT /*+ hint3 */ thirdtable.userid, thirdtable.otherstuff "
+ "FROM thirdtable JOIN (SELECT /*+ hint1 */ mytable.myid,"
+ " mytable.name, mytable.description, myothertable.otherid,"
+ " myothertable.othername FROM mytable JOIN myothertable ON"
+ " mytable.myid = myothertable.otherid) ON othername ="
+ " thirdtable.otherstuff"),
+ (s5, oracle_d,
+ "SELECT /*+ hint3 */ /*+ hint1 */ thirdtable.userid, "
+ "thirdtable.otherstuff "
+ "FROM thirdtable JOIN (SELECT mytable.myid,"
+ " mytable.name, mytable.description, myothertable.otherid,"
+ " myothertable.othername FROM mytable JOIN myothertable ON"
+ " mytable.myid = myothertable.otherid) ON othername ="
+ " thirdtable.otherstuff"),
+ (s6, oracle_d,
+ """SELECT /*+ "QuotedName" idx1 */ "QuotedName".col1 """
+ """FROM "QuotedName" WHERE "QuotedName".col1 > :col1_1"""),
+ (s7, oracle_d,
+ """SELECT /*+ SomeName idx1 */ "SomeName".col1 FROM """
+ """"QuotedName" "SomeName" WHERE "SomeName".col1 > :col1_1"""),
+ ]:
+ self.assert_compile(
+ stmt,
+ expected,
+ dialect=dialect
+ )
+
class CRUDTest(TestBase, AssertsCompiledSQL):
def test_insert(self):
# generic insert, will create bind params for all columns
- self.assert_compile(insert(table1), "INSERT INTO mytable (myid, name, description) VALUES (:myid, :name, :description)")
+ self.assert_compile(insert(table1),
+ "INSERT INTO mytable (myid, name, description) "
+ "VALUES (:myid, :name, :description)")
# insert with user-supplied bind params for specific columns,
# cols provided literally
self.assert_compile(
- insert(table1, {table1.c.myid : bindparam('userid'), table1.c.name : bindparam('username')}),
+ insert(table1, {
+ table1.c.myid : bindparam('userid'),
+ table1.c.name : bindparam('username')}),
"INSERT INTO mytable (myid, name) VALUES (:userid, :username)")
# insert with user-supplied bind params for specific columns, cols
)
self.assert_compile(
- insert(table1, values={table1.c.myid : bindparam('userid')}).values({table1.c.name : bindparam('username')}),
+ insert(table1, values={
+ table1.c.myid : bindparam('userid')
+ }).values({table1.c.name : bindparam('username')}),
"INSERT INTO mytable (myid, name) VALUES (:userid, :username)"
)
- self.assert_compile(insert(table1, values=dict(myid=func.lala())), "INSERT INTO mytable (myid) VALUES (lala())")
+ self.assert_compile(
+ insert(table1, values=dict(myid=func.lala())),
+ "INSERT INTO mytable (myid) VALUES (lala())")
def test_inline_insert(self):
metadata = MetaData()
table = Table('sometable', metadata,
Column('id', Integer, primary_key=True),
Column('foo', Integer, default=func.foobar()))
- self.assert_compile(table.insert(values={}, inline=True), "INSERT INTO sometable (foo) VALUES (foobar())")
- self.assert_compile(table.insert(inline=True), "INSERT INTO sometable (foo) VALUES (foobar())", params={})
+ self.assert_compile(
+ table.insert(values={}, inline=True),
+ "INSERT INTO sometable (foo) VALUES (foobar())")
+ self.assert_compile(
+ table.insert(inline=True),
+ "INSERT INTO sometable (foo) VALUES (foobar())", params={})
def test_update(self):
- self.assert_compile(update(table1, table1.c.myid == 7), "UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1", params = {table1.c.name:'fred'})
- self.assert_compile(table1.update().where(table1.c.myid==7).values({table1.c.myid:5}), "UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1", checkparams={'myid':5, 'myid_1':7})
- self.assert_compile(update(table1, table1.c.myid == 7), "UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1", params = {'name':'fred'})
- self.assert_compile(update(table1, values = {table1.c.name : table1.c.myid}), "UPDATE mytable SET name=mytable.myid")
- self.assert_compile(update(table1, whereclause = table1.c.name == bindparam('crit'), values = {table1.c.name : 'hi'}), "UPDATE mytable SET name=:name WHERE mytable.name = :crit", params = {'crit' : 'notthere'}, checkparams={'crit':'notthere', 'name':'hi'})
- self.assert_compile(update(table1, table1.c.myid == 12, values = {table1.c.name : table1.c.myid}), "UPDATE mytable SET name=mytable.myid, description=:description WHERE mytable.myid = :myid_1", params = {'description':'test'}, checkparams={'description':'test', 'myid_1':12})
- self.assert_compile(update(table1, table1.c.myid == 12, values = {table1.c.myid : 9}), "UPDATE mytable SET myid=:myid, description=:description WHERE mytable.myid = :myid_1", params = {'myid_1': 12, 'myid': 9, 'description': 'test'})
- self.assert_compile(update(table1, table1.c.myid ==12), "UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1", params={'myid':18}, checkparams={'myid':18, 'myid_1':12})
+ self.assert_compile(
+ update(table1, table1.c.myid == 7),
+ "UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1",
+ params = {table1.c.name:'fred'})
+ self.assert_compile(
+ table1.update().where(table1.c.myid==7).
+ values({table1.c.myid:5}),
+ "UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1",
+ checkparams={'myid':5, 'myid_1':7})
+ self.assert_compile(
+ update(table1, table1.c.myid == 7),
+ "UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1",
+ params = {'name':'fred'})
+ self.assert_compile(
+ update(table1, values = {table1.c.name : table1.c.myid}),
+ "UPDATE mytable SET name=mytable.myid")
+ self.assert_compile(
+ update(table1,
+ whereclause = table1.c.name == bindparam('crit'),
+ values = {table1.c.name : 'hi'}),
+ "UPDATE mytable SET name=:name WHERE mytable.name = :crit",
+ params = {'crit' : 'notthere'},
+ checkparams={'crit':'notthere', 'name':'hi'})
+ self.assert_compile(
+ update(table1, table1.c.myid == 12,
+ values = {table1.c.name : table1.c.myid}),
+ "UPDATE mytable SET name=mytable.myid, description="
+ ":description WHERE mytable.myid = :myid_1",
+ params = {'description':'test'},
+ checkparams={'description':'test', 'myid_1':12})
+ self.assert_compile(
+ update(table1, table1.c.myid == 12,
+ values = {table1.c.myid : 9}),
+ "UPDATE mytable SET myid=:myid, description=:description "
+ "WHERE mytable.myid = :myid_1",
+ params = {'myid_1': 12, 'myid': 9, 'description': 'test'})
+ self.assert_compile(
+ update(table1, table1.c.myid ==12),
+ "UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1",
+ params={'myid':18}, checkparams={'myid':18, 'myid_1':12})
s = table1.update(table1.c.myid == 12, values = {table1.c.name : 'lala'})
c = s.compile(column_keys=['id', 'name'])
- self.assert_compile(update(table1, table1.c.myid == 12, values = {table1.c.name : table1.c.myid}).values({table1.c.name:table1.c.name + 'foo'}), "UPDATE mytable SET name=(mytable.name || :name_1), description=:description WHERE mytable.myid = :myid_1", params = {'description':'test'})
- self.assert_(str(s) == str(c))
+ self.assert_compile(
+ update(table1, table1.c.myid == 12,
+ values = {table1.c.name : table1.c.myid}
+ ).values({table1.c.name:table1.c.name + 'foo'}),
+ "UPDATE mytable SET name=(mytable.name || :name_1), "
+ "description=:description WHERE mytable.myid = :myid_1",
+ params = {'description':'test'})
+ eq_(str(s), str(c))
self.assert_compile(update(table1,
(table1.c.myid == func.hoho(4)) &
values = {
table1.c.name : table1.c.name + "lala",
table1.c.myid : func.do_stuff(table1.c.myid, literal('hoho'))
- }), "UPDATE mytable SET myid=do_stuff(mytable.myid, :param_1), name=(mytable.name || :name_1) "
- "WHERE mytable.myid = hoho(:hoho_1) AND mytable.name = :param_2 || mytable.name || :param_3")
+ }), "UPDATE mytable SET myid=do_stuff(mytable.myid, :param_1), "
+ "name=(mytable.name || :name_1) "
+ "WHERE mytable.myid = hoho(:hoho_1) AND mytable.name = :param_2 || "
+ "mytable.name || :param_3")
def test_correlated_update(self):
# test against a straight text subquery
- u = update(table1, values = {table1.c.name : text("(select name from mytable where id=mytable.id)")})
- self.assert_compile(u, "UPDATE mytable SET name=(select name from mytable where id=mytable.id)")
+ u = update(table1, values = {
+ table1.c.name :
+ text("(select name from mytable where id=mytable.id)")})
+ self.assert_compile(u,
+ "UPDATE mytable SET name=(select name from mytable "
+ "where id=mytable.id)")
mt = table1.alias()
- u = update(table1, values = {table1.c.name : select([mt.c.name], mt.c.myid==table1.c.myid)})
- self.assert_compile(u, "UPDATE mytable SET name=(SELECT mytable_1.name FROM mytable AS mytable_1 WHERE mytable_1.myid = mytable.myid)")
+ u = update(table1, values = {
+ table1.c.name :
+ select([mt.c.name], mt.c.myid==table1.c.myid)
+ })
+ self.assert_compile(u,
+ "UPDATE mytable SET name=(SELECT mytable_1.name FROM "
+ "mytable AS mytable_1 WHERE mytable_1.myid = mytable.myid)")
# test against a regular constructed subquery
s = select([table2], table2.c.otherid == table1.c.myid)
u = update(table1, table1.c.name == 'jack', values = {table1.c.name : s})
- self.assert_compile(u, "UPDATE mytable SET name=(SELECT myothertable.otherid, myothertable.othername FROM myothertable WHERE myothertable.otherid = mytable.myid) WHERE mytable.name = :name_1")
+ self.assert_compile(u,
+ "UPDATE mytable SET name=(SELECT myothertable.otherid, "
+ "myothertable.othername FROM myothertable WHERE "
+ "myothertable.otherid = mytable.myid) WHERE mytable.name = :name_1")
# test a non-correlated WHERE clause
s = select([table2.c.othername], table2.c.otherid == 7)
u = update(table1, table1.c.name==s)
- self.assert_compile(u, "UPDATE mytable SET myid=:myid, name=:name, description=:description WHERE mytable.name = "\
- "(SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = :otherid_1)")
+ self.assert_compile(u,
+ "UPDATE mytable SET myid=:myid, name=:name, "
+ "description=:description WHERE mytable.name = "
+ "(SELECT myothertable.othername FROM myothertable "
+ "WHERE myothertable.otherid = :otherid_1)")
# test one that is actually correlated...
s = select([table2.c.othername], table2.c.otherid == table1.c.myid)