else:
raise exc.ArgumentError("Only '='/'!=' operators can be used with NULL")
else:
- obj = self._check_literal(obj)
+ obj = self._check_literal(op, obj)
if reverse:
return _BinaryExpression(obj,
negate=negate, modifiers=kwargs)
def __operate(self, op, obj, reverse=False):
- obj = self._check_literal(obj)
+ obj = self._check_literal(op, obj)
if reverse:
left, right = obj, self
"in() function accepts either a list of non-selectable values, "
"or a selectable: %r" % o)
else:
- o = self._bind_param(o)
+ o = self._bind_param(op, o)
args.append(o)
if len(args) == 0:
# use __radd__ to force string concat behavior
return self.__compare(
operators.like_op,
- literal_column("'%'", type_=sqltypes.String).__radd__(self._check_literal(other)),
+ literal_column("'%'", type_=sqltypes.String).__radd__(self._check_literal(operators.like_op, other)),
escape=escape)
def endswith(self, other, escape=None):
return self.__compare(
operators.like_op,
- literal_column("'%'", type_=sqltypes.String) + self._check_literal(other),
+ literal_column("'%'", type_=sqltypes.String) + self._check_literal(operators.like_op, other),
escape=escape)
def contains(self, other, escape=None):
return self.__compare(
operators.like_op,
literal_column("'%'", type_=sqltypes.String) +
- self._check_literal(other) +
+ self._check_literal(operators.like_op, other) +
literal_column("'%'", type_=sqltypes.String),
escape=escape)
The allowed contents of ``other`` are database backend specific.
"""
- return self.__compare(operators.match_op, self._check_literal(other))
+ return self.__compare(operators.match_op, self._check_literal(operators.match_op, other))
def label(self, name):
"""Produce a column label, i.e. ``<columnname> AS <name>``.
return _BinaryExpression(
self,
ClauseList(
- self._check_literal(cleft),
- self._check_literal(cright),
+ self._check_literal(operators.and_, cleft),
+ self._check_literal(operators.and_, cright),
operator=operators.and_,
group=False),
operators.between_op)
"""
return lambda other: self.__operate(operator, other)
- def _bind_param(self, obj):
- return _BindParamClause(None, obj, _fallback_type=self.type, unique=True)
+ def _bind_param(self, operator, obj):
+ return _BindParamClause(None, obj, _compared_to_operator=operator, _compared_to_type=self.type, unique=True)
- def _check_literal(self, other):
- if isinstance(other, _BindParamClause) and isinstance(other.type, sqltypes.NullType):
+ def _check_literal(self, operator, other):
+ if isinstance(other, _BindParamClause) and \
+ isinstance(other.type, sqltypes.NullType):
other.type = self.type
return other
elif hasattr(other, '__clause_element__'):
return other.__clause_element__()
elif not isinstance(other, ClauseElement):
- return self._bind_param(other)
+ return self._bind_param(operator, other)
elif isinstance(other, (_SelectBaseMixin, Alias)):
return other.as_scalar()
else:
def __init__(self, key, value, type_=None, unique=False,
isoutparam=False, required=False,
- _fallback_type=None):
+ _compared_to_operator=None,
+ _compared_to_type=None):
"""Construct a _BindParamClause.
key
self.required = required
if type_ is None:
- self.type = sqltypes.type_map.get(type(value), _fallback_type or sqltypes.NULLTYPE)
- if _fallback_type and _fallback_type._type_affinity == self.type._type_affinity:
- self.type = _fallback_type
+ if _compared_to_type is not None:
+ self.type = _compared_to_type._coerce_compared_value(_compared_to_operator, value)
+ else:
+ self.type = sqltypes.NULLTYPE
elif isinstance(type_, type):
self.type = type_()
else:
def _select_iterable(self):
return (self, )
- def _bind_param(self, obj):
+ def _bind_param(self, operator, obj):
return _Tuple(*[
- _BindParamClause(None, o, _fallback_type=self.type, unique=True)
+ _BindParamClause(None, o, _compared_to_operator=operator, _compared_to_type=self.type, unique=True)
for o in obj
]).self_group()
def execute(self):
return self.select().execute()
- def _bind_param(self, obj):
- return _BindParamClause(None, obj, _fallback_type=self.type, unique=True)
+ def _bind_param(self, operator, obj):
+ return _BindParamClause(None, obj, _compared_to_operator=operator, _compared_to_type=self.type, unique=True)
class Function(FunctionElement):
FunctionElement.__init__(self, *clauses, **kw)
- def _bind_param(self, obj):
- return _BindParamClause(self.name, obj, _fallback_type=self.type, unique=True)
+ def _bind_param(self, operator, obj):
+ return _BindParamClause(self.name, obj, _compared_to_operator=operator, _compared_to_type=self.type, unique=True)
class _Cast(ColumnElement):
else:
return []
- def _bind_param(self, obj):
- return _BindParamClause(self.name, obj, _fallback_type=self.type, unique=True)
+ def _bind_param(self, operator, obj):
+ return _BindParamClause(self.name, obj, _compared_to_operator=operator, _compared_to_type=self.type, unique=True)
def _make_proxy(self, selectable, name=None, attach=True):
# propagate the "is_literal" flag only if we are keeping our name,
typ = t
else:
return self.__class__
+
+ def _coerce_compared_value(self, op, value):
+ _coerced_type = type_map.get(type(value), NULLTYPE)
+ if _coerced_type._type_affinity == self._type_affinity:
+ return self
+ else:
+ return _coerced_type
def _compare_type_affinity(self, other):
return self._type_affinity is other._type_affinity
# strips it off on the way out.
impl = types.Unicode
-
+
def process_bind_param(self, value, dialect):
return "PREFIX:" + value
given; in this case, the "impl" variable can reference
``TypeEngine`` as a placeholder.
+ Types that receive a Python type that isn't similar to the
+ ultimate type used may want to define the :meth:`TypeDecorator.coerce_compared_value`
+ method=. This is used to give the expression system a hint
+ when coercing Python objects
+ into bind parameters within expressions. Consider this expression::
+
+ mytable.c.somecol + datetime.date(2009, 5, 15)
+
+ Above, if "somecol" is an ``Integer`` variant, it makes sense that
+ we doing date arithmetic, where above is usually interpreted
+ by databases as adding a number of days to the given date.
+ The expression system does the right thing by not attempting to
+ coerce the "date()" value into an integer-oriented bind parameter.
+
+ However, suppose "somecol" is a ``TypeDecorator`` that is wrapping
+ an ``Integer``, and our ``TypeDecorator`` is actually storing dates
+ as an "epoch", i.e. a total number of days from a fixed starting
+ date. So in this case, we *do* want the expression system to wrap
+ the date() into our ``TypeDecorator`` type's system of coercing
+ dates into integers. So we would want to define::
+
+ class MyEpochType(types.TypeDecorator):
+ impl = types.Integer
+
+ epoch = datetime.date(1970, 1, 1)
+
+ def process_bind_param(self, value, dialect):
+ return (value - self.epoch).days
+
+ def process_result_value(self, value, dialect):
+ return self.epoch + timedelta(days=value)
+
+ def coerce_compared_value(self, op, value):
+ if isinstance(value, datetime.date):
+ return Date
+ else:
+ raise ValueError("Python date expected.")
+
The reason that type behavior is modified using class decoration
instead of subclassing is due to the way dialect specific types
are used. Such as with the example above, when using the mysql
return process
else:
return self.impl.result_processor(dialect, coltype)
+
+ def coerce_compared_value(self, op, value):
+ return self.impl._coerce_compared_value(op, value)
+ def _coerce_compared_value(self, op, value):
+ return self.coerce_compared_value(op, value)
+
def copy(self):
instance = self.__class__.__new__(self.__class__)
instance.__dict__.update(self.__dict__)
def is_mutable(self):
return self.impl.is_mutable()
+ def _adapt_expression(self, op, othertype):
+ return self.impl._adapt_expression(op, othertype)
+
+
+
class MutableType(object):
"""A mixin that marks a Type as holding a mutable object.
"""A mixin that marks a type as supporting 'concatenation', typically strings."""
def _adapt_expression(self, op, othertype):
- if op is operators.add and isinstance(othertype, (Concatenable, NullType)):
+ if op is operators.add and issubclass(othertype._type_affinity, (Concatenable, NullType)):
return operators.concat_op, self
else:
return op, self
import datetime, os, re
from sqlalchemy import *
from sqlalchemy import exc, types, util, schema
-from sqlalchemy.sql import operators, column
+from sqlalchemy.sql import operators, column, table
from sqlalchemy.test.testing import eq_
import sqlalchemy.engine.url as url
from sqlalchemy.databases import *
f = os.path.join(os.path.dirname(__file__), "..", name)
return open(f, mode='rb').read()
-class ExpressionTest(TestBase, AssertsExecutionResults):
+class ExpressionTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
@classmethod
def setup_class(cls):
- global test_table, meta, MyCustomType
+ global test_table, meta, MyCustomType, MyTypeDec
class MyCustomType(types.UserDefinedType):
def get_col_spec(self):
return process
def adapt_operator(self, op):
return {operators.add:operators.sub, operators.sub:operators.add}.get(op, op)
+
+ class MyTypeDec(types.TypeDecorator):
+ impl = String
+
+ def process_bind_param(self, value, dialect):
+ return "BIND_IN" + str(value)
+ def process_result_value(self, value, dialect):
+ return value + "BIND_OUT"
+
meta = MetaData(testing.db)
test_table = Table('test', meta,
Column('id', Integer, primary_key=True),
Column('data', String(30)),
Column('atimestamp', Date),
- Column('avalue', MyCustomType))
+ Column('avalue', MyCustomType),
+ Column('bvalue', MyTypeDec),
+ )
meta.create_all()
'id':1,
'data':'somedata',
'atimestamp':datetime.date(2007, 10, 15),
- 'avalue':25})
+ 'avalue':25, 'bvalue':'foo'})
@classmethod
def teardown_class(cls):
eq_(
test_table.select().execute().fetchall(),
- [(1, 'somedata', datetime.date(2007, 10, 15), 25)]
+ [(1, 'somedata', datetime.date(2007, 10, 15), 25, "BIND_INfooBIND_OUT")]
)
def test_bind_adapt(self):
eq_(
testing.db.execute(
- test_table.select().where(expr),
+ select([test_table.c.id, test_table.c.data, test_table.c.atimestamp])
+ .where(expr),
{"thedate":datetime.date(2007, 10, 15)}).fetchall(),
- [(1, 'somedata', datetime.date(2007, 10, 15), 25)]
+ [(1, 'somedata', datetime.date(2007, 10, 15))]
)
expr = test_table.c.avalue == bindparam("somevalue")
eq_(expr.right.type._type_affinity, MyCustomType)
-
+
eq_(
testing.db.execute(test_table.select().where(expr), {"somevalue":25}).fetchall(),
- [(1, 'somedata', datetime.date(2007, 10, 15), 25)]
+ [(1, 'somedata', datetime.date(2007, 10, 15), 25, 'BIND_INfooBIND_OUT')]
+ )
+
+ expr = test_table.c.bvalue == bindparam("somevalue")
+ eq_(expr.right.type._type_affinity, String)
+
+ eq_(
+ testing.db.execute(test_table.select().where(expr), {"somevalue":"foo"}).fetchall(),
+ [(1, 'somedata', datetime.date(2007, 10, 15), 25, 'BIND_INfooBIND_OUT')]
)
def test_literal_adapt(self):
# this one relies upon anonymous labeling to assemble result
# processing rules on the column.
assert testing.db.execute(select([expr])).scalar() == -15
-
+
+ def test_typedec_operator_adapt(self):
+ expr = test_table.c.bvalue + "hi"
+
+ assert expr.type.__class__ is String
+
+ eq_(
+ testing.db.execute(select([expr.label('foo')])).scalar(),
+ "BIND_INfooBIND_INhiBIND_OUT"
+ )
+
+ def test_typedec_righthand_coercion(self):
+ class MyTypeDec(types.TypeDecorator):
+ impl = String
+
+ def process_bind_param(self, value, dialect):
+ return "BIND_IN" + str(value)
+
+ def process_result_value(self, value, dialect):
+ return value + "BIND_OUT"
+
+ tab = table('test', column('bvalue', MyTypeDec))
+ expr = tab.c.bvalue + 6
+
+ self.assert_compile(
+ expr,
+ "test.bvalue || :bvalue_1",
+ use_default_dialect=True
+ )
+
+ assert expr.type.__class__ is String
+ eq_(
+ testing.db.execute(select([expr.label('foo')])).scalar(),
+ "BIND_INfooBIND_IN6BIND_OUT"
+ )
+
+
def test_bind_typing(self):
from sqlalchemy.sql import column