decryption, usage of Postgis functions, etc.
[ticket:1534]
+ - [feature] The Core oeprator system now includes
+ the `getitem` operator, i.e. the bracket
+ operator in Python. This is used at first
+ to provide index and slice behavior to the
+ Postgresql ARRAY type, and also provides a hook
+ for end-user definition of custom __getitem__
+ schemes which can be applied at the type
+ level as well as within ORM-level custom
+ operator schemes.
+
+ Note that this change has the effect that
+ descriptor-based __getitem__ schemes used by
+ the ORM in conjunction with synonym() or other
+ "descriptor-wrapped" schemes will need
+ to start using a custom comparator in order
+ to maintain this behavior.
+
- [feature] Revised the rules used to determine
the operator precedence for the user-defined
operator, i.e. that granted using the ``op()``
performance of bind/result processing.
[ticket:2441]
+ - [feature] postgresql.ARRAY now supports
+ indexing and slicing. The Python [] operator
+ is available on all SQL expressions that are
+ of type ARRAY; integer or simple slices can be
+ passed. The slices can also be used on the
+ assignment side in the SET clause of an UPDATE
+ statement by passing them into Update.values();
+ see the docs for examples.
+
+ - [feature] Added new "array literal" construct
+ postgresql.array(). Basically a "tuple" that
+ renders as ARRAY[1,2,3].
+
- [feature] Added support for the Postgresql ONLY
keyword, which can appear corresponding to a
table in a SELECT, UPDATE, or DELETE statement.
MACADDR, NUMERIC, REAL, SMALLINT, TEXT, TIME, TIMESTAMP, \
UUID, VARCHAR
-Types which are specific to PostgreSQL, or have PostgreSQL-specific
+Types which are specific to PostgreSQL, or have PostgreSQL-specific
construction arguments, are as follows:
.. currentmodule:: sqlalchemy.dialects.postgresql
+.. autoclass:: array
+
.. autoclass:: ARRAY
:members: __init__
:show-inheritance:
from .base import \
INTEGER, BIGINT, SMALLINT, VARCHAR, CHAR, TEXT, NUMERIC, FLOAT, REAL, INET, \
CIDR, UUID, BIT, MACADDR, DOUBLE_PRECISION, TIMESTAMP, TIME,\
- DATE, BYTEA, BOOLEAN, INTERVAL, ARRAY, ENUM, dialect
+ DATE, BYTEA, BOOLEAN, INTERVAL, ARRAY, ENUM, dialect, array
__all__ = (
'INTEGER', 'BIGINT', 'SMALLINT', 'VARCHAR', 'CHAR', 'TEXT', 'NUMERIC', 'FLOAT', 'REAL', 'INET',
'CIDR', 'UUID', 'BIT', 'MACADDR', 'DOUBLE_PRECISION', 'TIMESTAMP', 'TIME',
-'DATE', 'BYTEA', 'BOOLEAN', 'INTERVAL', 'ARRAY', 'ENUM', 'dialect'
+'DATE', 'BYTEA', 'BOOLEAN', 'INTERVAL', 'ARRAY', 'ENUM', 'dialect', 'array'
)
from ... import sql, schema, exc, util
from ...engine import default, reflection
-from ...sql import compiler, expression, util as sql_util
+from ...sql import compiler, expression, util as sql_util, operators
from ... import types as sqltypes
try:
PGUuid = UUID
+class _Slice(expression.ColumnElement):
+ __visit_name__ = 'slice'
+ type = sqltypes.NULLTYPE
+ def __init__(self, slice_, source_comparator):
+ self.start = source_comparator._check_literal(
+ source_comparator.expr,
+ operators.getitem, slice_.start)
+ self.stop = source_comparator._check_literal(
+ source_comparator.expr,
+ operators.getitem, slice_.stop)
+
+class array(expression.Tuple):
+ """A Postgresql ARRAY literal.
+
+ This is used to produce ARRAY literals in SQL expressions, e.g.::
+
+ from sqlalchemy.dialects.postgresql import array
+ from sqlalchemy.dialects import postgresql
+ from sqlalchemy import select, func
+
+ stmt = select([
+ array([1,2]) + array([3,4,5])
+ ])
+
+ print stmt.compile(dialect=postgresql.dialect())
+
+ Produces the SQL::
+
+ SELECT ARRAY[%(param_1)s, %(param_2)s] ||
+ ARRAY[%(param_3)s, %(param_4)s, %(param_5)s]) AS anon_1
+
+ An instance of :class:`.array` will always have the datatype
+ :class:`.ARRAY`. The "inner" type of the array is inferred from
+ the values present, unless the "type_" keyword argument is passed::
+
+ array(['foo', 'bar'], type_=CHAR)
+
+ .. versionadded:: 0.8 Added the :class:`~.postgresql.array` literal type.
+
+ See also:
+
+ :class:`.postgresql.ARRAY`
+
+ """
+ __visit_name__ = 'array'
+
+ def __init__(self, clauses, **kw):
+ super(array, self).__init__(*clauses, **kw)
+ self.type = ARRAY(self.type)
+
+ def _bind_param(self, operator, obj):
+ return array(*[
+ expression.BindParameter(None, o, _compared_to_operator=operator,
+ _compared_to_type=self.type, unique=True)
+ for o in obj
+ ])
+
+ def self_group(self, against):
+ return self
+
class ARRAY(sqltypes.Concatenable, sqltypes.TypeEngine):
"""Postgresql ARRAY type.
Represents values as Python lists.
- The ARRAY type may not be supported on all DBAPIs.
+ An :class:`.ARRAY` type is constructed given the "type"
+ of element::
+
+ mytable = Table("mytable", metadata,
+ Column("data", ARRAY(Integer))
+ )
+
+ The above type represents an N-dimensional array,
+ meaning Postgresql will interpret values with any number
+ of dimensions automatically. To produce an INSERT
+ construct that passes in a 1-dimensional array of integers::
+
+ connection.execute(
+ mytable.insert(),
+ data=[1,2,3]
+ )
+
+ The :class:`.ARRAY` type can be constructed given a fixed number
+ of dimensions::
+
+ mytable = Table("mytable", metadata,
+ Column("data", ARRAY(Integer, dimensions=2))
+ )
+
+ This has the effect of the :class:`.ARRAY` type
+ specifying that number of bracketed blocks when a :class:`.Table`
+ is used in a CREATE TABLE statement, or when the type is used
+ within a :func:`.expression.cast` construct; it also causes
+ the bind parameter and result set processing of the type
+ to optimize itself to expect exactly that number of dimensions.
+ Note that Postgresql itself still allows N dimensions with such a type.
+
+ SQL expressions of type :class:`.ARRAY` have support for "index" and "slice"
+ behavior. The Python ``[]`` operator works normally here, given
+ integer indexes or slices. Note that Postgresql arrays default
+ to 1-based indexing. The operator produces binary expression
+ constructs which will produce the appropriate SQL, both for
+ SELECT statements::
+
+ select([mytable.c.data[5], mytable.c.data[2:7]])
+
+ as well as UPDATE statements when the :meth:`.Update.values` method
+ is used::
+
+ mytable.update().values({mytable.c.data[5]:7,
+ mytable.c.data[2:7]:[1,2,3]})
+
+ .. versionadded:: 0.8 Added support for index and slice operations
+ to the :class:`.ARRAY` type, including support for UPDATE
+ statements.
+
+ The :class:`.ARRAY` type may not be supported on all DBAPIs.
It is known to work on psycopg2 and not pg8000.
+ See also:
+
+ :class:`.postgresql.array` - produce a literal array value.
"""
__visit_name__ = 'ARRAY'
+ class Comparator(sqltypes.Concatenable.Comparator):
+ def __getitem__(self, index):
+ if isinstance(index, slice):
+ index = _Slice(index, self)
+ return_type = self.type
+ else:
+ return_type = self.type.item_type
+ return self._binary_operate(self.expr, operators.getitem, index,
+ result_type=return_type)
+
+ comparator_factory = Comparator
+
def __init__(self, item_type, as_tuple=False, dimensions=None):
"""Construct an ARRAY.
:param item_type: The data type of items of this array. Note that
dimensionality is irrelevant here, so multi-dimensional arrays like
``INTEGER[][]``, are constructed as ``ARRAY(Integer)``, not as
- ``ARRAY(ARRAY(Integer))`` or such. The type mapping figures out on
- the fly
+ ``ARRAY(ARRAY(Integer))`` or such.
:param as_tuple=False: Specify whether return results
should be converted to tuples from lists. DBAPIs such
class PGCompiler(compiler.SQLCompiler):
+ def visit_array(self, element, **kw):
+ return "ARRAY[%s]" % self.visit_clauselist(element, **kw)
+
+ def visit_slice(self, element, **kw):
+ return "%s:%s" % (
+ self.process(element.start, **kw),
+ self.process(element.stop, **kw),
+ )
+
+ def visit_getitem_binary(self, binary, operator, **kw):
+ return "%s[%s]" % (
+ self.process(binary.left, **kw),
+ self.process(binary.right, **kw)
+ )
+
def visit_match_op_binary(self, binary, operator, **kw):
return "%s @@ to_tsquery(%s)" % (
- self.process(binary.left),
- self.process(binary.right))
+ self.process(binary.left, **kw),
+ self.process(binary.right, **kw))
def visit_ilike_op_binary(self, binary, operator, **kw):
escape = binary.modifiers.get("escape", None)
return '%s ILIKE %s' % \
- (self.process(binary.left), self.process(binary.right)) \
+ (self.process(binary.left, **kw),
+ self.process(binary.right, **kw)) \
+ (escape and
(' ESCAPE ' + self.render_literal_value(escape, None))
or '')
def visit_notilike_op_binary(self, binary, operator, **kw):
escape = binary.modifiers.get("escape", None)
return '%s NOT ILIKE %s' % \
- (self.process(binary.left), self.process(binary.right)) \
+ (self.process(binary.left, **kw),
+ self.process(binary.right, **kw)) \
+ (escape and
(' ESCAPE ' + self.render_literal_value(escape, None))
or '')
def limit_clause(self, select):
text = ""
if select._limit is not None:
- text += " \n LIMIT " + self.process(sql.literal(select._limit))
+ text += " \n LIMIT " + self.process(sql.literal(select._limit))
if select._offset is not None:
if select._limit is None:
text += " \n LIMIT ALL"
within_columns_clause=False,
**kw)
- def visit_column(self, column, add_to_result_map=None, **kwargs):
+ def visit_column(self, column, add_to_result_map=None,
+ include_table=True, **kwargs):
name = orig_name = column.name
if name is None:
raise exc.CompileError("Cannot compile Column object until "
name = self.preparer.quote(name, column.quote)
table = column.table
- if table is None or not table.named_with_column:
+ if table is None or not include_table or not table.named_with_column:
return name
else:
if table.schema:
text += table_text
text += ' SET '
- if extra_froms and self.render_table_with_column_in_update_from:
- text += ', '.join(
- self.visit_column(c[0]) +
- '=' + c[1] for c in colparams
- )
- else:
- text += ', '.join(
- self.preparer.quote(c[0].name, c[0].quote) +
+ include_table = extra_froms and \
+ self.render_table_with_column_in_update_from
+ text += ', '.join(
+ c[0]._compiler_dispatch(self,
+ include_table=include_table) +
'=' + c[1] for c in colparams
- )
+ )
if update_stmt._returning:
self.returning = update_stmt._returning
if not stmt.parameters or
key not in stmt.parameters)
+ # create a list of column assignment clauses as tuples
+ values = []
+
if stmt.parameters is not None:
for k, v in stmt.parameters.iteritems():
- parameters.setdefault(sql._column_as_key(k), v)
+ colkey = sql._column_as_key(k)
+ if colkey is not None:
+ parameters.setdefault(colkey, v)
+ else:
+ # a non-Column expression on the left side;
+ # add it to values() in an "as-is" state,
+ # coercing right side to bound param
+ if sql._is_literal(v):
+ v = self.process(sql.bindparam(None, v, type_=k.type))
+ else:
+ v = self.process(v.self_group())
+
+ values.append((k, v))
- # create a list of column assignment clauses as tuples
- values = []
need_pks = self.isinsert and \
not self.inline and \
return element
if hasattr(element, '__clause_element__'):
element = element.__clause_element__()
- return element.key
+ try:
+ return element.key
+ except AttributeError:
+ return None
def _clause_element_as_expr(element):
if hasattr(element, '__clause_element__'):
type_=sqltypes.BOOLEANTYPE,
negate=negate, modifiers=kwargs)
- def _binary_operate(self, expr, op, obj, reverse=False):
+ def _binary_operate(self, expr, op, obj, reverse=False, result_type=None):
obj = self._check_literal(expr, op, obj)
if reverse:
else:
left, right = expr, obj
- op, result_type = left.comparator._adapt_expression(op, right.comparator)
+ if result_type is None:
+ op, result_type = left.comparator._adapt_expression(
+ op, right.comparator)
return BinaryExpression(left, right, op, type_=result_type)
return self._boolean_compare(expr, op,
ClauseList(*args).self_group(against=op),
negate=negate_op)
+
+ def _unsupported_impl(self, expr, op, *arg, **kw):
+ raise NotImplementedError("Operator '%s' is not supported on "
+ "this expression" % op.__name__)
+
def _neg_impl(self, expr, op, **kw):
"""See :meth:`.ColumnOperators.__neg__`."""
return UnaryExpression(expr, operator=operators.neg)
"startswith_op": (_startswith_impl,),
"endswith_op": (_endswith_impl,),
"neg": (_neg_impl,),
+ "getitem": (_unsupported_impl,),
}
def __init__(self, *clauses, **kw):
clauses = [_literal_as_binds(c) for c in clauses]
+ self.type = kw.pop('type_', None)
+ if self.type is None:
+ self.type = _type_from_args(clauses)
super(Tuple, self).__init__(*clauses, **kw)
- self.type = _type_from_args(clauses)
@property
def _select_iterable(self):
"""Defines operators used in SQL expressions."""
from operator import (
- and_, or_, inv, add, mul, sub, mod, truediv, lt, le, ne, gt, ge, eq, neg
+ and_, or_, inv, add, mul, sub, mod, truediv, lt, le, ne, gt, ge, eq, neg,
+ getitem
)
# Py2K
"""
return self.operate(neg)
+ def __getitem__(self, index):
+ """Implement the [] operator.
+
+ This can be used by some database-specific types
+ such as Postgresql ARRAY and HSTORE.
+
+ """
+ return self.operate(getitem, index)
+
def concat(self, other):
"""Implement the 'concat' operator.
_associative = _commutative.union([concat_op, and_, or_])
+_natural_self_precedent = _associative.union([getitem])
+"""Operators where if we have (a op b) op c, we don't want to
+parenthesize (a op b).
+
+"""
_smallest = symbol('_smallest', canonical=-100)
_largest = symbol('_largest', canonical=100)
_PRECEDENCE = {
from_: 15,
+ getitem: 15,
mul: 7,
truediv: 7,
# Py2K
def is_precedent(operator, against):
- if operator is against and operator in _associative:
+ if operator is against and operator in _natural_self_precedent:
return False
else:
return (_PRECEDENCE.get(operator,
"""Base class for custom comparison operations defined at the
type level. See :attr:`.TypeEngine.comparator_factory`.
+ The public base class for :class:`.TypeEngine.Comparator`
+ is :class:`.ColumnOperators`.
+
"""
def __init__(self, expr):
dunders = [m for m in dir(from_cls)
if (m.startswith('__') and m.endswith('__') and
not hasattr(into_cls, m) and m not in skip)]
+
for method in dunders:
try:
fn = getattr(from_cls, method)
# coding: utf-8
-from test.lib.testing import eq_, assert_raises, assert_raises_message
+from test.lib.testing import eq_, assert_raises, assert_raises_message, is_
from test.lib import engines
import datetime
from sqlalchemy import *
self.assert_compile(x,
'''SELECT pg_table.col1, pg_table."variadic" FROM pg_table''')
+ def test_array(self):
+ c = Column('x', postgresql.ARRAY(Integer))
+
+ self.assert_compile(
+ cast(c, postgresql.ARRAY(Integer)),
+ "CAST(x AS INTEGER[])"
+ )
+ self.assert_compile(
+ c[5],
+ "x[%(x_1)s]",
+ checkparams={'x_1': 5}
+ )
+
+ self.assert_compile(
+ c[5:7],
+ "x[%(x_1)s:%(x_2)s]",
+ checkparams={'x_2': 7, 'x_1': 5}
+ )
+ self.assert_compile(
+ c[5:7][2:3],
+ "x[%(x_1)s:%(x_2)s][%(param_1)s:%(param_2)s]",
+ checkparams={'x_2': 7, 'x_1': 5, 'param_1':2, 'param_2':3}
+ )
+ self.assert_compile(
+ c[5:7][3],
+ "x[%(x_1)s:%(x_2)s][%(param_1)s]",
+ checkparams={'x_2': 7, 'x_1': 5, 'param_1':3}
+ )
+
+ def test_array_literal_type(self):
+ is_(postgresql.array([1, 2]).type._type_affinity, postgresql.ARRAY)
+ is_(postgresql.array([1, 2]).type.item_type._type_affinity, Integer)
+
+ is_(postgresql.array([1, 2], type_=String).
+ type.item_type._type_affinity, String)
+
+ def test_array_literal(self):
+ self.assert_compile(
+ func.array_dims(postgresql.array([1, 2]) +
+ postgresql.array([3, 4, 5])),
+ "array_dims(ARRAY[%(param_1)s, %(param_2)s] || "
+ "ARRAY[%(param_3)s, %(param_4)s, %(param_5)s])",
+ checkparams={'param_5': 5, 'param_4': 4, 'param_1': 1,
+ 'param_3': 3, 'param_2': 2}
+ )
+
+ def test_update_array_element(self):
+ m = MetaData()
+ t = Table('t', m, Column('data', postgresql.ARRAY(Integer)))
+ self.assert_compile(
+ t.update().values({t.c.data[5]: 1}),
+ "UPDATE t SET data[%(data_1)s]=%(param_1)s",
+ checkparams={'data_1': 5, 'param_1': 1}
+ )
+
+ def test_update_array_slice(self):
+ m = MetaData()
+ t = Table('t', m, Column('data', postgresql.ARRAY(Integer)))
+ self.assert_compile(
+ t.update().values({t.c.data[2:5]: 2}),
+ "UPDATE t SET data[%(data_1)s:%(data_2)s]=%(param_1)s",
+ checkparams={'param_1': 2, 'data_2': 5, 'data_1': 2}
+
+ )
+
def test_from_only(self):
m = MetaData()
tbl1 = Table('testtbl1', m, Column('id', Integer))
eq_(results[0]['intarr'], [1, 2, 3])
def test_array_concat(self):
- arrtable.insert().execute(intarr=[1, 2, 3], strarr=[u'abc',
- u'def'])
+ arrtable.insert().execute(intarr=[1, 2, 3],
+ strarr=[u'abc', u'def'])
results = select([arrtable.c.intarr + [4, 5,
6]]).execute().fetchall()
eq_(len(results), 1)
eq_(results[0]['strarr'], [u'm\xe4\xe4', u'm\xf6\xf6'])
eq_(results[1]['strarr'], [[u'm\xe4\xe4'], [u'm\xf6\xf6']])
+ def test_array_literal(self):
+ eq_(
+ testing.db.scalar(
+ select([
+ postgresql.array([1, 2]) + postgresql.array([3, 4, 5])
+ ])
+ ), [1,2,3,4,5]
+ )
+
+ def test_array_getitem_single_type(self):
+ is_(arrtable.c.intarr[1].type._type_affinity, Integer)
+ is_(arrtable.c.strarr[1].type._type_affinity, String)
+
+ def test_array_getitem_slice_type(self):
+ is_(arrtable.c.intarr[1:3].type._type_affinity, postgresql.ARRAY)
+ is_(arrtable.c.strarr[1:3].type._type_affinity, postgresql.ARRAY)
+
+ def test_array_getitem_single_exec(self):
+ with testing.db.connect() as conn:
+ conn.execute(
+ arrtable.insert(),
+ intarr=[4, 5, 6],
+ strarr=[u'abc', u'def']
+ )
+ eq_(
+ conn.scalar(select([arrtable.c.intarr[2]])),
+ 5
+ )
+ conn.execute(
+ arrtable.update().values({arrtable.c.intarr[2]: 7})
+ )
+ eq_(
+ conn.scalar(select([arrtable.c.intarr[2]])),
+ 7
+ )
+
+ def test_array_getitem_slice_exec(self):
+ with testing.db.connect() as conn:
+ conn.execute(
+ arrtable.insert(),
+ intarr=[4, 5, 6],
+ strarr=[u'abc', u'def']
+ )
+ eq_(
+ conn.scalar(select([arrtable.c.intarr[2:3]])),
+ [5, 6]
+ )
+ conn.execute(
+ arrtable.update().values({arrtable.c.intarr[2:3]: [7, 8]})
+ )
+ eq_(
+ conn.scalar(select([arrtable.c.intarr[2:3]])),
+ [7, 8]
+ )
+
@testing.provide_metadata
def test_tuple_flag(self):
metadata = self.metadata
# TEST: test.aaa_profiling.test_compiler.CompileTest.test_update
-test.aaa_profiling.test_compiler.CompileTest.test_update 2.5_sqlite_pysqlite_nocextensions 57
-test.aaa_profiling.test_compiler.CompileTest.test_update 2.6_sqlite_pysqlite_nocextensions 57
-test.aaa_profiling.test_compiler.CompileTest.test_update 2.7_mysql_mysqldb_cextensions 57
-test.aaa_profiling.test_compiler.CompileTest.test_update 2.7_mysql_mysqldb_nocextensions 57
-test.aaa_profiling.test_compiler.CompileTest.test_update 2.7_postgresql_psycopg2_cextensions 57
-test.aaa_profiling.test_compiler.CompileTest.test_update 2.7_postgresql_psycopg2_nocextensions 57
-test.aaa_profiling.test_compiler.CompileTest.test_update 2.7_sqlite_pysqlite_cextensions 57
-test.aaa_profiling.test_compiler.CompileTest.test_update 2.7_sqlite_pysqlite_nocextensions 57
-test.aaa_profiling.test_compiler.CompileTest.test_update 3.2_postgresql_psycopg2_nocextensions 60
-test.aaa_profiling.test_compiler.CompileTest.test_update 3.2_sqlite_pysqlite_nocextensions 60
+test.aaa_profiling.test_compiler.CompileTest.test_update 2.7_sqlite_pysqlite_cextensions 65
# TEST: test.aaa_profiling.test_compiler.CompileTest.test_update_whereclause
-test.aaa_profiling.test_compiler.CompileTest.test_update_whereclause 2.5_sqlite_pysqlite_nocextensions 117
-test.aaa_profiling.test_compiler.CompileTest.test_update_whereclause 2.6_sqlite_pysqlite_nocextensions 117
-test.aaa_profiling.test_compiler.CompileTest.test_update_whereclause 2.7_mysql_mysqldb_cextensions 117
-test.aaa_profiling.test_compiler.CompileTest.test_update_whereclause 2.7_mysql_mysqldb_nocextensions 117
-test.aaa_profiling.test_compiler.CompileTest.test_update_whereclause 2.7_postgresql_psycopg2_cextensions 117
-test.aaa_profiling.test_compiler.CompileTest.test_update_whereclause 2.7_postgresql_psycopg2_nocextensions 117
-test.aaa_profiling.test_compiler.CompileTest.test_update_whereclause 2.7_sqlite_pysqlite_cextensions 117
-test.aaa_profiling.test_compiler.CompileTest.test_update_whereclause 2.7_sqlite_pysqlite_nocextensions 117
-test.aaa_profiling.test_compiler.CompileTest.test_update_whereclause 3.2_postgresql_psycopg2_nocextensions 122
-test.aaa_profiling.test_compiler.CompileTest.test_update_whereclause 3.2_sqlite_pysqlite_nocextensions 122
+test.aaa_profiling.test_compiler.CompileTest.test_update_whereclause 2.7_sqlite_pysqlite_cextensions 130
# TEST: test.aaa_profiling.test_orm.LoadManyToOneFromIdentityTest.test_many_to_one_load_identity
def method1(self):
return "method1"
- def __getitem__(self, key):
- return 'value'
-
prop = myprop(lambda self:None)
Foo.foo = prop
assert Foo.foo is not prop
assert Foo.foo.attr == 'bar'
assert Foo.foo.method1() == 'method1'
- assert Foo.foo['bar'] == 'value'
def test_comparator(self):
class Comparator(PropComparator):
def method2(self, other):
return "method2"
- # TODO ?
- #def __getitem__(self, key):
- # return 'value'
+ def __getitem__(self, key):
+ return 'value'
def __eq__(self, other):
return column('foo') == func.upper(other)
eq_(Foo.foo.method1(), "method1")
eq_(Foo.foo.method2('x'), "method2")
assert Foo.foo.attr == 'bar'
- # TODO ?
- #assert Foo.foo['bar'] == 'value'
+ assert Foo.foo['bar'] == 'value'
eq_(
(Foo.foo == 'bar').__str__(),
"foo = upper(:upper_1)"
assert_col = []
class extendedproperty(property):
attribute = 123
- def __getitem__(self, key):
- return 'value'
class User(object):
def _get_name(self):
assert u in sess.dirty
eq_(User.uname.attribute, 123)
- eq_(User.uname['key'], 'value')
def test_synonym_of_synonym(self):
users, User = (self.tables.users,
def method1(self):
return "method1"
- def __getitem__(self, key):
- return 'value'
-
from sqlalchemy.orm.properties import ColumnProperty
class UCComparator(ColumnProperty.Comparator):
__hash__ = None
assert u2 is u3
eq_(User.uc_name.attribute, 123)
- eq_(User.uc_name['key'], 'value')
sess.rollback()
def test_comparable_column(self):
checkparams={'date_1':datetime.date(2006,6,1),
'date_2':datetime.date(2006,6,5)})
- def test_operator_precedence(self):
- table = Table('op', metadata,
- Column('field', Integer))
- self.assert_compile(table.select((table.c.field == 5) == None),
- "SELECT op.field FROM op WHERE (op.field = :field_1) IS NULL")
- self.assert_compile(table.select((table.c.field + 5) == table.c.field),
- "SELECT op.field FROM op WHERE op.field + :field_1 = op.field")
- self.assert_compile(table.select((table.c.field + 5) * 6),
- "SELECT op.field FROM op WHERE (op.field + :field_1) * :param_1")
- self.assert_compile(table.select((table.c.field * 5) + 6),
- "SELECT op.field FROM op WHERE op.field * :field_1 + :param_1")
- self.assert_compile(table.select(5 + table.c.field.in_([5,6])),
- "SELECT op.field FROM op WHERE :param_1 + (op.field IN (:field_1, :field_2))")
- self.assert_compile(table.select((5 + table.c.field).in_([5,6])),
- "SELECT op.field FROM op WHERE :field_1 + op.field IN (:param_1, :param_2)")
- self.assert_compile(table.select(not_(and_(table.c.field == 5, table.c.field == 7))),
- "SELECT op.field FROM op WHERE NOT (op.field = :field_1 AND op.field = :field_2)")
- self.assert_compile(table.select(not_(table.c.field == 5)),
- "SELECT op.field FROM op WHERE op.field != :field_1")
- self.assert_compile(table.select(not_(table.c.field.between(5, 6))),
- "SELECT op.field FROM op WHERE NOT (op.field BETWEEN :field_1 AND :field_2)")
- self.assert_compile(table.select(not_(table.c.field) == 5),
- "SELECT op.field FROM op WHERE (NOT op.field) = :param_1")
- self.assert_compile(table.select((table.c.field == table.c.field).between(False, True)),
- "SELECT op.field FROM op WHERE (op.field = op.field) BETWEEN :param_1 AND :param_2")
- self.assert_compile(table.select(between((table.c.field == table.c.field), False, True)),
- "SELECT op.field FROM op WHERE (op.field = op.field) BETWEEN :param_1 AND :param_2")
-
- def test_associativity(self):
- f = column('f')
- self.assert_compile( f - f, "f - f" )
- self.assert_compile( f - f - f, "(f - f) - f" )
-
- self.assert_compile( (f - f) - f, "(f - f) - f" )
- self.assert_compile( (f - f).label('foo') - f, "(f - f) - f" )
-
- self.assert_compile( f - (f - f), "f - (f - f)" )
- self.assert_compile( f - (f - f).label('foo'), "f - (f - f)" )
-
- # because - less precedent than /
- self.assert_compile( f / (f - f), "f / (f - f)" )
- self.assert_compile( f / (f - f).label('foo'), "f / (f - f)" )
-
- self.assert_compile( f / f - f, "f / f - f" )
- self.assert_compile( (f / f) - f, "f / f - f" )
- self.assert_compile( (f / f).label('foo') - f, "f / f - f" )
-
- # because / more precedent than -
- self.assert_compile( f - (f / f), "f - f / f" )
- self.assert_compile( f - (f / f).label('foo'), "f - f / f" )
- self.assert_compile( f - f / f, "f - f / f" )
- self.assert_compile( (f - f) / f, "(f - f) / f" )
-
- self.assert_compile( ((f - f) / f) - f, "(f - f) / f - f")
- self.assert_compile( (f - f) / (f - f), "(f - f) / (f - f)")
-
- # higher precedence
- self.assert_compile( (f / f) - (f / f), "f / f - f / f")
-
- self.assert_compile( (f / f) - (f - f), "f / f - (f - f)")
- self.assert_compile( (f / f) / (f - f), "(f / f) / (f - f)")
- self.assert_compile( f / (f / (f - f)), "f / (f / (f - f))")
-
def test_delayed_col_naming(self):
my_str = Column(String)
"mytable WHERE mytable.myid = :myid_1",
params = {table1.c.name:'fred'})
+ def test_update_to_expression(self):
+ """test update from an expression.
+
+ this logic is triggered currently by a left side that doesn't
+ have a key. The current supported use case is updating the index
+ of a Postgresql ARRAY type.
+
+ """
+ expr = func.foo(table1.c.myid)
+ assert not hasattr(expr, "key")
+ self.assert_compile(
+ table1.update().values({expr: 'bar'}),
+ "UPDATE mytable SET foo(myid)=:param_1"
+ )
+
def test_correlated_update(self):
# test against a straight text subquery
u = update(table1, values = {
def test_plus(self):
self._do_operate_test(operators.add)
+ def test_no_getitem(self):
+ assert_raises_message(
+ NotImplementedError,
+ "Operator 'getitem' is not supported on this expression",
+ self._do_operate_test, operators.getitem
+ )
+ assert_raises_message(
+ NotImplementedError,
+ "Operator 'getitem' is not supported on this expression",
+ lambda: column('left')[3]
+ )
+
def test_in(self):
left = column('left')
assert left.comparator.operate(operators.in_op, [1, 2, 3]).compare(
def _assert_not_add_override(self, expr):
assert not hasattr(expr, "foob")
+from sqlalchemy import and_, not_, between
+
+class OperatorPrecedenceTest(fixtures.TestBase, testing.AssertsCompiledSQL):
+ __dialect__ = 'default'
+
+ def test_operator_precedence(self):
+ # TODO: clean up /break up
+ metadata = MetaData()
+ table = Table('op', metadata,
+ Column('field', Integer))
+ self.assert_compile(table.select((table.c.field == 5) == None),
+ "SELECT op.field FROM op WHERE (op.field = :field_1) IS NULL")
+ self.assert_compile(table.select((table.c.field + 5) == table.c.field),
+ "SELECT op.field FROM op WHERE op.field + :field_1 = op.field")
+ self.assert_compile(table.select((table.c.field + 5) * 6),
+ "SELECT op.field FROM op WHERE (op.field + :field_1) * :param_1")
+ self.assert_compile(table.select((table.c.field * 5) + 6),
+ "SELECT op.field FROM op WHERE op.field * :field_1 + :param_1")
+ self.assert_compile(table.select(5 + table.c.field.in_([5, 6])),
+ "SELECT op.field FROM op WHERE :param_1 + "
+ "(op.field IN (:field_1, :field_2))")
+ self.assert_compile(table.select((5 + table.c.field).in_([5, 6])),
+ "SELECT op.field FROM op WHERE :field_1 + op.field "
+ "IN (:param_1, :param_2)")
+ self.assert_compile(table.select(not_(and_(table.c.field == 5,
+ table.c.field == 7))),
+ "SELECT op.field FROM op WHERE NOT "
+ "(op.field = :field_1 AND op.field = :field_2)")
+ self.assert_compile(table.select(not_(table.c.field == 5)),
+ "SELECT op.field FROM op WHERE op.field != :field_1")
+ self.assert_compile(table.select(not_(table.c.field.between(5, 6))),
+ "SELECT op.field FROM op WHERE NOT "
+ "(op.field BETWEEN :field_1 AND :field_2)")
+ self.assert_compile(table.select(not_(table.c.field) == 5),
+ "SELECT op.field FROM op WHERE (NOT op.field) = :param_1")
+ self.assert_compile(table.select((table.c.field == table.c.field).\
+ between(False, True)),
+ "SELECT op.field FROM op WHERE (op.field = op.field) "
+ "BETWEEN :param_1 AND :param_2")
+ self.assert_compile(table.select(
+ between((table.c.field == table.c.field), False, True)),
+ "SELECT op.field FROM op WHERE (op.field = op.field) "
+ "BETWEEN :param_1 AND :param_2")
+
+class OperatorAssociativityTest(fixtures.TestBase, testing.AssertsCompiledSQL):
+ __dialect__ = 'default'
+
+ def test_associativity(self):
+ # TODO: clean up /break up
+ f = column('f')
+ self.assert_compile(f - f, "f - f")
+ self.assert_compile(f - f - f, "(f - f) - f")
+
+ self.assert_compile((f - f) - f, "(f - f) - f")
+ self.assert_compile((f - f).label('foo') - f, "(f - f) - f")
+
+ self.assert_compile(f - (f - f), "f - (f - f)")
+ self.assert_compile(f - (f - f).label('foo'), "f - (f - f)")
+
+ # because - less precedent than /
+ self.assert_compile(f / (f - f), "f / (f - f)")
+ self.assert_compile(f / (f - f).label('foo'), "f / (f - f)")
+
+ self.assert_compile(f / f - f, "f / f - f")
+ self.assert_compile((f / f) - f, "f / f - f")
+ self.assert_compile((f / f).label('foo') - f, "f / f - f")
+
+ # because / more precedent than -
+ self.assert_compile(f - (f / f), "f - f / f")
+ self.assert_compile(f - (f / f).label('foo'), "f - f / f")
+ self.assert_compile(f - f / f, "f - f / f")
+ self.assert_compile((f - f) / f, "(f - f) / f")
+
+ self.assert_compile(((f - f) / f) - f, "(f - f) / f - f")
+ self.assert_compile((f - f) / (f - f), "(f - f) / (f - f)")
+
+ # higher precedence
+ self.assert_compile((f / f) - (f / f), "f / f - f / f")
+
+ self.assert_compile((f / f) - (f - f), "f / f - (f - f)")
+ self.assert_compile((f / f) / (f - f), "(f / f) / (f - f)")
+ self.assert_compile(f / (f / (f - f)), "f / (f / (f - f))")
+
+