From: Mike Bayer Date: Wed, 5 Dec 2007 03:07:21 +0000 (+0000) Subject: - basic framework for generic functions, [ticket:615] X-Git-Tag: rel_0_4_2~111 X-Git-Url: http://git.ipfire.org/gitweb/gitweb.cgi?a=commitdiff_plain;h=238c2c8dbe3ca5b92d298b39e96f81eb416d1413;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - basic framework for generic functions, [ticket:615] - changed the various "literal" generation functions to use an anonymous bind parameter. not much changes here except their labels now look like ":param_1", ":param_2" instead of ":literal" - from_obj keyword argument to select() can be a scalar or a list. --- diff --git a/CHANGES b/CHANGES index dea30fd218..935bfb6ce3 100644 --- a/CHANGES +++ b/CHANGES @@ -4,6 +4,15 @@ CHANGES 0.4.2 ----- - sql + - generic functions ! we introduce a database of known SQL functions, such + as current_timestamp, coalesce, and create explicit function objects + representing them. These objects have constrained argument lists, are + type aware, and can compile in a dialect-specific fashion. So saying + func.char_length("foo", "bar") raises an error (too many args), + func.coalesce(datetime.date(2007, 10, 5), datetime.date(2005, 10, 15)) + knows that its return type is a Date. We only have a few functions + represented so far but will continue to add to the system [ticket:615] + - added new flag to String and create_engine(), assert_unicode=(True|False|None). Defaults to `False` or `None` on create_engine() and String, `True` on the Unicode type. When `True`, @@ -12,9 +21,15 @@ CHANGES advised that all unicode-aware applications make proper use of Python unicode objects (i.e. u'hello' and not 'hello'). + - changed the various "literal" generation functions to use an anonymous + bind parameter. not much changes here except their labels now look + like ":param_1", ":param_2" instead of ":literal" + - column labels in the form "tablename.columname", i.e. with a dot, are now supported. + - from_obj keyword argument to select() can be a scalar or a list. + - orm - new synonym() behavior: an attribute will be placed on the mapped diff --git a/lib/sqlalchemy/databases/firebird.py b/lib/sqlalchemy/databases/firebird.py index 7580a750aa..3a13c4b698 100644 --- a/lib/sqlalchemy/databases/firebird.py +++ b/lib/sqlalchemy/databases/firebird.py @@ -354,11 +354,11 @@ class FBCompiler(compiler.DefaultCompiler): else: return self.process(alias.original, **kwargs) - def apply_function_parens(self, func): + def function_argspec(self, func): if func.clauses: - return super(FBCompiler, self).apply_function_parens(func) + return self.process(func.clause_expr) else: - return False + return "" def default_from(self): return " FROM rdb$database" diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index 4e62478105..45e4c036f7 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -1353,7 +1353,7 @@ class ResultProxy(object): if self.context.result_map: try: - (name, obj, type_) = self.context.result_map[colname] + (name, obj, type_) = self.context.result_map[colname.lower()] except KeyError: (name, obj, type_) = (colname, None, typemap.get(item[1], types.NULLTYPE)) else: diff --git a/lib/sqlalchemy/sql/compiler.py b/lib/sqlalchemy/sql/compiler.py index d94d3fef67..ea7c1b7345 100644 --- a/lib/sqlalchemy/sql/compiler.py +++ b/lib/sqlalchemy/sql/compiler.py @@ -8,15 +8,10 @@ import string, re from sqlalchemy import schema, engine, util, exceptions -from sqlalchemy.sql import operators, visitors +from sqlalchemy.sql import operators, visitors, functions from sqlalchemy.sql import util as sql_util from sqlalchemy.sql import expression as sql -ANSI_FUNCS = util.Set([ - 'CURRENT_DATE', 'CURRENT_TIME', 'CURRENT_TIMESTAMP', - 'CURRENT_USER', 'LOCALTIME', 'LOCALTIMESTAMP', - 'SESSION_USER', 'USER']) - RESERVED_WORDS = util.Set([ 'all', 'analyse', 'analyze', 'and', 'any', 'array', 'as', 'asc', 'asymmetric', 'authorization', 'between', @@ -87,6 +82,18 @@ OPERATORS = { operators.isnot : 'IS NOT' } +FUNCTIONS = { + functions.coalesce : 'coalesce%(expr)s', + functions.current_date: 'CURRENT_DATE', + functions.current_time: 'CURRENT_TIME', + functions.current_timestamp: 'CURRENT_TIMESTAMP', + functions.current_user: 'CURRENT_USER', + functions.localtime: 'LOCALTIME', + functions.localtimestamp: 'LOCALTIMESTAMP', + functions.session_user :'SESSION_USER', + functions.user: 'USER' +} + class DefaultCompiler(engine.Compiled): """Default implementation of Compiled. @@ -97,6 +104,7 @@ class DefaultCompiler(engine.Compiled): __traverse_options__ = {'column_collections':False, 'entry':True} operators = OPERATORS + functions = FUNCTIONS def __init__(self, dialect, statement, column_keys=None, inline=False, **kwargs): """Construct a new ``DefaultCompiler`` object. @@ -215,7 +223,7 @@ class DefaultCompiler(engine.Compiled): labelname = self._truncated_identifier("colident", label.name) if result_map is not None: - result_map[labelname] = (label.name, (label, label.obj), label.obj.type) + result_map[labelname.lower()] = (label.name, (label, label.obj), label.obj.type) return " ".join([self.process(label.obj), self.operator_string(operators.as_), self.preparer.format_label(label, labelname)]) @@ -231,7 +239,7 @@ class DefaultCompiler(engine.Compiled): name = column.name if result_map is not None: - result_map[name] = (name, (column, ), column.type) + result_map[name.lower()] = (name, (column, ), column.type) if column._is_oid: n = self.dialect.oid_column_name(column) @@ -270,7 +278,7 @@ class DefaultCompiler(engine.Compiled): def visit_textclause(self, textclause, **kwargs): if textclause.typemap is not None: for colname, type_ in textclause.typemap.iteritems(): - self.result_map[colname] = (colname, None, type_) + self.result_map[colname.lower()] = (colname, None, type_) def do_bindparam(m): name = m.group(1) @@ -297,9 +305,6 @@ class DefaultCompiler(engine.Compiled): sep = " " + self.operator_string(clauselist.operator) + " " return sep.join([s for s in [self.process(c) for c in clauselist.clauses] if s is not None]) - def apply_function_parens(self, func): - return func.name.upper() not in ANSI_FUNCS or len(func.clauses) > 0 - def visit_calculatedclause(self, clause, **kwargs): return self.process(clause.clause_expr) @@ -308,12 +313,20 @@ class DefaultCompiler(engine.Compiled): def visit_function(self, func, result_map=None, **kwargs): if result_map is not None: - result_map[func.name] = (func.name, None, func.type) + result_map[func.name.lower()] = (func.name, None, func.type) + + name = self.function_string(func) - if not self.apply_function_parens(func): - return ".".join(func.packagenames + [func.name]) + if callable(name): + return name(*[self.process(x) for x in func.clause_expr]) else: - return ".".join(func.packagenames + [func.name]) + (not func.group and " " or "") + self.process(func.clause_expr) + return ".".join(func.packagenames + [name]) % {'expr':self.function_argspec(func)} + + def function_argspec(self, func): + return self.process(func.clause_expr) + + def function_string(self, func): + return self.functions.get(func.__class__, func.name + "%(expr)s") def visit_compound_select(self, cs, asfrom=False, parens=True, **kwargs): stack_entry = {'select':cs} @@ -358,10 +371,11 @@ class DefaultCompiler(engine.Compiled): def operator_string(self, operator): return self.operators.get(operator, str(operator)) - + def visit_bindparam(self, bindparam, **kwargs): - # apply truncation to the ultimate generated name - + # TODO: remove this whole "unique" thing, just use regular + # anonymous params to implement. params used for inserts/updates + # etc. should no longer be "unique". if bindparam.unique: count = 1 key = bindparam.key diff --git a/lib/sqlalchemy/sql/expression.py b/lib/sqlalchemy/sql/expression.py index c21b102c72..6ad29218f5 100644 --- a/lib/sqlalchemy/sql/expression.py +++ b/lib/sqlalchemy/sql/expression.py @@ -26,10 +26,11 @@ to stay the same in future releases. """ import re +import datetime from sqlalchemy import util, exceptions from sqlalchemy.sql import operators, visitors from sqlalchemy import types as sqltypes - +functions = None __all__ = [ 'Alias', 'ClauseElement', @@ -596,7 +597,7 @@ def literal(value, type_=None): bind-parameter translation for this literal. """ - return _BindParamClause('literal', value, type_=type_, unique=True) + return _BindParamClause(None, value, type_=type_, unique=True) def label(name, obj): """Return a [sqlalchemy.sql.expression#_Label] object for the given [sqlalchemy.sql.expression#ColumnElement]. @@ -766,6 +767,14 @@ class _FunctionGenerator(object): def __call__(self, *c, **kwargs): o = self.opts.copy() o.update(kwargs) + if len(self.__names) == 1: + global functions + if functions is None: + from sqlalchemy.sql import functions + func = getattr(functions, self.__names[-1].lower(), None) + if func is not None: + return func(*c, **o) + return _Function(self.__names[-1], packagenames=self.__names[0:-1], *c, **o) func = _FunctionGenerator() @@ -798,7 +807,7 @@ def _literal_as_column(element): else: return element -def _literal_as_binds(element, name='literal', type_=None): +def _literal_as_binds(element, name=None, type_=None): if isinstance(element, Operators): return element.expression_element() elif _is_literal(element): @@ -1344,7 +1353,7 @@ class _CompareMixin(ColumnOperators): return lambda other: self.__operate(operator, other) def _bind_param(self, obj): - return _BindParamClause('literal', obj, type_=self.type, unique=True) + return _BindParamClause(None, obj, type_=self.type, unique=True) def _check_literal(self, other): if isinstance(other, _BindParamClause) and isinstance(other.type, sqltypes.NullType): @@ -1762,6 +1771,10 @@ class _BindParamClause(ClauseElement, _CompareMixin): unicode : sqltypes.NCHAR, int : sqltypes.Integer, float : sqltypes.Numeric, + datetime.date : sqltypes.Date, + datetime.datetime : sqltypes.DateTime, + datetime.time : sqltypes.Time, + datetime.timedelta : sqltypes.Interval, type(None):sqltypes.NullType } @@ -1997,11 +2010,12 @@ class _Function(_CalculatedClause, FromClause): def __init__(self, name, *clauses, **kwargs): self.packagenames = kwargs.get('packagenames', None) or [] self.oid_column = None - kwargs['operator'] = operators.comma_op - _CalculatedClause.__init__(self, name, **kwargs) - for c in clauses: - self.append(c) - + self.name = name + self._bind = kwargs.get('bind', None) + args = [_literal_as_binds(c, self.name) for c in clauses] + self.clause_expr = ClauseList(operator=operators.comma_op, group_contents=True, *args).self_group() + self.type = sqltypes.to_instance(kwargs.get('type_', None)) + key = property(lambda self:self.name) columns = property(lambda self:[self]) @@ -2012,9 +2026,6 @@ class _Function(_CalculatedClause, FromClause): def get_children(self, **kwargs): return _CalculatedClause.get_children(self, **kwargs) - def append(self, clause): - self.clauses.append(_literal_as_binds(clause, self.name)) - class _Cast(ColumnElement): @@ -2984,7 +2995,7 @@ class Select(_SelectBaseMixin, FromClause): if from_obj: self._froms = util.Set([ _is_literal(f) and _TextFromClause(f) or f - for f in from_obj + for f in util.to_list(from_obj) ]) else: self._froms = util.Set() diff --git a/lib/sqlalchemy/sql/functions.py b/lib/sqlalchemy/sql/functions.py new file mode 100644 index 0000000000..d39032b91c --- /dev/null +++ b/lib/sqlalchemy/sql/functions.py @@ -0,0 +1,78 @@ +from sqlalchemy import types as sqltypes +from sqlalchemy.sql.expression import _Function, _literal_as_binds, ClauseList, _FigureVisitName +from sqlalchemy.sql import operators + +class _GenericMeta(_FigureVisitName): + def __init__(cls, clsname, bases, dict): + cls.__visit_name__ = 'function' + type.__init__(cls, clsname, bases, dict) + + def __call__(self, *args, **kwargs): + args = [_literal_as_binds(c) for c in args] + return type.__call__(self, *args, **kwargs) + +class GenericFunction(_Function): + __metaclass__ = _GenericMeta + + def __init__(self, type_=None, group=True, args=(), **kwargs): + self.packagenames = [] + self.oid_column = None + self.name = self.__class__.__name__ + self._bind = kwargs.get('bind', None) + if group: + self.clause_expr = ClauseList(operator=operators.comma_op, group_contents=True, *args).self_group() + else: + self.clause_expr = ClauseList(operator=operators.comma_op, group_contents=True, *args) + self.type = sqltypes.to_instance(type_ or getattr(self, '__return_type__', None)) + +class AnsiFunction(GenericFunction): + def __init__(self, **kwargs): + GenericFunction.__init__(self, **kwargs) + + +class coalesce(GenericFunction): + def __init__(self, *args, **kwargs): + kwargs.setdefault('type_', _type_from_args(args)) + GenericFunction.__init__(self, args=args, **kwargs) + +class concat(GenericFunction): + __return_type__ = sqltypes.String + def __init__(self, *args, **kwargs): + GenericFunction.__init__(self, args=args, **kwargs) + +class char_length(GenericFunction): + __return_type__ = sqltypes.Integer + + def __init__(self, arg, **kwargs): + GenericFunction.__init__(self, args=[arg], **kwargs) + +class current_date(AnsiFunction): + __return_type__ = sqltypes.Date + +class current_time(AnsiFunction): + __return_type__ = sqltypes.Time + +class current_timestamp(AnsiFunction): + __return_type__ = sqltypes.DateTime + +class current_user(AnsiFunction): + __return_type__ = sqltypes.String + +class localtime(AnsiFunction): + __return_type__ = sqltypes.DateTime + +class localtimestamp(AnsiFunction): + __return_type__ = sqltypes.DateTime + +class session_user(AnsiFunction): + __return_type__ = sqltypes.String + +class user(AnsiFunction): + __return_type__ = sqltypes.String + +def _type_from_args(args): + for a in args: + if not isinstance(a.type, sqltypes.NullType): + return a.type + else: + return sqltypes.NullType \ No newline at end of file diff --git a/test/dialect/firebird.py b/test/dialect/firebird.py index 9d043153c2..0abbdf029c 100644 --- a/test/dialect/firebird.py +++ b/test/dialect/firebird.py @@ -22,7 +22,7 @@ class CompileTest(SQLCompileTest): def test_function(self): self.assert_compile(func.foo(1, 2), "foo(:foo, :foo_1)") - self.assert_compile(func.current_time(), "current_time") + self.assert_compile(func.current_time(), "CURRENT_TIME") self.assert_compile(func.foo(), "foo") m = MetaData() diff --git a/test/dialect/mssql.py b/test/dialect/mssql.py index 05d9efd786..3b40ed354f 100755 --- a/test/dialect/mssql.py +++ b/test/dialect/mssql.py @@ -46,7 +46,7 @@ class CompileTest(SQLCompileTest): def test_function(self): self.assert_compile(func.foo(1, 2), "foo(:foo, :foo_1)") - self.assert_compile(func.current_time(), "current_time") + self.assert_compile(func.current_time(), "CURRENT_TIME") self.assert_compile(func.foo(), "foo()") m = MetaData() diff --git a/test/orm/query.py b/test/orm/query.py index ef9e3540d5..f1b40e0b1a 100644 --- a/test/orm/query.py +++ b/test/orm/query.py @@ -170,13 +170,13 @@ class OperatorTest(QueryTest): ): for (lhs, rhs, res) in ( (5, User.id, ':users_id %s users.id'), - (5, literal(6), ':literal %s :literal_1'), + (5, literal(6), ':param_1 %s :param_2'), (User.id, 5, 'users.id %s :users_id'), - (User.id, literal('b'), 'users.id %s :literal'), + (User.id, literal('b'), 'users.id %s :param_1'), (User.id, User.id, 'users.id %s users.id'), - (literal(5), 'b', ':literal %s :literal_1'), - (literal(5), User.id, ':literal %s users.id'), - (literal(5), literal(6), ':literal %s :literal_1'), + (literal(5), 'b', ':param_1 %s :param_2'), + (literal(5), User.id, ':param_1 %s users.id'), + (literal(5), literal(6), ':param_1 %s :param_2'), ): self._test(py_op(lhs, rhs), res % sql_op) @@ -190,13 +190,13 @@ class OperatorTest(QueryTest): (operator.ge, '>=', '<=')): for (lhs, rhs, l_sql, r_sql) in ( ('a', User.id, ':users_id', 'users.id'), - ('a', literal('b'), ':literal_1', ':literal'), # note swap! + ('a', literal('b'), ':param_2', ':param_1'), # note swap! (User.id, 'b', 'users.id', ':users_id'), - (User.id, literal('b'), 'users.id', ':literal'), + (User.id, literal('b'), 'users.id', ':param_1'), (User.id, User.id, 'users.id', 'users.id'), - (literal('a'), 'b', ':literal', ':literal_1'), - (literal('a'), User.id, ':literal', 'users.id'), - (literal('a'), literal('b'), ':literal', ':literal_1'), + (literal('a'), 'b', ':param_1', ':param_2'), + (literal('a'), User.id, ':param_1', 'users.id'), + (literal('a'), literal('b'), ':param_1', ':param_2'), ): # the compiled clause should match either (e.g.): @@ -224,7 +224,7 @@ class OperatorTest(QueryTest): for (expr, compare) in ( (func.max(User.id), "max(users.id)"), (User.id.desc(), "users.id DESC"), - (between(5, User.id, Address.id), ":literal BETWEEN users.id AND addresses.id"), + (between(5, User.id, Address.id), ":param_1 BETWEEN users.id AND addresses.id"), # this one would require adding compile() to InstrumentedScalarAttribute. do we want this ? #(User.id, "users.id") ): diff --git a/test/sql/alltests.py b/test/sql/alltests.py index a669a25f2d..5f5c68904e 100644 --- a/test/sql/alltests.py +++ b/test/sql/alltests.py @@ -17,6 +17,7 @@ def suite(): 'sql.unicode', # assorted round-trip tests + 'sql.functions', 'sql.query', 'sql.quote', 'sql.rowcount', diff --git a/test/sql/functions.py b/test/sql/functions.py new file mode 100644 index 0000000000..177a308b4a --- /dev/null +++ b/test/sql/functions.py @@ -0,0 +1,153 @@ +import testbase +import datetime +from sqlalchemy import * +from sqlalchemy import exceptions, sql +from sqlalchemy.sql.compiler import BIND_TEMPLATES +from sqlalchemy.engine import default +from sqlalchemy import types as sqltypes +from testlib import * + +# TODO: add a helper function to testlib for this +from sqlalchemy.databases import sqlite, postgres, mysql, oracle, firebird, mssql +dialects = [x.dialect() for x in [sqlite, postgres, mysql, oracle, firebird, mssql]] + +class CompileTest(SQLCompileTest): + def test_compile(self): + for dialect in dialects: + bindtemplate = BIND_TEMPLATES[dialect.paramstyle] + self.assert_compile(func.current_timestamp(), "CURRENT_TIMESTAMP", dialect=dialect) + self.assert_compile(func.localtime(), "LOCALTIME", dialect=dialect) + if isinstance(dialect, firebird.dialect): + self.assert_compile(func.nosuchfunction(), "nosuchfunction", dialect=dialect) + else: + self.assert_compile(func.nosuchfunction(), "nosuchfunction()", dialect=dialect) + self.assert_compile(func.char_length('foo'), "char_length(%s)" % bindtemplate % {'name':'param_1', 'position':1}, dialect=dialect) + + def test_constructor(self): + try: + func.current_timestamp('somearg') + assert False + except TypeError: + assert True + + try: + func.char_length('a', 'b') + assert False + except TypeError: + assert True + + try: + func.char_length() + assert False + except TypeError: + assert True + + def test_typing(self): + assert isinstance(func.coalesce(datetime.date(2007, 10, 5), datetime.date(2005, 10, 15)).type, sqltypes.Date) + + assert isinstance(func.coalesce(None, datetime.date(2005, 10, 15)).type, sqltypes.Date) + + assert isinstance(func.concat("foo", "bar").type, sqltypes.String) + +class ExecuteTest(PersistTest): + + def test_standalone_execute(self): + x = testbase.db.func.current_date().execute().scalar() + y = testbase.db.func.current_date().select().execute().scalar() + z = testbase.db.func.current_date().scalar() + assert (x == y == z) is True + + # ansi func + x = testbase.db.func.current_date() + assert isinstance(x.type, Date) + assert isinstance(x.execute().scalar(), datetime.date) + + def test_conn_execute(self): + conn = testbase.db.connect() + try: + x = conn.execute(func.current_date()).scalar() + y = conn.execute(func.current_date().select()).scalar() + z = conn.scalar(func.current_date()) + finally: + conn.close() + assert (x == y == z) is True + + def test_update(self): + """ + Tests sending functions and SQL expressions to the VALUES and SET + clauses of INSERT/UPDATE instances, and that column-level defaults + get overridden. + """ + + meta = MetaData(testbase.db) + t = Table('t1', meta, + Column('id', Integer, Sequence('t1idseq', optional=True), primary_key=True), + Column('value', Integer) + ) + t2 = Table('t2', meta, + Column('id', Integer, Sequence('t2idseq', optional=True), primary_key=True), + Column('value', Integer, default=7), + Column('stuff', String(20), onupdate="thisisstuff") + ) + meta.create_all() + try: + t.insert(values=dict(value=func.length("one"))).execute() + assert t.select().execute().fetchone()['value'] == 3 + t.update(values=dict(value=func.length("asfda"))).execute() + assert t.select().execute().fetchone()['value'] == 5 + + r = t.insert(values=dict(value=func.length("sfsaafsda"))).execute() + id = r.last_inserted_ids()[0] + assert t.select(t.c.id==id).execute().fetchone()['value'] == 9 + t.update(values={t.c.value:func.length("asdf")}).execute() + assert t.select().execute().fetchone()['value'] == 4 + print "--------------------------" + t2.insert().execute() + t2.insert(values=dict(value=func.length("one"))).execute() + t2.insert(values=dict(value=func.length("asfda") + -19)).execute(stuff="hi") + + res = exec_sorted(select([t2.c.value, t2.c.stuff])) + self.assertEquals(res, [(-14, 'hi'), (3, None), (7, None)]) + + t2.update(values=dict(value=func.length("asdsafasd"))).execute(stuff="some stuff") + assert select([t2.c.value, t2.c.stuff]).execute().fetchall() == [(9,"some stuff"), (9,"some stuff"), (9,"some stuff")] + + t2.delete().execute() + + t2.insert(values=dict(value=func.length("one") + 8)).execute() + assert t2.select().execute().fetchone()['value'] == 11 + + t2.update(values=dict(value=func.length("asfda"))).execute() + assert select([t2.c.value, t2.c.stuff]).execute().fetchone() == (5, "thisisstuff") + + t2.update(values={t2.c.value:func.length("asfdaasdf"), t2.c.stuff:"foo"}).execute() + print "HI", select([t2.c.value, t2.c.stuff]).execute().fetchone() + assert select([t2.c.value, t2.c.stuff]).execute().fetchone() == (9, "foo") + + finally: + meta.drop_all() + + @testing.supported('postgres') + def test_as_from(self): + # TODO: shouldnt this work on oracle too ? + x = testbase.db.func.current_date().execute().scalar() + y = testbase.db.func.current_date().select().execute().scalar() + z = testbase.db.func.current_date().scalar() + w = select(['*'], from_obj=[testbase.db.func.current_date()]).scalar() + + # construct a column-based FROM object out of a function, like in [ticket:172] + s = select([sql.column('date', type_=DateTime)], from_obj=[testbase.db.func.current_date()]) + q = s.execute().fetchone()[s.c.date] + r = s.alias('datequery').select().scalar() + + assert x == y == z == w == q == r + +def exec_sorted(statement, *args, **kw): + """Executes a statement and returns a sorted list plain tuple rows.""" + + return sorted([tuple(row) + for row in statement.execute(*args, **kw).fetchall()]) + +if __name__ == '__main__': + testbase.main() + \ No newline at end of file diff --git a/test/sql/query.py b/test/sql/query.py index fa88c5fe63..9b35cff1c8 100644 --- a/test/sql/query.py +++ b/test/sql/query.py @@ -417,95 +417,6 @@ class QueryTest(PersistTest): except exceptions.ArgumentError, e: assert str(e).startswith('Not an executable clause: ') - def test_functions(self): - x = testbase.db.func.current_date().execute().scalar() - y = testbase.db.func.current_date().select().execute().scalar() - z = testbase.db.func.current_date().scalar() - assert (x == y == z) is True - - x = testbase.db.func.current_date(type_=Date) - assert isinstance(x.type, Date) - assert isinstance(x.execute().scalar(), datetime.date) - - def test_conn_functions(self): - conn = testbase.db.connect() - try: - x = conn.execute(func.current_date()).scalar() - y = conn.execute(func.current_date().select()).scalar() - z = conn.scalar(func.current_date()) - finally: - conn.close() - assert (x == y == z) is True - - def test_update_functions(self): - """ - Tests sending functions and SQL expressions to the VALUES and SET - clauses of INSERT/UPDATE instances, and that column-level defaults - get overridden. - """ - - meta = MetaData(testbase.db) - t = Table('t1', meta, - Column('id', Integer, Sequence('t1idseq', optional=True), primary_key=True), - Column('value', Integer) - ) - t2 = Table('t2', meta, - Column('id', Integer, Sequence('t2idseq', optional=True), primary_key=True), - Column('value', Integer, default=7), - Column('stuff', String(20), onupdate="thisisstuff") - ) - meta.create_all() - try: - t.insert(values=dict(value=func.length("one"))).execute() - assert t.select().execute().fetchone()['value'] == 3 - t.update(values=dict(value=func.length("asfda"))).execute() - assert t.select().execute().fetchone()['value'] == 5 - - r = t.insert(values=dict(value=func.length("sfsaafsda"))).execute() - id = r.last_inserted_ids()[0] - assert t.select(t.c.id==id).execute().fetchone()['value'] == 9 - t.update(values={t.c.value:func.length("asdf")}).execute() - assert t.select().execute().fetchone()['value'] == 4 - print "--------------------------" - t2.insert().execute() - t2.insert(values=dict(value=func.length("one"))).execute() - t2.insert(values=dict(value=func.length("asfda") + -19)).execute(stuff="hi") - - res = exec_sorted(select([t2.c.value, t2.c.stuff])) - self.assertEquals(res, [(-14, 'hi'), (3, None), (7, None)]) - - t2.update(values=dict(value=func.length("asdsafasd"))).execute(stuff="some stuff") - assert select([t2.c.value, t2.c.stuff]).execute().fetchall() == [(9,"some stuff"), (9,"some stuff"), (9,"some stuff")] - - t2.delete().execute() - - t2.insert(values=dict(value=func.length("one") + 8)).execute() - assert t2.select().execute().fetchone()['value'] == 11 - - t2.update(values=dict(value=func.length("asfda"))).execute() - assert select([t2.c.value, t2.c.stuff]).execute().fetchone() == (5, "thisisstuff") - - t2.update(values={t2.c.value:func.length("asfdaasdf"), t2.c.stuff:"foo"}).execute() - print "HI", select([t2.c.value, t2.c.stuff]).execute().fetchone() - assert select([t2.c.value, t2.c.stuff]).execute().fetchone() == (9, "foo") - - finally: - meta.drop_all() - - @testing.supported('postgres') - def test_functions_with_cols(self): - # TODO: shouldnt this work on oracle too ? - x = testbase.db.func.current_date().execute().scalar() - y = testbase.db.func.current_date().select().execute().scalar() - z = testbase.db.func.current_date().scalar() - w = select(['*'], from_obj=[testbase.db.func.current_date()]).scalar() - - # construct a column-based FROM object out of a function, like in [ticket:172] - s = select([sql.column('date', type_=DateTime)], from_obj=[testbase.db.func.current_date()]) - q = s.execute().fetchone()[s.c.date] - r = s.alias('datequery').select().scalar() - - assert x == y == z == w == q == r def test_column_order_with_simple_query(self): diff --git a/test/sql/select.py b/test/sql/select.py index f9aa21f1e2..58c4ea3ddf 100644 --- a/test/sql/select.py +++ b/test/sql/select.py @@ -150,9 +150,9 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A def testexistsascolumnclause(self): self.assert_compile(exists([table1.c.myid], table1.c.myid==5).select(), "SELECT EXISTS (SELECT mytable.myid FROM mytable WHERE mytable.myid = :mytable_myid)", params={'mytable_myid':5}) - self.assert_compile(select([table1, exists([1], from_obj=[table2])]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) FROM mytable", params={}) + self.assert_compile(select([table1, exists([1], from_obj=table2)]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) FROM mytable", params={}) - self.assert_compile(select([table1, exists([1], from_obj=[table2]).label('foo')]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) AS foo FROM mytable", params={}) + self.assert_compile(select([table1, exists([1], from_obj=table2).label('foo')]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) AS foo FROM mytable", params={}) def test_generative_exists(self): self.assert_compile( @@ -178,7 +178,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A def testwheresubquery(self): s = select([addresses.c.street], addresses.c.user_id==users.c.user_id, correlate=True).alias('s') self.assert_compile( - select([users, s.c.street], from_obj=[s]), + select([users, s.c.street], from_obj=s), """SELECT users.user_id, users.user_name, users.password, s.street FROM users, (SELECT addresses.street AS street FROM addresses WHERE addresses.user_id = users.user_id) AS s""") self.assert_compile( @@ -205,11 +205,11 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A s = select([addresses.c.street], addresses.c.user_id==users.c.user_id, correlate=True).alias('s') self.assert_compile( - select([users, s.c.street], from_obj=[s]), + select([users, s.c.street], from_obj=s), """SELECT users.user_id, users.user_name, users.password, s.street FROM users, (SELECT addresses.street AS street FROM addresses WHERE addresses.user_id = users.user_id) AS s""") # test constructing the outer query via append_column(), which occurs in the ORM's Query object - s = select([], exists([1], table2.c.otherid==table1.c.myid), from_obj=[table1]) + s = select([], exists([1], table2.c.otherid==table1.c.myid), from_obj=table1) s.append_column(table1) self.assert_compile( s, @@ -242,9 +242,9 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A self.assert_compile(select([table2, s]), "SELECT myothertable.otherid, myothertable.othername, (SELECT mytable.myid FROM mytable) AS anon_1 FROM myothertable") # test expressions against scalar selects - self.assert_compile(select([s - literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) - :literal AS anon_1") - self.assert_compile(select([select([table1.c.name]).as_scalar() + literal('x')]), "SELECT (SELECT mytable.name FROM mytable) || :literal AS anon_1") - self.assert_compile(select([s > literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) > :literal AS anon_1") + self.assert_compile(select([s - literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) - :param_2 AS anon_1") + self.assert_compile(select([select([table1.c.name]).as_scalar() + literal('x')]), "SELECT (SELECT mytable.name FROM mytable) || :param_2 AS anon_1") + self.assert_compile(select([s > literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) > :param_2 AS anon_1") self.assert_compile(select([select([table1.c.name]).label('foo')]), "SELECT (SELECT mytable.name FROM mytable) AS foo") @@ -293,12 +293,12 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A a1 = table2.alias('t2alias') s1 = select([a1.c.otherid], table1.c.myid==a1.c.otherid, scalar=True) j1 = table1.join(table2, table1.c.myid==table2.c.otherid) - s2 = select([table1, s1], from_obj=[j1]) + s2 = select([table1, s1], from_obj=j1) self.assert_compile(s2, "SELECT mytable.myid, mytable.name, mytable.description, (SELECT t2alias.otherid FROM myothertable AS t2alias WHERE mytable.myid = t2alias.otherid) AS anon_1 FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid") def testlabelcomparison(self): x = func.lala(table1.c.myid).label('foo') - self.assert_compile(select([x], x==5), "SELECT lala(mytable.myid) AS foo FROM mytable WHERE lala(mytable.myid) = :literal") + self.assert_compile(select([x], x==5), "SELECT lala(mytable.myid) AS foo FROM mytable WHERE lala(mytable.myid) = :param_1") def testand(self): self.assert_compile( @@ -345,13 +345,13 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A ): for (lhs, rhs, res) in ( (5, table1.c.myid, ':mytable_myid %s mytable.myid'), - (5, literal(5), ':literal %s :literal_1'), + (5, literal(5), ':param_1 %s :param_2'), (table1.c.myid, 'b', 'mytable.myid %s :mytable_myid'), - (table1.c.myid, literal(2.7), 'mytable.myid %s :literal'), + (table1.c.myid, literal(2.7), 'mytable.myid %s :param_1'), (table1.c.myid, table1.c.myid, 'mytable.myid %s mytable.myid'), - (literal(5), 8, ':literal %s :literal_1'), - (literal(6), table1.c.myid, ':literal %s mytable.myid'), - (literal(7), literal(5.5), ':literal %s :literal_1'), + (literal(5), 8, ':param_1 %s :param_2'), + (literal(6), table1.c.myid, ':param_1 %s mytable.myid'), + (literal(7), literal(5.5), ':param_1 %s :param_2'), ): self.assert_compile(py_op(lhs, rhs), res % sql_op) @@ -364,13 +364,13 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A (operator.ge, '>=', '<=')): for (lhs, rhs, l_sql, r_sql) in ( ('a', table1.c.myid, ':mytable_myid', 'mytable.myid'), - ('a', literal('b'), ':literal_1', ':literal'), # note swap! + ('a', literal('b'), ':param_2', ':param_1'), # note swap! (table1.c.myid, 'b', 'mytable.myid', ':mytable_myid'), - (table1.c.myid, literal('b'), 'mytable.myid', ':literal'), + (table1.c.myid, literal('b'), 'mytable.myid', ':param_1'), (table1.c.myid, table1.c.myid, 'mytable.myid', 'mytable.myid'), - (literal('a'), 'b', ':literal', ':literal_1'), - (literal('a'), table1.c.myid, ':literal', 'mytable.myid'), - (literal('a'), literal('b'), ':literal', ':literal_1'), + (literal('a'), 'b', ':param_1', ':param_2'), + (literal('a'), table1.c.myid, ':param_1', 'mytable.myid'), + (literal('a'), literal('b'), ':param_1', ':param_2'), ): # the compiled clause should match either (e.g.): @@ -404,13 +404,13 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A ) self.assert_compile( - literal("a") + literal("b") * literal("c"), ":literal || :literal_1 * :literal_2" + literal("a") + literal("b") * literal("c"), ":param_1 || :param_2 * :param_3" ) # test the op() function, also that its results are further usable in expressions self.assert_compile( table1.select(table1.c.myid.op('hoho')(12)==14), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE (mytable.myid hoho :mytable_myid) = :literal" + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE (mytable.myid hoho :mytable_myid) = :param_1" ) # test that clauses can be pickled (operators need to be module-level, etc.) @@ -555,19 +555,19 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = def testtextcolumns(self): self.assert_compile( - select(["column1", "column2"], from_obj=[table1]).alias('somealias').select(), + select(["column1", "column2"], from_obj=table1).alias('somealias').select(), "SELECT somealias.column1, somealias.column2 FROM (SELECT column1, column2 FROM mytable) AS somealias" ) # test that use_labels doesnt interfere with literal columns self.assert_compile( - select(["column1", "column2", table1.c.myid], from_obj=[table1], use_labels=True), + select(["column1", "column2", table1.c.myid], from_obj=table1, use_labels=True), "SELECT column1, column2, mytable.myid AS mytable_myid FROM mytable" ) # test that use_labels doesnt interfere with literal columns that have textual labels self.assert_compile( - select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], from_obj=[table1], use_labels=True), + select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], from_obj=table1, use_labels=True), "SELECT column1 AS foobar, column2 AS hoho, mytable.myid AS mytable_myid FROM mytable" ) @@ -640,7 +640,7 @@ FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today def testliteral(self): self.assert_compile(select([literal("foo") + literal("bar")], from_obj=[table1]), - "SELECT :literal || :literal_1 AS anon_1 FROM mytable") + "SELECT :param_2 || :param_3 AS anon_1 FROM mytable") def testcalculatedcolumns(self): value_tbl = table('values', @@ -658,19 +658,19 @@ FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today self.assert_compile( select([value_tbl.c.id], (value_tbl.c.val2 - value_tbl.c.val1)/value_tbl.c.val1 > 2.0), - "SELECT values.id FROM values WHERE (values.val2 - values.val1) / values.val1 > :literal" + "SELECT values.id FROM values WHERE (values.val2 - values.val1) / values.val1 > :param_1" ) self.assert_compile( select([value_tbl.c.id], value_tbl.c.val1 / (value_tbl.c.val2 - value_tbl.c.val1) /value_tbl.c.val1 > 2.0), - "SELECT values.id FROM values WHERE values.val1 / (values.val2 - values.val1) / values.val1 > :literal" + "SELECT values.id FROM values WHERE values.val1 / (values.val2 - values.val1) / values.val1 > :param_1" ) def testfunction(self): """tests the generation of functions using the func keyword""" # test an expression with a function self.assert_compile(func.lala(3, 4, literal("five"), table1.c.myid) * table2.c.otherid, - "lala(:lala, :lala_1, :literal, mytable.myid) * myothertable.otherid") + "lala(:lala, :lala_1, :param_1, mytable.myid) * myothertable.otherid") # test it in a SELECT self.assert_compile(select([func.count(table1.c.myid)]), @@ -985,7 +985,7 @@ EXISTS (select yay from foo where boo = lar)", t = table('foo', column('id')) s = select([t, literal('lala').label('hoho')]) - self.assert_compile(s, "SELECT foo.id, :literal AS hoho FROM foo") + self.assert_compile(s, "SELECT foo.id, :param_1 AS hoho FROM foo") assert [str(c) for c in s.c] == ["id", "hoho"] def testin(self): @@ -1002,31 +1002,31 @@ EXISTS (select yay from foo where boo = lar)", "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :mytable_myid_1)") self.assert_compile(select([table1], table1.c.myid.in_([literal('a')])), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal)") + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1)") self.assert_compile(select([table1], table1.c.myid.in_([literal('a'), 'b'])), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal, :mytable_myid)") + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, :mytable_myid)") self.assert_compile(select([table1], table1.c.myid.in_([literal('a'), literal('b')])), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal, :literal_1)") + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, :param_2)") self.assert_compile(select([table1], table1.c.myid.in_(['a', literal('b')])), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :literal)") + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :param_1)") self.assert_compile(select([table1], table1.c.myid.in_([literal(1) + 'a'])), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal + :literal_1)") + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1 + :param_2)") self.assert_compile(select([table1], table1.c.myid.in_([literal('a') +'a', 'b'])), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal || :literal_1, :mytable_myid)") + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1 || :param_2, :mytable_myid)") self.assert_compile(select([table1], table1.c.myid.in_([literal('a') + literal('a'), literal('b')])), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal || :literal_1, :literal_2)") + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1 || :param_2, :param_3)") self.assert_compile(select([table1], table1.c.myid.in_([1, literal(3) + 4])), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :literal + :literal_1)") + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :param_1 + :param_2)") self.assert_compile(select([table1], table1.c.myid.in_([literal('a') < 'b'])), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal < :literal_1)") + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1 < :param_2)") self.assert_compile(select([table1], table1.c.myid.in_([table1.c.myid])), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (mytable.myid)") @@ -1035,13 +1035,13 @@ EXISTS (select yay from foo where boo = lar)", "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, mytable.myid)") self.assert_compile(select([table1], table1.c.myid.in_([literal('a'), table1.c.myid])), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal, mytable.myid)") + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, mytable.myid)") self.assert_compile(select([table1], table1.c.myid.in_([literal('a'), table1.c.myid +'a'])), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal, mytable.myid + :mytable_myid)") + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, mytable.myid + :mytable_myid)") self.assert_compile(select([table1], table1.c.myid.in_([literal(1), 'a' + table1.c.myid])), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal, :mytable_myid + mytable.myid)") + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, :mytable_myid + mytable.myid)") self.assert_compile(select([table1], table1.c.myid.in_([1, 2, 3])), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :mytable_myid_1, :mytable_myid_2)") @@ -1115,10 +1115,10 @@ UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE self.assertEqual(str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, casttest.ts, CAST(casttest.v1 AS NUMERIC(10, 2)) AS anon_1 \nFROM casttest") # first test with Postgres engine - check_results(postgres.dialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'TEXT', 'VARCHAR(20)'], '%(literal)s') + check_results(postgres.dialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'TEXT', 'VARCHAR(20)'], '%(param_1)s') # then the Oracle engine - check_results(oracle.dialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'CLOB', 'VARCHAR(20)'], ':literal') + check_results(oracle.dialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'CLOB', 'VARCHAR(20)'], ':param_1') # then the sqlite engine check_results(sqlite.dialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'TEXT', 'VARCHAR(20)'], '?') @@ -1132,7 +1132,7 @@ UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE Column('date', Date)) self.assert_compile(table.select(table.c.date.between(datetime.date(2006,6,1), datetime.date(2006,6,5))), "SELECT dt.date FROM dt WHERE dt.date BETWEEN :dt_date AND :dt_date_1", checkparams={'dt_date':datetime.date(2006,6,1), 'dt_date_1':datetime.date(2006,6,5)}) - self.assert_compile(table.select(sql.between(table.c.date, datetime.date(2006,6,1), datetime.date(2006,6,5))), "SELECT dt.date FROM dt WHERE dt.date BETWEEN :literal AND :literal_1", checkparams={'literal':datetime.date(2006,6,1), 'literal_1':datetime.date(2006,6,5)}) + self.assert_compile(table.select(sql.between(table.c.date, datetime.date(2006,6,1), datetime.date(2006,6,5))), "SELECT dt.date FROM dt WHERE dt.date BETWEEN :param_1 AND :param_2", checkparams={'param_1':datetime.date(2006,6,1), 'param_2':datetime.date(2006,6,5)}) def test_operator_precedence(self): table = Table('op', metadata, @@ -1142,13 +1142,13 @@ UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE self.assert_compile(table.select((table.c.field + 5) == table.c.field), "SELECT op.field FROM op WHERE op.field + :op_field = op.field") self.assert_compile(table.select((table.c.field + 5) * 6), - "SELECT op.field FROM op WHERE (op.field + :op_field) * :literal") + "SELECT op.field FROM op WHERE (op.field + :op_field) * :param_1") self.assert_compile(table.select((table.c.field * 5) + 6), - "SELECT op.field FROM op WHERE op.field * :op_field + :literal") + "SELECT op.field FROM op WHERE op.field * :op_field + :param_1") self.assert_compile(table.select(5 + table.c.field.in_([5,6])), - "SELECT op.field FROM op WHERE :literal + (op.field IN (:op_field, :op_field_1))") + "SELECT op.field FROM op WHERE :param_1 + (op.field IN (:op_field, :op_field_1))") self.assert_compile(table.select((5 + table.c.field).in_([5,6])), - "SELECT op.field FROM op WHERE :op_field + op.field IN (:literal, :literal_1)") + "SELECT op.field FROM op WHERE :op_field + op.field IN (:param_1, :param_2)") self.assert_compile(table.select(not_(and_(table.c.field == 5, table.c.field == 7))), "SELECT op.field FROM op WHERE NOT (op.field = :op_field AND op.field = :op_field_1)") self.assert_compile(table.select(not_(table.c.field == 5)), @@ -1156,11 +1156,11 @@ UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE self.assert_compile(table.select(not_(table.c.field.between(5, 6))), "SELECT op.field FROM op WHERE NOT (op.field BETWEEN :op_field AND :op_field_1)") self.assert_compile(table.select(not_(table.c.field) == 5), - "SELECT op.field FROM op WHERE (NOT op.field) = :literal") + "SELECT op.field FROM op WHERE (NOT op.field) = :param_1") self.assert_compile(table.select((table.c.field == table.c.field).between(False, True)), - "SELECT op.field FROM op WHERE (op.field = op.field) BETWEEN :literal AND :literal_1") + "SELECT op.field FROM op WHERE (op.field = op.field) BETWEEN :param_1 AND :param_2") self.assert_compile(table.select(between((table.c.field == table.c.field), False, True)), - "SELECT op.field FROM op WHERE (op.field = op.field) BETWEEN :literal AND :literal_1") + "SELECT op.field FROM op WHERE (op.field = op.field) BETWEEN :param_1 AND :param_2") class CRUDTest(SQLCompileTest): def testinsert(self): @@ -1223,8 +1223,8 @@ class CRUDTest(SQLCompileTest): values = { table1.c.name : table1.c.name + "lala", table1.c.myid : func.do_stuff(table1.c.myid, literal('hoho')) - }), "UPDATE mytable SET myid=do_stuff(mytable.myid, :literal), name=(mytable.name || :mytable_name) " - "WHERE mytable.myid = hoho(:hoho) AND mytable.name = :literal_1 || mytable.name || :literal_2") + }), "UPDATE mytable SET myid=do_stuff(mytable.myid, :param_1), name=(mytable.name || :mytable_name) " + "WHERE mytable.myid = hoho(:hoho) AND mytable.name = :param_2 || mytable.name || :param_3") def testcorrelatedupdate(self): # test against a straight text subquery