setups, such as the one demonstrated in the ORM documentation
at :ref:`post_update`.
- :param comparator_factory: a :class:`.operators.ColumnOperators` subclass
- which will produce custom operator behavior.
-
- .. versionadded: 0.8 support for pluggable operators in
- core column expressions.
-
:param default: A scalar, Python callable, or
:class:`~sqlalchemy.sql.expression.ClauseElement` representing the
*default value* for this column, which will be invoked upon insert
no_type = type_ is None
- super(Column, self).__init__(name, None, type_,
- comparator_factory=
- kwargs.pop('comparator_factory', None))
+ super(Column, self).__init__(name, None, type_)
self.key = kwargs.pop('key', name)
self.primary_key = kwargs.pop('primary_key', False)
self.nullable = kwargs.pop('nullable', not self.primary_key)
name=self.name,
type_=self.type,
key = self.key,
- comparator_factory = self.comparator_factory,
primary_key = self.primary_key,
nullable = self.nullable,
unique = self.unique,
key = key if key else name if name else self.key,
primary_key = self.primary_key,
nullable = self.nullable,
- comparator_factory = self.comparator_factory,
quote=self.quote, _proxies=[self], *fk)
except TypeError, e:
# Py3K
def __operate(self, expr, op, obj, reverse=False):
obj = self._check_literal(expr, op, obj)
- comparator_factory = None
+
if reverse:
left, right = obj, expr
else:
if left.type is None:
op, result_type = sqltypes.NULLTYPE._adapt_expression(op,
right.type)
- result_type = sqltypes.to_instance(result_type)
- if right.type._compare_type_affinity(result_type):
- comparator_factory = right.comparator_factory
elif right.type is None:
op, result_type = left.type._adapt_expression(op,
sqltypes.NULLTYPE)
- result_type = sqltypes.to_instance(result_type)
- if left.type._compare_type_affinity(result_type):
- comparator_factory = left.comparator_factory
else:
op, result_type = left.type._adapt_expression(op, right.type)
- result_type = sqltypes.to_instance(result_type)
- if left.type._compare_type_affinity(result_type):
- comparator_factory = left.comparator_factory
- elif right.type._compare_type_affinity(result_type):
- comparator_factory = right.comparator_factory
- return BinaryExpression(left, right, op, type_=result_type,
- comparator_factory=comparator_factory)
+ return BinaryExpression(left, right, op, type_=result_type)
def __scalar(self, expr, op, fn, **kw):
return fn(expr)
__visit_name__ = 'column'
primary_key = False
foreign_keys = []
+ type = None
quote = None
_label = None
_key_label = None
_alt_names = ()
- comparator = None
-
- class Comparator(operators.ColumnOperators):
- def __init__(self, expr):
- self.expr = expr
-
- def operate(self, op, *other, **kwargs):
- return _DEFAULT_COMPARATOR.operate(self.expr, op, *other, **kwargs)
-
- def reverse_operate(self, op, other, **kwargs):
- return _DEFAULT_COMPARATOR.reverse_operate(self.expr, op, other,
- **kwargs)
+ @util.memoized_property
+ def comparator(self):
+ if self.type is None:
+ return None
+ elif self.type.comparator_factory is not None:
+ return self.type.comparator_factory(self)
+ else:
+ return None
def __getattr__(self, key):
if self.comparator is None:
__visit_name__ = 'binary'
def __init__(self, left, right, operator, type_=None,
- negate=None, modifiers=None, comparator_factory=None):
+ negate=None, modifiers=None):
# allow compatibility with libraries that
# refer to BinaryExpression directly and pass strings
if isinstance(operator, basestring):
self.type = sqltypes.to_instance(type_)
self.negate = negate
- self.comparator_factory = comparator_factory
- if comparator_factory is not None:
- self.comparator = comparator_factory(self)
-
if modifiers is None:
self.modifiers = {}
else:
:func:`literal_column()` function is usually used to create such a
:class:`.ColumnClause`.
- :param comparator_factory: a :class:`.operators.ColumnOperators` subclass
- which will produce custom operator behavior.
-
- .. versionadded: 0.8 support for pluggable operators in
- core column expressions.
"""
__visit_name__ = 'column'
_memoized_property = util.group_expirable_memoized_property()
- def __init__(self, text, selectable=None, type_=None, is_literal=False,
- comparator_factory=None):
+ def __init__(self, text, selectable=None, type_=None, is_literal=False):
self.key = self.name = text
self.table = selectable
self.type = sqltypes.to_instance(type_)
self.is_literal = is_literal
- self.comparator_factory = comparator_factory
- if comparator_factory:
- self.comparator = comparator_factory(self)
def _compare_name_for_result(self, other):
if self.table is not None and hasattr(other, 'proxy_set'):
from . import exc, schema, util, processors, events, event
from .sql import operators
+from .sql.expression import _DEFAULT_COMPARATOR
from .util import pickle
from .util.compat import decimal
from .sql.visitors import Visitable
class TypeEngine(AbstractType):
"""Base for built-in types."""
+ class Comparator(operators.ColumnOperators):
+ def __init__(self, expr):
+ self.expr = expr
+
+ def operate(self, op, *other, **kwargs):
+ return _DEFAULT_COMPARATOR.operate(self.expr, op, *other, **kwargs)
+
+ def reverse_operate(self, op, other, **kwargs):
+ return _DEFAULT_COMPARATOR.reverse_operate(self.expr, op, other,
+ **kwargs)
+
+ comparator_factory = None
+ """A :class:`.TypeEngine.Comparator` class which will apply
+ to operations performed by owning :class:`.ColumnElement` objects.
+
+ """
+
def copy_value(self, value):
return value
"type being decorated")
self.impl = to_instance(self.__class__.impl, *args, **kwargs)
+ @property
+ def comparator_factory(self):
+ return self.impl.comparator_factory
def _gen_dialect_impl(self, dialect):
"""
return self.impl.compare_values(x, y)
def _adapt_expression(self, op, othertype):
- """
- #todo
- """
- op, typ =self.impl._adapt_expression(op, othertype)
- if typ is self.impl:
+ op, typ = self.impl._adapt_expression(op, othertype)
+ typ = to_instance(typ)
+ if typ._compare_type_affinity(self.impl):
return op, self
else:
return op, typ
othertype = othertype._type_affinity
return op, \
self._expression_adaptations.get(op, self._blank_dict).\
- get(othertype, self)
+ get(othertype, NULLTYPE)
class String(Concatenable, TypeEngine):
"""The base for all string and character types.
return {
operators.add:{
Date:Date,
- Integer:Integer,
+ Integer:self.__class__,
Numeric:Numeric,
},
operators.mul:{
Interval:Interval,
- Integer:Integer,
+ Integer:self.__class__,
Numeric:Numeric,
},
# Py2K
operators.div:{
- Integer:Integer,
+ Integer:self.__class__,
Numeric:Numeric,
},
# end Py2K
operators.truediv:{
- Integer:Integer,
+ Integer:self.__class__,
Numeric:Numeric,
},
operators.sub:{
- Integer:Integer,
+ Integer:self.__class__,
Numeric:Numeric,
},
}
return {
operators.mul:{
Interval:Interval,
- Numeric:Numeric,
- Integer:Numeric,
+ Numeric:self.__class__,
+ Integer:self.__class__,
},
# Py2K
operators.div:{
- Numeric:Numeric,
- Integer:Numeric,
+ Numeric:self.__class__,
+ Integer:self.__class__,
},
# end Py2K
operators.truediv:{
- Numeric:Numeric,
- Integer:Numeric,
+ Numeric:self.__class__,
+ Integer:self.__class__,
},
operators.add:{
- Numeric:Numeric,
- Integer:Numeric,
+ Numeric:self.__class__,
+ Integer:self.__class__,
},
operators.sub:{
- Numeric:Numeric,
- Integer:Numeric,
+ Numeric:self.__class__,
+ Integer:self.__class__,
}
}
return {
operators.mul:{
Interval:Interval,
- Numeric:Float,
+ Numeric:self.__class__,
},
# Py2K
operators.div:{
- Numeric:Float,
+ Numeric:self.__class__,
},
# end Py2K
operators.truediv:{
- Numeric:Float,
+ Numeric:self.__class__,
},
operators.add:{
- Numeric:Float,
+ Numeric:self.__class__,
},
operators.sub:{
- Numeric:Float,
+ Numeric:self.__class__,
}
}
def _expression_adaptations(self):
return {
operators.add:{
- Interval:DateTime,
+ Interval:self.__class__,
},
operators.sub:{
- Interval:DateTime,
+ Interval:self.__class__,
DateTime:Interval,
},
}
def _expression_adaptations(self):
return {
operators.add:{
- Integer:Date,
+ Integer:self.__class__,
Interval:DateTime,
Time:DateTime,
},
operators.sub:{
# date - integer = date
- Integer:Date,
+ Integer:self.__class__,
# date - date = integer.
Date:Integer,
return {
operators.add:{
Date:DateTime,
- Interval:Time
+ Interval:self.__class__
},
operators.sub:{
Time:Interval,
- Interval:Time,
+ Interval:self.__class__,
},
}
return {
operators.add:{
Date:DateTime,
- Interval:Interval,
+ Interval:self.__class__,
DateTime:DateTime,
Time:Time,
},
operators.sub:{
- Interval:Interval
+ Interval:self.__class__
},
operators.mul:{
- Numeric:Interval
+ Numeric:self.__class__
},
operators.truediv: {
- Numeric:Interval
+ Numeric:self.__class__
},
# Py2K
operators.div: {
- Numeric:Interval
+ Numeric:self.__class__
}
# end Py2K
}
ClauseList, Grouping, _DefaultColumnComparator
from sqlalchemy.sql import operators
from sqlalchemy.schema import Column, Table, MetaData
-from sqlalchemy.types import Integer
+from sqlalchemy.types import Integer, TypeEngine, TypeDecorator
class DefaultColumnComparatorTest(fixtures.TestBase):
collate(left, right)
)
-class CustomComparatorTest(fixtures.TestBase):
- def _add_override_factory(self):
- class MyComparator(Column.Comparator):
- def __init__(self, expr):
- self.expr = expr
+class _CustomComparatorTests(object):
+ def test_override_builtin(self):
+ c1 = Column('foo', self._add_override_factory())
+ self._assert_add_override(c1)
+
+ def test_column_proxy(self):
+ t = Table('t', MetaData(),
+ Column('foo', self._add_override_factory())
+ )
+ proxied = t.select().c.foo
+ self._assert_add_override(proxied)
- def __add__(self, other):
- return self.expr.op("goofy")(other)
- return MyComparator
+ def test_alias_proxy(self):
+ t = Table('t', MetaData(),
+ Column('foo', self._add_override_factory())
+ )
+ proxied = t.alias().c.foo
+ self._assert_add_override(proxied)
+
+ def test_binary_propagate(self):
+ c1 = Column('foo', self._add_override_factory())
+ self._assert_add_override(c1 - 6)
+
+ def test_reverse_binary_propagate(self):
+ c1 = Column('foo', self._add_override_factory())
+ self._assert_add_override(6 - c1)
+
+ def test_binary_multi_propagate(self):
+ c1 = Column('foo', self._add_override_factory(True))
+ self._assert_add_override((c1 - 6) + 5)
+
+ def test_no_binary_multi_propagate_wo_adapt(self):
+ c1 = Column('foo', self._add_override_factory())
+ self._assert_not_add_override((c1 - 6) + 5)
+
+ def test_no_boolean_propagate(self):
+ c1 = Column('foo', self._add_override_factory())
+ self._assert_not_add_override(c1 == 56)
def _assert_add_override(self, expr):
assert (expr + 5).compare(
expr.op("goofy")(5)
)
- def test_override_builtin(self):
- c1 = Column('foo', Integer,
- comparator_factory=self._add_override_factory())
- self._assert_add_override(c1)
+class CustomComparatorTest(_CustomComparatorTests, fixtures.TestBase):
+ def _add_override_factory(self, include_adapt=False):
- def test_column_proxy(self):
- t = Table('t', MetaData(),
- Column('foo', Integer,
- comparator_factory=self._add_override_factory()))
- proxied = t.select().c.foo
- self._assert_add_override(proxied)
+ class MyInteger(Integer):
+ class comparator_factory(TypeEngine.Comparator):
+ def __init__(self, expr):
+ self.expr = expr
- def test_binary_propagate(self):
- c1 = Column('foo', Integer,
- comparator_factory=self._add_override_factory())
+ def __add__(self, other):
+ return self.expr.op("goofy")(other)
- self._assert_add_override(c1 - 6)
+ if include_adapt:
+ def _adapt_expression(self, op, othertype):
+ if op.__name__ == 'custom_op':
+ return op, self
+ else:
+ return super(MyInteger, self)._adapt_expression(
+ op, othertype)
- def test_binary_multi_propagate(self):
- c1 = Column('foo', Integer,
- comparator_factory=self._add_override_factory())
- self._assert_add_override((c1 - 6) + 5)
+ return MyInteger
- def test_no_boolean_propagate(self):
- c1 = Column('foo', Integer,
- comparator_factory=self._add_override_factory())
- self._assert_not_add_override(c1 == 56)
+class TypeDecoratorComparatorTest(_CustomComparatorTests, fixtures.TestBase):
+ def _add_override_factory(self, include_adapt=False):
+
+ class MyInteger(TypeDecorator):
+ impl = Integer
+
+ class comparator_factory(TypeEngine.Comparator):
+ def __init__(self, expr):
+ self.expr = expr
+
+ def __add__(self, other):
+ return self.expr.op("goofy")(other)
+
+ if include_adapt:
+ def _adapt_expression(self, op, othertype):
+ if op.__name__ == 'custom_op':
+ return op, self
+ else:
+ return super(MyInteger, self)._adapt_expression(
+ op, othertype)
+
+ return MyInteger
+
+
+class CustomEmbeddedinTypeDecoratorTest(_CustomComparatorTests, fixtures.TestBase):
+ def _add_override_factory(self, include_adapt=False):
+ class MyInteger(Integer):
+ class comparator_factory(TypeEngine.Comparator):
+ def __init__(self, expr):
+ self.expr = expr
+
+ def __add__(self, other):
+ return self.expr.op("goofy")(other)
+
+ if include_adapt:
+ def _adapt_expression(self, op, othertype):
+ if op.__name__ == 'custom_op':
+ return op, self
+ else:
+ return super(MyInteger, self)._adapt_expression(
+ op, othertype)
+
+ class MyDecInteger(TypeDecorator):
+ impl = MyInteger
+
+ return MyDecInteger
+
+class NewOperatorTest(_CustomComparatorTests, fixtures.TestBase):
+ def _add_override_factory(self, include_adapt=False):
+ class MyInteger(Integer):
+ class comparator_factory(TypeEngine.Comparator):
+ def __init__(self, expr):
+ self.expr = expr
+
+ def foob(self, other):
+ return self.expr.op("foob")(other)
+
+ if include_adapt:
+ def _adapt_expression(self, op, othertype):
+ if op.__name__ == 'custom_op':
+ return op, self
+ else:
+ return super(MyInteger, self)._adapt_expression(
+ op, othertype)
+
+ return MyInteger
+
+ def _assert_add_override(self, expr):
+ assert (expr.foob(5)).compare(
+ expr.op("foob")(5)
+ )
+
+ def _assert_not_add_override(self, expr):
+ assert not hasattr(expr, "foob")
+ def test_no_binary_multi_propagate_wo_adapt(self):
+ pass
\ No newline at end of file