0.4.2
-----
- sql
+ - generic functions ! we introduce a database of known SQL functions, such
+ as current_timestamp, coalesce, and create explicit function objects
+ representing them. These objects have constrained argument lists, are
+ type aware, and can compile in a dialect-specific fashion. So saying
+ func.char_length("foo", "bar") raises an error (too many args),
+ func.coalesce(datetime.date(2007, 10, 5), datetime.date(2005, 10, 15))
+ knows that its return type is a Date. We only have a few functions
+ represented so far but will continue to add to the system [ticket:615]
+
- added new flag to String and create_engine(),
assert_unicode=(True|False|None). Defaults to `False` or `None` on
create_engine() and String, `True` on the Unicode type. When `True`,
advised that all unicode-aware applications make proper use of Python
unicode objects (i.e. u'hello' and not 'hello').
+ - changed the various "literal" generation functions to use an anonymous
+ bind parameter. not much changes here except their labels now look
+ like ":param_1", ":param_2" instead of ":literal"
+
- column labels in the form "tablename.columname", i.e. with a dot, are now
supported.
+ - from_obj keyword argument to select() can be a scalar or a list.
+
- orm
- new synonym() behavior: an attribute will be placed on the mapped
else:
return self.process(alias.original, **kwargs)
- def apply_function_parens(self, func):
+ def function_argspec(self, func):
if func.clauses:
- return super(FBCompiler, self).apply_function_parens(func)
+ return self.process(func.clause_expr)
else:
- return False
+ return ""
def default_from(self):
return " FROM rdb$database"
if self.context.result_map:
try:
- (name, obj, type_) = self.context.result_map[colname]
+ (name, obj, type_) = self.context.result_map[colname.lower()]
except KeyError:
(name, obj, type_) = (colname, None, typemap.get(item[1], types.NULLTYPE))
else:
import string, re
from sqlalchemy import schema, engine, util, exceptions
-from sqlalchemy.sql import operators, visitors
+from sqlalchemy.sql import operators, visitors, functions
from sqlalchemy.sql import util as sql_util
from sqlalchemy.sql import expression as sql
-ANSI_FUNCS = util.Set([
- 'CURRENT_DATE', 'CURRENT_TIME', 'CURRENT_TIMESTAMP',
- 'CURRENT_USER', 'LOCALTIME', 'LOCALTIMESTAMP',
- 'SESSION_USER', 'USER'])
-
RESERVED_WORDS = util.Set([
'all', 'analyse', 'analyze', 'and', 'any', 'array',
'as', 'asc', 'asymmetric', 'authorization', 'between',
operators.isnot : 'IS NOT'
}
+FUNCTIONS = {
+ functions.coalesce : 'coalesce%(expr)s',
+ functions.current_date: 'CURRENT_DATE',
+ functions.current_time: 'CURRENT_TIME',
+ functions.current_timestamp: 'CURRENT_TIMESTAMP',
+ functions.current_user: 'CURRENT_USER',
+ functions.localtime: 'LOCALTIME',
+ functions.localtimestamp: 'LOCALTIMESTAMP',
+ functions.session_user :'SESSION_USER',
+ functions.user: 'USER'
+}
+
class DefaultCompiler(engine.Compiled):
"""Default implementation of Compiled.
__traverse_options__ = {'column_collections':False, 'entry':True}
operators = OPERATORS
+ functions = FUNCTIONS
def __init__(self, dialect, statement, column_keys=None, inline=False, **kwargs):
"""Construct a new ``DefaultCompiler`` object.
labelname = self._truncated_identifier("colident", label.name)
if result_map is not None:
- result_map[labelname] = (label.name, (label, label.obj), label.obj.type)
+ result_map[labelname.lower()] = (label.name, (label, label.obj), label.obj.type)
return " ".join([self.process(label.obj), self.operator_string(operators.as_), self.preparer.format_label(label, labelname)])
name = column.name
if result_map is not None:
- result_map[name] = (name, (column, ), column.type)
+ result_map[name.lower()] = (name, (column, ), column.type)
if column._is_oid:
n = self.dialect.oid_column_name(column)
def visit_textclause(self, textclause, **kwargs):
if textclause.typemap is not None:
for colname, type_ in textclause.typemap.iteritems():
- self.result_map[colname] = (colname, None, type_)
+ self.result_map[colname.lower()] = (colname, None, type_)
def do_bindparam(m):
name = m.group(1)
sep = " " + self.operator_string(clauselist.operator) + " "
return sep.join([s for s in [self.process(c) for c in clauselist.clauses] if s is not None])
- def apply_function_parens(self, func):
- return func.name.upper() not in ANSI_FUNCS or len(func.clauses) > 0
-
def visit_calculatedclause(self, clause, **kwargs):
return self.process(clause.clause_expr)
def visit_function(self, func, result_map=None, **kwargs):
if result_map is not None:
- result_map[func.name] = (func.name, None, func.type)
+ result_map[func.name.lower()] = (func.name, None, func.type)
+
+ name = self.function_string(func)
- if not self.apply_function_parens(func):
- return ".".join(func.packagenames + [func.name])
+ if callable(name):
+ return name(*[self.process(x) for x in func.clause_expr])
else:
- return ".".join(func.packagenames + [func.name]) + (not func.group and " " or "") + self.process(func.clause_expr)
+ return ".".join(func.packagenames + [name]) % {'expr':self.function_argspec(func)}
+
+ def function_argspec(self, func):
+ return self.process(func.clause_expr)
+
+ def function_string(self, func):
+ return self.functions.get(func.__class__, func.name + "%(expr)s")
def visit_compound_select(self, cs, asfrom=False, parens=True, **kwargs):
stack_entry = {'select':cs}
def operator_string(self, operator):
return self.operators.get(operator, str(operator))
-
+
def visit_bindparam(self, bindparam, **kwargs):
- # apply truncation to the ultimate generated name
-
+ # TODO: remove this whole "unique" thing, just use regular
+ # anonymous params to implement. params used for inserts/updates
+ # etc. should no longer be "unique".
if bindparam.unique:
count = 1
key = bindparam.key
"""
import re
+import datetime
from sqlalchemy import util, exceptions
from sqlalchemy.sql import operators, visitors
from sqlalchemy import types as sqltypes
-
+functions = None
__all__ = [
'Alias', 'ClauseElement',
bind-parameter translation for this literal.
"""
- return _BindParamClause('literal', value, type_=type_, unique=True)
+ return _BindParamClause(None, value, type_=type_, unique=True)
def label(name, obj):
"""Return a [sqlalchemy.sql.expression#_Label] object for the given [sqlalchemy.sql.expression#ColumnElement].
def __call__(self, *c, **kwargs):
o = self.opts.copy()
o.update(kwargs)
+ if len(self.__names) == 1:
+ global functions
+ if functions is None:
+ from sqlalchemy.sql import functions
+ func = getattr(functions, self.__names[-1].lower(), None)
+ if func is not None:
+ return func(*c, **o)
+
return _Function(self.__names[-1], packagenames=self.__names[0:-1], *c, **o)
func = _FunctionGenerator()
else:
return element
-def _literal_as_binds(element, name='literal', type_=None):
+def _literal_as_binds(element, name=None, type_=None):
if isinstance(element, Operators):
return element.expression_element()
elif _is_literal(element):
return lambda other: self.__operate(operator, other)
def _bind_param(self, obj):
- return _BindParamClause('literal', obj, type_=self.type, unique=True)
+ return _BindParamClause(None, obj, type_=self.type, unique=True)
def _check_literal(self, other):
if isinstance(other, _BindParamClause) and isinstance(other.type, sqltypes.NullType):
unicode : sqltypes.NCHAR,
int : sqltypes.Integer,
float : sqltypes.Numeric,
+ datetime.date : sqltypes.Date,
+ datetime.datetime : sqltypes.DateTime,
+ datetime.time : sqltypes.Time,
+ datetime.timedelta : sqltypes.Interval,
type(None):sqltypes.NullType
}
def __init__(self, name, *clauses, **kwargs):
self.packagenames = kwargs.get('packagenames', None) or []
self.oid_column = None
- kwargs['operator'] = operators.comma_op
- _CalculatedClause.__init__(self, name, **kwargs)
- for c in clauses:
- self.append(c)
-
+ self.name = name
+ self._bind = kwargs.get('bind', None)
+ args = [_literal_as_binds(c, self.name) for c in clauses]
+ self.clause_expr = ClauseList(operator=operators.comma_op, group_contents=True, *args).self_group()
+ self.type = sqltypes.to_instance(kwargs.get('type_', None))
+
key = property(lambda self:self.name)
columns = property(lambda self:[self])
def get_children(self, **kwargs):
return _CalculatedClause.get_children(self, **kwargs)
- def append(self, clause):
- self.clauses.append(_literal_as_binds(clause, self.name))
-
class _Cast(ColumnElement):
if from_obj:
self._froms = util.Set([
_is_literal(f) and _TextFromClause(f) or f
- for f in from_obj
+ for f in util.to_list(from_obj)
])
else:
self._froms = util.Set()
--- /dev/null
+from sqlalchemy import types as sqltypes
+from sqlalchemy.sql.expression import _Function, _literal_as_binds, ClauseList, _FigureVisitName
+from sqlalchemy.sql import operators
+
+class _GenericMeta(_FigureVisitName):
+ def __init__(cls, clsname, bases, dict):
+ cls.__visit_name__ = 'function'
+ type.__init__(cls, clsname, bases, dict)
+
+ def __call__(self, *args, **kwargs):
+ args = [_literal_as_binds(c) for c in args]
+ return type.__call__(self, *args, **kwargs)
+
+class GenericFunction(_Function):
+ __metaclass__ = _GenericMeta
+
+ def __init__(self, type_=None, group=True, args=(), **kwargs):
+ self.packagenames = []
+ self.oid_column = None
+ self.name = self.__class__.__name__
+ self._bind = kwargs.get('bind', None)
+ if group:
+ self.clause_expr = ClauseList(operator=operators.comma_op, group_contents=True, *args).self_group()
+ else:
+ self.clause_expr = ClauseList(operator=operators.comma_op, group_contents=True, *args)
+ self.type = sqltypes.to_instance(type_ or getattr(self, '__return_type__', None))
+
+class AnsiFunction(GenericFunction):
+ def __init__(self, **kwargs):
+ GenericFunction.__init__(self, **kwargs)
+
+
+class coalesce(GenericFunction):
+ def __init__(self, *args, **kwargs):
+ kwargs.setdefault('type_', _type_from_args(args))
+ GenericFunction.__init__(self, args=args, **kwargs)
+
+class concat(GenericFunction):
+ __return_type__ = sqltypes.String
+ def __init__(self, *args, **kwargs):
+ GenericFunction.__init__(self, args=args, **kwargs)
+
+class char_length(GenericFunction):
+ __return_type__ = sqltypes.Integer
+
+ def __init__(self, arg, **kwargs):
+ GenericFunction.__init__(self, args=[arg], **kwargs)
+
+class current_date(AnsiFunction):
+ __return_type__ = sqltypes.Date
+
+class current_time(AnsiFunction):
+ __return_type__ = sqltypes.Time
+
+class current_timestamp(AnsiFunction):
+ __return_type__ = sqltypes.DateTime
+
+class current_user(AnsiFunction):
+ __return_type__ = sqltypes.String
+
+class localtime(AnsiFunction):
+ __return_type__ = sqltypes.DateTime
+
+class localtimestamp(AnsiFunction):
+ __return_type__ = sqltypes.DateTime
+
+class session_user(AnsiFunction):
+ __return_type__ = sqltypes.String
+
+class user(AnsiFunction):
+ __return_type__ = sqltypes.String
+
+def _type_from_args(args):
+ for a in args:
+ if not isinstance(a.type, sqltypes.NullType):
+ return a.type
+ else:
+ return sqltypes.NullType
\ No newline at end of file
def test_function(self):
self.assert_compile(func.foo(1, 2), "foo(:foo, :foo_1)")
- self.assert_compile(func.current_time(), "current_time")
+ self.assert_compile(func.current_time(), "CURRENT_TIME")
self.assert_compile(func.foo(), "foo")
m = MetaData()
def test_function(self):
self.assert_compile(func.foo(1, 2), "foo(:foo, :foo_1)")
- self.assert_compile(func.current_time(), "current_time")
+ self.assert_compile(func.current_time(), "CURRENT_TIME")
self.assert_compile(func.foo(), "foo()")
m = MetaData()
):
for (lhs, rhs, res) in (
(5, User.id, ':users_id %s users.id'),
- (5, literal(6), ':literal %s :literal_1'),
+ (5, literal(6), ':param_1 %s :param_2'),
(User.id, 5, 'users.id %s :users_id'),
- (User.id, literal('b'), 'users.id %s :literal'),
+ (User.id, literal('b'), 'users.id %s :param_1'),
(User.id, User.id, 'users.id %s users.id'),
- (literal(5), 'b', ':literal %s :literal_1'),
- (literal(5), User.id, ':literal %s users.id'),
- (literal(5), literal(6), ':literal %s :literal_1'),
+ (literal(5), 'b', ':param_1 %s :param_2'),
+ (literal(5), User.id, ':param_1 %s users.id'),
+ (literal(5), literal(6), ':param_1 %s :param_2'),
):
self._test(py_op(lhs, rhs), res % sql_op)
(operator.ge, '>=', '<=')):
for (lhs, rhs, l_sql, r_sql) in (
('a', User.id, ':users_id', 'users.id'),
- ('a', literal('b'), ':literal_1', ':literal'), # note swap!
+ ('a', literal('b'), ':param_2', ':param_1'), # note swap!
(User.id, 'b', 'users.id', ':users_id'),
- (User.id, literal('b'), 'users.id', ':literal'),
+ (User.id, literal('b'), 'users.id', ':param_1'),
(User.id, User.id, 'users.id', 'users.id'),
- (literal('a'), 'b', ':literal', ':literal_1'),
- (literal('a'), User.id, ':literal', 'users.id'),
- (literal('a'), literal('b'), ':literal', ':literal_1'),
+ (literal('a'), 'b', ':param_1', ':param_2'),
+ (literal('a'), User.id, ':param_1', 'users.id'),
+ (literal('a'), literal('b'), ':param_1', ':param_2'),
):
# the compiled clause should match either (e.g.):
for (expr, compare) in (
(func.max(User.id), "max(users.id)"),
(User.id.desc(), "users.id DESC"),
- (between(5, User.id, Address.id), ":literal BETWEEN users.id AND addresses.id"),
+ (between(5, User.id, Address.id), ":param_1 BETWEEN users.id AND addresses.id"),
# this one would require adding compile() to InstrumentedScalarAttribute. do we want this ?
#(User.id, "users.id")
):
'sql.unicode',
# assorted round-trip tests
+ 'sql.functions',
'sql.query',
'sql.quote',
'sql.rowcount',
--- /dev/null
+import testbase
+import datetime
+from sqlalchemy import *
+from sqlalchemy import exceptions, sql
+from sqlalchemy.sql.compiler import BIND_TEMPLATES
+from sqlalchemy.engine import default
+from sqlalchemy import types as sqltypes
+from testlib import *
+
+# TODO: add a helper function to testlib for this
+from sqlalchemy.databases import sqlite, postgres, mysql, oracle, firebird, mssql
+dialects = [x.dialect() for x in [sqlite, postgres, mysql, oracle, firebird, mssql]]
+
+class CompileTest(SQLCompileTest):
+ def test_compile(self):
+ for dialect in dialects:
+ bindtemplate = BIND_TEMPLATES[dialect.paramstyle]
+ self.assert_compile(func.current_timestamp(), "CURRENT_TIMESTAMP", dialect=dialect)
+ self.assert_compile(func.localtime(), "LOCALTIME", dialect=dialect)
+ if isinstance(dialect, firebird.dialect):
+ self.assert_compile(func.nosuchfunction(), "nosuchfunction", dialect=dialect)
+ else:
+ self.assert_compile(func.nosuchfunction(), "nosuchfunction()", dialect=dialect)
+ self.assert_compile(func.char_length('foo'), "char_length(%s)" % bindtemplate % {'name':'param_1', 'position':1}, dialect=dialect)
+
+ def test_constructor(self):
+ try:
+ func.current_timestamp('somearg')
+ assert False
+ except TypeError:
+ assert True
+
+ try:
+ func.char_length('a', 'b')
+ assert False
+ except TypeError:
+ assert True
+
+ try:
+ func.char_length()
+ assert False
+ except TypeError:
+ assert True
+
+ def test_typing(self):
+ assert isinstance(func.coalesce(datetime.date(2007, 10, 5), datetime.date(2005, 10, 15)).type, sqltypes.Date)
+
+ assert isinstance(func.coalesce(None, datetime.date(2005, 10, 15)).type, sqltypes.Date)
+
+ assert isinstance(func.concat("foo", "bar").type, sqltypes.String)
+
+class ExecuteTest(PersistTest):
+
+ def test_standalone_execute(self):
+ x = testbase.db.func.current_date().execute().scalar()
+ y = testbase.db.func.current_date().select().execute().scalar()
+ z = testbase.db.func.current_date().scalar()
+ assert (x == y == z) is True
+
+ # ansi func
+ x = testbase.db.func.current_date()
+ assert isinstance(x.type, Date)
+ assert isinstance(x.execute().scalar(), datetime.date)
+
+ def test_conn_execute(self):
+ conn = testbase.db.connect()
+ try:
+ x = conn.execute(func.current_date()).scalar()
+ y = conn.execute(func.current_date().select()).scalar()
+ z = conn.scalar(func.current_date())
+ finally:
+ conn.close()
+ assert (x == y == z) is True
+
+ def test_update(self):
+ """
+ Tests sending functions and SQL expressions to the VALUES and SET
+ clauses of INSERT/UPDATE instances, and that column-level defaults
+ get overridden.
+ """
+
+ meta = MetaData(testbase.db)
+ t = Table('t1', meta,
+ Column('id', Integer, Sequence('t1idseq', optional=True), primary_key=True),
+ Column('value', Integer)
+ )
+ t2 = Table('t2', meta,
+ Column('id', Integer, Sequence('t2idseq', optional=True), primary_key=True),
+ Column('value', Integer, default=7),
+ Column('stuff', String(20), onupdate="thisisstuff")
+ )
+ meta.create_all()
+ try:
+ t.insert(values=dict(value=func.length("one"))).execute()
+ assert t.select().execute().fetchone()['value'] == 3
+ t.update(values=dict(value=func.length("asfda"))).execute()
+ assert t.select().execute().fetchone()['value'] == 5
+
+ r = t.insert(values=dict(value=func.length("sfsaafsda"))).execute()
+ id = r.last_inserted_ids()[0]
+ assert t.select(t.c.id==id).execute().fetchone()['value'] == 9
+ t.update(values={t.c.value:func.length("asdf")}).execute()
+ assert t.select().execute().fetchone()['value'] == 4
+ print "--------------------------"
+ t2.insert().execute()
+ t2.insert(values=dict(value=func.length("one"))).execute()
+ t2.insert(values=dict(value=func.length("asfda") + -19)).execute(stuff="hi")
+
+ res = exec_sorted(select([t2.c.value, t2.c.stuff]))
+ self.assertEquals(res, [(-14, 'hi'), (3, None), (7, None)])
+
+ t2.update(values=dict(value=func.length("asdsafasd"))).execute(stuff="some stuff")
+ assert select([t2.c.value, t2.c.stuff]).execute().fetchall() == [(9,"some stuff"), (9,"some stuff"), (9,"some stuff")]
+
+ t2.delete().execute()
+
+ t2.insert(values=dict(value=func.length("one") + 8)).execute()
+ assert t2.select().execute().fetchone()['value'] == 11
+
+ t2.update(values=dict(value=func.length("asfda"))).execute()
+ assert select([t2.c.value, t2.c.stuff]).execute().fetchone() == (5, "thisisstuff")
+
+ t2.update(values={t2.c.value:func.length("asfdaasdf"), t2.c.stuff:"foo"}).execute()
+ print "HI", select([t2.c.value, t2.c.stuff]).execute().fetchone()
+ assert select([t2.c.value, t2.c.stuff]).execute().fetchone() == (9, "foo")
+
+ finally:
+ meta.drop_all()
+
+ @testing.supported('postgres')
+ def test_as_from(self):
+ # TODO: shouldnt this work on oracle too ?
+ x = testbase.db.func.current_date().execute().scalar()
+ y = testbase.db.func.current_date().select().execute().scalar()
+ z = testbase.db.func.current_date().scalar()
+ w = select(['*'], from_obj=[testbase.db.func.current_date()]).scalar()
+
+ # construct a column-based FROM object out of a function, like in [ticket:172]
+ s = select([sql.column('date', type_=DateTime)], from_obj=[testbase.db.func.current_date()])
+ q = s.execute().fetchone()[s.c.date]
+ r = s.alias('datequery').select().scalar()
+
+ assert x == y == z == w == q == r
+
+def exec_sorted(statement, *args, **kw):
+ """Executes a statement and returns a sorted list plain tuple rows."""
+
+ return sorted([tuple(row)
+ for row in statement.execute(*args, **kw).fetchall()])
+
+if __name__ == '__main__':
+ testbase.main()
+
\ No newline at end of file
except exceptions.ArgumentError, e:
assert str(e).startswith('Not an executable clause: ')
- def test_functions(self):
- x = testbase.db.func.current_date().execute().scalar()
- y = testbase.db.func.current_date().select().execute().scalar()
- z = testbase.db.func.current_date().scalar()
- assert (x == y == z) is True
-
- x = testbase.db.func.current_date(type_=Date)
- assert isinstance(x.type, Date)
- assert isinstance(x.execute().scalar(), datetime.date)
-
- def test_conn_functions(self):
- conn = testbase.db.connect()
- try:
- x = conn.execute(func.current_date()).scalar()
- y = conn.execute(func.current_date().select()).scalar()
- z = conn.scalar(func.current_date())
- finally:
- conn.close()
- assert (x == y == z) is True
-
- def test_update_functions(self):
- """
- Tests sending functions and SQL expressions to the VALUES and SET
- clauses of INSERT/UPDATE instances, and that column-level defaults
- get overridden.
- """
-
- meta = MetaData(testbase.db)
- t = Table('t1', meta,
- Column('id', Integer, Sequence('t1idseq', optional=True), primary_key=True),
- Column('value', Integer)
- )
- t2 = Table('t2', meta,
- Column('id', Integer, Sequence('t2idseq', optional=True), primary_key=True),
- Column('value', Integer, default=7),
- Column('stuff', String(20), onupdate="thisisstuff")
- )
- meta.create_all()
- try:
- t.insert(values=dict(value=func.length("one"))).execute()
- assert t.select().execute().fetchone()['value'] == 3
- t.update(values=dict(value=func.length("asfda"))).execute()
- assert t.select().execute().fetchone()['value'] == 5
-
- r = t.insert(values=dict(value=func.length("sfsaafsda"))).execute()
- id = r.last_inserted_ids()[0]
- assert t.select(t.c.id==id).execute().fetchone()['value'] == 9
- t.update(values={t.c.value:func.length("asdf")}).execute()
- assert t.select().execute().fetchone()['value'] == 4
- print "--------------------------"
- t2.insert().execute()
- t2.insert(values=dict(value=func.length("one"))).execute()
- t2.insert(values=dict(value=func.length("asfda") + -19)).execute(stuff="hi")
-
- res = exec_sorted(select([t2.c.value, t2.c.stuff]))
- self.assertEquals(res, [(-14, 'hi'), (3, None), (7, None)])
-
- t2.update(values=dict(value=func.length("asdsafasd"))).execute(stuff="some stuff")
- assert select([t2.c.value, t2.c.stuff]).execute().fetchall() == [(9,"some stuff"), (9,"some stuff"), (9,"some stuff")]
-
- t2.delete().execute()
-
- t2.insert(values=dict(value=func.length("one") + 8)).execute()
- assert t2.select().execute().fetchone()['value'] == 11
-
- t2.update(values=dict(value=func.length("asfda"))).execute()
- assert select([t2.c.value, t2.c.stuff]).execute().fetchone() == (5, "thisisstuff")
-
- t2.update(values={t2.c.value:func.length("asfdaasdf"), t2.c.stuff:"foo"}).execute()
- print "HI", select([t2.c.value, t2.c.stuff]).execute().fetchone()
- assert select([t2.c.value, t2.c.stuff]).execute().fetchone() == (9, "foo")
-
- finally:
- meta.drop_all()
-
- @testing.supported('postgres')
- def test_functions_with_cols(self):
- # TODO: shouldnt this work on oracle too ?
- x = testbase.db.func.current_date().execute().scalar()
- y = testbase.db.func.current_date().select().execute().scalar()
- z = testbase.db.func.current_date().scalar()
- w = select(['*'], from_obj=[testbase.db.func.current_date()]).scalar()
-
- # construct a column-based FROM object out of a function, like in [ticket:172]
- s = select([sql.column('date', type_=DateTime)], from_obj=[testbase.db.func.current_date()])
- q = s.execute().fetchone()[s.c.date]
- r = s.alias('datequery').select().scalar()
-
- assert x == y == z == w == q == r
def test_column_order_with_simple_query(self):
def testexistsascolumnclause(self):
self.assert_compile(exists([table1.c.myid], table1.c.myid==5).select(), "SELECT EXISTS (SELECT mytable.myid FROM mytable WHERE mytable.myid = :mytable_myid)", params={'mytable_myid':5})
- self.assert_compile(select([table1, exists([1], from_obj=[table2])]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) FROM mytable", params={})
+ self.assert_compile(select([table1, exists([1], from_obj=table2)]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) FROM mytable", params={})
- self.assert_compile(select([table1, exists([1], from_obj=[table2]).label('foo')]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) AS foo FROM mytable", params={})
+ self.assert_compile(select([table1, exists([1], from_obj=table2).label('foo')]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) AS foo FROM mytable", params={})
def test_generative_exists(self):
self.assert_compile(
def testwheresubquery(self):
s = select([addresses.c.street], addresses.c.user_id==users.c.user_id, correlate=True).alias('s')
self.assert_compile(
- select([users, s.c.street], from_obj=[s]),
+ select([users, s.c.street], from_obj=s),
"""SELECT users.user_id, users.user_name, users.password, s.street FROM users, (SELECT addresses.street AS street FROM addresses WHERE addresses.user_id = users.user_id) AS s""")
self.assert_compile(
s = select([addresses.c.street], addresses.c.user_id==users.c.user_id, correlate=True).alias('s')
self.assert_compile(
- select([users, s.c.street], from_obj=[s]),
+ select([users, s.c.street], from_obj=s),
"""SELECT users.user_id, users.user_name, users.password, s.street FROM users, (SELECT addresses.street AS street FROM addresses WHERE addresses.user_id = users.user_id) AS s""")
# test constructing the outer query via append_column(), which occurs in the ORM's Query object
- s = select([], exists([1], table2.c.otherid==table1.c.myid), from_obj=[table1])
+ s = select([], exists([1], table2.c.otherid==table1.c.myid), from_obj=table1)
s.append_column(table1)
self.assert_compile(
s,
self.assert_compile(select([table2, s]), "SELECT myothertable.otherid, myothertable.othername, (SELECT mytable.myid FROM mytable) AS anon_1 FROM myothertable")
# test expressions against scalar selects
- self.assert_compile(select([s - literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) - :literal AS anon_1")
- self.assert_compile(select([select([table1.c.name]).as_scalar() + literal('x')]), "SELECT (SELECT mytable.name FROM mytable) || :literal AS anon_1")
- self.assert_compile(select([s > literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) > :literal AS anon_1")
+ self.assert_compile(select([s - literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) - :param_2 AS anon_1")
+ self.assert_compile(select([select([table1.c.name]).as_scalar() + literal('x')]), "SELECT (SELECT mytable.name FROM mytable) || :param_2 AS anon_1")
+ self.assert_compile(select([s > literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) > :param_2 AS anon_1")
self.assert_compile(select([select([table1.c.name]).label('foo')]), "SELECT (SELECT mytable.name FROM mytable) AS foo")
a1 = table2.alias('t2alias')
s1 = select([a1.c.otherid], table1.c.myid==a1.c.otherid, scalar=True)
j1 = table1.join(table2, table1.c.myid==table2.c.otherid)
- s2 = select([table1, s1], from_obj=[j1])
+ s2 = select([table1, s1], from_obj=j1)
self.assert_compile(s2, "SELECT mytable.myid, mytable.name, mytable.description, (SELECT t2alias.otherid FROM myothertable AS t2alias WHERE mytable.myid = t2alias.otherid) AS anon_1 FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid")
def testlabelcomparison(self):
x = func.lala(table1.c.myid).label('foo')
- self.assert_compile(select([x], x==5), "SELECT lala(mytable.myid) AS foo FROM mytable WHERE lala(mytable.myid) = :literal")
+ self.assert_compile(select([x], x==5), "SELECT lala(mytable.myid) AS foo FROM mytable WHERE lala(mytable.myid) = :param_1")
def testand(self):
self.assert_compile(
):
for (lhs, rhs, res) in (
(5, table1.c.myid, ':mytable_myid %s mytable.myid'),
- (5, literal(5), ':literal %s :literal_1'),
+ (5, literal(5), ':param_1 %s :param_2'),
(table1.c.myid, 'b', 'mytable.myid %s :mytable_myid'),
- (table1.c.myid, literal(2.7), 'mytable.myid %s :literal'),
+ (table1.c.myid, literal(2.7), 'mytable.myid %s :param_1'),
(table1.c.myid, table1.c.myid, 'mytable.myid %s mytable.myid'),
- (literal(5), 8, ':literal %s :literal_1'),
- (literal(6), table1.c.myid, ':literal %s mytable.myid'),
- (literal(7), literal(5.5), ':literal %s :literal_1'),
+ (literal(5), 8, ':param_1 %s :param_2'),
+ (literal(6), table1.c.myid, ':param_1 %s mytable.myid'),
+ (literal(7), literal(5.5), ':param_1 %s :param_2'),
):
self.assert_compile(py_op(lhs, rhs), res % sql_op)
(operator.ge, '>=', '<=')):
for (lhs, rhs, l_sql, r_sql) in (
('a', table1.c.myid, ':mytable_myid', 'mytable.myid'),
- ('a', literal('b'), ':literal_1', ':literal'), # note swap!
+ ('a', literal('b'), ':param_2', ':param_1'), # note swap!
(table1.c.myid, 'b', 'mytable.myid', ':mytable_myid'),
- (table1.c.myid, literal('b'), 'mytable.myid', ':literal'),
+ (table1.c.myid, literal('b'), 'mytable.myid', ':param_1'),
(table1.c.myid, table1.c.myid, 'mytable.myid', 'mytable.myid'),
- (literal('a'), 'b', ':literal', ':literal_1'),
- (literal('a'), table1.c.myid, ':literal', 'mytable.myid'),
- (literal('a'), literal('b'), ':literal', ':literal_1'),
+ (literal('a'), 'b', ':param_1', ':param_2'),
+ (literal('a'), table1.c.myid, ':param_1', 'mytable.myid'),
+ (literal('a'), literal('b'), ':param_1', ':param_2'),
):
# the compiled clause should match either (e.g.):
)
self.assert_compile(
- literal("a") + literal("b") * literal("c"), ":literal || :literal_1 * :literal_2"
+ literal("a") + literal("b") * literal("c"), ":param_1 || :param_2 * :param_3"
)
# test the op() function, also that its results are further usable in expressions
self.assert_compile(
table1.select(table1.c.myid.op('hoho')(12)==14),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE (mytable.myid hoho :mytable_myid) = :literal"
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE (mytable.myid hoho :mytable_myid) = :param_1"
)
# test that clauses can be pickled (operators need to be module-level, etc.)
def testtextcolumns(self):
self.assert_compile(
- select(["column1", "column2"], from_obj=[table1]).alias('somealias').select(),
+ select(["column1", "column2"], from_obj=table1).alias('somealias').select(),
"SELECT somealias.column1, somealias.column2 FROM (SELECT column1, column2 FROM mytable) AS somealias"
)
# test that use_labels doesnt interfere with literal columns
self.assert_compile(
- select(["column1", "column2", table1.c.myid], from_obj=[table1], use_labels=True),
+ select(["column1", "column2", table1.c.myid], from_obj=table1, use_labels=True),
"SELECT column1, column2, mytable.myid AS mytable_myid FROM mytable"
)
# test that use_labels doesnt interfere with literal columns that have textual labels
self.assert_compile(
- select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], from_obj=[table1], use_labels=True),
+ select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], from_obj=table1, use_labels=True),
"SELECT column1 AS foobar, column2 AS hoho, mytable.myid AS mytable_myid FROM mytable"
)
def testliteral(self):
self.assert_compile(select([literal("foo") + literal("bar")], from_obj=[table1]),
- "SELECT :literal || :literal_1 AS anon_1 FROM mytable")
+ "SELECT :param_2 || :param_3 AS anon_1 FROM mytable")
def testcalculatedcolumns(self):
value_tbl = table('values',
self.assert_compile(
select([value_tbl.c.id], (value_tbl.c.val2 -
value_tbl.c.val1)/value_tbl.c.val1 > 2.0),
- "SELECT values.id FROM values WHERE (values.val2 - values.val1) / values.val1 > :literal"
+ "SELECT values.id FROM values WHERE (values.val2 - values.val1) / values.val1 > :param_1"
)
self.assert_compile(
select([value_tbl.c.id], value_tbl.c.val1 / (value_tbl.c.val2 - value_tbl.c.val1) /value_tbl.c.val1 > 2.0),
- "SELECT values.id FROM values WHERE values.val1 / (values.val2 - values.val1) / values.val1 > :literal"
+ "SELECT values.id FROM values WHERE values.val1 / (values.val2 - values.val1) / values.val1 > :param_1"
)
def testfunction(self):
"""tests the generation of functions using the func keyword"""
# test an expression with a function
self.assert_compile(func.lala(3, 4, literal("five"), table1.c.myid) * table2.c.otherid,
- "lala(:lala, :lala_1, :literal, mytable.myid) * myothertable.otherid")
+ "lala(:lala, :lala_1, :param_1, mytable.myid) * myothertable.otherid")
# test it in a SELECT
self.assert_compile(select([func.count(table1.c.myid)]),
t = table('foo', column('id'))
s = select([t, literal('lala').label('hoho')])
- self.assert_compile(s, "SELECT foo.id, :literal AS hoho FROM foo")
+ self.assert_compile(s, "SELECT foo.id, :param_1 AS hoho FROM foo")
assert [str(c) for c in s.c] == ["id", "hoho"]
def testin(self):
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :mytable_myid_1)")
self.assert_compile(select([table1], table1.c.myid.in_([literal('a')])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1)")
self.assert_compile(select([table1], table1.c.myid.in_([literal('a'), 'b'])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal, :mytable_myid)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, :mytable_myid)")
self.assert_compile(select([table1], table1.c.myid.in_([literal('a'), literal('b')])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal, :literal_1)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, :param_2)")
self.assert_compile(select([table1], table1.c.myid.in_(['a', literal('b')])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :literal)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :param_1)")
self.assert_compile(select([table1], table1.c.myid.in_([literal(1) + 'a'])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal + :literal_1)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1 + :param_2)")
self.assert_compile(select([table1], table1.c.myid.in_([literal('a') +'a', 'b'])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal || :literal_1, :mytable_myid)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1 || :param_2, :mytable_myid)")
self.assert_compile(select([table1], table1.c.myid.in_([literal('a') + literal('a'), literal('b')])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal || :literal_1, :literal_2)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1 || :param_2, :param_3)")
self.assert_compile(select([table1], table1.c.myid.in_([1, literal(3) + 4])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :literal + :literal_1)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :param_1 + :param_2)")
self.assert_compile(select([table1], table1.c.myid.in_([literal('a') < 'b'])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal < :literal_1)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1 < :param_2)")
self.assert_compile(select([table1], table1.c.myid.in_([table1.c.myid])),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (mytable.myid)")
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, mytable.myid)")
self.assert_compile(select([table1], table1.c.myid.in_([literal('a'), table1.c.myid])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal, mytable.myid)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, mytable.myid)")
self.assert_compile(select([table1], table1.c.myid.in_([literal('a'), table1.c.myid +'a'])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal, mytable.myid + :mytable_myid)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, mytable.myid + :mytable_myid)")
self.assert_compile(select([table1], table1.c.myid.in_([literal(1), 'a' + table1.c.myid])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal, :mytable_myid + mytable.myid)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, :mytable_myid + mytable.myid)")
self.assert_compile(select([table1], table1.c.myid.in_([1, 2, 3])),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :mytable_myid_1, :mytable_myid_2)")
self.assertEqual(str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, casttest.ts, CAST(casttest.v1 AS NUMERIC(10, 2)) AS anon_1 \nFROM casttest")
# first test with Postgres engine
- check_results(postgres.dialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'TEXT', 'VARCHAR(20)'], '%(literal)s')
+ check_results(postgres.dialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'TEXT', 'VARCHAR(20)'], '%(param_1)s')
# then the Oracle engine
- check_results(oracle.dialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'CLOB', 'VARCHAR(20)'], ':literal')
+ check_results(oracle.dialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'CLOB', 'VARCHAR(20)'], ':param_1')
# then the sqlite engine
check_results(sqlite.dialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'TEXT', 'VARCHAR(20)'], '?')
Column('date', Date))
self.assert_compile(table.select(table.c.date.between(datetime.date(2006,6,1), datetime.date(2006,6,5))), "SELECT dt.date FROM dt WHERE dt.date BETWEEN :dt_date AND :dt_date_1", checkparams={'dt_date':datetime.date(2006,6,1), 'dt_date_1':datetime.date(2006,6,5)})
- self.assert_compile(table.select(sql.between(table.c.date, datetime.date(2006,6,1), datetime.date(2006,6,5))), "SELECT dt.date FROM dt WHERE dt.date BETWEEN :literal AND :literal_1", checkparams={'literal':datetime.date(2006,6,1), 'literal_1':datetime.date(2006,6,5)})
+ self.assert_compile(table.select(sql.between(table.c.date, datetime.date(2006,6,1), datetime.date(2006,6,5))), "SELECT dt.date FROM dt WHERE dt.date BETWEEN :param_1 AND :param_2", checkparams={'param_1':datetime.date(2006,6,1), 'param_2':datetime.date(2006,6,5)})
def test_operator_precedence(self):
table = Table('op', metadata,
self.assert_compile(table.select((table.c.field + 5) == table.c.field),
"SELECT op.field FROM op WHERE op.field + :op_field = op.field")
self.assert_compile(table.select((table.c.field + 5) * 6),
- "SELECT op.field FROM op WHERE (op.field + :op_field) * :literal")
+ "SELECT op.field FROM op WHERE (op.field + :op_field) * :param_1")
self.assert_compile(table.select((table.c.field * 5) + 6),
- "SELECT op.field FROM op WHERE op.field * :op_field + :literal")
+ "SELECT op.field FROM op WHERE op.field * :op_field + :param_1")
self.assert_compile(table.select(5 + table.c.field.in_([5,6])),
- "SELECT op.field FROM op WHERE :literal + (op.field IN (:op_field, :op_field_1))")
+ "SELECT op.field FROM op WHERE :param_1 + (op.field IN (:op_field, :op_field_1))")
self.assert_compile(table.select((5 + table.c.field).in_([5,6])),
- "SELECT op.field FROM op WHERE :op_field + op.field IN (:literal, :literal_1)")
+ "SELECT op.field FROM op WHERE :op_field + op.field IN (:param_1, :param_2)")
self.assert_compile(table.select(not_(and_(table.c.field == 5, table.c.field == 7))),
"SELECT op.field FROM op WHERE NOT (op.field = :op_field AND op.field = :op_field_1)")
self.assert_compile(table.select(not_(table.c.field == 5)),
self.assert_compile(table.select(not_(table.c.field.between(5, 6))),
"SELECT op.field FROM op WHERE NOT (op.field BETWEEN :op_field AND :op_field_1)")
self.assert_compile(table.select(not_(table.c.field) == 5),
- "SELECT op.field FROM op WHERE (NOT op.field) = :literal")
+ "SELECT op.field FROM op WHERE (NOT op.field) = :param_1")
self.assert_compile(table.select((table.c.field == table.c.field).between(False, True)),
- "SELECT op.field FROM op WHERE (op.field = op.field) BETWEEN :literal AND :literal_1")
+ "SELECT op.field FROM op WHERE (op.field = op.field) BETWEEN :param_1 AND :param_2")
self.assert_compile(table.select(between((table.c.field == table.c.field), False, True)),
- "SELECT op.field FROM op WHERE (op.field = op.field) BETWEEN :literal AND :literal_1")
+ "SELECT op.field FROM op WHERE (op.field = op.field) BETWEEN :param_1 AND :param_2")
class CRUDTest(SQLCompileTest):
def testinsert(self):
values = {
table1.c.name : table1.c.name + "lala",
table1.c.myid : func.do_stuff(table1.c.myid, literal('hoho'))
- }), "UPDATE mytable SET myid=do_stuff(mytable.myid, :literal), name=(mytable.name || :mytable_name) "
- "WHERE mytable.myid = hoho(:hoho) AND mytable.name = :literal_1 || mytable.name || :literal_2")
+ }), "UPDATE mytable SET myid=do_stuff(mytable.myid, :param_1), name=(mytable.name || :mytable_name) "
+ "WHERE mytable.myid = hoho(:hoho) AND mytable.name = :param_2 || mytable.name || :param_3")
def testcorrelatedupdate(self):
# test against a straight text subquery