To specify a specific named sequence to be used for primary key generation,
use the :func:`~sqlalchemy.schema.Sequence` construct::
- Table('sometable', metadata,
+ Table('sometable', metadata,
Column('id', Integer, Sequence('some_id_seq'), primary_key=True)
)
and ``SERIALIZABLE``::
engine = create_engine(
- "postgresql+pg8000://scott:tiger@localhost/test",
+ "postgresql+pg8000://scott:tiger@localhost/test",
isolation_level="READ UNCOMMITTED"
)
remote table matches that of the referencing table, and the "schema" argument
was explicitly stated on the referencing table.
-The best practice here is to not use the ``schema`` argument
+The best practice here is to not use the ``schema`` argument
on :class:`.Table` for any schemas that are present in ``search_path``.
``search_path`` defaults to "public", but care should be taken
to inspect the actual value using::
were also in the ``search_path`` could make an incorrect assumption
if the schemas were explicitly stated on each :class:`.Table`.
-Background on PG's ``search_path`` is at:
+Background on PG's ``search_path`` is at:
http://www.postgresql.org/docs/9.0/static/ddl-schemas.html#DDL-SCHEMAS-PATH
INSERT/UPDATE...RETURNING
Partial Indexes
^^^^^^^^^^^^^^^^
-Partial indexes add criterion to the index definition so that the index is
+Partial indexes add criterion to the index definition so that the index is
applied to a subset of rows. These can be specified on :class:`.Index`
using the ``postgresql_where`` keyword argument::
The :class:`.Index` construct allows these to be specified via the ``postgresql_ops``
keyword argument::
- Index('my_index', my_table.c.id, my_table.c.data,
+ Index('my_index', my_table.c.id, my_table.c.data,
postgresql_ops={
- 'data': 'text_pattern_ops',
+ 'data': 'text_pattern_ops',
'id': 'int4_ops'
- })
+ })
.. versionadded:: 0.7.2
``postgresql_ops`` keyword argument to :class:`.Index` construct.
:param dimensions: if non-None, the ARRAY will assume a fixed
number of dimensions. This will cause the DDL emitted for this
ARRAY to include the exact number of bracket clauses ``[]``,
- and will also optimize the performance of the type overall.
+ and will also optimize the performance of the type overall.
Note that PG arrays are always implicitly "non-dimensioned",
meaning they can store any number of dimensions no matter how
they were declared.
else:
return collection(
self._proc_array(
- x, itemproc,
- dim - 1 if dim is not None else None,
- collection)
+ x, itemproc,
+ dim - 1 if dim is not None else None,
+ collection)
for x in arr
)
return value
else:
return self._proc_array(
- value,
- item_proc,
- self.dimensions,
+ value,
+ item_proc,
+ self.dimensions,
list)
return process
return value
else:
return self._proc_array(
- value,
- item_proc,
- self.dimensions,
+ value,
+ item_proc,
+ self.dimensions,
tuple if self.as_tuple else list)
return process
class ENUM(sqltypes.Enum):
"""Postgresql ENUM type.
-
+
This is a subclass of :class:`.types.Enum` which includes
support for PG's ``CREATE TYPE``.
-
- :class:`~.postgresql.ENUM` is used automatically when
+
+ :class:`~.postgresql.ENUM` is used automatically when
using the :class:`.types.Enum` type on PG assuming
- the ``native_enum`` is left as ``True``. However, the
+ the ``native_enum`` is left as ``True``. However, the
:class:`~.postgresql.ENUM` class can also be instantiated
directly in order to access some additional Postgresql-specific
- options, namely finer control over whether or not
+ options, namely finer control over whether or not
``CREATE TYPE`` should be emitted.
-
- Note that both :class:`.types.Enum` as well as
+
+ Note that both :class:`.types.Enum` as well as
:class:`~.postgresql.ENUM` feature create/drop
methods; the base :class:`.types.Enum` type ultimately
delegates to the :meth:`~.postgresql.ENUM.create` and
:meth:`~.postgresql.ENUM.drop` methods present here.
-
+
"""
def __init__(self, *enums, **kw):
"""Construct an :class:`~.postgresql.ENUM`.
-
+
Arguments are the same as that of
:class:`.types.Enum`, but also including
the following parameters.
-
- :param create_type: Defaults to True.
- Indicates that ``CREATE TYPE`` should be
- emitted, after optionally checking for the
- presence of the type, when the parent
+
+ :param create_type: Defaults to True.
+ Indicates that ``CREATE TYPE`` should be
+ emitted, after optionally checking for the
+ presence of the type, when the parent
table is being created; and additionally
that ``DROP TYPE`` is called when the table
is dropped. When ``False``, no check
are called directly.
Setting to ``False`` is helpful
when invoking a creation scheme to a SQL file
- without access to the actual database -
+ without access to the actual database -
the :meth:`~.postgresql.ENUM.create` and
:meth:`~.postgresql.ENUM.drop` methods can
be used to emit SQL to a target bind.
super(ENUM, self).__init__(*enums, **kw)
def create(self, bind=None, checkfirst=True):
- """Emit ``CREATE TYPE`` for this
+ """Emit ``CREATE TYPE`` for this
:class:`~.postgresql.ENUM`.
-
+
If the underlying dialect does not support
Postgresql CREATE TYPE, no action is taken.
-
+
:param bind: a connectable :class:`.Engine`,
:class:`.Connection`, or similar object to emit
SQL.
- :param checkfirst: if ``True``, a query against
+ :param checkfirst: if ``True``, a query against
the PG catalog will be first performed to see
if the type does not exist already before
creating.
-
+
"""
if not bind.dialect.supports_native_enum:
return
bind.execute(CreateEnumType(self))
def drop(self, bind=None, checkfirst=True):
- """Emit ``DROP TYPE`` for this
+ """Emit ``DROP TYPE`` for this
:class:`~.postgresql.ENUM`.
-
+
If the underlying dialect does not support
Postgresql DROP TYPE, no action is taken.
-
+
:param bind: a connectable :class:`.Engine`,
:class:`.Connection`, or similar object to emit
SQL.
- :param checkfirst: if ``True``, a query against
+ :param checkfirst: if ``True``, a query against
the PG catalog will be first performed to see
if the type actually exists before dropping.
-
+
"""
if not bind.dialect.supports_native_enum:
return
def _check_for_name_in_memos(self, checkfirst, kw):
"""Look in the 'ddl runner' for 'memos', then
note our name in that collection.
-
+
This to ensure a particular named enum is operated
upon only once within any kind of create/drop
sequence without relying upon "checkfirst".
def visit_match_op(self, binary, **kw):
return "%s @@ to_tsquery(%s)" % (
- self.process(binary.left),
+ self.process(binary.left),
self.process(binary.right))
def visit_ilike_op(self, binary, **kw):
escape = binary.modifiers.get("escape", None)
return '%s ILIKE %s' % \
(self.process(binary.left), self.process(binary.right)) \
- + (escape and
+ + (escape and
(' ESCAPE ' + self.render_literal_value(escape, None))
or '')
escape = binary.modifiers.get("escape", None)
return '%s NOT ILIKE %s' % \
(self.process(binary.left), self.process(binary.right)) \
- + (escape and
+ + (escape and
(' ESCAPE ' + self.render_literal_value(escape, None))
or '')
columns = [
self.process(
- self.label_select_column(None, c, asfrom=False),
- within_columns_clause=True,
- result_map=self.result_map)
+ self.label_select_column(None, c, asfrom=False),
+ within_columns_clause=True,
+ result_map=self.result_map)
for c in expression._select_iterables(returning_cols)
]
affinity = None
casts = {
- sqltypes.Date:'date',
- sqltypes.DateTime:'timestamp',
+ sqltypes.Date:'date',
+ sqltypes.DateTime:'timestamp',
sqltypes.Interval:'interval', sqltypes.Time:'time'
}
cast = casts.get(affinity, None)
if isinstance(extract.expr, sql.ColumnElement) and cast is not None:
- expr = extract.expr.op('::')(sql.literal_column(cast))
+ expr = extract.expr.op('::', precedence=100)(
+ sql.literal_column(cast))
else:
expr = extract.expr
return "EXTRACT(%s FROM %s)" % (
column is column.table._autoincrement_column and \
not isinstance(impl_type, sqltypes.SmallInteger) and \
(
- column.default is None or
+ column.default is None or
(
isinstance(column.default, schema.Sequence) and
column.default.optional
text += "(%s)" \
% (
', '.join([
- preparer.format_column(c) +
+ preparer.format_column(c) +
(c.key in ops and (' ' + ops[c.key]) or '')
for c in index.columns])
)
def visit_TIMESTAMP(self, type_):
return "TIMESTAMP%s %s" % (
- getattr(type_, 'precision', None) and "(%d)" %
+ getattr(type_, 'precision', None) and "(%d)" %
type_.precision or "",
(type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE"
)
def visit_TIME(self, type_):
return "TIME%s %s" % (
- getattr(type_, 'precision', None) and "(%d)" %
+ getattr(type_, 'precision', None) and "(%d)" %
type_.precision or "",
(type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE"
)
return "BYTEA"
def visit_ARRAY(self, type_):
- return self.process(type_.item_type) + ('[]' * (type_.dimensions
- if type_.dimensions
+ return self.process(type_.item_type) + ('[]' * (type_.dimensions
+ if type_.dimensions
is not None else 1))
return self._execute_scalar("select %s" %
column.server_default.arg, column.type)
- elif (column.default is None or
+ elif (column.default is None or
(column.default.is_sequence and
column.default.optional)):
- # execute the sequence associated with a SERIAL primary
+ # execute the sequence associated with a SERIAL primary
# key column. for non-primary-key SERIAL, the ID just
# generates server side.
try:
seq_name = column._postgresql_seq_name
except AttributeError:
- tab = column.table.name
- col = column.name
- tab = tab[0:29 + max(0, (29 - len(col)))]
- col = col[0:29 + max(0, (29 - len(tab)))]
+ tab = column.table.name
+ col = column.name
+ tab = tab[0:29 + max(0, (29 - len(col)))]
+ col = col[0:29 + max(0, (29 - len(tab)))]
column._postgresql_seq_name = seq_name = "%s_%s_seq" % (tab, col)
sch = column.table.schema
else:
return None
- _isolation_lookup = set(['SERIALIZABLE',
+ _isolation_lookup = set(['SERIALIZABLE',
'READ UNCOMMITTED', 'READ COMMITTED', 'REPEATABLE READ'])
def set_isolation_level(self, connection, level):
if level not in self._isolation_lookup:
raise exc.ArgumentError(
"Invalid value '%s' for isolation_level. "
- "Valid isolation levels for %s are %s" %
+ "Valid isolation levels for %s are %s" %
(level, self.name, ", ".join(self._isolation_lookup))
- )
+ )
cursor = connection.cursor()
cursor.execute(
"SET SESSION CHARACTERISTICS AS TRANSACTION "
def do_prepare_twophase(self, connection, xid):
connection.execute("PREPARE TRANSACTION '%s'" % xid)
- def do_rollback_twophase(self, connection, xid,
+ def do_rollback_twophase(self, connection, xid,
is_prepared=True, recover=False):
if is_prepared:
if recover:
- #FIXME: ugly hack to get out of transaction
+ #FIXME: ugly hack to get out of transaction
# context when committing recoverable transactions
- # Must find out a way how to make the dbapi not
+ # Must find out a way how to make the dbapi not
# open a transaction.
connection.execute("ROLLBACK")
connection.execute("ROLLBACK PREPARED '%s'" % xid)
else:
self.do_rollback(connection.connection)
- def do_commit_twophase(self, connection, xid,
+ def do_commit_twophase(self, connection, xid,
is_prepared=True, recover=False):
if is_prepared:
if recover:
"n.oid=c.relnamespace where n.nspname=:schema and "
"relname=:name",
bindparams=[
- sql.bindparam('name',
+ sql.bindparam('name',
unicode(table_name), type_=sqltypes.Unicode),
- sql.bindparam('schema',
- unicode(schema), type_=sqltypes.Unicode)]
+ sql.bindparam('schema',
+ unicode(schema), type_=sqltypes.Unicode)]
)
)
return bool(cursor.first())
bindparams=[
sql.bindparam('name', unicode(sequence_name),
type_=sqltypes.Unicode)
- ]
+ ]
)
)
else:
bindparams=[
sql.bindparam('name', unicode(sequence_name),
type_=sqltypes.Unicode),
- sql.bindparam('schema',
+ sql.bindparam('schema',
unicode(schema), type_=sqltypes.Unicode)
]
)
SELECT relname
FROM pg_class c
WHERE relkind = 'v'
- AND '%(schema)s' = (select nspname from pg_namespace n
+ AND '%(schema)s' = (select nspname from pg_namespace n
where n.oid = c.relnamespace)
""" % dict(schema=current_schema)
# Py3K
#view_names = [row[0] for row in connection.execute(s)]
# Py2K
- view_names = [row[0].decode(self.encoding)
+ view_names = [row[0].decode(self.encoding)
for row in connection.execute(s)]
# end Py2K
return view_names
SQL_COLS = """
SELECT a.attname,
pg_catalog.format_type(a.atttypid, a.atttypmod),
- (SELECT substring(pg_catalog.pg_get_expr(d.adbin, d.adrelid)
- for 128)
+ (SELECT substring(pg_catalog.pg_get_expr(d.adbin, d.adrelid)
+ for 128)
FROM pg_catalog.pg_attrdef d
- WHERE d.adrelid = a.attrelid AND d.adnum = a.attnum
+ WHERE d.adrelid = a.attrelid AND d.adnum = a.attnum
AND a.atthasdef)
AS DEFAULT,
a.attnotnull, a.attnum, a.attrelid as table_oid
AND a.attnum > 0 AND NOT a.attisdropped
ORDER BY a.attnum
"""
- s = sql.text(SQL_COLS,
- bindparams=[sql.bindparam('table_oid', type_=sqltypes.Integer)],
+ s = sql.text(SQL_COLS,
+ bindparams=[sql.bindparam('table_oid', type_=sqltypes.Integer)],
typemap={'attname':sqltypes.Unicode, 'default':sqltypes.Unicode}
)
c = connection.execute(s, table_oid=table_oid)
# format columns
columns = []
for name, format_type, default, notnull, attnum, table_oid in rows:
- ## strip (5) from character varying(5), timestamp(5)
+ ## strip (5) from character varying(5), timestamp(5)
# with time zone, etc
attype = re.sub(r'\([\d,]+\)', '', format_type)
args = (53, )
elif attype == 'integer':
args = ()
- elif attype in ('timestamp with time zone',
+ elif attype in ('timestamp with time zone',
'time with time zone'):
kwargs['timezone'] = True
if charlen:
kwargs['precision'] = int(charlen)
args = ()
- elif attype in ('timestamp without time zone',
+ elif attype in ('timestamp without time zone',
'time without time zone', 'time'):
kwargs['timezone'] = False
if charlen:
# A table can't override whether the domain is nullable.
nullable = domain['nullable']
if domain['default'] and not default:
- # It can, however, override the default
+ # It can, however, override the default
# value, but can't set it to null.
default = domain['default']
continue
sch = schema
if '.' not in match.group(2) and sch is not None:
# unconditionally quote the schema name. this could
- # later be enhanced to obey quoting rules /
+ # later be enhanced to obey quoting rules /
# "quote schema"
default = match.group(1) + \
('"%s"' % sch) + '.' + \
PK_SQL = """
SELECT a.attname
- FROM
+ FROM
pg_class t
join pg_index ix on t.oid = ix.indrelid
- join pg_attribute a
+ join pg_attribute a
on t.oid=a.attrelid and a.attnum=ANY(ix.indkey)
WHERE
t.oid = :table_oid and
info_cache=kw.get('info_cache'))
FK_SQL = """
- SELECT r.conname,
+ SELECT r.conname,
pg_catalog.pg_get_constraintdef(r.oid, true) as condef,
n.nspname as conschema
FROM pg_catalog.pg_constraint r,
pg_namespace n,
pg_class c
- WHERE r.conrelid = :table AND
+ WHERE r.conrelid = :table AND
r.contype = 'f' AND
c.oid = confrelid AND
n.oid = c.relnamespace
'(?:(.*?)\.)?(.*?)\((.*?)\)', condef).groups()
constrained_columns, referred_schema, \
referred_table, referred_columns = m
- constrained_columns = [preparer._unquote_identifier(x)
+ constrained_columns = [preparer._unquote_identifier(x)
for x in re.split(r'\s*,\s*', constrained_columns)]
if referred_schema:
# and an explicit schema was given for the referencing table.
referred_schema = schema
referred_table = preparer._unquote_identifier(referred_table)
- referred_columns = [preparer._unquote_identifier(x)
+ referred_columns = [preparer._unquote_identifier(x)
for x in re.split(r'\s*,\s', referred_columns)]
fkey_d = {
'name' : conname,
ix.indisunique, ix.indexprs, ix.indpred,
a.attname
FROM
- pg_class t
+ pg_class t
join pg_index ix on t.oid = ix.indrelid
join pg_class i on i.oid=ix.indexrelid
- left outer join
- pg_attribute a
+ left outer join
+ pg_attribute a
on t.oid=a.attrelid and a.attnum=ANY(ix.indkey)
WHERE
t.relkind = 'r'
SQL_ENUMS = """
SELECT t.typname as "name",
-- no enum defaults in 8.4 at least
- -- t.typdefault as "default",
+ -- t.typdefault as "default",
pg_catalog.pg_type_is_visible(t.oid) as "visible",
n.nspname as "schema",
e.enumlabel as "label"
name = "%s.%s" % (domain['schema'], domain['name'])
domains[name] = {
- 'attype':attype,
- 'nullable': domain['nullable'],
+ 'attype':attype,
+ 'nullable': domain['nullable'],
'default': domain['default']
}
from ..util import symbol
+
class Operators(object):
"""Base of comparison and logical operators.
-
+
Implements base methods :meth:`operate` and :meth:`reverse_operate`,
as well as :meth:`__and__`, :meth:`__or__`, :meth:`__invert__`.
-
+
Usually is used via its most common subclass
:class:`.ColumnOperators`.
-
+
"""
def __and__(self, other):
"""Implement the ``&`` operator.
-
+
When used with SQL expressions, results in an
AND operation, equivalent to
:func:`~.expression.and_`, that is::
-
+
a & b
-
+
is equivalent to::
-
+
from sqlalchemy import and_
and_(a, b)
operator precedence; the ``&`` operator has the highest precedence.
The operands should be enclosed in parenthesis if they contain
further sub expressions::
-
+
(a == 2) & (b == 4)
"""
def __or__(self, other):
"""Implement the ``|`` operator.
-
+
When used with SQL expressions, results in an
OR operation, equivalent to
:func:`~.expression.or_`, that is::
-
+
a | b
-
+
is equivalent to::
-
+
from sqlalchemy import or_
or_(a, b)
operator precedence; the ``|`` operator has the highest precedence.
The operands should be enclosed in parenthesis if they contain
further sub expressions::
-
+
(a == 2) | (b == 4)
"""
def __invert__(self):
"""Implement the ``~`` operator.
-
- When used with SQL expressions, results in a
- NOT operation, equivalent to
+
+ When used with SQL expressions, results in a
+ NOT operation, equivalent to
:func:`~.expression.not_`, that is::
-
+
~a
-
+
is equivalent to::
-
+
from sqlalchemy import not_
not_(a)
"""
return self.operate(inv)
- def op(self, opstring):
+ def op(self, opstring, precedence=0):
"""produce a generic operator function.
e.g.::
somecolumn * 5
- :param operator: a string which will be output as the infix operator
- between this :class:`.ClauseElement` and the expression passed to the
- generated function.
-
This function can also be used to make bitwise operators explicit. For
example::
somecolumn.op('&')(0xff)
- is a bitwise AND of the value in somecolumn.
+ is a bitwise AND of the value in ``somecolumn``.
+
+ :param operator: a string which will be output as the infix operator
+ between this :class:`.ClauseElement` and the expression passed to the
+ generated function.
+
+ :param precedence: precedence to apply to the operator, when
+ parenthesizing expressions. A lower number will cause the expression
+ to be parenthesized when applied against another operator with
+ higher precedence. The default value of ``0`` is lower than all
+ operators except for the comma (``,``) and ``AS`` operators.
+ A value of 100 will be higher or equal to all operators, and -100
+ will be lower than or equal to all operators.
+
+ .. versionadded:: 0.8 - added the 'precedence' argument.
"""
- def _op(b):
- return self.operate(op, opstring, b)
- return _op
+ operator = custom_op(opstring, precedence)
+
+ def against(other):
+ return operator(self, other)
+ return against
def operate(self, op, *other, **kwargs):
"""Operate on an argument.
-
+
This is the lowest level of operation, raises
:class:`NotImplementedError` by default.
-
- Overriding this on a subclass can allow common
- behavior to be applied to all operations.
+
+ Overriding this on a subclass can allow common
+ behavior to be applied to all operations.
For example, overriding :class:`.ColumnOperators`
- to apply ``func.lower()`` to the left and right
+ to apply ``func.lower()`` to the left and right
side::
-
+
class MyComparator(ColumnOperators):
def operate(self, op, other):
return op(func.lower(self), func.lower(other))
be a single scalar for most operations.
:param \**kwargs: modifiers. These may be passed by special
operators such as :meth:`ColumnOperators.contains`.
-
-
+
+
"""
raise NotImplementedError(str(op))
def reverse_operate(self, op, other, **kwargs):
"""Reverse operate on an argument.
-
+
Usage is the same as :meth:`operate`.
-
+
"""
raise NotImplementedError(str(op))
+
+class custom_op(object):
+ __name__ = 'custom_op'
+
+ def __init__(self, opstring, precedence=0):
+ self.opstring = opstring
+ self.precedence = precedence
+
+ def __call__(self, left, right, **kw):
+ return left.operate(self, right, **kw)
+
+
class ColumnOperators(Operators):
"""Defines comparison and math operations.
-
+
By default all methods call down to
:meth:`Operators.operate` or :meth:`Operators.reverse_operate`
- passing in the appropriate operator function from the
+ passing in the appropriate operator function from the
Python builtin ``operator`` module or
- a SQLAlchemy-specific operator function from
+ a SQLAlchemy-specific operator function from
:mod:`sqlalchemy.expression.operators`. For example
the ``__eq__`` function::
-
+
def __eq__(self, other):
return self.operate(operators.eq, other)
Where ``operators.eq`` is essentially::
-
+
def eq(a, b):
return a == b
-
+
A SQLAlchemy construct like :class:`.ColumnElement` ultimately
overrides :meth:`.Operators.operate` and others
- to return further :class:`.ClauseElement` constructs,
+ to return further :class:`.ClauseElement` constructs,
so that the ``==`` operation above is replaced by a clause
construct.
-
+
The docstrings here will describe column-oriented
behavior of each operator. For ORM-based operators
on related objects and collections, see :class:`.RelationshipProperty.Comparator`.
-
+
"""
timetuple = None
def __lt__(self, other):
"""Implement the ``<`` operator.
-
+
In a column context, produces the clause ``a < b``.
-
+
"""
return self.operate(lt, other)
def __le__(self, other):
"""Implement the ``<=`` operator.
-
+
In a column context, produces the clause ``a <= b``.
-
+
"""
return self.operate(le, other)
def __eq__(self, other):
"""Implement the ``==`` operator.
-
+
In a column context, produces the clause ``a = b``.
If the target is ``None``, produces ``a IS NULL``.
In a column context, produces the clause ``a != b``.
If the target is ``None``, produces ``a IS NOT NULL``.
-
+
"""
return self.operate(ne, other)
def __gt__(self, other):
"""Implement the ``>`` operator.
-
+
In a column context, produces the clause ``a > b``.
-
+
"""
return self.operate(gt, other)
def __ge__(self, other):
"""Implement the ``>=`` operator.
-
+
In a column context, produces the clause ``a >= b``.
-
+
"""
return self.operate(ge, other)
def __neg__(self):
"""Implement the ``-`` operator.
-
+
In a column context, produces the clause ``-a``.
-
+
"""
return self.operate(neg)
def concat(self, other):
"""Implement the 'concat' operator.
-
+
In a column context, produces the clause ``a || b``,
or uses the ``concat()`` operator on MySQL.
-
+
"""
return self.operate(concat_op, other)
def like(self, other, escape=None):
"""Implement the ``like`` operator.
-
+
In a column context, produces the clause ``a LIKE other``.
-
+
"""
return self.operate(like_op, other, escape=escape)
def ilike(self, other, escape=None):
"""Implement the ``ilike`` operator.
-
+
In a column context, produces the clause ``a ILIKE other``.
-
+
"""
return self.operate(ilike_op, other, escape=escape)
def in_(self, other):
"""Implement the ``in`` operator.
-
+
In a column context, produces the clause ``a IN other``.
"other" may be a tuple/list of column expressions,
or a :func:`~.expression.select` construct.
-
+
"""
return self.operate(in_op, other)
"""Implement the ``startwith`` operator.
In a column context, produces the clause ``LIKE '<other>%'``
-
+
"""
return self.operate(startswith_op, other, **kwargs)
def endswith(self, other, **kwargs):
"""Implement the 'endswith' operator.
-
+
In a column context, produces the clause ``LIKE '%<other>'``
-
+
"""
return self.operate(endswith_op, other, **kwargs)
def contains(self, other, **kwargs):
"""Implement the 'contains' operator.
-
+
In a column context, produces the clause ``LIKE '%<other>%'``
-
+
"""
return self.operate(contains_op, other, **kwargs)
def match(self, other, **kwargs):
"""Implements the 'match' operator.
-
- In a column context, this produces a MATCH clause, i.e.
- ``MATCH '<other>'``. The allowed contents of ``other``
+
+ In a column context, this produces a MATCH clause, i.e.
+ ``MATCH '<other>'``. The allowed contents of ``other``
are database backend specific.
"""
"""Implement the ``+`` operator in reverse.
See :meth:`__add__`.
-
+
"""
return self.reverse_operate(add, other)
"""Implement the ``-`` operator in reverse.
See :meth:`__sub__`.
-
+
"""
return self.reverse_operate(sub, other)
"""Implement the ``*`` operator in reverse.
See :meth:`__mul__`.
-
+
"""
return self.reverse_operate(mul, other)
"""Implement the ``/`` operator in reverse.
See :meth:`__div__`.
-
+
"""
return self.reverse_operate(div, other)
def __add__(self, other):
"""Implement the ``+`` operator.
-
+
In a column context, produces the clause ``a + b``
if the parent object has non-string affinity.
- If the parent object has a string affinity,
+ If the parent object has a string affinity,
produces the concatenation operator, ``a || b`` -
see :meth:`concat`.
-
+
"""
return self.operate(add, other)
def __sub__(self, other):
"""Implement the ``-`` operator.
-
+
In a column context, produces the clause ``a - b``.
-
+
"""
return self.operate(sub, other)
def __mul__(self, other):
"""Implement the ``*`` operator.
-
+
In a column context, produces the clause ``a * b``.
-
+
"""
return self.operate(mul, other)
def __div__(self, other):
"""Implement the ``/`` operator.
-
+
In a column context, produces the clause ``a / b``.
-
+
"""
return self.operate(div, other)
def __mod__(self, other):
"""Implement the ``%`` operator.
-
+
In a column context, produces the clause ``a % b``.
-
+
"""
return self.operate(mod, other)
def __truediv__(self, other):
"""Implement the ``//`` operator.
-
+
In a column context, produces the clause ``a / b``.
-
+
"""
return self.operate(truediv, other)
def __rtruediv__(self, other):
"""Implement the ``//`` operator in reverse.
-
+
See :meth:`__truediv__`.
-
+
"""
return self.reverse_operate(truediv, other)
return op in _commutative
def is_ordering_modifier(op):
- return op in (asc_op, desc_op,
+ return op in (asc_op, desc_op,
nullsfirst_op, nullslast_op)
_associative = _commutative.union([concat_op, and_, or_])
-_smallest = symbol('_smallest')
-_largest = symbol('_largest')
+_smallest = symbol('_smallest', canonical=-100)
+_largest = symbol('_largest', canonical=100)
_PRECEDENCE = {
from_: 15,
collate: 7,
as_: -1,
exists: 0,
- _smallest: -1000,
- _largest: 1000
+ _smallest: _smallest,
+ _largest: _largest
}
+
def is_precedent(operator, against):
if operator is against and operator in _associative:
return False
else:
- return (_PRECEDENCE.get(operator, _PRECEDENCE[_smallest]) <=
- _PRECEDENCE.get(against, _PRECEDENCE[_largest]))
+ return (_PRECEDENCE.get(operator,
+ getattr(operator, 'precedence', _smallest)) <=
+ _PRECEDENCE.get(against,
+ getattr(against, 'precedence', _largest)))
assert not hasattr(select([table1.c.myid]).as_scalar(), 'columns')
def test_table_select(self):
- self.assert_compile(table1.select(),
+ self.assert_compile(table1.select(),
"SELECT mytable.myid, mytable.name, "
"mytable.description FROM mytable")
- self.assert_compile(select([table1, table2]),
+ self.assert_compile(select([table1, table2]),
"SELECT mytable.myid, mytable.name, mytable.description, "
"myothertable.otherid, myothertable.othername FROM mytable, "
"myothertable")
def test_limit_offset(self):
for lim, offset, exp, params in [
- (5, 10, "LIMIT :param_1 OFFSET :param_2",
+ (5, 10, "LIMIT :param_1 OFFSET :param_2",
{'param_1':5, 'param_2':10}),
(None, 10, "LIMIT -1 OFFSET :param_1", {'param_1':10}),
(5, None, "LIMIT :param_1", {'param_1':5}),
- (0, 0, "LIMIT :param_1 OFFSET :param_2",
+ (0, 0, "LIMIT :param_1 OFFSET :param_2",
{'param_1':0, 'param_2':0}),
]:
self.assert_compile(
"myothertable.otherid = mytable.myid"
self.assert_compile(
- sq.select(),
+ sq.select(),
"SELECT sq.mytable_myid, sq.mytable_name, "
"sq.mytable_description, sq.myothertable_otherid, "
"sq.myothertable_othername FROM (%s) AS sq" % sqstring)
).alias('sq2')
self.assert_compile(
- sq2.select(),
+ sq2.select(),
"SELECT sq2.sq_mytable_myid, sq2.sq_mytable_name, "
"sq2.sq_mytable_description, sq2.sq_myothertable_otherid, "
"sq2.sq_myothertable_othername FROM (SELECT sq.mytable_myid AS "
def test_select_from_clauselist(self):
self.assert_compile(
- select([ClauseList(column('a'), column('b'))]).select_from('sometable'),
+ select([ClauseList(column('a'), column('b'))]).select_from('sometable'),
'SELECT a, b FROM sometable'
)
)
def test_dupe_columns(self):
- """test that deduping is performed against clause
+ """test that deduping is performed against clause
element identity, not rendered result."""
self.assert_compile(
, dialect=default.DefaultDialect()
)
- # using alternate keys.
+ # using alternate keys.
a, b, c = Column('a', Integer, key='b'), \
Column('b', Integer), \
Column('c', Integer, key='a')
def test_exists(self):
s = select([table1.c.myid]).where(table1.c.myid==5)
- self.assert_compile(exists(s),
+ self.assert_compile(exists(s),
"EXISTS (SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)"
)
- self.assert_compile(exists(s.as_scalar()),
+ self.assert_compile(exists(s.as_scalar()),
"EXISTS (SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)"
)
':param_1')
self.assert_compile(
- label('bar', column('foo', type_=String))+ 'foo',
+ label('bar', column('foo', type_=String))+ 'foo',
'foo || :param_1')
)
self.assert_compile(
- and_(table1.c.myid == 12, table1.c.name=='asdf',
+ and_(table1.c.myid == 12, table1.c.name=='asdf',
table2.c.othername == 'foo', "sysdate() = today()"),
"mytable.myid = :myid_1 AND mytable.name = :name_1 "\
"AND myothertable.othername = :othername_1 AND sysdate() = today()"
self.assert_compile(
and_(
table1.c.myid == 12,
- or_(table2.c.othername=='asdf',
+ or_(table2.c.othername=='asdf',
table2.c.othername == 'foo', table2.c.otherid == 9),
"sysdate() = today()",
),
'mytable.myid = :myid_1 AND (myothertable.othername = '
':othername_1 OR myothertable.othername = :othername_2 OR '
'myothertable.otherid = :otherid_1) AND sysdate() = '
- 'today()',
+ 'today()',
checkparams = {'othername_1': 'asdf', 'othername_2':'foo', 'otherid_1': 9, 'myid_1': 12}
)
t = table('t', column('x'))
self.assert_compile(
- select([t]).where(and_(t.c.x==5,
+ select([t]).where(and_(t.c.x==5,
or_(and_(or_(t.c.x==7))))),
"SELECT t.x FROM t WHERE t.x = :x_1 AND t.x = :x_2"
)
self.assert_compile(
- select([t]).where(and_(or_(t.c.x==12,
+ select([t]).where(and_(or_(t.c.x==12,
and_(or_(t.c.x==8))))),
"SELECT t.x FROM t WHERE t.x = :x_1 OR t.x = :x_2"
)
self.assert_compile(
- select([t]).where(and_(or_(or_(t.c.x==12),
+ select([t]).where(and_(or_(or_(t.c.x==12),
and_(or_(), or_(and_(t.c.x==8)), and_())))),
"SELECT t.x FROM t WHERE t.x = :x_1 OR t.x = :x_2"
)
def test_distinct(self):
self.assert_compile(
- select([table1.c.myid.distinct()]),
+ select([table1.c.myid.distinct()]),
"SELECT DISTINCT mytable.myid FROM mytable"
)
self.assert_compile(
- select([distinct(table1.c.myid)]),
+ select([distinct(table1.c.myid)]),
"SELECT DISTINCT mytable.myid FROM mytable"
)
self.assert_compile(
- select([table1.c.myid]).distinct(),
+ select([table1.c.myid]).distinct(),
"SELECT DISTINCT mytable.myid FROM mytable"
)
self.assert_compile(
- select([func.count(table1.c.myid.distinct())]),
+ select([func.count(table1.c.myid.distinct())]),
"SELECT count(DISTINCT mytable.myid) AS count_1 FROM mytable"
)
self.assert_compile(
- select([func.count(distinct(table1.c.myid))]),
+ select([func.count(distinct(table1.c.myid))]),
"SELECT count(DISTINCT mytable.myid) AS count_1 FROM mytable"
)
- def test_operators(self):
+ def test_math_operators(self):
for (py_op, sql_op) in ((operator.add, '+'), (operator.mul, '*'),
- (operator.sub, '-'),
+ (operator.sub, '-'),
# Py3K
#(operator.truediv, '/'),
# Py2K
):
self.assert_compile(py_op(lhs, rhs), res % sql_op)
+ def test_comparison_operators(self):
dt = datetime.datetime.today()
# exercise comparison operators
for (py_op, fwd_op, rev_op) in ((operator.lt, '<', '>'),
"\n'" + compiled + "'\n does not match\n'" +
fwd_sql + "'\n or\n'" + rev_sql + "'")
+ def test_negate_operators_1(self):
for (py_op, op) in (
(operator.neg, '-'),
(operator.inv, 'NOT '),
sql = "%s%s" % (op, sql)
eq_(compiled, sql)
+ def test_negate_operators_2(self):
self.assert_compile(
table1.select((table1.c.myid != 12) & ~(table1.c.name=='john')),
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable WHERE mytable.myid != :myid_1 AND mytable.name != :name_1"
)
+ def test_negate_operators_3(self):
self.assert_compile(
- table1.select((table1.c.myid != 12) &
+ table1.select((table1.c.myid != 12) &
~(table1.c.name.between('jack','john'))),
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable WHERE mytable.myid != :myid_1 AND "\
"NOT (mytable.name BETWEEN :name_1 AND :name_2)"
)
+ def test_negate_operators_4(self):
self.assert_compile(
- table1.select((table1.c.myid != 12) &
+ table1.select((table1.c.myid != 12) &
~and_(table1.c.name=='john', table1.c.name=='ed', table1.c.name=='fred')),
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable WHERE mytable.myid != :myid_1 AND "\
"AND mytable.name = :name_3)"
)
+ def test_negate_operators_5(self):
self.assert_compile(
table1.select((table1.c.myid != 12) & ~table1.c.name),
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable WHERE mytable.myid != :myid_1 AND NOT mytable.name"
)
+ def test_commutative_operators(self):
self.assert_compile(
literal("a") + literal("b") * literal("c"), ":param_1 || :param_2 * :param_3"
)
- # test the op() function, also that its results are further usable in expressions
+ def test_op_operators(self):
self.assert_compile(
- table1.select(table1.c.myid.op('hoho')(12)==14),
+ table1.select(table1.c.myid.op('hoho')(12) == 14),
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable WHERE (mytable.myid hoho :myid_1) = :param_1"
)
- # test that clauses can be pickled (operators need to be module-level, etc.)
+ def test_op_operators_comma_precedence(self):
+ self.assert_compile(
+ func.foo(table1.c.myid.op('hoho')(12)),
+ "foo(mytable.myid hoho :myid_1)"
+ )
+
+ def test_op_operators_comparison_precedence(self):
+ self.assert_compile(
+ table1.c.myid.op('hoho')(12) == 5,
+ "(mytable.myid hoho :myid_1) = :param_1"
+ )
+
+ def test_op_operators_custom_precedence(self):
+ op1 = table1.c.myid.op('hoho', precedence=5)
+ op2 = op1(5).op('lala', precedence=4)(4)
+ op3 = op1(5).op('lala', precedence=6)(4)
+
+ self.assert_compile(op2, "mytable.myid hoho :myid_1 lala :param_1")
+ self.assert_compile(op3, "(mytable.myid hoho :myid_1) lala :param_1")
+
+ def test_pickle_operators(self):
clause = (table1.c.myid == 12) & table1.c.myid.between(15, 20) & \
table1.c.myid.like('hoho')
- assert str(clause) == str(util.pickle.loads(util.pickle.dumps(clause)))
+ eq_(str(clause), str(util.pickle.loads(util.pickle.dumps(clause))))
def test_like(self):
for expr, check, dialect in [
(
- table1.c.myid.like('somstr'),
+ table1.c.myid.like('somstr'),
"mytable.myid LIKE :myid_1", None),
(
- ~table1.c.myid.like('somstr'),
+ ~table1.c.myid.like('somstr'),
"mytable.myid NOT LIKE :myid_1", None),
(
- table1.c.myid.like('somstr', escape='\\'),
- "mytable.myid LIKE :myid_1 ESCAPE '\\'",
+ table1.c.myid.like('somstr', escape='\\'),
+ "mytable.myid LIKE :myid_1 ESCAPE '\\'",
None),
(
- ~table1.c.myid.like('somstr', escape='\\'),
- "mytable.myid NOT LIKE :myid_1 ESCAPE '\\'",
+ ~table1.c.myid.like('somstr', escape='\\'),
+ "mytable.myid NOT LIKE :myid_1 ESCAPE '\\'",
None),
(
- table1.c.myid.ilike('somstr', escape='\\'),
- "lower(mytable.myid) LIKE lower(:myid_1) ESCAPE '\\'",
+ table1.c.myid.ilike('somstr', escape='\\'),
+ "lower(mytable.myid) LIKE lower(:myid_1) ESCAPE '\\'",
None),
(
- ~table1.c.myid.ilike('somstr', escape='\\'),
- "lower(mytable.myid) NOT LIKE lower(:myid_1) ESCAPE '\\'",
+ ~table1.c.myid.ilike('somstr', escape='\\'),
+ "lower(mytable.myid) NOT LIKE lower(:myid_1) ESCAPE '\\'",
None),
(
- table1.c.myid.ilike('somstr', escape='\\'),
- "mytable.myid ILIKE %(myid_1)s ESCAPE '\\\\'",
+ table1.c.myid.ilike('somstr', escape='\\'),
+ "mytable.myid ILIKE %(myid_1)s ESCAPE '\\\\'",
postgresql.PGDialect()),
(
- ~table1.c.myid.ilike('somstr', escape='\\'),
- "mytable.myid NOT ILIKE %(myid_1)s ESCAPE '\\\\'",
+ ~table1.c.myid.ilike('somstr', escape='\\'),
+ "mytable.myid NOT ILIKE %(myid_1)s ESCAPE '\\\\'",
postgresql.PGDialect()),
(
- table1.c.name.ilike('%something%'),
+ table1.c.name.ilike('%something%'),
"lower(mytable.name) LIKE lower(:name_1)", None),
(
- table1.c.name.ilike('%something%'),
+ table1.c.name.ilike('%something%'),
"mytable.name ILIKE %(name_1)s", postgresql.PGDialect()),
(
- ~table1.c.name.ilike('%something%'),
+ ~table1.c.name.ilike('%something%'),
"lower(mytable.name) NOT LIKE lower(:name_1)", None),
(
- ~table1.c.name.ilike('%something%'),
- "mytable.name NOT ILIKE %(name_1)s",
+ ~table1.c.name.ilike('%something%'),
+ "mytable.name NOT ILIKE %(name_1)s",
postgresql.PGDialect()),
]:
self.assert_compile(expr, check, dialect=dialect)
def test_match(self):
for expr, check, dialect in [
- (table1.c.myid.match('somstr'),
+ (table1.c.myid.match('somstr'),
"mytable.myid MATCH ?", sqlite.SQLiteDialect()),
- (table1.c.myid.match('somstr'),
- "MATCH (mytable.myid) AGAINST (%s IN BOOLEAN MODE)",
+ (table1.c.myid.match('somstr'),
+ "MATCH (mytable.myid) AGAINST (%s IN BOOLEAN MODE)",
mysql.dialect()),
- (table1.c.myid.match('somstr'),
- "CONTAINS (mytable.myid, :myid_1)",
+ (table1.c.myid.match('somstr'),
+ "CONTAINS (mytable.myid, :myid_1)",
mssql.dialect()),
- (table1.c.myid.match('somstr'),
- "mytable.myid @@ to_tsquery(%(myid_1)s)",
+ (table1.c.myid.match('somstr'),
+ "mytable.myid @@ to_tsquery(%(myid_1)s)",
postgresql.dialect()),
- (table1.c.myid.match('somstr'),
- "CONTAINS (mytable.myid, :myid_1)",
+ (table1.c.myid.match('somstr'),
+ "CONTAINS (mytable.myid, :myid_1)",
oracle.dialect()),
]:
self.assert_compile(expr, check, dialect=dialect)
def test_composed_string_comparators(self):
self.assert_compile(
- table1.c.name.contains('jo'),
- "mytable.name LIKE '%%' || :name_1 || '%%'" ,
+ table1.c.name.contains('jo'),
+ "mytable.name LIKE '%%' || :name_1 || '%%'" ,
checkparams = {'name_1': u'jo'},
)
self.assert_compile(
- table1.c.name.contains('jo'),
- "mytable.name LIKE concat(concat('%%', %s), '%%')" ,
+ table1.c.name.contains('jo'),
+ "mytable.name LIKE concat(concat('%%', %s), '%%')" ,
checkparams = {'name_1': u'jo'},
dialect=mysql.dialect()
)
self.assert_compile(
- table1.c.name.contains('jo', escape='\\'),
- "mytable.name LIKE '%%' || :name_1 || '%%' ESCAPE '\\'" ,
+ table1.c.name.contains('jo', escape='\\'),
+ "mytable.name LIKE '%%' || :name_1 || '%%' ESCAPE '\\'" ,
checkparams = {'name_1': u'jo'},
)
self.assert_compile(
- table1.c.name.startswith('jo', escape='\\'),
+ table1.c.name.startswith('jo', escape='\\'),
"mytable.name LIKE :name_1 || '%%' ESCAPE '\\'" )
self.assert_compile(
- table1.c.name.endswith('jo', escape='\\'),
+ table1.c.name.endswith('jo', escape='\\'),
"mytable.name LIKE '%%' || :name_1 ESCAPE '\\'" )
self.assert_compile(
- table1.c.name.endswith('hn'),
- "mytable.name LIKE '%%' || :name_1",
+ table1.c.name.endswith('hn'),
+ "mytable.name LIKE '%%' || :name_1",
checkparams = {'name_1': u'hn'}, )
self.assert_compile(
- table1.c.name.endswith('hn'),
+ table1.c.name.endswith('hn'),
"mytable.name LIKE concat('%%', %s)",
checkparams = {'name_1': u'hn'}, dialect=mysql.dialect()
)
self.assert_compile(
- table1.c.name.startswith(u"hi \xf6 \xf5"),
+ table1.c.name.startswith(u"hi \xf6 \xf5"),
"mytable.name LIKE :name_1 || '%%'",
checkparams = {'name_1': u'hi \xf6 \xf5'},
)
self.assert_compile(
- column('name').endswith(text("'foo'")),
+ column('name').endswith(text("'foo'")),
"name LIKE '%%' || 'foo'" )
self.assert_compile(
- column('name').endswith(literal_column("'foo'")),
+ column('name').endswith(literal_column("'foo'")),
"name LIKE '%%' || 'foo'" )
self.assert_compile(
- column('name').startswith(text("'foo'")),
+ column('name').startswith(text("'foo'")),
"name LIKE 'foo' || '%%'" )
self.assert_compile(
column('name').startswith(text("'foo'")),
"name LIKE concat('foo', '%%')", dialect=mysql.dialect())
self.assert_compile(
- column('name').startswith(literal_column("'foo'")),
+ column('name').startswith(literal_column("'foo'")),
"name LIKE 'foo' || '%%'" )
self.assert_compile(
- column('name').startswith(literal_column("'foo'")),
+ column('name').startswith(literal_column("'foo'")),
"name LIKE concat('foo', '%%')", dialect=mysql.dialect())
def test_multiple_col_binds(self):
self.assert_compile(
select(
- [table2.c.othername, func.count(table2.c.otherid)],
+ [table2.c.othername, func.count(table2.c.otherid)],
group_by = [table2.c.othername]),
"SELECT myothertable.othername, count(myothertable.otherid) AS count_1 "
"FROM myothertable GROUP BY myothertable.othername"
)
self.assert_compile(
- select([table2.c.othername, func.count(table2.c.otherid)],
- group_by = [table2.c.othername],
+ select([table2.c.othername, func.count(table2.c.otherid)],
+ group_by = [table2.c.othername],
order_by = [table2.c.othername]),
"SELECT myothertable.othername, count(myothertable.otherid) AS count_1 "
"FROM myothertable GROUP BY myothertable.othername ORDER BY myothertable.othername"
"SELECT mytable_1.myid, mytable_1.name, mytable_1.description "
"FROM mytable AS mytable_1")
- # create a select for a join of two tables. use_labels
- # means the column names will have labels tablename_columnname,
+ # create a select for a join of two tables. use_labels
+ # means the column names will have labels tablename_columnname,
# which become the column keys accessible off the Selectable object.
- # also, only use one column from the second table and all columns
+ # also, only use one column from the second table and all columns
# from the first table1.
q = select(
- [table1, table2.c.otherid],
+ [table1, table2.c.otherid],
table1.c.myid == table2.c.otherid, use_labels = True
)
- # make an alias of the "selectable". column names
+ # make an alias of the "selectable". column names
# stay the same (i.e. the labels), table name "changes" to "t2view".
a = alias(q, 't2view')
[u"foobar(a)", u"pk_foo_bar(syslaal)"],
u"a = 12",
from_obj = [u"foobar left outer join lala on foobar.foo = lala.foo"]
- ),
+ ),
"SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar "
"left outer join lala on foobar.foo = lala.foo WHERE a = 12"
)
def test_binds_in_text(self):
self.assert_compile(
- text("select * from foo where lala=:bar and hoho=:whee",
+ text("select * from foo where lala=:bar and hoho=:whee",
bindparams=[bindparam('bar', 4), bindparam('whee', 7)]),
"select * from foo where lala=:bar and hoho=:whee",
checkparams={'bar':4, 'whee': 7},
dialect = postgresql.dialect()
self.assert_compile(
- text("select * from foo where lala=:bar and hoho=:whee",
+ text("select * from foo where lala=:bar and hoho=:whee",
bindparams=[bindparam('bar',4), bindparam('whee',7)]),
"select * from foo where lala=%(bar)s and hoho=%(whee)s",
checkparams={'bar':4, 'whee': 7},
dialect = sqlite.dialect()
self.assert_compile(
- text("select * from foo where lala=:bar and hoho=:whee",
+ text("select * from foo where lala=:bar and hoho=:whee",
bindparams=[bindparam('bar',4), bindparam('whee',7)]),
"select * from foo where lala=? and hoho=?",
checkparams={'bar':4, 'whee':7},
# test Text embedded within select_from(), using binds
generate_series = text(
- "generate_series(:x, :y, :z) as s(a)",
+ "generate_series(:x, :y, :z) as s(a)",
bindparams=[bindparam('x'), bindparam('y'), bindparam('z')]
)
(func.current_date() + literal_column("s.a")).label("dates")
]).select_from(generate_series)
self.assert_compile(
- s,
- "SELECT CURRENT_DATE + s.a AS dates FROM generate_series(:x, :y, :z) as s(a)",
+ s,
+ "SELECT CURRENT_DATE + s.a AS dates FROM generate_series(:x, :y, :z) as s(a)",
checkparams={'y': None, 'x': None, 'z': None}
)
self.assert_compile(
- s.params(x=5, y=6, z=7),
- "SELECT CURRENT_DATE + s.a AS dates FROM generate_series(:x, :y, :z) as s(a)",
+ s.params(x=5, y=6, z=7),
+ "SELECT CURRENT_DATE + s.a AS dates FROM generate_series(:x, :y, :z) as s(a)",
checkparams={'y': 6, 'x': 5, 'z': 7}
)
@testing.emits_warning('.*empty sequence.*')
def test_render_binds_as_literal(self):
- """test a compiler that renders binds inline into
+ """test a compiler that renders binds inline into
SQL in the columns clause."""
dialect = default.DefaultDialect()
expr = select([table1.c.name]).\
order_by(table1.c.name.collate('latin1_german2_ci'))
- self.assert_compile(expr,
+ self.assert_compile(expr,
"SELECT mytable.name FROM mytable ORDER BY "
"mytable.name COLLATE latin1_german2_ci")
self.assert_compile(
select(
- [join(join(table1, table2, table1.c.myid == table2.c.otherid),
+ [join(join(table1, table2, table1.c.myid == table2.c.otherid),
table3, table1.c.myid == table3.c.userid)]
),
"SELECT mytable.myid, mytable.name, mytable.description, "
)
self.assert_compile(
select([table1, table2, table3],
- from_obj = [outerjoin(table1,
+ from_obj = [outerjoin(table1,
join(table2, table3, table2.c.otherid == table3.c.userid),
table1.c.myid==table2.c.otherid)]
)
)
x = union(x, select([table1]))
self.assert_compile(x, "(SELECT mytable.myid, mytable.name, mytable.description "
- "FROM mytable UNION SELECT mytable.myid, mytable.name, "
+ "FROM mytable UNION SELECT mytable.myid, mytable.name, "
"mytable.description FROM mytable) UNION SELECT mytable.myid,"
" mytable.name, mytable.description FROM mytable")
self.assert_compile(
union(
select([table1.c.myid, table1.c.name, func.max(table1.c.description)],
- table1.c.name=='name2',
+ table1.c.name=='name2',
group_by=[table1.c.myid, table1.c.name]),
table1.select(table1.c.name=='name1')
),
s = select([column('foo'), column('bar')])
# ORDER BY's even though not supported by all DB's, are rendered if requested
- self.assert_compile(union(s.order_by("foo"), s.order_by("bar")),
+ self.assert_compile(union(s.order_by("foo"), s.order_by("bar")),
"SELECT foo, bar ORDER BY foo UNION SELECT foo, bar ORDER BY bar"
)
# self_group() is honored
self.assert_compile(
- union(s.order_by("foo").self_group(), s.order_by("bar").limit(10).self_group()),
+ union(s.order_by("foo").self_group(), s.order_by("bar").limit(10).self_group()),
"(SELECT foo, bar ORDER BY foo) UNION (SELECT foo, bar ORDER BY bar LIMIT :param_1)",
{'param_1':10}
{'mytablename':5}, {'mytablename':5}, [5]
),
(
- select([table1], or_(table1.c.myid==bindparam('myid'),
+ select([table1], or_(table1.c.myid==bindparam('myid'),
table2.c.otherid==bindparam('myid'))),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable, myothertable WHERE mytable.myid = :myid "
{'myid':5}, {'myid':5}, [5,5]
),
(
- select([table1], or_(table1.c.myid==bindparam('myid', unique=True),
+ select([table1], or_(table1.c.myid==bindparam('myid', unique=True),
table2.c.otherid==bindparam('myid', unique=True))),
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable, myothertable WHERE mytable.myid = "
{}, {'test':None}, [None]
),
(
- select([table1], or_(table1.c.myid==bindparam('myid'),
+ select([table1], or_(table1.c.myid==bindparam('myid'),
table2.c.otherid==bindparam('myotherid'))).\
params({'myid':8, 'myotherid':7}),
"SELECT mytable.myid, mytable.name, mytable.description FROM "
{'myid':5}, {'myid':5, 'myotherid':7}, [5,7]
),
(
- select([table1], or_(table1.c.myid==bindparam('myid', value=7, unique=True),
+ select([table1], or_(table1.c.myid==bindparam('myid', value=7, unique=True),
table2.c.otherid==bindparam('myid', value=8, unique=True))),
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable, myothertable WHERE mytable.myid = "
assert [pp[k] for k in positional.positiontup] == expected_test_params_list
# check that params() doesnt modify original statement
- s = select([table1], or_(table1.c.myid==bindparam('myid'),
+ s = select([table1], or_(table1.c.myid==bindparam('myid'),
table2.c.otherid==bindparam('myotherid')))
s2 = s.params({'myid':8, 'myotherid':7})
s3 = s2.params({'myid':9})
assert [pp[k] for k in positional.positiontup] == [12, 12]
# check that conflicts with "unique" params are caught
- s = select([table1], or_(table1.c.myid==7,
+ s = select([table1], or_(table1.c.myid==7,
table1.c.myid==bindparam('myid_1')))
- assert_raises_message(exc.CompileError,
+ assert_raises_message(exc.CompileError,
"conflicts with unique bind parameter "
- "of the same name",
+ "of the same name",
str, s)
- s = select([table1], or_(table1.c.myid==7, table1.c.myid==8,
+ s = select([table1], or_(table1.c.myid==7, table1.c.myid==8,
table1.c.myid==bindparam('myid_1')))
- assert_raises_message(exc.CompileError,
+ assert_raises_message(exc.CompileError,
"conflicts with unique bind parameter "
- "of the same name",
+ "of the same name",
str, s)
def test_binds_no_hash_collision(self):
)
def test_bind_params_missing(self):
- assert_raises_message(exc.InvalidRequestError,
+ assert_raises_message(exc.InvalidRequestError,
r"A value is required for bind parameter 'x'",
select([table1]).where(
and_(
- table1.c.myid==bindparam("x", required=True),
+ table1.c.myid==bindparam("x", required=True),
table1.c.name==bindparam("y", required=True)
)
).compile().construct_params,
params=dict(y=5)
)
- assert_raises_message(exc.InvalidRequestError,
+ assert_raises_message(exc.InvalidRequestError,
r"A value is required for bind parameter 'x'",
select([table1]).where(
table1.c.myid==bindparam("x", required=True)
).compile().construct_params
)
- assert_raises_message(exc.InvalidRequestError,
+ assert_raises_message(exc.InvalidRequestError,
r"A value is required for bind parameter 'x', "
"in parameter group 2",
select([table1]).where(
and_(
- table1.c.myid==bindparam("x", required=True),
+ table1.c.myid==bindparam("x", required=True),
table1.c.name==bindparam("y", required=True)
)
).compile().construct_params,
_group_number=2
)
- assert_raises_message(exc.InvalidRequestError,
+ assert_raises_message(exc.InvalidRequestError,
r"A value is required for bind parameter 'x', "
"in parameter group 2",
select([table1]).where(
)
def check_results(dialect, expected_results, literal):
- eq_(len(expected_results), 5,
+ eq_(len(expected_results), 5,
'Incorrect number of expected results')
- eq_(str(cast(tbl.c.v1, Numeric).compile(dialect=dialect)),
+ eq_(str(cast(tbl.c.v1, Numeric).compile(dialect=dialect)),
'CAST(casttest.v1 AS %s)' % expected_results[0])
- eq_(str(cast(tbl.c.v1, Numeric(12, 9)).compile(dialect=dialect)),
+ eq_(str(cast(tbl.c.v1, Numeric(12, 9)).compile(dialect=dialect)),
'CAST(casttest.v1 AS %s)' % expected_results[1])
- eq_(str(cast(tbl.c.ts, Date).compile(dialect=dialect)),
+ eq_(str(cast(tbl.c.ts, Date).compile(dialect=dialect)),
'CAST(casttest.ts AS %s)' % expected_results[2])
- eq_(str(cast(1234, Text).compile(dialect=dialect)),
+ eq_(str(cast(1234, Text).compile(dialect=dialect)),
'CAST(%s AS %s)' % (literal, expected_results[3]))
- eq_(str(cast('test', String(20)).compile(dialect=dialect)),
+ eq_(str(cast('test', String(20)).compile(dialect=dialect)),
'CAST(%s AS %s)' %(literal, expected_results[4]))
# fixme: shoving all of this dialect-specific stuff in one test
# is now officialy completely ridiculous AND non-obviously omits
# coverage on other dialects.
sel = select([tbl, cast(tbl.c.v1, Numeric)]).compile(dialect=dialect)
if isinstance(dialect, type(mysql.dialect())):
- eq_(str(sel),
+ eq_(str(sel),
"SELECT casttest.id, casttest.v1, casttest.v2, casttest.ts, "
"CAST(casttest.v1 AS DECIMAL) AS anon_1 \nFROM casttest")
else:
- eq_(str(sel),
+ eq_(str(sel),
"SELECT casttest.id, casttest.v1, casttest.v2, "
"casttest.ts, CAST(casttest.v1 AS NUMERIC) AS "
"anon_1 \nFROM casttest")
"AS anon_1 FROM mytable"
)
- # this tests that _from_objects
+ # this tests that _from_objects
# concantenates OK
self.assert_compile(
select([column("x") + over(func.foo())]),
table = Table('dt', metadata,
Column('date', Date))
self.assert_compile(
- table.select(table.c.date.between(datetime.date(2006,6,1),
+ table.select(table.c.date.between(datetime.date(2006,6,1),
datetime.date(2006,6,5))),
- "SELECT dt.date FROM dt WHERE dt.date BETWEEN :date_1 AND :date_2",
- checkparams={'date_1':datetime.date(2006,6,1),
+ "SELECT dt.date FROM dt WHERE dt.date BETWEEN :date_1 AND :date_2",
+ checkparams={'date_1':datetime.date(2006,6,1),
'date_2':datetime.date(2006,6,5)})
self.assert_compile(
table.select(sql.between(table.c.date, datetime.date(2006,6,1),
datetime.date(2006,6,5))),
- "SELECT dt.date FROM dt WHERE dt.date BETWEEN :date_1 AND :date_2",
- checkparams={'date_1':datetime.date(2006,6,1),
+ "SELECT dt.date FROM dt WHERE dt.date BETWEEN :date_1 AND :date_2",
+ checkparams={'date_1':datetime.date(2006,6,1),
'date_2':datetime.date(2006,6,5)})
def test_operator_precedence(self):
s1 = select([s1])
if label:
- self.assert_compile(s1,
- "SELECT %s FROM (SELECT %s AS %s FROM mytable)" %
+ self.assert_compile(s1,
+ "SELECT %s FROM (SELECT %s AS %s FROM mytable)" %
(label, expr, label))
elif col.table is not None:
# sqlite rule labels subquery columns
- self.assert_compile(s1,
- "SELECT %s FROM (SELECT %s AS %s FROM mytable)" %
+ self.assert_compile(s1,
+ "SELECT %s FROM (SELECT %s AS %s FROM mytable)" %
(key,expr, key))
else:
- self.assert_compile(s1,
- "SELECT %s FROM (SELECT %s FROM mytable)" %
+ self.assert_compile(s1,
+ "SELECT %s FROM (SELECT %s FROM mytable)" %
(expr,expr))
def test_hints(self):
s4 = select([table3]).select_from(
table3.join(
- subs4,
+ subs4,
subs4.c.othername==table3.c.otherstuff
)
).\
]).select_from(table1.join(table2, table1.c.myid==table2.c.otherid))
s5 = select([table3]).select_from(
table3.join(
- subs5,
+ subs5,
subs5.c.othername==table3.c.otherstuff
)
).\
sybase.dialect()
for stmt, dialect, expected in [
- (s, mysql_d,
+ (s, mysql_d,
"SELECT mytable.myid FROM mytable test hint mytable"),
- (s, oracle_d,
+ (s, oracle_d,
"SELECT /*+ test hint mytable */ mytable.myid FROM mytable"),
- (s, sybase_d,
+ (s, sybase_d,
"SELECT mytable.myid FROM mytable test hint mytable"),
- (s2, mysql_d,
+ (s2, mysql_d,
"SELECT mytable.myid FROM mytable"),
- (s2, oracle_d,
+ (s2, oracle_d,
"SELECT /*+ index(mytable idx) */ mytable.myid FROM mytable"),
- (s2, sybase_d,
+ (s2, sybase_d,
"SELECT mytable.myid FROM mytable WITH HINT INDEX idx"),
- (s3, mysql_d,
+ (s3, mysql_d,
"SELECT mytable_1.myid FROM mytable AS mytable_1 "
"index(mytable_1 hint)"),
- (s3, oracle_d,
+ (s3, oracle_d,
"SELECT /*+ index(mytable_1 hint) */ mytable_1.myid FROM "
"mytable mytable_1"),
- (s3, sybase_d,
+ (s3, sybase_d,
"SELECT mytable_1.myid FROM mytable AS mytable_1 "
"index(mytable_1 hint)"),
- (s4, mysql_d,
+ (s4, mysql_d,
"SELECT thirdtable.userid, thirdtable.otherstuff FROM thirdtable "
"hint3 INNER JOIN (SELECT mytable.myid, mytable.name, "
"mytable.description, myothertable.otherid, "
"myothertable.othername FROM mytable hint1 INNER "
"JOIN myothertable ON mytable.myid = myothertable.otherid) "
"ON othername = thirdtable.otherstuff"),
- (s4, sybase_d,
+ (s4, sybase_d,
"SELECT thirdtable.userid, thirdtable.otherstuff FROM thirdtable "
"hint3 JOIN (SELECT mytable.myid, mytable.name, "
"mytable.description, myothertable.otherid, "
"myothertable.othername FROM mytable hint1 "
"JOIN myothertable ON mytable.myid = myothertable.otherid) "
"ON othername = thirdtable.otherstuff"),
- (s4, oracle_d,
+ (s4, oracle_d,
"SELECT /*+ hint3 */ thirdtable.userid, thirdtable.otherstuff "
"FROM thirdtable JOIN (SELECT /*+ hint1 */ mytable.myid,"
" mytable.name, mytable.description, myothertable.otherid,"
" mytable.myid = myothertable.otherid) ON othername ="
" thirdtable.otherstuff"),
# TODO: figure out dictionary ordering solution here
-# (s5, oracle_d,
+# (s5, oracle_d,
# "SELECT /*+ hint3 */ /*+ hint1 */ thirdtable.userid, "
# "thirdtable.otherstuff "
# "FROM thirdtable JOIN (SELECT mytable.myid,"
# " myothertable.othername FROM mytable JOIN myothertable ON"
# " mytable.myid = myothertable.otherid) ON othername ="
# " thirdtable.otherstuff"),
- (s6, oracle_d,
+ (s6, oracle_d,
"""SELECT /*+ "QuotedName" idx1 */ "QuotedName".col1 """
"""FROM "QuotedName" WHERE "QuotedName".col1 > :col1_1"""),
- (s7, oracle_d,
+ (s7, oracle_d,
"""SELECT /*+ SomeName idx1 */ "SomeName".col1 FROM """
""""QuotedName" "SomeName" WHERE "SomeName".col1 > :col1_1"""),
]:
def test_insert(self):
# generic insert, will create bind params for all columns
- self.assert_compile(insert(table1),
+ self.assert_compile(insert(table1),
"INSERT INTO mytable (myid, name, description) "
"VALUES (:myid, :name, :description)")
# cols provided literally
self.assert_compile(
insert(table1, {
- table1.c.myid : bindparam('userid'),
+ table1.c.myid : bindparam('userid'),
table1.c.name : bindparam('username')}),
"INSERT INTO mytable (myid, name) VALUES (:userid, :username)")
)
self.assert_compile(
- insert(table1, values=dict(myid=func.lala())),
+ insert(table1, values=dict(myid=func.lala())),
"INSERT INTO mytable (myid) VALUES (lala())")
def test_inline_insert(self):
Column('id', Integer, primary_key=True),
Column('foo', Integer, default=func.foobar()))
self.assert_compile(
- table.insert(values={}, inline=True),
+ table.insert(values={}, inline=True),
"INSERT INTO sometable (foo) VALUES (foobar())")
self.assert_compile(
- table.insert(inline=True),
+ table.insert(inline=True),
"INSERT INTO sometable (foo) VALUES (foobar())", params={})
def test_update(self):
self.assert_compile(
- update(table1, table1.c.myid == 7),
- "UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1",
+ update(table1, table1.c.myid == 7),
+ "UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1",
params = {table1.c.name:'fred'})
self.assert_compile(
table1.update().where(table1.c.myid==7).
- values({table1.c.myid:5}),
- "UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1",
+ values({table1.c.myid:5}),
+ "UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1",
checkparams={'myid':5, 'myid_1':7})
self.assert_compile(
- update(table1, table1.c.myid == 7),
- "UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1",
+ update(table1, table1.c.myid == 7),
+ "UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1",
params = {'name':'fred'})
self.assert_compile(
- update(table1, values = {table1.c.name : table1.c.myid}),
+ update(table1, values = {table1.c.name : table1.c.myid}),
"UPDATE mytable SET name=mytable.myid")
self.assert_compile(
- update(table1,
- whereclause = table1.c.name == bindparam('crit'),
- values = {table1.c.name : 'hi'}),
- "UPDATE mytable SET name=:name WHERE mytable.name = :crit",
- params = {'crit' : 'notthere'},
+ update(table1,
+ whereclause = table1.c.name == bindparam('crit'),
+ values = {table1.c.name : 'hi'}),
+ "UPDATE mytable SET name=:name WHERE mytable.name = :crit",
+ params = {'crit' : 'notthere'},
checkparams={'crit':'notthere', 'name':'hi'})
self.assert_compile(
- update(table1, table1.c.myid == 12,
- values = {table1.c.name : table1.c.myid}),
+ update(table1, table1.c.myid == 12,
+ values = {table1.c.name : table1.c.myid}),
"UPDATE mytable SET name=mytable.myid, description="
- ":description WHERE mytable.myid = :myid_1",
- params = {'description':'test'},
+ ":description WHERE mytable.myid = :myid_1",
+ params = {'description':'test'},
checkparams={'description':'test', 'myid_1':12})
self.assert_compile(
- update(table1, table1.c.myid == 12,
- values = {table1.c.myid : 9}),
+ update(table1, table1.c.myid == 12,
+ values = {table1.c.myid : 9}),
"UPDATE mytable SET myid=:myid, description=:description "
- "WHERE mytable.myid = :myid_1",
+ "WHERE mytable.myid = :myid_1",
params = {'myid_1': 12, 'myid': 9, 'description': 'test'})
self.assert_compile(
- update(table1, table1.c.myid ==12),
- "UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1",
+ update(table1, table1.c.myid ==12),
+ "UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1",
params={'myid':18}, checkparams={'myid':18, 'myid_1':12})
s = table1.update(table1.c.myid == 12, values = {table1.c.name : 'lala'})
c = s.compile(column_keys=['id', 'name'])
self.assert_compile(
- update(table1, table1.c.myid == 12,
+ update(table1, table1.c.myid == 12,
values = {table1.c.name : table1.c.myid}
- ).values({table1.c.name:table1.c.name + 'foo'}),
+ ).values({table1.c.name:table1.c.name + 'foo'}),
"UPDATE mytable SET name=(mytable.name || :name_1), "
- "description=:description WHERE mytable.myid = :myid_1",
+ "description=:description WHERE mytable.myid = :myid_1",
params = {'description':'test'})
eq_(str(s), str(c))
def test_correlated_update(self):
# test against a straight text subquery
u = update(table1, values = {
- table1.c.name :
+ table1.c.name :
text("(select name from mytable where id=mytable.id)")})
- self.assert_compile(u,
+ self.assert_compile(u,
"UPDATE mytable SET name=(select name from mytable "
"where id=mytable.id)")
mt = table1.alias()
u = update(table1, values = {
- table1.c.name :
+ table1.c.name :
select([mt.c.name], mt.c.myid==table1.c.myid)
})
- self.assert_compile(u,
+ self.assert_compile(u,
"UPDATE mytable SET name=(SELECT mytable_1.name FROM "
"mytable AS mytable_1 WHERE mytable_1.myid = mytable.myid)")
# test against a regular constructed subquery
s = select([table2], table2.c.otherid == table1.c.myid)
u = update(table1, table1.c.name == 'jack', values = {table1.c.name : s})
- self.assert_compile(u,
+ self.assert_compile(u,
"UPDATE mytable SET name=(SELECT myothertable.otherid, "
"myothertable.othername FROM myothertable WHERE "
"myothertable.otherid = mytable.myid) WHERE mytable.name = :name_1")
# test a non-correlated WHERE clause
s = select([table2.c.othername], table2.c.otherid == 7)
u = update(table1, table1.c.name==s)
- self.assert_compile(u,
+ self.assert_compile(u,
"UPDATE mytable SET myid=:myid, name=:name, "
"description=:description WHERE mytable.name = "
"(SELECT myothertable.othername FROM myothertable "
# test one that is actually correlated...
s = select([table2.c.othername], table2.c.otherid == table1.c.myid)
u = table1.update(table1.c.name==s)
- self.assert_compile(u,
+ self.assert_compile(u,
"UPDATE mytable SET myid=:myid, name=:name, "
"description=:description WHERE mytable.name = "
"(SELECT myothertable.othername FROM myothertable "
def test_delete(self):
self.assert_compile(
- delete(table1, table1.c.myid == 7),
+ delete(table1, table1.c.myid == 7),
"DELETE FROM mytable WHERE mytable.myid = :myid_1")
self.assert_compile(
- table1.delete().where(table1.c.myid == 7),
+ table1.delete().where(table1.c.myid == 7),
"DELETE FROM mytable WHERE mytable.myid = :myid_1")
self.assert_compile(
table1.delete().where(table1.c.myid == 7).\
- where(table1.c.name=='somename'),
+ where(table1.c.name=='somename'),
"DELETE FROM mytable WHERE mytable.myid = :myid_1 "
"AND mytable.name = :name_1")
# test one that is actually correlated...
s = select([table2.c.othername], table2.c.otherid == table1.c.myid)
u = table1.delete(table1.c.name==s)
- self.assert_compile(u,
+ self.assert_compile(u,
"DELETE FROM mytable WHERE mytable.name = (SELECT "
"myothertable.othername FROM myothertable WHERE "
"myothertable.otherid = mytable.myid)")
def test_binds_that_match_columns(self):
- """test bind params named after column names
+ """test bind params named after column names
replace the normal SET/VALUES generation."""
t = table('foo', column('x'), column('y'))
assert_raises(exc.CompileError, u.values(x=7).compile, column_keys=['x', 'y'])
assert_raises(exc.CompileError, u.compile, column_keys=['x', 'y'])
- self.assert_compile(u.values(x=3 + bindparam('x')),
+ self.assert_compile(u.values(x=3 + bindparam('x')),
"UPDATE foo SET x=(:param_1 + :x) WHERE foo.x = :x")
- self.assert_compile(u.values(x=3 + bindparam('x')),
+ self.assert_compile(u.values(x=3 + bindparam('x')),
"UPDATE foo SET x=(:param_1 + :x) WHERE foo.x = :x",
params={'x':1})
- self.assert_compile(u.values(x=3 + bindparam('x')),
+ self.assert_compile(u.values(x=3 + bindparam('x')),
"UPDATE foo SET x=(:param_1 + :x), y=:y WHERE foo.x = :x",
params={'x':1, 'y':2})
i = t.insert().values(x=3 + bindparam('x'))
self.assert_compile(i, "INSERT INTO foo (x) VALUES ((:param_1 + :x))")
- self.assert_compile(i,
+ self.assert_compile(i,
"INSERT INTO foo (x, y) VALUES ((:param_1 + :x), :y)",
params={'x':1, 'y':2})
Column('col2', Integer, default=select([func.coalesce(func.max(foo.c.id))])),
)
- self.assert_compile(t.insert(inline=True, values={}),
+ self.assert_compile(t.insert(inline=True, values={}),
"INSERT INTO test (col1, col2) VALUES (foo(:foo_1), "
"(SELECT coalesce(max(foo.id)) AS coalesce_1 FROM "
"foo))")
Column('col3', String(30))
)
- self.assert_compile(t.update(inline=True, values={'col3':'foo'}),
+ self.assert_compile(t.update(inline=True, values={'col3':'foo'}),
"UPDATE test SET col1=foo(:foo_1), col2=(SELECT "
"coalesce(max(foo.id)) AS coalesce_1 FROM foo), "
"col3=:col3")
__dialect__ = 'default'
def test_select(self):
- self.assert_compile(table4.select(),
+ self.assert_compile(table4.select(),
"SELECT remote_owner.remotetable.rem_id, remote_owner.remotetable.datatype_id,"
" remote_owner.remotetable.value FROM remote_owner.remotetable")
"remote_owner.remotetable.value = :value_1")
# multi-part schema name
- self.assert_compile(table5.select(),
+ self.assert_compile(table5.select(),
'SELECT "dbo.remote_owner".remotetable.rem_id, '
'"dbo.remote_owner".remotetable.datatype_id, "dbo.remote_owner".remotetable.value '
'FROM "dbo.remote_owner".remotetable'
)
# multi-part schema name labels - convert '.' to '_'
- self.assert_compile(table5.select(use_labels=True),
+ self.assert_compile(table5.select(use_labels=True),
'SELECT "dbo.remote_owner".remotetable.rem_id AS'
' dbo_remote_owner_remotetable_rem_id, "dbo.remote_owner".remotetable.datatype_id'
' AS dbo_remote_owner_remotetable_datatype_id,'
def test_alias(self):
a = alias(table4, 'remtable')
- self.assert_compile(a.select(a.c.datatype_id==7),
+ self.assert_compile(a.select(a.c.datatype_id==7),
"SELECT remtable.rem_id, remtable.datatype_id, remtable.value FROM"
" remote_owner.remotetable AS remtable "
"WHERE remtable.datatype_id = :datatype_id_1")
def test_update(self):
self.assert_compile(
- table4.update(table4.c.value=='test', values={table4.c.datatype_id:12}),
+ table4.update(table4.c.value=='test', values={table4.c.datatype_id:12}),
"UPDATE remote_owner.remotetable SET datatype_id=:datatype_id "
"WHERE remote_owner.remotetable.value = :value_1")
def test_insert(self):
- self.assert_compile(table4.insert(values=(2, 5, 'test')),
+ self.assert_compile(table4.insert(values=(2, 5, 'test')),
"INSERT INTO remote_owner.remotetable (rem_id, datatype_id, value) VALUES "
"(:rem_id, :datatype_id, :value)")