including their ddl listener and other event callables.
[ticket:1694] [ticket:1698]
++ - Some platforms will now interpret certain literal values
++ as non-bind parameters, rendered literally into the SQL
++ statement. This to support strict SQL-92 rules that are
++ enforced by some platforms including MS-SQL and Sybase.
++ In this model, bind parameters aren't allowed in the
++ columns clause of a SELECT, nor are certain ambiguous
++ expressions like "?=?". When this mode is enabled, the base
++ compiler will render the binds as inline literals, but only across
++ strings and numeric values. Other types such as dates
++ will raise an error, unless the dialect subclass defines
++ a literal rendering function for those. The bind parameter
++ must have an embedded literal value already or an error
++ is raised (i.e. won't work with straight bindparam('x')).
++ Dialects can also expand upon the areas where binds are not
++ accepted, such as within argument lists of functions
++ (which don't work on MS-SQL when native SQL binding is used).
++
- Added "unicode_errors" parameter to String, Unicode, etc.
Behaves like the 'errors' keyword argument to
the standard library's string.decode() functions. This flag
will be returned as a string. [ticket:1685]
- sybase
-- - Implemented a preliminary working dialect for Sybase
-- based on the Python-Sybase driver. Handles table
++ - Implemented a preliminary working dialect for Sybase,
++ with sub-implementations for Python-Sybase as well
++ as Pyodbc. Handles table
creates/drops and basic round trip functionality.
Does not yet include reflection or comprehensive
support of unicode/special expressions/etc.
**Sybase ASE**
-------------------------------------------------------------------------------------------------------------------------------
mxodbc_ ``sybase+mxodbc`` development development no yes yes
--pyodbc_ ``sybase+pyodbc`` development development no unknown unknown
++pyodbc_ ``sybase+pyodbc`` partial development no unknown unknown
python-sybase_ ``sybase+pysybase``\* partial development no yes yes
========================= =========================== =========== =========== =========== ================= ============
return (self.process(join.left, asfrom=True) + (join.isouter and " LEFT OUTER JOIN " or " INNER JOIN ") + \
self.process(join.right, asfrom=True) + " ON " + self.process(join.onclause))
-- def visit_extract(self, extract):
++ def visit_extract(self, extract, **kw):
field = self.extract_map.get(extract.field, extract.field)
-- return 'DATEPART("%s", %s)' % (field, self.process(extract.expr))
++ return 'DATEPART("%s", %s)' % (field, self.process(extract.expr, **kw))
class AccessDDLCompiler(compiler.DDLCompiler):
def get_column_specification(self, column, **kwargs):
class MSSQLCompiler(compiler.SQLCompiler):
returning_precedes_values = True
-- extract_map = compiler.SQLCompiler.extract_map.copy()
-- extract_map.update ({
++ extract_map = util.update_copy(
++ compiler.SQLCompiler.extract_map,
++ {
'doy': 'dayofyear',
'dow': 'weekday',
'milliseconds': 'millisecond',
kwargs['mssql_aliased'] = True
return super(MSSQLCompiler, self).visit_alias(alias, **kwargs)
-- def visit_extract(self, extract):
++ def visit_extract(self, extract, **kw):
field = self.extract_map.get(extract.field, extract.field)
-- return 'DATEPART("%s", %s)' % (field, self.process(extract.expr))
++ return 'DATEPART("%s", %s)' % (field, self.process(extract.expr, **kw))
def visit_rollback_to_savepoint(self, savepoint_stmt):
return "ROLLBACK TRANSACTION %s" % self.preparer.format_savepoint(savepoint_stmt)
else:
return ""
++class MSSQLStrictCompiler(MSSQLCompiler):
++ """A subclass of MSSQLCompiler which disables the usage of bind
++ parameters where not allowed natively by MS-SQL.
++
++ A dialect may use this compiler on a platform where native
++ binds are used.
++
++ """
++ ansi_bind_rules = True
++
++ def visit_in_op(self, binary, **kw):
++ kw['literal_binds'] = True
++ return "%s IN %s" % (
++ self.process(binary.left, **kw),
++ self.process(binary.right, **kw)
++ )
++
++ def visit_notin_op(self, binary, **kw):
++ kw['literal_binds'] = True
++ return "%s NOT IN %s" % (
++ self.process(binary.left, **kw),
++ self.process(binary.right, **kw)
++ )
++
++ def visit_function(self, func, **kw):
++ kw['literal_binds'] = True
++ return super(MSSQLStrictCompiler, self).visit_function(func, **kw)
++
++ #def render_literal_value(self, value):
++ # TODO! use mxODBC's literal quoting services here
++
class MSDDLCompiler(compiler.DDLCompiler):
def get_column_specification(self, column, **kwargs):
from sqlalchemy import types as sqltypes
from sqlalchemy.connectors.mxodbc import MxODBCConnector
from sqlalchemy.dialects.mssql.pyodbc import MSExecutionContext_pyodbc
- from sqlalchemy.dialects.mssql.base import MSExecutionContext, MSDialect
-
-from sqlalchemy.dialects.mssql.base import MSExecutionContext, MSDialect, MSSQLCompiler
++from sqlalchemy.dialects.mssql.base import MSExecutionContext, MSDialect, \
++ MSSQLCompiler, MSSQLStrictCompiler
-# TODO: does Pyodbc on windows have the same limitations ?
-# if so this compiler can be moved to a common "odbc.py" module
-# here
-# *or* - should we implement this for MS-SQL across the board
-# since its technically MS-SQL's behavior ?
-# perhaps yes, with a dialect flag "strict_binds" to turn it off
-class MSSQLCompiler_mxodbc(MSSQLCompiler):
- binds_in_columns_clause = False
-
- def visit_in_op(self, binary, **kw):
- kw['literal_binds'] = True
- return "%s IN %s" % (
- self.process(binary.left, **kw),
- self.process(binary.right, **kw)
- )
-
- def visit_notin_op(self, binary, **kw):
- kw['literal_binds'] = True
- return "%s NOT IN %s" % (
- self.process(binary.left, **kw),
- self.process(binary.right, **kw)
- )
-
- def visit_function(self, func, **kw):
- kw['literal_binds'] = True
- return super(MSSQLCompiler_mxodbc, self).visit_function(func, **kw)
-
- def render_literal_value(self, value):
- # TODO! use mxODBC's literal quoting services here
- if isinstance(value, basestring):
- value = value.replace("'", "''")
- return "'%s'" % value
- else:
- return repr(value)
-
+
class MSExecutionContext_mxodbc(MSExecutionContext_pyodbc):
"""
The pyodbc execution context is useful for enabling
class MSDialect_mxodbc(MxODBCConnector, MSDialect):
execution_ctx_cls = MSExecutionContext_mxodbc
-
+
+ # TODO: may want to use this only if FreeTDS is not in use,
+ # since FreeTDS doesn't seem to use native binds.
- statement_compiler = MSSQLCompiler_mxodbc
++ statement_compiler = MSSQLStrictCompiler
+
def __init__(self, description_encoding='latin-1', **params):
super(MSDialect_mxodbc, self).__init__(**params)
self.description_encoding = description_encoding
++"""
++Support for MS-SQL via pyodbc.
++
++http://pypi.python.org/pypi/pyodbc/
++
++Connect strings are of the form::
++
++ mssql+pyodbc://<username>:<password>@<dsn>/
++ mssql+pyodbc://<username>:<password>@<host>/<database>
++
++
++"""
++
from sqlalchemy.dialects.mssql.base import MSExecutionContext, MSDialect
from sqlalchemy.connectors.pyodbc import PyODBCConnector
from sqlalchemy import types as sqltypes
else:
return self.process(cast.clause)
-- def visit_extract(self, extract):
++ def visit_extract(self, extract, **kw):
try:
return "CAST(STRFTIME('%s', %s) AS INTEGER)" % (
-- self.extract_map[extract.field], self.process(extract.expr))
++ self.extract_map[extract.field], self.process(extract.expr, **kw))
except KeyError:
raise exc.ArgumentError(
"%s is not a valid extract argument." % extract.field)
return lastrowid
class SybaseSQLCompiler(compiler.SQLCompiler):
++ ansi_bind_rules = True
-- extract_map = compiler.SQLCompiler.extract_map.copy()
-- extract_map.update ({
++ extract_map = util.update_copy(
++ compiler.SQLCompiler.extract_map,
++ {
'doy': 'dayofyear',
'dow': 'weekday',
'milliseconds': 'millisecond'
# Limit in sybase is after the select keyword
return ""
-- def dont_visit_binary(self, binary):
-- """Move bind parameters to the right-hand side of an operator, where possible."""
-- if isinstance(binary.left, expression._BindParamClause) and binary.operator == operator.eq:
-- return self.process(expression._BinaryExpression(binary.right, binary.left, binary.operator))
-- else:
-- return super(SybaseSQLCompiler, self).visit_binary(binary)
--
-- def dont_label_select_column(self, select, column, asfrom):
-- if isinstance(column, expression.Function):
-- return column.label(None)
-- else:
-- return super(SybaseSQLCompiler, self).label_select_column(select, column, asfrom)
--
--# def visit_getdate_func(self, fn, **kw):
-- # TODO: need to cast? something ?
--# pass
--
-- def visit_extract(self, extract):
++ def visit_extract(self, extract, **kw):
field = self.extract_map.get(extract.field, extract.field)
-- return 'DATEPART("%s", %s)' % (field, self.process(extract.expr))
++ return 'DATEPART("%s", %s)' % (field, self.process(extract.expr, **kw))
def for_update_clause(self, select):
# "FOR UPDATE" is only allowed on "DECLARE CURSOR" which SQLAlchemy doesn't use
return ''
- def order_by_clause(self, select):
- order_by = self.process(select._order_by_clause)
+ def order_by_clause(self, select, **kw):
++ kw['literal_binds'] = True
+ order_by = self.process(select._order_by_clause, **kw)
# SybaseSQL only allows ORDER BY in subqueries if there is a LIMIT
if order_by and (not self.is_subquery() or select._limit):
"""
Support for Sybase via pyodbc.
--This dialect is a stub only and is likely non functional at this time.
++http://pypi.python.org/pypi/pyodbc/
++
++Connect strings are of the form::
++
++ sybase+pyodbc://<username>:<password>@<dsn>/
++ sybase+pyodbc://<username>:<password>@<host>/<database>
"""
supports_sane_rowcount = True
supports_sane_multi_rowcount = True
dbapi_type_map = {}
++ colspecs = {}
default_paramstyle = 'named'
supports_default_values = False
supports_empty_insert = True
from sqlalchemy import schema, engine, util, exc
from sqlalchemy.sql import operators, functions, util as sql_util, visitors
from sqlalchemy.sql import expression as sql
++import decimal
RESERVED_WORDS = set([
'all', 'analyse', 'analyze', 'and', 'any', 'array',
# clauses before the VALUES or WHERE clause (i.e. MSSQL)
returning_precedes_values = False
- # in the columns clause of a SELECT. A compiler
+ # SQL 92 doesn't allow bind parameters to be used
- binds_in_columns_clause = True
++ # in the columns clause of a SELECT, nor does it allow
++ # ambiguous expressions like "? = ?". A compiler
+ # subclass can set this flag to False if the target
+ # driver/DB enforces this
++ ansi_bind_rules = False
def __init__(self, dialect, statement, column_keys=None, inline=False, **kwargs):
"""Construct a new ``DefaultCompiler`` object.
"""
return ""
- def visit_grouping(self, grouping, **kwargs):
- return "(" + self.process(grouping.element) + ")"
+ def visit_grouping(self, grouping, asfrom=False, **kwargs):
+ return "(" + self.process(grouping.element, **kwargs) + ")"
-- def visit_label(self, label, result_map=None, within_columns_clause=False):
++ def visit_label(self, label, result_map=None, within_columns_clause=False, **kw):
# only render labels within the columns clause
# or ORDER BY clause of a select. dialect-specific compilers
# can modify this behavior.
result_map[labelname.lower()] = \
(label.name, (label, label.element, labelname), label.element.type)
-- return self.process(label.element) + \
++ return self.process(label.element,
++ within_columns_clause=within_columns_clause,
++ **kw) + \
OPERATORS[operators.as_] + \
self.preparer.format_label(label, labelname)
else:
-- return self.process(label.element)
++ return self.process(label.element,
++ within_columns_clause=within_columns_clause,
++ **kw)
def visit_column(self, column, result_map=None, **kwargs):
name = column.name
s = s + OPERATORS[unary.modifier]
return s
-- def visit_binary(self, binary, **kwargs):
--
++ def visit_binary(self, binary, **kw):
++ # don't allow "? = ?" to render
++ if self.ansi_bind_rules and \
++ isinstance(binary.left, sql._BindParamClause) and \
++ isinstance(binary.right, sql._BindParamClause):
++ kw['literal_binds'] = True
++
return self._operator_dispatch(binary.operator,
binary,
- lambda opstr: self.process(binary.left) + opstr + self.process(binary.right),
- **kwargs
- lambda opstr: self.process(binary.left, **kwargs) +
++ lambda opstr: self.process(binary.left, **kw) +
+ opstr +
- self.process(binary.right, **kwargs),
- **kwargs
++ self.process(binary.right, **kw),
++ **kw
)
def visit_like_op(self, binary, **kw):
else:
return fn(" " + operator + " ")
- def visit_bindparam(self, bindparam, **kwargs):
+ def visit_bindparam(self, bindparam, within_columns_clause=False,
+ literal_binds=False, **kwargs):
+ if literal_binds or \
+ (within_columns_clause and \
- not self.binds_in_columns_clause) and \
- bindparam.value is not None:
++ self.ansi_bind_rules):
++ if bindparam.value is None:
++ raise exc.CompileError("Bind parameter without a "
++ "renderable value not allowed here.")
+ return self.render_literal_bindparam(bindparam, within_columns_clause=True, **kwargs)
+
name = self._truncate_bindparam(bindparam)
if name in self.binds:
existing = self.binds[name]
self.binds[bindparam.key] = self.binds[name] = bindparam
return self.bindparam_string(name)
-
+
+ def render_literal_bindparam(self, bindparam, **kw):
+ value = bindparam.value
+ processor = bindparam.bind_processor(self.dialect)
+ if processor:
+ value = processor(value)
- return self.render_literal_value(value)
++ return self.render_literal_value(value, bindparam.type)
+
- def render_literal_value(self, value):
++ def render_literal_value(self, value, type_):
+ """Render the value of a bind parameter as a quoted literal.
+
+ This is used for statement sections that do not accept bind paramters
+ on the target driver/database.
+
+ This should be implemented by subclasses using the quoting services
+ of the DBAPI.
+
+ """
- raise NotImplementedError()
++ if isinstance(value, basestring):
++ value = value.replace("'", "''")
++ return "'%s'" % value
++ elif value is None:
++ return "NULL"
++ elif isinstance(value, (float, int, long)):
++ return repr(value)
++ elif isinstance(value, decimal.Decimal):
++ return str(value)
++ else:
++ raise NotImplementedError("Don't know how to literal-quote value %r" % value)
+
def _truncate_bindparam(self, bindparam):
if bindparam in self.bind_names:
return self.bind_names[bindparam]
def test_order_by_label(self):
"""test that a label within an ORDER BY works on each backend.
-- simple labels in ORDER BYs now render as the actual labelname
-- which not every database supports.
++ This test should be modified to support [ticket:1068] when that ticket
++ is implemented. For now, you need to put the actual string in the
++ ORDER BY.
"""
users.insert().execute(
concat = ("test: " + users.c.user_name).label('thedata')
eq_(
-- select([concat]).order_by(concat).execute().fetchall(),
++ select([concat]).order_by("thedata").execute().fetchall(),
[("test: ed",), ("test: fred",), ("test: jack",)]
)
eq_(
-- select([concat]).order_by(concat).execute().fetchall(),
++ select([concat]).order_by("thedata").execute().fetchall(),
[("test: ed",), ("test: fred",), ("test: jack",)]
)
concat = ("test: " + users.c.user_name).label('thedata')
eq_(
-- select([concat]).order_by(desc(concat)).execute().fetchall(),
++ select([concat]).order_by(desc('thedata')).execute().fetchall(),
[("test: jack",), ("test: fred",), ("test: ed",)]
)
-- concat = ("test: " + users.c.user_name).label('thedata')
-- eq_(
-- select([concat]).order_by(concat + "x").execute().fetchall(),
-- [("test: ed",), ("test: fred",), ("test: jack",)]
-- )
++ @testing.fails_on('postgresql', 'only simple labels allowed')
++ @testing.fails_on('sybase', 'only simple labels allowed')
++ def go():
++ concat = ("test: " + users.c.user_name).label('thedata')
++ eq_(
++ select([concat]).order_by(literal_column('thedata') + "x").execute().fetchall(),
++ [("test: ed",), ("test: fred",), ("test: jack",)]
++ )
++ go()
def test_row_comparison(self):
assert len(r) == 0
@testing.emits_warning('.*empty sequence.*')
-- @testing.fails_on('firebird', "kinterbasdb doesn't send full type information")
++ @testing.fails_on('firebird', "uses sql-92 rules")
++ @testing.fails_on('sybase', "uses sql-92 rules")
@testing.fails_if(lambda:
testing.against('mssql+pyodbc') and not testing.db.dialect.freetds,
-- "not supported by Windows ODBC driver")
++ "uses sql-92 rules")
def test_bind_in(self):
++ """test calling IN against a bind parameter.
++
++ this isn't allowed on several platforms since we
++ generate ? = ?.
++
++ """
users.insert().execute(user_id = 7, user_name = 'jack')
users.insert().execute(user_id = 8, user_name = 'fred')
users.insert().execute(user_id = 9, user_name = None)
assert len(r) == 3
r = s.execute(search_key=None).fetchall()
assert len(r) == 0
++
++ @testing.emits_warning('.*empty sequence.*')
++ def test_literal_in(self):
++ """similar to test_bind_in but use a bind with a value."""
++
++ users.insert().execute(user_id = 7, user_name = 'jack')
++ users.insert().execute(user_id = 8, user_name = 'fred')
++ users.insert().execute(user_id = 9, user_name = None)
++ s = users.select(not_(literal("john").in_([])))
++ r = s.execute().fetchall()
++ assert len(r) == 3
++
++
++
@testing.emits_warning('.*empty sequence.*')
@testing.fails_on('firebird', 'FIXME: unknown')
@testing.fails_on('maxdb', 'FIXME: unknown')
def test_orderby_groupby(self):
self.assert_compile(
table2.select(order_by = [table2.c.otherid, asc(table2.c.othername)]),
-- "SELECT myothertable.otherid, myothertable.othername FROM myothertable ORDER BY myothertable.otherid, myothertable.othername ASC"
++ "SELECT myothertable.otherid, myothertable.othername FROM "
++ "myothertable ORDER BY myothertable.otherid, myothertable.othername ASC"
)
self.assert_compile(
table2.select(order_by = [table2.c.otherid, table2.c.othername.desc()]),
-- "SELECT myothertable.otherid, myothertable.othername FROM myothertable ORDER BY myothertable.otherid, myothertable.othername DESC"
++ "SELECT myothertable.otherid, myothertable.othername FROM "
++ "myothertable ORDER BY myothertable.otherid, myothertable.othername DESC"
)
# generative order_by
self.assert_compile(
table2.select().order_by(table2.c.otherid).order_by(table2.c.othername.desc()),
-- "SELECT myothertable.otherid, myothertable.othername FROM myothertable ORDER BY myothertable.otherid, myothertable.othername DESC"
++ "SELECT myothertable.otherid, myothertable.othername FROM "
++ "myothertable ORDER BY myothertable.otherid, myothertable.othername DESC"
)
self.assert_compile(
-- table2.select().order_by(table2.c.otherid).order_by(table2.c.othername.desc()).order_by(None),
++ table2.select().order_by(table2.c.otherid).
++ order_by(table2.c.othername.desc()).order_by(None),
"SELECT myothertable.otherid, myothertable.othername FROM myothertable"
)
self.assert_compile(
-- select([table2.c.othername, func.count(table2.c.otherid)], group_by = [table2.c.othername]),
-- "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 FROM myothertable GROUP BY myothertable.othername"
++ select(
++ [table2.c.othername, func.count(table2.c.otherid)],
++ group_by = [table2.c.othername]),
++ "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 "
++ "FROM myothertable GROUP BY myothertable.othername"
)
# generative group by
self.assert_compile(
-- select([table2.c.othername, func.count(table2.c.otherid)]).group_by(table2.c.othername),
-- "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 FROM myothertable GROUP BY myothertable.othername"
++ select([table2.c.othername, func.count(table2.c.otherid)]).
++ group_by(table2.c.othername),
++ "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 "
++ "FROM myothertable GROUP BY myothertable.othername"
)
self.assert_compile(
-- select([table2.c.othername, func.count(table2.c.otherid)]).group_by(table2.c.othername).group_by(None),
-- "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 FROM myothertable"
++ select([table2.c.othername, func.count(table2.c.otherid)]).
++ group_by(table2.c.othername).group_by(None),
++ "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 "
++ "FROM myothertable"
)
self.assert_compile(
-- select([table2.c.othername, func.count(table2.c.otherid)], group_by = [table2.c.othername], order_by = [table2.c.othername]),
-- "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 FROM myothertable GROUP BY myothertable.othername ORDER BY myothertable.othername"
++ select([table2.c.othername, func.count(table2.c.otherid)],
++ group_by = [table2.c.othername],
++ order_by = [table2.c.othername]),
++ "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 "
++ "FROM myothertable GROUP BY myothertable.othername ORDER BY myothertable.othername"
)
def test_for_update(self):
-- self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
++ self.assert_compile(
++ table1.select(table1.c.myid==7, for_update=True),
++ "SELECT mytable.myid, mytable.name, mytable.description "
++ "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
-- self.assert_compile(table1.select(table1.c.myid==7, for_update="nowait"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
++ self.assert_compile(
++ table1.select(table1.c.myid==7, for_update="nowait"),
++ "SELECT mytable.myid, mytable.name, mytable.description "
++ "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
-- self.assert_compile(table1.select(table1.c.myid==7, for_update="nowait"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE NOWAIT", dialect=oracle.dialect())
++ self.assert_compile(
++ table1.select(table1.c.myid==7, for_update="nowait"),
++ "SELECT mytable.myid, mytable.name, mytable.description "
++ "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE NOWAIT",
++ dialect=oracle.dialect())
-- self.assert_compile(table1.select(table1.c.myid==7, for_update="read"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = %s LOCK IN SHARE MODE", dialect=mysql.dialect())
++ self.assert_compile(
++ table1.select(table1.c.myid==7, for_update="read"),
++ "SELECT mytable.myid, mytable.name, mytable.description "
++ "FROM mytable WHERE mytable.myid = %s LOCK IN SHARE MODE",
++ dialect=mysql.dialect())
-- self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = %s FOR UPDATE", dialect=mysql.dialect())
++ self.assert_compile(
++ table1.select(table1.c.myid==7, for_update=True),
++ "SELECT mytable.myid, mytable.name, mytable.description "
++ "FROM mytable WHERE mytable.myid = %s FOR UPDATE",
++ dialect=mysql.dialect())
-- self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE", dialect=oracle.dialect())
++ self.assert_compile(
++ table1.select(table1.c.myid==7, for_update=True),
++ "SELECT mytable.myid, mytable.name, mytable.description "
++ "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE",
++ dialect=oracle.dialect())
def test_alias(self):
# test the alias for a table1. column names stay the same, table name "changes" to "foo".
,dialect=dialect)
self.assert_compile(
-- select([table1.alias()])
-- ,"SELECT mytable_1.myid, mytable_1.name, mytable_1.description FROM mytable AS mytable_1")
++ select([table1.alias()]),
++ "SELECT mytable_1.myid, mytable_1.name, mytable_1.description "
++ "FROM mytable AS mytable_1")
-- # create a select for a join of two tables. use_labels means the column names will have
-- # labels tablename_columnname, which become the column keys accessible off the Selectable object.
-- # also, only use one column from the second table and all columns from the first table1.
-- q = select([table1, table2.c.otherid], table1.c.myid == table2.c.otherid, use_labels = True)
++ # create a select for a join of two tables. use_labels
++ # means the column names will have labels tablename_columnname,
++ # which become the column keys accessible off the Selectable object.
++ # also, only use one column from the second table and all columns
++ # from the first table1.
++ q = select(
++ [table1, table2.c.otherid],
++ table1.c.myid == table2.c.otherid, use_labels = True
++ )
-- # make an alias of the "selectable". column names stay the same (i.e. the labels), table name "changes" to "t2view".
++ # make an alias of the "selectable". column names
++ # stay the same (i.e. the labels), table name "changes" to "t2view".
a = alias(q, 't2view')
# select from that alias, also using labels. two levels of labels should produce two underscores.
# also, reference the column "mytable_myid" off of the t2view alias.
self.assert_compile(
a.select(a.c.mytable_myid == 9, use_labels = True),
-- "SELECT t2view.mytable_myid AS t2view_mytable_myid, t2view.mytable_name AS t2view_mytable_name, \
--t2view.mytable_description AS t2view_mytable_description, t2view.myothertable_otherid AS t2view_myothertable_otherid FROM \
--(SELECT mytable.myid AS mytable_myid, mytable.name AS mytable_name, mytable.description AS mytable_description, \
--myothertable.otherid AS myothertable_otherid FROM mytable, myothertable \
--WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = :mytable_myid_1"
++ "SELECT t2view.mytable_myid AS t2view_mytable_myid, t2view.mytable_name "
++ "AS t2view_mytable_name, t2view.mytable_description AS t2view_mytable_description, "
++ "t2view.myothertable_otherid AS t2view_myothertable_otherid FROM "
++ "(SELECT mytable.myid AS mytable_myid, mytable.name AS mytable_name, "
++ "mytable.description AS mytable_description, myothertable.otherid AS "
++ "myothertable_otherid FROM mytable, myothertable WHERE mytable.myid = "
++ "myothertable.otherid) AS t2view WHERE t2view.mytable_myid = :mytable_myid_1"
)
def test_prefixes(self):
self.assert_compile(table1.select().prefix_with("SQL_CALC_FOUND_ROWS").prefix_with("SQL_SOME_WEIRD_MYSQL_THING"),
-- "SELECT SQL_CALC_FOUND_ROWS SQL_SOME_WEIRD_MYSQL_THING mytable.myid, mytable.name, mytable.description FROM mytable"
++ "SELECT SQL_CALC_FOUND_ROWS SQL_SOME_WEIRD_MYSQL_THING "
++ "mytable.myid, mytable.name, mytable.description FROM mytable"
)
def test_text(self):
["foobar(a)", "pk_foo_bar(syslaal)"],
"a = 12",
from_obj = ["foobar left outer join lala on foobar.foo = lala.foo"]
-- ),
-- "SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar left outer join lala on foobar.foo = lala.foo WHERE a = 12")
++ ),
++ "SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar "
++ "left outer join lala on foobar.foo = lala.foo WHERE a = 12"
++ )
# test unicode
self.assert_compile(select(
[u"foobar(a)", u"pk_foo_bar(syslaal)"],
u"a = 12",
from_obj = [u"foobar left outer join lala on foobar.foo = lala.foo"]
-- ),
-- u"SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar left outer join lala on foobar.foo = lala.foo WHERE a = 12")
++ ),
++ "SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar "
++ "left outer join lala on foobar.foo = lala.foo WHERE a = 12"
++ )
# test building a select query programmatically with text
s = select()
s.append_whereclause("column2=19")
s = s.order_by("column1")
s.append_from("table1")
-- self.assert_compile(s, "SELECT column1, column2 FROM table1 WHERE column1=12 AND column2=19 ORDER BY column1")
++ self.assert_compile(s, "SELECT column1, column2 FROM table1 WHERE "
++ "column1=12 AND column2=19 ORDER BY column1")
self.assert_compile(
select(["column1", "column2"], from_obj=table1).alias('somealias').select(),
-- "SELECT somealias.column1, somealias.column2 FROM (SELECT column1, column2 FROM mytable) AS somealias"
++ "SELECT somealias.column1, somealias.column2 FROM "
++ "(SELECT column1, column2 FROM mytable) AS somealias"
)
# test that use_labels doesnt interfere with literal columns
"SELECT column1 AS foobar, column2 AS hoho, mytable.myid AS mytable_myid FROM mytable"
)
-- print "---------------------------------------------"
s1 = select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], from_obj=[table1])
-- print "---------------------------------------------"
# test that "auto-labeling of subquery columns" doesnt interfere with literal columns,
# exported columns dont get quoted
self.assert_compile(
select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], from_obj=[table1]).select(),
-- "SELECT column1 AS foobar, column2 AS hoho, myid FROM (SELECT column1 AS foobar, column2 AS hoho, mytable.myid AS myid FROM mytable)"
++ "SELECT column1 AS foobar, column2 AS hoho, myid FROM "
++ "(SELECT column1 AS foobar, column2 AS hoho, mytable.myid AS myid FROM mytable)"
)
self.assert_compile(
def test_binds_in_text(self):
self.assert_compile(
-- text("select * from foo where lala=:bar and hoho=:whee", bindparams=[bindparam('bar', 4), bindparam('whee', 7)]),
++ text("select * from foo where lala=:bar and hoho=:whee",
++ bindparams=[bindparam('bar', 4), bindparam('whee', 7)]),
"select * from foo where lala=:bar and hoho=:whee",
checkparams={'bar':4, 'whee': 7},
)
dialect = postgresql.dialect()
self.assert_compile(
-- text("select * from foo where lala=:bar and hoho=:whee", bindparams=[bindparam('bar',4), bindparam('whee',7)]),
++ text("select * from foo where lala=:bar and hoho=:whee",
++ bindparams=[bindparam('bar',4), bindparam('whee',7)]),
"select * from foo where lala=%(bar)s and hoho=%(whee)s",
checkparams={'bar':4, 'whee': 7},
dialect=dialect
dialect = sqlite.dialect()
self.assert_compile(
-- text("select * from foo where lala=:bar and hoho=:whee", bindparams=[bindparam('bar',4), bindparam('whee',7)]),
++ text("select * from foo where lala=:bar and hoho=:whee",
++ bindparams=[bindparam('bar',4), bindparam('whee',7)]),
"select * from foo where lala=? and hoho=?",
checkparams={'bar':4, 'whee':7},
dialect=dialect
table1.c.myid == table2.c.otherid,
)
),
-- "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, sysdate(), foo, bar, lala \
--FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today AND mytable.myid = myothertable.otherid")
++ "SELECT mytable.myid, mytable.name, mytable.description, "
++ "myothertable.otherid, sysdate(), foo, bar, lala "
++ "FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND "
++ "datetime(foo) = Today AND mytable.myid = myothertable.otherid")
self.assert_compile(select(
[alias(table1, 't'), "foo.f"],
"foo.f = t.id",
from_obj = ["(select f from bar where lala=heyhey) foo"]
),
-- "SELECT t.myid, t.name, t.description, foo.f FROM mytable AS t, (select f from bar where lala=heyhey) foo WHERE foo.f = t.id")
++ "SELECT t.myid, t.name, t.description, foo.f FROM mytable AS t, "
++ "(select f from bar where lala=heyhey) foo WHERE foo.f = t.id")
# test Text embedded within select_from(), using binds
-- generate_series = text("generate_series(:x, :y, :z) as s(a)", bindparams=[bindparam('x'), bindparam('y'), bindparam('z')])
--
-- s =select([(func.current_date() + literal_column("s.a")).label("dates")]).select_from(generate_series)
-- self.assert_compile(s, "SELECT CURRENT_DATE + s.a AS dates FROM generate_series(:x, :y, :z) as s(a)", checkparams={'y': None, 'x': None, 'z': None})
++ generate_series = text(
++ "generate_series(:x, :y, :z) as s(a)",
++ bindparams=[bindparam('x'), bindparam('y'), bindparam('z')]
++ )
++
++ s =select([
++ (func.current_date() + literal_column("s.a")).label("dates")
++ ]).select_from(generate_series)
++ self.assert_compile(
++ s,
++ "SELECT CURRENT_DATE + s.a AS dates FROM generate_series(:x, :y, :z) as s(a)",
++ checkparams={'y': None, 'x': None, 'z': None}
++ )
++
++ self.assert_compile(
++ s.params(x=5, y=6, z=7),
++ "SELECT CURRENT_DATE + s.a AS dates FROM generate_series(:x, :y, :z) as s(a)",
++ checkparams={'y': 6, 'x': 5, 'z': 7}
++ )
++
++ @testing.emits_warning('.*empty sequence.*')
++ def test_render_binds_as_literal(self):
++ """test a compiler that renders binds inline into
++ SQL in the columns clause."""
-- self.assert_compile(s.params(x=5, y=6, z=7), "SELECT CURRENT_DATE + s.a AS dates FROM generate_series(:x, :y, :z) as s(a)", checkparams={'y': 6, 'x': 5, 'z': 7})
++ dialect = default.DefaultDialect()
++ class Compiler(dialect.statement_compiler):
++ ansi_bind_rules = True
++ dialect.statement_compiler = Compiler
++ self.assert_compile(
++ select([literal("someliteral")]),
++ "SELECT 'someliteral'",
++ dialect=dialect
++ )
++ self.assert_compile(
++ select([table1.c.myid + 3]),
++ "SELECT mytable.myid + 3 AS anon_1 FROM mytable",
++ dialect=dialect
++ )
++
++ self.assert_compile(
++ select([table1.c.myid.in_([4, 5, 6])]),
++ "SELECT mytable.myid IN (4, 5, 6) AS anon_1 FROM mytable",
++ dialect=dialect
++ )
++
++ self.assert_compile(
++ select([literal("foo").in_([])]),
++ "SELECT 'foo' != 'foo' AS anon_1",
++ dialect=dialect
++ )
++
++ assert_raises(
++ exc.CompileError,
++ bindparam("foo").in_([]).compile, dialect=dialect
++ )
++
++
def test_literal(self):
self.assert_compile(select([literal('foo')]), "SELECT :param_1")