from sqlalchemy import exceptions, logging, schema, sql, util
from sqlalchemy.sql import operators as sql_operators
+from sqlalchemy.sql import functions as sql_functions
from sqlalchemy.sql import compiler
from sqlalchemy.engine import base as engine_base, default
sql_operators.concat_op: lambda x, y: "concat(%s, %s)" % (x, y),
sql_operators.mod: '%%'
})
+ functions = compiler.DefaultCompiler.functions.copy()
+ functions.update ({
+ sql_functions.random: 'rand%(expr)s'
+ })
+
def visit_typeclause(self, typeclause):
type_ = typeclause.type.dialect_impl(self.dialect)
from sqlalchemy import types as sqltypes
-from sqlalchemy.sql.expression import _Function, _literal_as_binds, ClauseList, _FigureVisitName
+from sqlalchemy.sql.expression import _Function, _literal_as_binds, \
+ ClauseList, _FigureVisitName
from sqlalchemy.sql import operators
+
class _GenericMeta(_FigureVisitName):
def __init__(cls, clsname, bases, dict):
cls.__visit_name__ = 'function'
type.__init__(cls, clsname, bases, dict)
-
+
def __call__(self, *args, **kwargs):
args = [_literal_as_binds(c) for c in args]
return type.__call__(self, *args, **kwargs)
-
+
class GenericFunction(_Function):
__metaclass__ = _GenericMeta
self.name = self.__class__.__name__
self._bind = kwargs.get('bind', None)
if group:
- self.clause_expr = ClauseList(operator=operators.comma_op, group_contents=True, *args).self_group()
+ self.clause_expr = ClauseList(
+ operator=operators.comma_op,
+ group_contents=True, *args).self_group()
else:
- self.clause_expr = ClauseList(operator=operators.comma_op, group_contents=True, *args)
- self.type = sqltypes.to_instance(type_ or getattr(self, '__return_type__', None))
-
+ self.clause_expr = ClauseList(
+ operator=operators.comma_op,
+ group_contents=True, *args)
+ self.type = sqltypes.to_instance(
+ type_ or getattr(self, '__return_type__', None))
+
class AnsiFunction(GenericFunction):
def __init__(self, **kwargs):
GenericFunction.__init__(self, **kwargs)
-
+
class coalesce(GenericFunction):
def __init__(self, *args, **kwargs):
kwargs.setdefault('type_', _type_from_args(args))
class now(GenericFunction):
__return_type__ = sqltypes.DateTime
-
+
class concat(GenericFunction):
__return_type__ = sqltypes.String
def __init__(self, *args, **kwargs):
def __init__(self, arg, **kwargs):
GenericFunction.__init__(self, args=[arg], **kwargs)
-
+
+class random(GenericFunction):
+ def __init__(self, *args, **kwargs):
+ kwargs.setdefault('type_', None)
+ GenericFunction.__init__(self, args=args, **kwargs)
+
class current_date(AnsiFunction):
__return_type__ = sqltypes.Date
-
+
class current_time(AnsiFunction):
__return_type__ = sqltypes.Time
def test_generic_now(self):
assert isinstance(func.now().type, sqltypes.DateTime)
-
+
for ret, dialect in [
('CURRENT_TIMESTAMP', sqlite.dialect()),
('now()', postgres.dialect()),
('CURRENT_TIMESTAMP', oracle.dialect())
]:
self.assert_compile(func.now(), ret, dialect=dialect)
-
+
+ def test_generic_random(self):
+ assert func.random().type == sqltypes.NULLTYPE
+ assert isinstance(func.random(type_=Integer).type, Integer)
+
+ for ret, dialect in [
+ ('random()', sqlite.dialect()),
+ ('random()', postgres.dialect()),
+ ('rand()', mysql.dialect()),
+ ('random()', oracle.dialect())
+ ]:
+ self.assert_compile(func.random(), ret, dialect=dialect)
+
def test_constructor(self):
try:
func.current_timestamp('somearg')
'myothertable',
column('otherid', Integer),
)
-
+
# test an expression with a function
self.assert_compile(func.lala(3, 4, literal("five"), table1.c.myid) * table2.c.otherid,
"lala(:lala_1, :lala_2, :param_1, mytable.myid) * myothertable.otherid")