- this branch changes query.values() to immediately return an iterator, adds a new "aliased" construct which will be the primary method to get at aliased columns when using values()
- tentative ORM versions of _join and _outerjoin are not yet public, would like to integrate with Query better (work continues in the branch)
- lots of fixes to expressions regarding cloning and correlation. Some apparent ORM bug-workarounds removed.
- to fix a recursion issue with anonymous identifiers, bind parameters generated against columns now just use the name of the column instead of the tablename_columnname label (plus the unique integer counter). this way expensive recursive schemes aren't needed for the anon identifier logic. This, as usual, impacted a ton of compiler unit tests which needed a search-n-replace for the new bind names.
# you say something like query.options(contains_alias('fooalias')) - the matching
# is done on strings
if isinstance(key, expression.ColumnElement):
- if key._label.lower() in props:
+ if key._label and key._label.lower() in props:
return props[key._label.lower()]
elif key.name.lower() in props:
return props[key.name.lower()]
from sqlalchemy.orm.properties import SynonymProperty, ComparableProperty, PropertyLoader, ColumnProperty, CompositeProperty, BackRef
from sqlalchemy.orm import mapper as mapperlib
from sqlalchemy.orm import strategies
-from sqlalchemy.orm.query import Query
+from sqlalchemy.orm.query import Query, aliased
from sqlalchemy.orm.util import polymorphic_union, create_row_adapter
from sqlalchemy.orm.session import Session as _Session
from sqlalchemy.orm.session import object_session, sessionmaker
'compile_mappers', 'class_mapper', 'object_mapper', 'sessionmaker',
'scoped_session', 'dynamic_loader', 'MapperExtension',
'polymorphic_union', 'comparable_property',
- 'create_session', 'synonym', 'contains_alias', 'Query',
+ 'create_session', 'synonym', 'contains_alias', 'Query', 'aliased',
'contains_eager', 'EXT_CONTINUE', 'EXT_STOP', 'EXT_PASS',
'object_session', 'PropComparator' ]
from sqlalchemy import sql, util, exceptions, logging
from sqlalchemy.sql import expression, visitors, operators, util as sqlutil
from sqlalchemy.orm import sync, attributes
-from sqlalchemy.orm.util import ExtensionCarrier, state_str, instance_str
from sqlalchemy.orm.interfaces import MapperProperty, EXT_CONTINUE, PropComparator
-
+from sqlalchemy.orm.util import has_identity, _state_has_identity, _is_mapped_class, has_mapper, \
+ _state_mapper, class_mapper, object_mapper, _class_to_mapper,\
+ ExtensionCarrier, state_str, instance_str
+
__all__ = ['Mapper', 'class_mapper', 'object_mapper', '_mapper_registry']
_mapper_registry = weakref.WeakKeyDictionary()
Mapper.logger = logging.class_logger(Mapper)
-def has_identity(object):
- return hasattr(object, '_instance_key')
-
-def _state_has_identity(state):
- return '_instance_key' in state.dict
-
-def has_mapper(object):
- """Return True if the given object has had a mapper association
- set up, either through loading, or via insertion in a session.
- """
-
- return hasattr(object, '_entity_name')
-
object_session = None
def _load_scalar_attributes(instance, attribute_names):
if session.query(mapper)._get(identity_key, refresh_instance=state, only_load_props=attribute_names) is None and shouldraise:
raise exceptions.InvalidRequestError("Could not refresh instance '%s'" % instance_str(instance))
-def _state_mapper(state, entity_name=None):
- return state.class_._class_state.mappers[state.dict.get('_entity_name', entity_name)]
-
-def object_mapper(object, entity_name=None, raiseerror=True):
- """Given an object, return the primary Mapper associated with the object instance.
-
- object
- The object instance.
-
- entity_name
- Entity name of the mapper to retrieve, if the given instance is
- transient. Otherwise uses the entity name already associated
- with the instance.
-
- raiseerror
- Defaults to True: raise an ``InvalidRequestError`` if no mapper can
- be located. If False, return None.
-
- """
-
- try:
- mapper = object.__class__._class_state.mappers[getattr(object, '_entity_name', entity_name)]
- except (KeyError, AttributeError):
- if raiseerror:
- raise exceptions.InvalidRequestError("Class '%s' entity name '%s' has no mapper associated with it" % (object.__class__.__name__, getattr(object, '_entity_name', entity_name)))
- else:
- return None
- return mapper
-
-def class_mapper(class_, entity_name=None, compile=True, raiseerror=True):
- """Given a class and optional entity_name, return the primary Mapper associated with the key.
-
- If no mapper can be located, raises ``InvalidRequestError``.
- """
-
- try:
- mapper = class_._class_state.mappers[entity_name]
- except (KeyError, AttributeError):
- if raiseerror:
- raise exceptions.InvalidRequestError("Class '%s' entity name '%s' has no mapper associated with it" % (class_.__name__, entity_name))
- else:
- return None
- if compile:
- return mapper.compile()
- else:
- return mapper
-
-def _class_to_mapper(class_or_mapper, entity_name=None, compile=True):
- if isinstance(class_or_mapper, type):
- return class_mapper(class_or_mapper, entity_name=entity_name, compile=compile)
- else:
- if compile:
- return class_or_mapper.compile()
- else:
- return class_or_mapper
return sql.and_(*clauses)
else:
return self.prop._optimized_compare(other)
-
- def _join_and_criterion(self, criterion=None, **kwargs):
- adapt_against = None
+ def _join_and_criterion(self, criterion=None, **kwargs):
if getattr(self, '_of_type', None):
target_mapper = self._of_type
- to_selectable = target_mapper.mapped_table
- adapt_against = to_selectable
+ to_selectable = target_mapper._with_polymorphic_selectable() #mapped_table
else:
- target_mapper = self.prop.mapper
to_selectable = None
- adapt_against = None
-
- if self.prop._is_self_referential():
- pj = self.prop.primary_join_against(self.prop.parent, None)
- sj = self.prop.secondary_join_against(self.prop.parent, toselectable=to_selectable)
-
- pac = PropertyAliasedClauses(self.prop, pj, sj)
- j = pac.primaryjoin
- if pac.secondaryjoin:
- j = j & pac.secondaryjoin
- else:
- j = self.prop.full_join_against(self.prop.parent, None, toselectable=to_selectable)
+
+ pj, sj, source, dest, target_adapter = self.prop._create_joins(dest_polymorphic=True, dest_selectable=to_selectable)
for k in kwargs:
crit = (getattr(self.prop.mapper.class_, k) == kwargs[k])
else:
criterion = criterion & crit
- if criterion:
- if adapt_against:
- criterion = ClauseAdapter(adapt_against).traverse(criterion)
- if self.prop._is_self_referential():
- criterion = pac.adapt_clause(criterion)
+ if sj:
+ j = pj & sj
+ else:
+ j = pj
+
+ if criterion and target_adapter:
+ criterion = target_adapter.traverse(criterion)
- return j, criterion, to_selectable
+ return j, criterion, dest
def any(self, criterion=None, **kwargs):
if not self.prop.uselist:
def _is_self_referential(self):
return self.mapper.common_parent(self.parent)
-
- def primary_join_against(self, mapper, selectable=None, toselectable=None):
- return self.__join_against(mapper, selectable, toselectable, True, False)
-
- def secondary_join_against(self, mapper, toselectable=None):
- return self.__join_against(mapper, None, toselectable, False, True)
-
- def full_join_against(self, mapper, selectable=None, toselectable=None):
- return self.__join_against(mapper, selectable, toselectable, True, True)
- def __join_against(self, frommapper, fromselectable, toselectable, primary, secondary):
- if fromselectable is None:
- fromselectable = frommapper.local_table
-
- parent_equivalents = frommapper._equivalent_columns
-
- if primary:
- primaryjoin = self.primaryjoin
-
- if fromselectable is not frommapper.local_table:
- if self.direction is ONETOMANY:
- primaryjoin = ClauseAdapter(fromselectable, exclude=self.foreign_keys, equivalents=parent_equivalents).traverse(primaryjoin)
- elif self.direction is MANYTOONE:
- primaryjoin = ClauseAdapter(fromselectable, include=self.foreign_keys, equivalents=parent_equivalents).traverse(primaryjoin)
- elif self.secondaryjoin:
- primaryjoin = ClauseAdapter(fromselectable, exclude=self.foreign_keys, equivalents=parent_equivalents).traverse(primaryjoin)
+ def _create_joins(self, source_polymorphic=False, source_selectable=None, dest_polymorphic=False, dest_selectable=None):
+ if source_selectable is None:
+ if source_polymorphic and self.parent.with_polymorphic:
+ source_selectable = self.parent._with_polymorphic_selectable()
+ else:
+ source_selectable = None
+ if dest_selectable is None:
+ if dest_polymorphic and self.mapper.with_polymorphic:
+ dest_selectable = self.mapper._with_polymorphic_selectable()
+ else:
+ dest_selectable = None
+ if self._is_self_referential():
+ if dest_selectable:
+ dest_selectable = dest_selectable.alias()
+ else:
+ dest_selectable = self.mapper.local_table.alias()
- if secondary:
- secondaryjoin = self.secondaryjoin
- return primaryjoin & secondaryjoin
+ primaryjoin = self.primaryjoin
+ if source_selectable:
+ if self.direction in (ONETOMANY, MANYTOMANY):
+ primaryjoin = ClauseAdapter(source_selectable, exclude=self.foreign_keys, equivalents=self.parent._equivalent_columns).traverse(primaryjoin)
else:
- return primaryjoin
- elif secondary:
- return self.secondaryjoin
- else:
- raise AssertionError("illegal condition")
+ primaryjoin = ClauseAdapter(source_selectable, include=self.foreign_keys, equivalents=self.parent._equivalent_columns).traverse(primaryjoin)
+
+ secondaryjoin = self.secondaryjoin
+ target_adapter = None
+ if dest_selectable:
+ if self.direction == ONETOMANY:
+ target_adapter = ClauseAdapter(dest_selectable, include=self.foreign_keys, equivalents=self.mapper._equivalent_columns)
+ elif self.direction == MANYTOMANY:
+ target_adapter = ClauseAdapter(dest_selectable, equivalents=self.mapper._equivalent_columns)
+ else:
+ target_adapter = ClauseAdapter(dest_selectable, exclude=self.foreign_keys, equivalents=self.mapper._equivalent_columns)
+ if secondaryjoin:
+ secondaryjoin = target_adapter.traverse(secondaryjoin)
+ else:
+ primaryjoin = target_adapter.traverse(primaryjoin)
+ target_adapter.include = target_adapter.exclude = None
+
+ return primaryjoin, secondaryjoin, source_selectable or self.parent.local_table, dest_selectable or self.mapper.local_table, target_adapter
- def get_join(self, parent, primary=True, secondary=True, polymorphic_parent=True):
+ def _get_join(self, parent, primary=True, secondary=True, polymorphic_parent=True):
"""deprecated. use primary_join_against(), secondary_join_against(), full_join_against()"""
+ pj, sj, source, dest, adapter = self._create_joins(source_polymorphic=polymorphic_parent)
+
if primary and secondary:
- return self.full_join_against(parent, parent.mapped_table)
+ return pj & sj
elif primary:
- return self.primary_join_against(parent, parent.mapped_table)
+ return pj
elif secondary:
- return self.secondary_join_against(parent)
+ return sj
else:
raise AssertionError("illegal condition")
-
+
+
def register_dependencies(self, uowcommit):
if not self.viewonly:
self._dependency_processor.register_dependencies(uowcommit)
from sqlalchemy.sql import util as sql_util
from sqlalchemy.sql import expression, visitors, operators
from sqlalchemy.orm import mapper, object_mapper
-from sqlalchemy.orm.mapper import _state_mapper, _class_to_mapper
+
+from sqlalchemy.orm.util import _state_mapper, _class_to_mapper, _is_mapped_class, _is_aliased_class
from sqlalchemy.orm import util as mapperutil
from sqlalchemy.orm import interfaces
+from sqlalchemy.orm import attributes
+from sqlalchemy.orm.util import AliasedClass
+
+aliased = AliasedClass
-__all__ = ['Query', 'QueryContext']
+__all__ = ['Query', 'QueryContext', 'aliased']
class Query(object):
"""
q = self._clone()
+ if not alias and _is_aliased_class(entity):
+ alias = entity.alias
+
if isinstance(entity, type):
entity = mapper.class_mapper(entity)
+
if alias is not None:
alias = mapperutil.AliasedClauses(alias)
"""
q = self._clone()
q._eager_loaders = util.Set()
- fromclause = q.compile()
+ fromclause = q.compile().correlate(None)
return Query(self.mapper, self.session).select_from(fromclause)
- def _values(self, *columns):
- """Turn this query into a 'columns only' query.
+ def values(self, *columns):
+ """Return an iterator yielding result tuples corresponding to the given list of columns"""
- The API for this method hasn't been decided yet and is subject to change.
-
- """
q = self.__no_entities('_values')
q._only_load_props = q._eager_loaders = util.Set()
-
+ q._no_filters = True
for column in columns:
- q._entities.append(self._add_column(column, None))
- return q
-
+ q._entities.append(self._add_column(column, None, False))
+ if not q._yield_per:
+ q = q.yield_per(10)
+ return iter(q)
+ _values = values
+
def add_column(self, column, id=None):
"""Add a SQL ColumnElement to the list of result columns to be returned.
"""
q = self._clone()
- q._entities = q._entities + [self._add_column(column, id)]
+ q._entities = q._entities + [self._add_column(column, id, True)]
return q
- def _add_column(self, column, id=None):
+ def _add_column(self, column, id, looks_for_aliases):
if isinstance(column, interfaces.PropComparator):
column = column.clause_element()
elif not isinstance(column, (sql.ColumnElement, basestring)):
raise exceptions.InvalidRequestError("Invalid column expression '%r'" % column)
- return _ColumnEntity(column=column, id=id)
+ return _ColumnEntity(column, id)
def options(self, *args):
"""Return a new Query object, applying the given list of
if isinstance(a, mapperutil.PropertyAliasedClauses):
q._alias_ids.setdefault(a.mapper, []).append(a)
q._alias_ids.setdefault(a.table, []).append(a)
- q._alias_ids.setdefault(a.alias, []).append(a)
a = a.parentclauses
else:
break
if not isinstance(keys, list):
keys = [keys]
+
for key in keys:
use_selectable = None
of_type = None
-
+ is_aliased_class = False
+
if isinstance(key, tuple):
key, use_selectable = key
if isinstance(key, interfaces.PropComparator):
prop = key.property
if getattr(key, '_of_type', None):
- if use_selectable:
- raise exceptions.InvalidRequestError("Can't specify use_selectable along with polymorphic property created via of_type().")
of_type = key._of_type
- use_selectable = key._of_type.mapped_table
+ if not use_selectable:
+ use_selectable = key._of_type.mapped_table
else:
prop = mapper.get_property(key, resolve_synonyms=True)
if use_selectable:
+ if _is_aliased_class(use_selectable):
+ use_selectable = use_selectable.alias
+ is_aliased_class = True
if not use_selectable.is_derived_from(prop.mapper.mapped_table):
raise exceptions.InvalidRequestError("Selectable '%s' is not derived from '%s'" % (use_selectable.description, prop.mapper.mapped_table.description))
if not isinstance(use_selectable, expression.Alias):
raise exceptions.InvalidRequestError("Self-referential query on '%s' property requires aliased=True argument." % str(prop))
if prop.table not in currenttables or create_aliases or use_selectable:
- if prop.secondary:
- if use_selectable or create_aliases:
- alias = mapperutil.PropertyAliasedClauses(prop,
- prop.primary_join_against(mapper, adapt_against),
- prop.secondary_join_against(mapper, toselectable=use_selectable),
- alias,
- alias=use_selectable
- )
- crit = alias.primaryjoin
- clause = clause.join(alias.secondary, crit, isouter=outerjoin).join(alias.alias, alias.secondaryjoin, isouter=outerjoin)
+
+ if use_selectable or create_aliases:
+ alias = mapperutil.PropertyAliasedClauses(prop,
+ prop.primaryjoin,
+ prop.secondaryjoin,
+ alias,
+ alias=use_selectable,
+ should_adapt=not is_aliased_class
+ )
+ crit = alias.primaryjoin
+ if prop.secondary:
+ clause = clause.join(alias.secondary, crit, isouter=outerjoin)
+ clause = clause.join(alias.alias, alias.secondaryjoin, isouter=outerjoin)
else:
- crit = prop.primary_join_against(mapper, adapt_against)
- clause = clause.join(prop.secondary, crit, isouter=outerjoin)
- clause = clause.join(prop.table, prop.secondary_join_against(mapper), isouter=outerjoin)
- else:
- if use_selectable or create_aliases:
- alias = mapperutil.PropertyAliasedClauses(prop,
- prop.primary_join_against(mapper, adapt_against, toselectable=use_selectable),
- None,
- alias,
- alias=use_selectable
- )
- crit = alias.primaryjoin
clause = clause.join(alias.alias, crit, isouter=outerjoin)
+ else:
+ assert not prop.mapper.with_polymorphic
+ pj, sj, source, dest, target_adapter = prop._create_joins(source_selectable=adapt_against)
+ if sj:
+ clause = clause.join(prop.secondary, pj, isouter=outerjoin)
+ clause = clause.join(prop.table, sj, isouter=outerjoin)
else:
- crit = prop.primary_join_against(mapper, adapt_against)
- clause = clause.join(prop.table, crit, isouter=outerjoin)
+ clause = clause.join(prop.table, pj, isouter=outerjoin)
+
elif not create_aliases and prop.secondary is not None and prop.secondary not in currenttables:
# TODO: this check is not strong enough for different paths to the same endpoint which
# does not use secondary tables
if use_selectable:
adapt_against = use_selectable
-
+
return (clause, mapper, alias)
context.runid = _new_runid()
entities = self._entities + [_QueryEntity.legacy_guess_type(mc) for mc in mappers_or_columns]
- should_unique = isinstance(entities[0], _PrimaryMapperEntity) and len(entities) == 1
+
+ if getattr(self, '_no_filters', False):
+ filter = None
+ as_instances = False
+ else:
+ as_instances = isinstance(entities[0], _PrimaryMapperEntity) and len(entities) == 1
+
+ if as_instances:
+ filter = util.OrderedIdentitySet
+ else:
+ filter = util.OrderedSet
+
process = [query_entity.row_processor(self, context) for query_entity in entities]
while True:
else:
fetch = cursor.fetchall()
- if not should_unique:
- rows = util.OrderedSet()
- for row in fetch:
- rows.add(tuple([proc(context, row) for proc in process]))
+ if as_instances:
+ rows = [process[0](context, row) for row in fetch]
else:
- rows = util.UniqueAppender([])
- for row in fetch:
- rows.append(process[0](context, row))
+ rows = [tuple([proc(context, row) for proc in process]) for row in fetch]
+
+ if filter:
+ rows = filter(rows)
if context.refresh_instance and context.only_load_props and context.refresh_instance in context.progress:
context.refresh_instance.commit(context.only_load_props)
if context.eager_joins:
eager_joins = local_adapter.traverse(context.eager_joins)
- statement.append_from(eager_joins, _copy_collection=False)
+ statement.append_from(eager_joins)
if context.order_by:
statement.append_order_by(*local_adapter.copy_and_process(context.order_by))
if context.eager_joins:
if adapter:
context.eager_joins = adapter.adapt_clause(context.eager_joins)
- statement.append_from(context.eager_joins, _copy_collection=False)
+ statement.append_from(context.eager_joins)
if context.eager_order_by:
if adapter:
for key in keys:
prop = mapper.get_property(key, resolve_synonyms=True)
if clause is None:
- clause = prop.get_join(mapper)
+ clause = prop._get_join(mapper)
else:
- clause &= prop.get_join(mapper)
+ clause &= prop._get_join(mapper)
mapper = prop.mapper
return clause
continue
context.exec_with_path(self.mapper, value.key, value.setup, context, only_load_props=query._only_load_props)
-
class _ColumnEntity(_QueryEntity):
"""entity column corresponding to Table or selectable columns."""
- def __init__(self, column, id=None):
+ def __init__(self, column, id):
+ if isinstance(column, basestring):
+ column = sql.literal_column(column)
+
if column and isinstance(column, sql.ColumnElement) and not hasattr(column, '_label'):
column = column.label(None)
self.column = column
self.alias_id = id
- self.__tables = None
+
+ def __resolve_expr_against_query_aliases(self, query, expr, context):
+ if not query._alias_ids:
+ return expr
+
+ if ('_ColumnEntity', expr) in context.attributes:
+ return context.attributes[('_ColumnEntity', expr)]
- def _tables(self):
- if not self.__tables:
- self.__tables = sql_util.find_tables(self.column, check_columns=True)
- return self.__tables
- _tables = property(_tables)
-
- def _get_entity_clauses(self, query):
if self.alias_id:
try:
- return query._alias_ids[self.alias_id][0]
+ aliases = query._alias_ids[self.alias_id][0]
except KeyError:
raise exceptions.InvalidRequestError("Query has no alias identified by '%s'" % self.alias_id)
- if isinstance(self.column, sql.ColumnElement):
- aliases = list(chain(*[query._alias_ids[t] for t in self._tables if t in query._alias_ids]))
- if len(aliases) > 1:
- raise exceptions.InvalidRequestError("Ambiguous join for entity '%s'; specify id=<someid> to query.join()/query.add_column()" % str(self.column))
- elif len(aliases) == 1:
- return aliases[0]
+ def _locate_aliased(element):
+ if element in query._alias_ids:
+ return aliases
+ else:
+ def _locate_aliased(element):
+ if element in query._alias_ids:
+ aliases = query._alias_ids[element]
+ if len(aliases) > 1:
+ raise exceptions.InvalidRequestError("Ambiguous join for entity '%s'; specify id=<someid> to query.join()/query.add_column(), or use the aliased() function to use explicit class aliases." % expr)
+ return aliases[0]
+ return None
- return None
-
+ class Adapter(visitors.ClauseVisitor):
+ def before_clone(self, element):
+ if isinstance(element, expression.FromClause):
+ alias = _locate_aliased(element)
+ if alias:
+ return alias.alias
+
+ if hasattr(element, 'table'):
+ alias = _locate_aliased(element.table)
+ if alias:
+ return alias.aliased_column(element)
+
+ return None
+
+ context.attributes[('_ColumnEntity', expr)] = ret = Adapter().traverse(expr, clone=True)
+ return ret
+
def row_processor(self, query, context):
- clauses = self._get_entity_clauses(query)
- if clauses:
- def proc(context, row):
- return clauses.row_decorator(row)[self.column]
- else:
- def proc(context, row):
- return row[self.column]
+ column = self.__resolve_expr_against_query_aliases(query, self.column, context)
+ def proc(context, row):
+ return row[column]
return proc
def setup_context(self, query, context):
- clauses = self._get_entity_clauses(query)
- if clauses:
- context.secondary_columns.append(clauses.aliased_column(self.column))
- else:
- context.secondary_columns.append(self.column)
+ column = self.__resolve_expr_against_query_aliases(query, self.column, context)
+ context.secondary_columns.append(column)
def __str__(self):
return str(self.column)
-
Query.logger = logging.class_logger(Query)
finally:
self.path = oldpath
+
+
_runid = 1L
_id_lock = util.threading.Lock()
from sqlalchemy.sql import util as sql_util
from sqlalchemy.sql.util import row_adapter as create_row_adapter
from sqlalchemy.sql import visitors
-from sqlalchemy.orm.interfaces import MapperExtension, EXT_CONTINUE
+from sqlalchemy.orm.interfaces import MapperExtension, EXT_CONTINUE, PropComparator
all_cascades = util.Set(["delete", "delete-orphan", "all", "merge",
"expunge", "save-update", "refresh-expire", "none"])
return self.methods.get(key, self._pass)
class AliasedClauses(object):
- """Creates aliases of a mapped tables for usage in ORM queries.
- """
+ """Creates aliases of a mapped tables for usage in ORM queries, and provides expression adaptation."""
- def __init__(self, alias, equivalents=None, chain_to=None):
+ def __init__(self, alias, equivalents=None, chain_to=None, should_adapt=True):
self.alias = alias
self.equivalents = equivalents
self.row_decorator = self._create_row_adapter()
- self.adapter = sql_util.ClauseAdapter(self.alias, equivalents=equivalents)
+ self.should_adapt = should_adapt
+ if should_adapt:
+ self.adapter = sql_util.ClauseAdapter(self.alias, equivalents=equivalents)
+ else:
+ self.adapter = visitors.NullVisitor()
+
if chain_to:
self.adapter.chain(chain_to.adapter)
def aliased_column(self, column):
-
+ if not self.should_adapt:
+ return column
+
conv = self.alias.corresponding_column(column)
if conv:
return conv
-
- aliased_column = column
- class ModifySubquery(visitors.ClauseVisitor):
- def visit_select(s, select):
- select._should_correlate = False
- select.append_correlation(self.alias)
- aliased_column = sql_util.ClauseAdapter(self.alias, equivalents=self.equivalents).chain(ModifySubquery()).traverse(aliased_column, clone=True)
- aliased_column = aliased_column.label(None)
+
+ # process column-level subqueries
+ aliased_column = sql_util.ClauseAdapter(self.alias, equivalents=self.equivalents).traverse(column, clone=True)
+
+ # add to row decorator explicitly
self.row_decorator({}).map[column] = aliased_column
return aliased_column
class PropertyAliasedClauses(AliasedClauses):
"""extends AliasedClauses to add support for primary/secondary joins on a relation()."""
- def __init__(self, prop, primaryjoin, secondaryjoin, parentclauses=None, alias=None):
+ def __init__(self, prop, primaryjoin, secondaryjoin, parentclauses=None, alias=None, should_adapt=True):
self.prop = prop
self.mapper = self.prop.mapper
self.table = self.prop.table
from_obj = self.mapper._with_polymorphic_selectable()
alias = from_obj.alias()
- super(PropertyAliasedClauses, self).__init__(alias, equivalents=self.mapper._equivalent_columns, chain_to=parentclauses)
+ super(PropertyAliasedClauses, self).__init__(alias, equivalents=self.mapper._equivalent_columns, chain_to=parentclauses, should_adapt=should_adapt)
if prop.secondary:
self.secondary = prop.secondary.alias()
else:
self.order_by = None
+class AliasedClass(object):
+ def __new__(cls, target):
+ from sqlalchemy.orm import attributes
+ mapper = _class_to_mapper(target)
+ alias = mapper.mapped_table.alias()
+ retcls = type(target.__name__ + "Alias", (cls,), {'alias':alias})
+ retcls._class_state = mapper._class_state
+ for prop in mapper.iterate_properties:
+ existing = mapper._class_state.attrs[prop.key]
+ setattr(retcls, prop.key, attributes.InstrumentedAttribute(existing.impl, comparator=AliasedComparator(alias, existing.comparator)))
+
+ return retcls
+
+ def __init__(self, alias):
+ self.alias = alias
+
+class AliasedComparator(PropComparator):
+ def __init__(self, alias, comparator):
+ self.alias = alias
+ self.comparator = comparator
+ self.adapter = sql_util.ClauseAdapter(alias)
+
+ def clause_element(self):
+ return self.adapter.traverse(self.comparator.clause_element(), clone=True)
+
+ def operate(self, op, *other, **kwargs):
+ return self.adapter.traverse(self.comparator.operate(op, *other, **kwargs), clone=True)
+
+ def reverse_operate(self, op, other, **kwargs):
+ return self.adapter.traverse(self.comparator.reverse_operate(op, *other, **kwargs), clone=True)
+
+from sqlalchemy.sql import expression
+_selectable = expression._selectable
+def _orm_selectable(selectable):
+ if _is_mapped_class(selectable):
+ if _is_aliased_class(selectable):
+ return selectable.alias
+ else:
+ return _class_to_mapper(selectable)._with_polymorphic_selectable()
+ else:
+ return _selectable(selectable)
+expression._selectable = _orm_selectable
+
+class _ORMJoin(expression.Join):
+ """future functionality."""
+
+ __visit_name__ = expression.Join.__visit_name__
+
+ def __init__(self, left, right, onclause=None, isouter=False):
+ if _is_mapped_class(left) or _is_mapped_class(right):
+ if hasattr(left, '_orm_mappers'):
+ left_mapper = left._orm_mappers[1]
+ adapt_from = left.right
+ else:
+ left_mapper = _class_to_mapper(left)
+ if _is_aliased_class(left):
+ adapt_from = left.alias
+ else:
+ adapt_from = None
+
+ right_mapper = _class_to_mapper(right)
+ self._orm_mappers = (left_mapper, right_mapper)
+
+ if isinstance(onclause, basestring):
+ prop = left_mapper.get_property(onclause)
+
+ if _is_aliased_class(right):
+ adapt_to = right.alias
+ else:
+ adapt_to = None
+
+ pj, sj, source, dest, target_adapter = prop._create_joins(source_selectable=adapt_from, dest_selectable=adapt_to, source_polymorphic=True, dest_polymorphic=True)
+
+ if sj:
+ left = sql.join(left, prop.secondary, onclause=pj)
+ onclause = sj
+ else:
+ onclause = pj
+ expression.Join.__init__(self, left, right, onclause, isouter)
+
+ def join(self, right, onclause=None, isouter=False):
+ return _ORMJoin(self, right, onclause, isouter)
+
+ def outerjoin(self, right, onclause=None):
+ return _ORMJoin(self, right, onclause, True)
+
+def _join(left, right, onclause=None):
+ """future functionality."""
+
+ return _ORMJoin(left, right, onclause, False)
+
+def _outerjoin(left, right, onclause=None):
+ """future functionality."""
+
+ return _ORMJoin(left, right, onclause, True)
+
+def has_identity(object):
+ return hasattr(object, '_instance_key')
+
+def _state_has_identity(state):
+ return '_instance_key' in state.dict
+
+def _is_mapped_class(cls):
+ return hasattr(cls, '_class_state')
+
+def _is_aliased_class(obj):
+ return isinstance(obj, type) and issubclass(obj, AliasedClass)
+
+def has_mapper(object):
+ """Return True if the given object has had a mapper association
+ set up, either through loading, or via insertion in a session.
+ """
+
+ return hasattr(object, '_entity_name')
+
+def _state_mapper(state, entity_name=None):
+ return state.class_._class_state.mappers[state.dict.get('_entity_name', entity_name)]
+
+def object_mapper(object, entity_name=None, raiseerror=True):
+ """Given an object, return the primary Mapper associated with the object instance.
+
+ object
+ The object instance.
+
+ entity_name
+ Entity name of the mapper to retrieve, if the given instance is
+ transient. Otherwise uses the entity name already associated
+ with the instance.
+
+ raiseerror
+ Defaults to True: raise an ``InvalidRequestError`` if no mapper can
+ be located. If False, return None.
+
+ """
+
+ try:
+ mapper = object.__class__._class_state.mappers[getattr(object, '_entity_name', entity_name)]
+ except (KeyError, AttributeError):
+ if raiseerror:
+ raise exceptions.InvalidRequestError("Class '%s' entity name '%s' has no mapper associated with it" % (object.__class__.__name__, getattr(object, '_entity_name', entity_name)))
+ else:
+ return None
+ return mapper
+
+def class_mapper(class_, entity_name=None, compile=True, raiseerror=True):
+ """Given a class and optional entity_name, return the primary Mapper associated with the key.
+
+ If no mapper can be located, raises ``InvalidRequestError``.
+ """
+
+ try:
+ mapper = class_._class_state.mappers[entity_name]
+ except (KeyError, AttributeError):
+ if raiseerror:
+ raise exceptions.InvalidRequestError("Class '%s' entity name '%s' has no mapper associated with it" % (class_.__name__, entity_name))
+ else:
+ return None
+ if compile:
+ return mapper.compile()
+ else:
+ return mapper
+
+def _class_to_mapper(class_or_mapper, entity_name=None, compile=True):
+ if isinstance(class_or_mapper, type):
+ return class_mapper(class_or_mapper, entity_name=entity_name, compile=compile)
+ else:
+ if compile:
+ return class_or_mapper.compile()
+ else:
+ return class_or_mapper
+
def instance_str(instance):
"""Return a string describing an instance."""
is otherwise internal to SQLAlchemy.
"""
-import string, re
+import string, re, itertools
from sqlalchemy import schema, engine, util, exceptions
from sqlalchemy.sql import operators, functions
from sqlalchemy.sql import expression as sql
BIND_PARAMS = re.compile(r'(?<![:\w\$\x5c]):([\w\$]+)(?![:\w\$])', re.UNICODE)
BIND_PARAMS_ESC = re.compile(r'\x5c(:[\w\$]+)(?![:\w\$])', re.UNICODE)
-ANONYMOUS_LABEL = re.compile(r'{ANON (-?\d+) (.*?)}')
+ANONYMOUS_LABEL = re.compile(r'{ANON (-?\d+) ([^{}]+)}')
BIND_TEMPLATES = {
'pyformat':"%%(%(name)s)s",
def _truncated_identifier(self, ident_class, name):
if (ident_class, name) in self.generated_ids:
return self.generated_ids[(ident_class, name)]
-
+
anonname = ANONYMOUS_LABEL.sub(self._process_anon, name)
if len(anonname) > self.dialect.max_identifier_length:
truncname = anonname
self.generated_ids[(ident_class, name)] = truncname
return truncname
-
+
def _process_anon(self, match):
(ident, derived) = match.group(1,2)
+
key = ('anonymous', ident)
if key in self.generated_ids:
return self.generated_ids[key]
not isinstance(column.table, sql.Select):
return column.label(column.name)
elif not isinstance(column, (sql._UnaryExpression, sql._TextClause)) and (not hasattr(column, 'name') or isinstance(column, sql._Function)):
- return column.anon_label
+ return column.label(column.anon_label)
else:
return column
froms = select._get_display_froms(existingfroms)
- correlate_froms = util.Set()
- for f in froms:
- correlate_froms.add(f)
- correlate_froms.update(f._get_from_objects())
+ correlate_froms = util.Set(itertools.chain(*([froms] + [f._get_from_objects() for f in froms])))
# TODO: might want to propigate existing froms for select(select(select))
# where innermost select should correlate to outermost
"""
return _UnaryExpression(column, modifier=operators.asc_op)
-def outerjoin(left, right, onclause=None, **kwargs):
+def outerjoin(left, right, onclause=None):
"""Return an ``OUTER JOIN`` clause element.
The returned object is an instance of [sqlalchemy.sql.expression#Join].
methods on the resulting ``Join`` object.
"""
- return Join(left, right, onclause, isouter = True, **kwargs)
+ return Join(left, right, onclause, isouter=True)
-def join(left, right, onclause=None, **kwargs):
+def join(left, right, onclause=None, isouter=False):
"""Return a ``JOIN`` clause element (regular inner join).
The returned object is an instance of [sqlalchemy.sql.expression#Join].
methods on the resulting ``Join`` object.
"""
- return Join(left, right, onclause, **kwargs)
+ return Join(left, right, onclause, isouter)
def select(columns=None, whereclause=None, from_obj=[], **kwargs):
"""Returns a ``SELECT`` clause element.
return _Function(self.__names[-1], packagenames=self.__names[0:-1], *c, **o)
+# "func" global - i.e. func.count()
func = _FunctionGenerator()
+# "modifier" global - i.e. modifier.distinct
# TODO: use UnaryExpression for this instead ?
modifier = _FunctionGenerator(group=False)
def _clone(element):
return element._clone()
+def _expand_cloned(elements):
+ """expand the given set of ClauseElements to be the set of all 'cloned' predecessors."""
+
+ return itertools.chain(*[x._cloned_set for x in elements])
+
+def _cloned_intersection(a, b):
+ """return the intersection of sets a and b, counting
+ any overlap between 'cloned' predecessors.
+
+ The returned set is in terms of the enties present within 'a'.
+
+ """
+ all_overlap = util.Set(_expand_cloned(a)).intersection(_expand_cloned(b))
+ return a.intersection(
+ [
+ elem for elem in a if all_overlap.intersection(elem._cloned_set)
+ ]
+ )
+
def _compound_select(keyword, *selects, **kwargs):
return CompoundSelect(keyword, *selects, **kwargs)
else:
raise exceptions.ArgumentError("Object '%s' is not a Selectable and does not implement `__selectable__()`" % repr(element))
+
def is_column(col):
"""True if ``col`` is an instance of ``ColumnElement``."""
return isinstance(col, ColumnElement)
co = _ColumnClause(name, selectable, type_=getattr(self, 'type', None))
else:
name = str(self)
- co = _ColumnClause(self.anon_label.name, selectable, type_=getattr(self, 'type', None))
+ co = _ColumnClause(self.anon_label, selectable, type_=getattr(self, 'type', None))
co.proxies = [self]
selectable.columns[name]= co
"""
if not hasattr(self, '_ColumnElement__anon_label'):
- self.__anon_label = self.label(None)
+ self.__anon_label = "{ANON %d %s}" % (id(self), getattr(self, 'name', 'anon'))
return self.__anon_label
anon_label = property(anon_label)
col = list(self.columns)[0]
return select([func.count(col).label('tbl_row_count')], whereclause, from_obj=[self], **params)
- def select(self, whereclauses = None, **params):
+ def select(self, whereclause=None, **params):
"""return a SELECT of this ``FromClause``."""
- return select([self], whereclauses, **params)
+ return select([self], whereclause, **params)
- def join(self, right, *args, **kwargs):
+ def join(self, right, onclause=None, isouter=False):
"""return a join of this ``FromClause`` against another ``FromClause``."""
- return Join(self, right, *args, **kwargs)
+ return Join(self, right, onclause, isouter)
- def outerjoin(self, right, *args, **kwargs):
+ def outerjoin(self, right, onclause=None):
"""return an outer join of this ``FromClause`` against another ``FromClause``."""
- return Join(self, right, isouter=True, *args, **kwargs)
+ return Join(self, right, onclause, True)
def alias(self, name=None):
"""return an alias of this ``FromClause`` against another ``FromClause``."""
return getattr(self, 'name', self.__class__.__name__ + " object")
description = property(description)
- def _clone_from_clause(self):
+ def _reset_exported(self):
# delete all the "generated" collections of columns for a
# newly cloned FromClause, so that they will be re-derived
# from the item. this is because FromClause subclasses, when
def _copy_internals(self, clone=_clone):
_CalculatedClause._copy_internals(self, clone=clone)
- self._clone_from_clause()
+ self._reset_exported()
def get_children(self, **kwargs):
return _CalculatedClause.get_children(self, **kwargs)
s = select(*args, **kwargs).as_scalar().self_group()
_UnaryExpression.__init__(self, s, operator=operators.exists)
- def select(self, whereclauses = None, **params):
- return select([self], whereclauses, **params)
+ def select(self, whereclause=None, **params):
+ return select([self], whereclause, **params)
def correlate(self, fromclause):
e = self._clone()
off all ``FromClause`` subclasses.
"""
- def __init__(self, left, right, onclause=None, isouter = False):
+ def __init__(self, left, right, onclause=None, isouter=False):
self.left = _selectable(left)
self.right = _selectable(right).self_group()
if onclause is None:
- self.onclause = self._match_primaries(self.left, self.right)
+ self.onclause = self.__match_primaries(self.left, self.right)
else:
self.onclause = onclause
+
self.isouter = isouter
self.__folded_equivalents = None
self._oid_column = self.left.oid_column
def _copy_internals(self, clone=_clone):
- self._clone_from_clause()
+ self._reset_exported()
self.left = clone(self.left)
self.right = clone(self.right)
self.onclause = clone(self.onclause)
def get_children(self, **kwargs):
return self.left, self.right, self.onclause
- def _match_primaries(self, primary, secondary):
+ def __match_primaries(self, primary, secondary):
crit = []
constraints = util.Set()
for fk in secondary.foreign_keys:
else:
return and_(*crit)
- def _folded_equivalents(self, equivs=None):
- """Returns the column list of this Join with all equivalently-named,
- equated columns folded into one column, where 'equated' means they are
- equated to each other in the ON clause of this join.
-
- this method is used by select(fold_equivalents=True).
-
- The primary usage for this is when generating UNIONs so that
- each selectable can have distinctly-named columns without the need
- for use_labels=True.
- """
-
- if self.__folded_equivalents is not None:
- return self.__folded_equivalents
- if equivs is None:
- equivs = util.Set()
- class LocateEquivs(visitors.NoColumnVisitor):
- def visit_binary(self, binary):
- if binary.operator == operators.eq and binary.left.name == binary.right.name:
- equivs.add(binary.right)
- equivs.add(binary.left)
- LocateEquivs().traverse(self.onclause)
- collist = []
- if isinstance(self.left, Join):
- left = self.left._folded_equivalents(equivs)
- else:
- left = list(self.left.columns)
- if isinstance(self.right, Join):
- right = self.right._folded_equivalents(equivs)
- else:
- right = list(self.right.columns)
- used = util.Set()
- for c in left + right:
- if c in equivs:
- if c.name not in used:
- collist.append(c)
- used.add(c.name)
- else:
- collist.append(c)
- self.__folded_equivalents = collist
- return self.__folded_equivalents
- folded_equivalents = property(_folded_equivalents)
-
- def select(self, whereclause = None, fold_equivalents=False, **kwargs):
+ def select(self, whereclause=None, fold_equivalents=False, **kwargs):
"""Create a ``Select`` from this ``Join``.
whereclause
"""
if fold_equivalents:
- collist = self.folded_equivalents
+ global sql_util
+ if not sql_util:
+ from sqlalchemy.sql import util as sql_util
+ collist = sql_util.folded_equivalents(self)
else:
collist = [self.left, self.right]
self._oid_column = self.selectable.oid_column._make_proxy(self)
def _copy_internals(self, clone=_clone):
- self._clone_from_clause()
+ self._reset_exported()
self.selectable = _clone(self.selectable)
baseselectable = self.selectable
while isinstance(baseselectable, Alias):
return []
def _bind_param(self, obj):
- return _BindParamClause(self._label, obj, type_=self.type, unique=True)
+ return _BindParamClause(self.name, obj, type_=self.type, unique=True)
def _make_proxy(self, selectable, name = None):
# propigate the "is_literal" flag only if we are keeping our name,
col = list(self.columns)[0]
return select([func.count(col).label('tbl_row_count')], whereclause, from_obj=[self], **params)
- def join(self, right, *args, **kwargs):
- return Join(self, right, *args, **kwargs)
-
- def outerjoin(self, right, *args, **kwargs):
- return Join(self, right, isouter = True, *args, **kwargs)
-
- def alias(self, name=None):
- return Alias(self, name)
-
- def select(self, whereclause = None, **params):
- return select([self], whereclause, **params)
-
def insert(self, values=None, inline=False, **kwargs):
return insert(self, values=values, inline=inline, **kwargs)
is eligible to be used as a scalar expression.
The returned object is an instance of [sqlalchemy.sql.expression#_ScalarSelect].
- """
+ """
return _ScalarSelect(self)
def apply_labels(self):
name, such as "SELECT somecolumn AS tablename_somecolumn". This allows selectables which
contain multiple FROM clauses to produce a unique set of column names regardless of name conflicts
among the individual FROM clauses.
- """
+ """
s = self._generate()
s.use_labels = True
return s
with a label.
See also ``as_scalar()``.
- """
+ """
return self.as_scalar().label(name)
def supports_execution(self):
return s
def _generate(self):
- s = self._clone()
- s._clone_from_clause()
+ s = self.__class__.__new__(self.__class__)
+ s.__dict__ = self.__dict__.copy()
+ s._reset_exported()
return s
def limit(self, limit):
"""return a new selectable with the given list of ORDER BY criterion applied.
The criterion will be appended to any pre-existing ORDER BY criterion.
- """
+ """
s = self._generate()
s.append_order_by(*clauses)
return s
"""return a new selectable with the given list of GROUP BY criterion applied.
The criterion will be appended to any pre-existing GROUP BY criterion.
- """
+ """
s = self._generate()
s.append_group_by(*clauses)
return s
The criterion will be appended to any pre-existing ORDER BY criterion.
- Note that this mutates the Select construct such that derived attributes,
- such as the "primary_key", "oid_column", and child "froms" collection may
- be invalid if they have already been initialized. Consider the generative
- form of this method instead to prevent this issue.
"""
-
if len(clauses) == 1 and clauses[0] is None:
self._order_by_clause = ClauseList()
else:
The criterion will be appended to any pre-existing GROUP BY criterion.
- Note that this mutates the Select construct such that derived attributes,
- such as the "primary_key", "oid_column", and child "froms" collection may
- be invalid if they have already been initialized. Consider the generative
- form of this method instead to prevent this issue.
"""
-
if len(clauses) == 1 and clauses[0] is None:
self._group_by_clause = ClauseList()
else:
clauses = list(self._group_by_clause) + list(clauses)
self._group_by_clause = ClauseList(*clauses)
- def select(self, whereclauses = None, **params):
- """return a SELECT of this selectable.
-
- This has the effect of embeddeding this select into a subquery that is selected
- from.
- """
- return select([self], whereclauses, **params)
-
def _get_from_objects(self, is_where=False, **modifiers):
if is_where:
return []
self._oid_column = col
def _copy_internals(self, clone=_clone):
- self._clone_from_clause()
+ self._reset_exported()
self.selects = [clone(s) for s in self.selects]
if hasattr(self, '_col_map'):
del self._col_map
self._should_correlate = correlate
self._distinct = distinct
- # NOTE: the _generate()
- # operation creates a *shallow* copy of the object, so append_XXX() methods,
- # usually called via a generative method, create a copy of each collection
- # by default
-
- self.__correlate = util.Set()
- self._having = None
- self._prefixes = []
+ self._correlate = util.Set()
if columns:
self._raw_columns = [
]
else:
self._raw_columns = []
-
+
if from_obj:
self._froms = util.Set([
_is_literal(f) and _TextFromClause(f) or f
])
else:
self._froms = util.Set()
-
+
if whereclause:
self._whereclause = _literal_as_text(whereclause)
else:
rendered in the FROM clause of enclosing selects; this Select
may want to leave those absent if it is automatically
correlating.
+
"""
-
froms = util.OrderedSet()
for col in self._raw_columns:
toremove = itertools.chain(*[f._hide_froms for f in froms])
froms.difference_update(toremove)
- if len(froms) > 1 or self.__correlate:
- if self.__correlate:
- froms.difference_update(self.__correlate)
- if self._should_correlate and existing_froms is not None:
- froms.difference_update(existing_froms)
-
+ if len(froms) > 1 or self._correlate:
+ if self._correlate:
+ froms.difference_update(_cloned_intersection(froms, self._correlate))
+
+ if self._should_correlate and existing_froms:
+ froms.difference_update(_cloned_intersection(froms, existing_froms))
+
+ if not len(froms):
+ raise exceptions.InvalidRequestError("Select statement '%s' returned no FROM clauses due to auto-correlation; specify correlate(<tables>) to control correlation manually." % self)
+
return froms
froms = property(_get_display_froms, doc="""Return a list of all FromClause elements which will be applied to the FROM clause of the resulting statement.""")
This set is a superset of that returned by the ``froms`` property, which
is specifically for those FromClause elements that would actually be rendered.
+
"""
if hasattr(self, '_all_froms'):
return self._all_froms
- froms = util.Set()
- for col in self._raw_columns:
- for f in col._get_from_objects():
- froms.add(f)
+ froms = util.Set(
+ itertools.chain(*
+ [self._froms] +
+ [f._get_from_objects() for f in self._froms] +
+ [col._get_from_objects() for col in self._raw_columns]
+ )
+ )
- if self._whereclause is not None:
- for f in self._whereclause._get_from_objects(is_where=True):
- froms.add(f)
+ if self._whereclause:
+ froms.update(self._whereclause._get_from_objects(is_where=True))
- for elem in self._froms:
- froms.add(elem)
- for f in elem._get_from_objects():
- froms.add(f)
self._all_froms = froms
return froms
def inner_columns(self):
- """a collection of all ColumnElement expressions which would
+ """an iteratorof all ColumnElement expressions which would
be rendered into the columns clause of the resulting SELECT statement.
- """
+ """
for c in self._raw_columns:
if isinstance(c, Selectable):
for co in c.columns:
return False
def _copy_internals(self, clone=_clone):
- self._clone_from_clause()
- self._recorrelate_froms([(f, clone(f)) for f in self._froms])
+ self._reset_exported()
+ from_cloned = dict([(f, clone(f)) for f in self._froms.union(self._correlate)])
+ self._froms = util.Set([from_cloned[f] for f in self._froms])
+ self._correlate = util.Set([from_cloned[f] for f in self._correlate])
self._raw_columns = [clone(c) for c in self._raw_columns]
for attr in ('_whereclause', '_having', '_order_by_clause', '_group_by_clause'):
if getattr(self, attr) is not None:
list(self.locate_all_froms()) + \
[x for x in (self._whereclause, self._having, self._order_by_clause, self._group_by_clause) if x is not None]
- def _recorrelate_froms(self, froms):
- newcorrelate = util.Set()
- newfroms = util.Set()
- oldfroms = util.Set(self._froms)
- for old, new in froms:
- if old in self.__correlate:
- newcorrelate.add(new)
- self.__correlate.remove(old)
- if old in oldfroms:
- newfroms.add(new)
- oldfroms.remove(old)
- self.__correlate = self.__correlate.union(newcorrelate)
- self._froms = [f for f in oldfroms.union(newfroms)]
-
def column(self, column):
"""return a new select() construct with the given column expression added to its columns clause."""
s = self._generate()
- s.append_column(column)
+ column = _literal_as_column(column)
+
+ if isinstance(column, _ScalarSelect):
+ column = column.self_group(against=operators.comma_op)
+
+ s._raw_columns = s._raw_columns + [column]
+
return s
def where(self, whereclause):
columns clause, not using any commas."""
s = self._generate()
- s.append_prefix(clause)
+ clause = _literal_as_text(clause)
+ s._prefixes = s._prefixes + [clause]
return s
def select_from(self, fromclause):
FROM objects."""
s = self._generate()
- s.append_from(fromclause)
- return s
+ if _is_literal(fromclause):
+ fromclause = _TextFromClause(fromclause)
- def __dont_correlate(self):
- s = self._generate()
- s._should_correlate = False
+ s._froms = s._froms.union([fromclause])
return s
- def correlate(self, fromclause):
- """return a new select() construct which will correlate the given FROM clause to that
+ def correlate(self, *fromclauses):
+ """return a new select() construct which will correlate the given FROM clauses to that
of an enclosing select(), if a match is found.
By "match", the given fromclause must be present in this select's list of FROM objects
select() auto-correlates all of its FROM clauses to those of an embedded select when
compiled.
- If the fromclause is None, the select() will not correlate to anything.
+ If the fromclause is None, correlation is disabled for the returned select().
+
"""
-
s = self._generate()
s._should_correlate=False
- if fromclause is None:
- s.__correlate = util.Set()
+ if fromclauses == (None,):
+ s._correlate = util.Set()
else:
- s.append_correlation(fromclause)
+ s._correlate = s._correlate.union(fromclauses)
return s
- def append_correlation(self, fromclause, _copy_collection=True):
- """append the given correlation expression to this select() construct.
-
- Note that this mutates the Select construct such that derived attributes,
- such as the "primary_key", "oid_column", and child "froms" collection may
- be invalid if they have already been initialized. Consider the generative
- form of this method instead to prevent this issue.
- """
-
- if not _copy_collection:
- self.__correlate.add(fromclause)
- else:
- self.__correlate = util.Set(list(self.__correlate) + [fromclause])
-
- def append_column(self, column, _copy_collection=True):
- """append the given column expression to the columns clause of this select() construct.
+ def append_correlation(self, fromclause):
+ """append the given correlation expression to this select() construct."""
+
+ self._should_correlate=False
+ self._correlate.add(fromclause)
- Note that this mutates the Select construct such that derived attributes,
- such as the "primary_key", "oid_column", and child "froms" collection may
- be invalid if they have already been initialized. Consider the generative
- form of this method instead to prevent this issue.
- """
+ def append_column(self, column):
+ """append the given column expression to the columns clause of this select() construct."""
column = _literal_as_column(column)
if isinstance(column, _ScalarSelect):
column = column.self_group(against=operators.comma_op)
- if not _copy_collection:
- self._raw_columns.append(column)
- else:
- self._raw_columns = self._raw_columns + [column]
-
- def append_prefix(self, clause, _copy_collection=True):
- """append the given columns clause prefix expression to this select() construct.
+ self._raw_columns.append(column)
+ self._reset_exported()
- Note that this mutates the Select construct such that derived attributes,
- such as the "primary_key", "oid_column", and child "froms" collection may
- be invalid if they have already been initialized. Consider the generative
- form of this method instead to prevent this issue.
- """
+ def append_prefix(self, clause):
+ """append the given columns clause prefix expression to this select() construct."""
clause = _literal_as_text(clause)
- if not _copy_collection:
- self._prefixes.append(clause)
- else:
- self._prefixes = self._prefixes + [clause]
+ self._prefixes.append(clause)
def append_whereclause(self, whereclause):
"""append the given expression to this select() construct's WHERE criterion.
The expression will be joined to existing WHERE criterion via AND.
- Note that this mutates the Select construct such that derived attributes,
- such as the "primary_key", "oid_column", and child "froms" collection may
- be invalid if they have already been initialized. Consider the generative
- form of this method instead to prevent this issue.
"""
-
- if self._whereclause is not None:
+ if self._whereclause is not None:
self._whereclause = and_(self._whereclause, _literal_as_text(whereclause))
else:
self._whereclause = _literal_as_text(whereclause)
The expression will be joined to existing HAVING criterion via AND.
- Note that this mutates the Select construct such that derived attributes,
- such as the "primary_key", "oid_column", and child "froms" collection may
- be invalid if they have already been initialized. Consider the generative
- form of this method instead to prevent this issue.
"""
-
if self._having is not None:
self._having = and_(self._having, _literal_as_text(having))
else:
self._having = _literal_as_text(having)
- def append_from(self, fromclause, _copy_collection=True):
+ def append_from(self, fromclause):
"""append the given FromClause expression to this select() construct's FROM clause.
- Note that this mutates the Select construct such that derived attributes,
- such as the "primary_key", "oid_column", and child "froms" collection may
- be invalid if they have already been initialized. Consider the generative
- form of this method instead to prevent this issue.
"""
-
if _is_literal(fromclause):
fromclause = _TextFromClause(fromclause)
- if not _copy_collection:
- self._froms.add(fromclause)
- else:
- self._froms = util.Set(list(self._froms) + [fromclause])
+ self._froms.add(fromclause)
def __exportable_columns(self):
for column in self._raw_columns:
This produces an element that can be embedded in an expression. Note that
this method is called automatically as needed when constructing expressions.
- """
+ """
if isinstance(against, CompoundSelect):
return self
return _FromGrouping(self)
def _table_iterator(self):
return iter([self.table])
+ def _generate(self):
+ s = self.__class__.__new__(self.__class__)
+ s.__dict__ = self.__dict__.copy()
+ return s
+
def _process_colparams(self, parameters):
if parameters is None:
If multiple prefixes are supplied, they will be separated with
spaces.
"""
- gen = self._clone()
+ gen = self._generate()
clause = _literal_as_text(clause)
gen._prefixes = self._prefixes + [clause]
return gen
"""return a new update() construct with the given expression added to its WHERE clause, joined
to the existing clause via AND, if any."""
- s = self._clone()
+ s = self._generate()
if s._whereclause is not None:
s._whereclause = and_(s._whereclause, _literal_as_text(whereclause))
else:
"""return a new delete() construct with the given expression added to its WHERE clause, joined
to the existing clause via AND, if any."""
- s = self._clone()
+ s = self._generate()
if s._whereclause is not None:
s._whereclause = and_(s._whereclause, _literal_as_text(whereclause))
else:
pairs = []
visitors.traverse(expression, visit_binary=visit_binary)
return pairs
+
+def folded_equivalents(join, equivs=None):
+ """Returns the column list of the given Join with all equivalently-named,
+ equated columns folded into one column, where 'equated' means they are
+ equated to each other in the ON clause of this join.
+
+ This function is used by Join.select(fold_equivalents=True).
+ TODO: deprecate ?
+ """
+
+ if equivs is None:
+ equivs = util.Set()
+ def visit_binary(binary):
+ if binary.operator == operators.eq and binary.left.name == binary.right.name:
+ equivs.add(binary.right)
+ equivs.add(binary.left)
+ visitors.traverse(join.onclause, visit_binary=visit_binary)
+ collist = []
+ if isinstance(join.left, expression.Join):
+ left = folded_equivalents(join.left, equivs)
+ else:
+ left = list(join.left.columns)
+ if isinstance(join.right, expression.Join):
+ right = folded_equivalents(join.right, equivs)
+ else:
+ right = list(join.right.columns)
+ used = util.Set()
+ for c in left + right:
+ if c in equivs:
+ if c.name not in used:
+ collist.append(c)
+ used.add(c.name)
+ else:
+ collist.append(c)
+ return collist
+
class AliasedRow(object):
def __init__(self, row, map):
__traverse_options__ = {'column_collections':False}
-
+class NullVisitor(ClauseVisitor):
+ def traverse(self, obj, clone=False):
+ next = getattr(self, '_next', None)
+ if next:
+ return next.traverse(obj, clone=clone)
+ else:
+ return obj
+
def traverse(clause, **kwargs):
"""traverse the given clause, applying visit functions passed in as keyword arguments."""
for o in iterable:
self.add(o)
-
class UniqueAppender(object):
"""Only adds items to a collection once.
def test_update(self):
t = table('sometable', column('somecolumn'))
- self.assert_compile(t.update(t.c.somecolumn==7), "UPDATE sometable SET somecolumn=:somecolumn WHERE sometable.somecolumn = :sometable_somecolumn_1", dict(somecolumn=10))
+ self.assert_compile(t.update(t.c.somecolumn==7), "UPDATE sometable SET somecolumn=:somecolumn WHERE sometable.somecolumn = :somecolumn_1", dict(somecolumn=10))
def test_count(self):
t = table('sometable', column('somecolumn'))
def test_delete_schema(self):
metadata = MetaData()
tbl = Table('test', metadata, Column('id', Integer, primary_key=True), schema='paj')
- self.assert_compile(tbl.delete(tbl.c.id == 1), "DELETE FROM paj.test WHERE paj.test.id = :paj_test_id_1")
+ self.assert_compile(tbl.delete(tbl.c.id == 1), "DELETE FROM paj.test WHERE paj.test.id = :id_1")
def test_union(self):
t1 = table('t1',
select([t2.c.col3.label('col3'), t2.c.col4.label('col4')], t2.c.col2.in_(["t2col2r2", "t2col2r3"]))
)
u = union(s1, s2, order_by=['col3', 'col4'])
- self.assert_compile(u, "SELECT t1.col3 AS col3, t1.col4 AS col4 FROM t1 WHERE t1.col2 IN (:t1_col2_1, :t1_col2_2) "\
- "UNION SELECT t2.col3 AS col3, t2.col4 AS col4 FROM t2 WHERE t2.col2 IN (:t2_col2_1, :t2_col2_2) ORDER BY col3, col4")
+ self.assert_compile(u, "SELECT t1.col3 AS col3, t1.col4 AS col4 FROM t1 WHERE t1.col2 IN (:col2_1, :col2_2) "\
+ "UNION SELECT t2.col3 AS col3, t2.col4 AS col4 FROM t2 WHERE t2.col2 IN (:col2_3, :col2_4) ORDER BY col3, col4")
self.assert_compile(u.alias('bar').select(), "SELECT bar.col3, bar.col4 FROM (SELECT t1.col3 AS col3, t1.col4 AS col4 FROM t1 WHERE "\
- "t1.col2 IN (:t1_col2_1, :t1_col2_2) UNION SELECT t2.col3 AS col3, t2.col4 AS col4 FROM t2 WHERE t2.col2 IN (:t2_col2_1, :t2_col2_2)) AS bar")
+ "t1.col2 IN (:col2_1, :col2_2) UNION SELECT t2.col3 AS col3, t2.col4 AS col4 FROM t2 WHERE t2.col2 IN (:col2_3, :col2_4)) AS bar")
def test_function(self):
self.assert_compile(func.foo(1, 2), "foo(:foo_1, :foo_2)")
self.assert_compile(query,
"SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername \
FROM mytable, myothertable WHERE \
-(mytable.name = :mytable_name_1 OR mytable.myid = :mytable_myid_1 OR \
-myothertable.othername != :myothertable_othername_1 OR EXISTS (select yay from foo where boo = lar)) \
+(mytable.name = :name_1 OR mytable.myid = :myid_1 OR \
+myothertable.othername != :othername_1 OR EXISTS (select yay from foo where boo = lar)) \
AND mytable.myid = myothertable.otherid(+)",
dialect=oracle.OracleDialect(use_ansi = False))
order_by(addresses.oid_column, address_types.oid_column)
self.assert_compile(s, "SELECT address_types_1.id, address_types_1.name, addresses.id, addresses.user_id, "
"addresses.address_type_id, addresses.email_address FROM addresses LEFT OUTER JOIN address_types address_types_1 "
- "ON addresses.address_type_id = address_types_1.id WHERE addresses.user_id = :addresses_user_id_1 ORDER BY addresses.rowid, "
+ "ON addresses.address_type_id = address_types_1.id WHERE addresses.user_id = :user_id_1 ORDER BY addresses.rowid, "
"address_types.rowid")
class SchemaReflectionTest(TestBase, AssertsCompiledSQL):
session.clear()
def query1():
- session = create_session(metadata.bind)
+ session = create_session(testing.db)
user = session.query(User).first()
return user.addresses.all()
def query2():
- session = create_session(metadata.bind)
+ session = create_session(testing.db)
return session.query(User).first().addresses.all()
def query3():
- session = create_session(metadata.bind)
+ session = create_session(testing.db)
user = session.query(User).first()
return session.query(User).first().addresses.all()
if testing.against('sqlite'):
self.assert_sql(testing.db, go, [
(
- "SELECT nodes.id AS nodes_id, nodes.parent_id AS nodes_parent_id, nodes.data AS nodes_data FROM nodes WHERE nodes.data = :nodes_data_1 ORDER BY nodes.oid LIMIT 1 OFFSET 0",
- {'nodes_data_1': 'n1'}
+ "SELECT nodes.id AS nodes_id, nodes.parent_id AS nodes_parent_id, nodes.data AS nodes_data FROM nodes WHERE nodes.data = :data_1 ORDER BY nodes.oid LIMIT 1 OFFSET 0",
+ {'data_1': 'n1'}
),
])
c1
)
+ if select_type != '':
+ self.assertEquals(
+ sess.query(Person).filter(Engineer.machines.any(Machine.name=="Commodore 64")).all(), [e2, e3]
+ )
+
+ self.assertEquals(
+ sess.query(Person).filter(Person.paperwork.any(Paperwork.description=="review #2")).all(), [m1]
+ )
+
if select_type == '':
+ # this tests that a hand-rolled criterion in the any() doesn't get clobbered by
+ # aliasing, when the mapper is not set up for polymorphic joins
self.assertEquals(
sess.query(Company).filter(Company.employees.any(and_(Engineer.primary_language=='cobol', people.c.person_id==engineers.c.person_id))).one(),
c2
)
+ else:
+ self.assertEquals(
+ sess.query(Company).filter(Company.employees.any(and_(Engineer.primary_language=='cobol'))).one(),
+ c2
+ )
+
def test_expire(self):
"""test that individual column refresh doesn't get tripped up by the select_table mapper"""
import testenv; testenv.configure_for_tests()
import operator
from sqlalchemy import *
-from sqlalchemy import exceptions
+from sqlalchemy import exceptions, util
from sqlalchemy.sql import compiler
from sqlalchemy.engine import default
from sqlalchemy.orm import *
+
from testlib import *
from testlib import engines
from testlib.fixtures import *
+from sqlalchemy.orm.util import _join as join, _outerjoin as outerjoin
+
class QueryTest(FixtureTest):
keep_mappers = True
keep_data = True
(operator.sub, '-'), (operator.div, '/'),
):
for (lhs, rhs, res) in (
- (5, User.id, ':users_id_1 %s users.id'),
+ (5, User.id, ':id_1 %s users.id'),
(5, literal(6), ':param_1 %s :param_2'),
- (User.id, 5, 'users.id %s :users_id_1'),
+ (User.id, 5, 'users.id %s :id_1'),
(User.id, literal('b'), 'users.id %s :param_1'),
(User.id, User.id, 'users.id %s users.id'),
(literal(5), 'b', ':param_1 %s :param_2'),
(operator.le, '<=', '>='),
(operator.ge, '>=', '<=')):
for (lhs, rhs, l_sql, r_sql) in (
- ('a', User.id, ':users_id_1', 'users.id'),
+ ('a', User.id, ':id_1', 'users.id'),
('a', literal('b'), ':param_2', ':param_1'), # note swap!
- (User.id, 'b', 'users.id', ':users_id_1'),
+ (User.id, 'b', 'users.id', ':id_1'),
(User.id, literal('b'), 'users.id', ':param_1'),
(User.id, User.id, 'users.id', 'users.id'),
(literal('a'), 'b', ':param_1', ':param_2'),
fwd_sql + "'\n or\n'" + rev_sql + "'")
def test_op(self):
- assert str(User.name.op('ilike')('17').compile(dialect=default.DefaultDialect())) == "users.name ilike :users_name_1"
+ assert str(User.name.op('ilike')('17').compile(dialect=default.DefaultDialect())) == "users.name ilike :name_1"
def test_in(self):
self._test(User.id.in_(['a', 'b']),
- "users.id IN (:users_id_1, :users_id_2)")
+ "users.id IN (:id_1, :id_2)")
def test_between(self):
self._test(User.id.between('a', 'b'),
- "users.id BETWEEN :users_id_1 AND :users_id_2")
+ "users.id BETWEEN :id_1 AND :id_2")
def test_clauses(self):
for (expr, compare) in (
assert [Keyword(name='red'), Keyword(name='small'), Keyword(name='square')] == k
-class JoinTest(QueryTest):
+class JoinTest(QueryTest, AssertsCompiledSQL):
def test_getjoinable_tables(self):
sess = create_session()
[]
)
+ orderalias = aliased(Order)
+ itemalias = aliased(Item)
+ self.assertEquals(
+ sess.query(User).join([('orders', orderalias), ('items', itemalias)]).filter(itemalias.description == 'item 4').all(),
+ [User(name='jack')]
+ )
+ self.assertEquals(
+ sess.query(User).join([('orders', orderalias), ('items', itemalias)]).filter(orderalias.user_id==9).filter(itemalias.description=='item 4').all(),
+ []
+ )
+ def test_aliased_classes(self):
+ sess = create_session()
+
+ (user7, user8, user9, user10) = sess.query(User).all()
+ (address1, address2, address3, address4, address5) = sess.query(Address).all()
+ expected = [(user7, address1),
+ (user8, address2),
+ (user8, address3),
+ (user8, address4),
+ (user9, address5),
+ (user10, None)]
+
+ q = sess.query(User)
+ AdAlias = aliased(Address)
+ q = q.add_entity(AdAlias).select_from(outerjoin(User, AdAlias))
+ l = q.all()
+ self.assertEquals(l, expected)
+
+ sess.clear()
+
+ q = sess.query(User).add_entity(AdAlias)
+ l = q.select_from(outerjoin(User, AdAlias)).filter(AdAlias.email_address=='ed@bettyboop.com').all()
+ self.assertEquals(l, [(user8, address3)])
+
+
+ l = q.select_from(outerjoin(User, AdAlias, 'addresses')).filter(AdAlias.email_address=='ed@bettyboop.com').all()
+ self.assertEquals(l, [(user8, address3)])
+
+ l = q.select_from(outerjoin(User, AdAlias, User.id==AdAlias.user_id)).filter(AdAlias.email_address=='ed@bettyboop.com').all()
+ self.assertEquals(l, [(user8, address3)])
+
+ def test_aliased_classes_m2m(self):
+ sess = create_session()
+
+ (order1, order2, order3, order4, order5) = sess.query(Order).all()
+ (item1, item2, item3, item4, item5) = sess.query(Item).all()
+ expected = [
+ (order1, item1),
+ (order1, item2),
+ (order1, item3),
+ (order2, item1),
+ (order2, item2),
+ (order2, item3),
+ (order3, item3),
+ (order3, item4),
+ (order3, item5),
+ (order4, item1),
+ (order4, item5),
+ (order5, item5),
+ ]
+ self.assert_compile(join(Order, Item, 'items'), "orders JOIN order_items ON orders.id = order_items.order_id JOIN items ON items.id = order_items.item_id")
+
+ q = sess.query(Order)
+ q = q.add_entity(Item).select_from(join(Order, Item, 'items'))
+ l = q.all()
+ self.assertEquals(l, expected)
+
+ IAlias = aliased(Item)
+ q = sess.query(Order, IAlias).select_from(join(Order, IAlias, 'items')).filter(IAlias.description=='item 3')
+ l = q.all()
+ self.assertEquals(l,
+ [
+ (order1, item3),
+ (order2, item3),
+ (order3, item3),
+ ]
+ )
def test_generative_join(self):
# test that alised_ids is copied
self.assert_sql_count(testing.db, go, 1)
sess.clear()
- def test_columns(self):
+ def test_values(self):
sess = create_session()
sel = users.select(User.id.in_([7, 8])).alias()
q = sess.query(User)
- q2 = q.select_from(sel)._values(User.name)
- self.assertEquals(q2.all(), [(u'jack',), (u'ed',)])
+ q2 = q.select_from(sel).values(User.name)
+ self.assertEquals(list(q2), [(u'jack',), (u'ed',)])
q = sess.query(User)
- q2 = q._values(User.name, User.name + " " + cast(User.id, String)).order_by(User.id)
- self.assertEquals(q2.all(), [(u'jack', u'jack 7'), (u'ed', u'ed 8'), (u'fred', u'fred 9'), (u'chuck', u'chuck 10')])
+ q2 = q.order_by(User.id).values(User.name, User.name + " " + cast(User.id, String))
+ self.assertEquals(list(q2), [(u'jack', u'jack 7'), (u'ed', u'ed 8'), (u'fred', u'fred 9'), (u'chuck', u'chuck 10')])
- q2 = q._values(User.name.like('%j%'), func.count(User.name.like('%j%'))).group_by([User.name.like('%j%')]).order_by(desc(User.name.like('%j%')))
- self.assertEquals(q2.all(), [(True, 1), (False, 3)])
+ q2 = q.group_by([User.name.like('%j%')]).order_by(desc(User.name.like('%j%'))).values(User.name.like('%j%'), func.count(User.name.like('%j%')))
+ self.assertEquals(list(q2), [(True, 1), (False, 3)])
- q2 = q.join('addresses').filter(User.name.like('%e%')).order_by(User.id, Address.id)._values(User.name, Address.email_address)
- self.assertEquals(q2.all(), [(u'ed', u'ed@wood.com'), (u'ed', u'ed@bettyboop.com'), (u'ed', u'ed@lala.com'), (u'fred', u'fred@fred.com')])
+ q2 = q.join('addresses').filter(User.name.like('%e%')).order_by(User.id, Address.id).values(User.name, Address.email_address)
+ self.assertEquals(list(q2), [(u'ed', u'ed@wood.com'), (u'ed', u'ed@bettyboop.com'), (u'ed', u'ed@lala.com'), (u'fred', u'fred@fred.com')])
- q2 = q.join('addresses').filter(User.name.like('%e%'))._values(User.name, Address.email_address).order_by(desc(Address.email_address))[1:3]
- self.assertEquals(q2.all(), [(u'ed', u'ed@wood.com'), (u'ed', u'ed@lala.com')])
+ q2 = q.join('addresses').filter(User.name.like('%e%')).order_by(desc(Address.email_address))[1:3].values(User.name, Address.email_address)
+ self.assertEquals(list(q2), [(u'ed', u'ed@wood.com'), (u'ed', u'ed@lala.com')])
- q2 = q.join('addresses', aliased=True).filter(User.name.like('%e%'))._values(User.name, Address.email_address)
- self.assertEquals(q2.all(), [(u'ed', u'ed@wood.com'), (u'ed', u'ed@bettyboop.com'), (u'ed', u'ed@lala.com'), (u'fred', u'fred@fred.com')])
+ q2 = q.join('addresses', aliased=True).filter(User.name.like('%e%')).values(User.name, Address.email_address)
+ self.assertEquals(list(q2), [(u'ed', u'ed@wood.com'), (u'ed', u'ed@bettyboop.com'), (u'ed', u'ed@lala.com'), (u'fred', u'fred@fred.com')])
- q2 = q._values(func.count(User.name))
- assert q2.one() == (4,)
+ q2 = q.values(func.count(User.name))
+ assert q2.next() == (4,)
u2 = users.alias()
- q2 = q.select_from(sel).filter(u2.c.id>1)._values(users.c.name, sel.c.name, u2.c.name).order_by([users.c.id, sel.c.id, u2.c.id])
- self.assertEquals(q2.all(), [(u'jack', u'jack', u'jack'), (u'jack', u'jack', u'ed'), (u'jack', u'jack', u'fred'), (u'jack', u'jack', u'chuck'), (u'ed', u'ed', u'jack'), (u'ed', u'ed', u'ed'), (u'ed', u'ed', u'fred'), (u'ed', u'ed', u'chuck')])
-
- q2 = q.select_from(sel).filter(users.c.id>1)._values(users.c.name, sel.c.name, User.name)
- self.assertEquals(q2.all(), [(u'jack', u'jack', u'jack'), (u'ed', u'ed', u'ed')])
+ q2 = q.select_from(sel).filter(u2.c.id>1).order_by([users.c.id, sel.c.id, u2.c.id]).values(users.c.name, sel.c.name, u2.c.name)
+ self.assertEquals(list(q2), [(u'jack', u'jack', u'jack'), (u'jack', u'jack', u'ed'), (u'jack', u'jack', u'fred'), (u'jack', u'jack', u'chuck'), (u'ed', u'ed', u'jack'), (u'ed', u'ed', u'ed'), (u'ed', u'ed', u'fred'), (u'ed', u'ed', u'chuck')])
+ q2 = q.select_from(sel).filter(users.c.id>1).values(users.c.name, sel.c.name, User.name)
+ self.assertEquals(list(q2), [(u'jack', u'jack', u'jack'), (u'ed', u'ed', u'ed')])
+
def test_multi_mappers(self):
test_session = create_session()
sess.clear()
q = sess.query(User, Address).join('addresses', aliased=aliased).options(eagerload('addresses')).filter_by(email_address='ed@bettyboop.com')
- assert q.all() == [(user8, address3)]
+ self.assertEquals(list(util.OrderedSet(q.all())), [(user8, address3)])
sess.clear()
def test_aliased_multi_mappers(self):
(user7, user8, user9, user10) = sess.query(User).all()
(address1, address2, address3, address4, address5) = sess.query(Address).all()
- # note the result is a cartesian product
expected = [(user7, address1),
(user8, address2),
(user8, address3),
sess.clear()
self.assertRaises(exceptions.InvalidRequestError, sess.query(User).add_column, object())
-
+
+ def test_ambiguous_column(self):
+ sess = create_session()
+
+ q = sess.query(User).join('addresses', aliased=True).join('addresses', aliased=True).add_column(Address.id)
+ self.assertRaises(exceptions.InvalidRequestError, iter, q)
+
def test_multi_columns_2(self):
"""test aliased/nonalised joins with the usage of add_column()"""
sess = create_session()
(user9, 1, "Name:fred"),
(user10, 0, "Name:chuck")]
+ q = create_session().query(User).add_column(func.count(addresses.c.id))\
+ .add_column(("Name:" + users.c.name)).outerjoin('addresses', aliased=True)\
+ .group_by([c for c in users.c]).order_by(users.c.id)
+
+ assert q.all() == expected
+
# test with a straight statement
s = select([users, func.count(addresses.c.id).label('count'), ("Name:" + users.c.name).label('concat')], from_obj=[users.outerjoin(addresses)], group_by=[c for c in users.c], order_by=[users.c.id])
q = create_session().query(User)
node = sess.query(Node).filter_by(data='n122').join('parent', aliased=True).filter_by(data='n12').\
join('parent', aliased=True, from_joinpoint=True).filter_by(data='n1').first()
assert node.data == 'n122'
+
+ def test_explicit_join(self):
+ sess = create_session()
+
+ n1 = aliased(Node)
+ n2 = aliased(Node)
+
+ node = sess.query(Node).select_from(join(Node, n1, 'children')).filter(n1.data=='n122').first()
+ assert node.data=='n12'
+
+ node = sess.query(Node).select_from(join(Node, n1, 'children').join(n2, 'children')).\
+ filter(n2.data=='n122').first()
+ assert node.data=='n1'
+
+ # mix explicit and named onclauses
+ node = sess.query(Node).select_from(join(Node, n1, Node.id==n1.parent_id).join(n2, 'children')).\
+ filter(n2.data=='n122').first()
+ assert node.data=='n1'
+
+ node = sess.query(Node).select_from(join(Node, n1, 'parent').join(n2, 'parent')).\
+ filter(and_(Node.data=='n122', n1.data=='n12', n2.data=='n1')).first()
+ assert node.data == 'n122'
+ self.assertEquals(
+ list(sess.query(Node).select_from(join(Node, n1, 'parent').join(n2, 'parent')).\
+ filter(and_(Node.data=='n122', n1.data=='n12', n2.data=='n1')).values(Node.data, n1.data, n2.data)),
+ [('n122', 'n12', 'n1')])
+
def test_any(self):
sess = create_session()
self.assertEquals(sess.query(Node).filter(Node.children.contains(n4)).order_by(Node.data).all(), [Node(data='n1'), Node(data='n3')])
self.assertEquals(sess.query(Node).filter(not_(Node.children.contains(n4))).order_by(Node.data).all(), [Node(data='n2'), Node(data='n4'), Node(data='n5'), Node(data='n6'), Node(data='n7')])
+
+ def test_explicit_join(self):
+ sess = create_session()
+
+ n1 = aliased(Node)
+ self.assertEquals(
+ sess.query(Node).select_from(join(Node, n1, 'children')).filter(n1.data.in_(['n3', 'n7'])).all(),
+ [Node(data='n1'), Node(data='n2')]
+ )
class ExternalColumnsTest(QueryTest):
keep_mappers = False
pass
def test_external_columns_bad(self):
- """test that SA catches some common mis-configurations of external columns."""
- f = (users.c.id * 2)
- try:
- mapper(User, users, properties={
- 'concat': f,
- })
- class_mapper(User)
- except exceptions.ArgumentError, e:
- assert str(e) == "Column '%s' is not represented in mapper's table. Use the `column_property()` function to force this column to be mapped as a read-only attribute." % str(f)
- else:
- raise 'expected ArgumentError'
+
+ self.assertRaisesMessage(exceptions.ArgumentError, "not represented in mapper's table", mapper, User, users, properties={
+ 'concat': (users.c.id * 2),
+ })
clear_mappers()
- try:
- mapper(User, users, properties={
- 'count': column_property(select([func.count(addresses.c.id)], users.c.id==addresses.c.user_id).correlate(users))
- })
- except exceptions.ArgumentError, e:
- assert str(e) == 'column_property() must be given a ColumnElement as its argument. Try .label() or .as_scalar() for Selectables to fix this.'
- else:
- raise 'expected ArgumentError'
+
+ self.assertRaisesMessage(exceptions.ArgumentError, "must be given a ColumnElement as its argument.", column_property,
+ select([func.count(addresses.c.id)], users.c.id==addresses.c.user_id).correlate(users)
+ )
def test_external_columns_good(self):
"""test querying mappings that reference external columns or selectables."""
+
mapper(User, users, properties={
'concat': column_property((users.c.id * 2)),
'count': column_property(select([func.count(addresses.c.id)], users.c.id==addresses.c.user_id).correlate(users).as_scalar())
})
sess = create_session()
+
+
l = sess.query(User).all()
assert [
User(id=7, concat=14, count=1),
Address(id=5, user=User(id=9, concat=18, count=1))
]
- assert address_result == sess.query(Address).all()
+ self.assertEquals(sess.query(Address).all(), address_result)
# run the eager version twice to test caching of aliased clauses
for x in range(2):
sess.clear()
def go():
- assert address_result == sess.query(Address).options(eagerload('user')).all()
+ self.assertEquals(sess.query(Address).options(eagerload('user')).all(), address_result)
self.assert_sql_count(testing.db, go, 1)
-
+
tuple_address_result = [(address, address.user) for address in address_result]
+
+ q =sess.query(Address).join('user', aliased=True, id='ualias').join('user', aliased=True).add_column(User.concat)
+ self.assertRaisesMessage(exceptions.InvalidRequestError, "Ambiguous", q.all)
+
+ self.assertEquals(sess.query(Address).join('user', aliased=True, id='ualias').add_entity(User, id='ualias').all(), tuple_address_result)
+
+ self.assertEquals(sess.query(Address).join('user', aliased=True, id='ualias').join('user', aliased=True).\
+ add_column(User.concat, id='ualias').add_column(User.count, id='ualias').all(),
+ [
+ (Address(id=1), 14, 1),
+ (Address(id=2), 16, 3),
+ (Address(id=3), 16, 3),
+ (Address(id=4), 16, 3),
+ (Address(id=5), 18, 1)
+ ]
+ )
- tuple_address_result == sess.query(Address).join('user').add_entity(User).all()
+ self.assertEquals(list(sess.query(Address).join('user').values(Address.id, User.id, User.concat, User.count)),
+ [(1, 7, 14, 1), (2, 8, 16, 3), (3, 8, 16, 3), (4, 8, 16, 3), (5, 9, 18, 1)]
+ )
- assert tuple_address_result == sess.query(Address).join('user', aliased=True, id='ualias').add_entity(User, id='ualias').all()
+ self.assertEquals(list(sess.query(Address).join('user', aliased=True).values(Address.id, User.id, User.concat, User.count)),
+ [(1, 7, 14, 1), (2, 8, 16, 3), (3, 8, 16, 3), (4, 8, 16, 3), (5, 9, 18, 1)]
+ )
+ ua = aliased(User)
+ self.assertEquals(list(sess.query(Address, ua).select_from(join(Address,ua, 'user')).values(Address.id, ua.id, ua.concat, ua.count)),
+ [(1, 7, 14, 1), (2, 8, 16, 3), (3, 8, 16, 3), (4, 8, 16, 3), (5, 9, 18, 1)]
+ )
if __name__ == '__main__':
self.assertRaises(exceptions.ArgumentError, case, [("x", "y")])
self.assert_compile(case([("x", "y")], value=t.c.col1), "CASE test.col1 WHEN :param_1 THEN :param_2 END")
- self.assert_compile(case([(t.c.col1==7, "y")], else_="z"), "CASE WHEN (test.col1 = :test_col1_1) THEN :param_1 ELSE :param_2 END")
+ self.assert_compile(case([(t.c.col1==7, "y")], else_="z"), "CASE WHEN (test.col1 = :col1_1) THEN :param_1 ELSE :param_2 END")
@testing.fails_on('maxdb')
clause = t1.c.col2 == t2.c.col2
assert str(clause) == ClauseVisitor().traverse(clause, clone=True)
+ def test_binary_anon_label_quirk(self):
+ t = table('t1', column('col1'))
+
+
+ f = t.c.col1 * 5
+ self.assert_compile(select([f]), "SELECT t1.col1 * :col1_1 AS anon_1 FROM t1")
+
+ f.anon_label
+
+ a = t.alias()
+ f = sql_util.ClauseAdapter(a).traverse(f)
+
+ self.assert_compile(select([f]), "SELECT t1_1.col1 * :col1_1 AS anon_1 FROM t1 AS t1_1")
+
def test_join(self):
clause = t1.join(t2, t1.c.col2==t2.c.col2)
c1 = str(clause)
clause2 = Vis().traverse(clause, clone=True)
assert c1 == str(clause)
assert str(clause2) == str(t1.join(t2, t1.c.col2==t2.c.col3))
-
+
def test_text(self):
clause = text("select * from table where foo=:bar", bindparams=[bindparam('bar')])
c1 = str(clause)
def visit_binary(self, binary):
if binary.left is t1.c.col3:
binary.left = t1.c.col1
- binary.right = bindparam("table1_col1", unique=True)
+ binary.right = bindparam("col1", unique=True)
s5 = Vis().traverse(s4, clone=True)
print str(s4)
print str(s5)
s2 = ClauseVisitor().traverse(s, clone=True).alias()
s3 = select([s], s.c.col2==s2.c.col2)
self.assert_compile(s3, "SELECT anon_1.col1, anon_1.col2, anon_1.col3 FROM (SELECT table1.col1 AS col1, table1.col2 AS col2, "\
- "table1.col3 AS col3 FROM table1 WHERE table1.col1 = :table1_col1_1) AS anon_1, "\
- "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :table1_col1_2) AS anon_2 "\
+ "table1.col3 AS col3 FROM table1 WHERE table1.col1 = :col1_1) AS anon_1, "\
+ "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :col1_2) AS anon_2 "\
"WHERE anon_1.col2 = anon_2.col2")
@testing.emits_warning('.*replaced by another column with the same key')
def visit_select(self, select):
select.append_whereclause(t1.c.col2==7)
- self.assert_compile(Vis().traverse(s, clone=True), "SELECT * FROM table1 WHERE table1.col1 = table2.col1 AND table1.col2 = :table1_col2_1")
+ self.assert_compile(Vis().traverse(s, clone=True), "SELECT * FROM table1 WHERE table1.col1 = table2.col1 AND table1.col2 = :col2_1")
class ClauseAdapterTest(TestBase, AssertsCompiledSQL):
def setUpAll(self):
column("col3"),
)
+ def test_correlation_on_clone(self):
+ t1alias = t1.alias('t1alias')
+ t2alias = t2.alias('t2alias')
+ vis = sql_util.ClauseAdapter(t1alias)
+
+ s = select(['*'], from_obj=[t1alias, t2alias]).as_scalar()
+ assert t2alias in s._froms
+ assert t1alias in s._froms
+
+ self.assert_compile(select(['*'], t2alias.c.col1==s), "SELECT * FROM table2 AS t2alias WHERE t2alias.col1 = (SELECT * FROM table1 AS t1alias)")
+ s = vis.traverse(s, clone=True)
+ assert t2alias not in s._froms # not present because it's been cloned
+ assert t1alias in s._froms # present because the adapter placed it there
+ # correlate list on "s" needs to take into account the full _cloned_set for each element in _froms when correlating
+ self.assert_compile(select(['*'], t2alias.c.col1==s), "SELECT * FROM table2 AS t2alias WHERE t2alias.col1 = (SELECT * FROM table1 AS t1alias)")
+
+ s = select(['*'], from_obj=[t1alias, t2alias]).correlate(t2alias).as_scalar()
+ self.assert_compile(select(['*'], t2alias.c.col1==s), "SELECT * FROM table2 AS t2alias WHERE t2alias.col1 = (SELECT * FROM table1 AS t1alias)")
+ s = vis.traverse(s, clone=True)
+ self.assert_compile(select(['*'], t2alias.c.col1==s), "SELECT * FROM table2 AS t2alias WHERE t2alias.col1 = (SELECT * FROM table1 AS t1alias)")
+ s = ClauseVisitor().traverse(s, clone=True)
+ self.assert_compile(select(['*'], t2alias.c.col1==s), "SELECT * FROM table2 AS t2alias WHERE t2alias.col1 = (SELECT * FROM table1 AS t1alias)")
+
+ s = select(['*']).where(t1.c.col1==t2.c.col1).as_scalar()
+ self.assert_compile(select([t1.c.col1, s]), "SELECT table1.col1, (SELECT * FROM table2 WHERE table1.col1 = table2.col1) AS anon_1 FROM table1")
+ vis = sql_util.ClauseAdapter(t1alias)
+ s = vis.traverse(s, clone=True)
+ self.assert_compile(select([t1alias.c.col1, s]), "SELECT t1alias.col1, (SELECT * FROM table2 WHERE t1alias.col1 = table2.col1) AS anon_1 FROM table1 AS t1alias")
+ s = ClauseVisitor().traverse(s, clone=True)
+ self.assert_compile(select([t1alias.c.col1, s]), "SELECT t1alias.col1, (SELECT * FROM table2 WHERE t1alias.col1 = table2.col1) AS anon_1 FROM table1 AS t1alias")
+ s = select(['*']).where(t1.c.col1==t2.c.col1).correlate(t1).as_scalar()
+ self.assert_compile(select([t1.c.col1, s]), "SELECT table1.col1, (SELECT * FROM table2 WHERE table1.col1 = table2.col1) AS anon_1 FROM table1")
+ vis = sql_util.ClauseAdapter(t1alias)
+ s = vis.traverse(s, clone=True)
+ self.assert_compile(select([t1alias.c.col1, s]), "SELECT t1alias.col1, (SELECT * FROM table2 WHERE t1alias.col1 = table2.col1) AS anon_1 FROM table1 AS t1alias")
+ s = ClauseVisitor().traverse(s, clone=True)
+ self.assert_compile(select([t1alias.c.col1, s]), "SELECT t1alias.col1, (SELECT * FROM table2 WHERE t1alias.col1 = table2.col1) AS anon_1 FROM table1 AS t1alias")
+
+
def test_table_to_alias(self):
t1alias = t1.alias('t1alias')
self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t1), clone=True), "SELECT * FROM table2 WHERE t1alias.col1 = table2.col2")
self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t2), clone=True), "SELECT * FROM table1 AS t1alias WHERE t1alias.col1 = table2.col2")
+
s = select(['*'], from_obj=[t1]).alias('foo')
self.assert_compile(s.select(), "SELECT foo.* FROM (SELECT * FROM table1) AS foo")
self.assert_compile(vis.traverse(s.select(), clone=True), "SELECT foo.* FROM (SELECT * FROM table1 AS t1alias) AS foo")
def test_select(self):
self.assert_compile(t1.select().where(t1.c.col1==5).order_by(t1.c.col3),
- "SELECT table1.col1, table1.col2, table1.col3 FROM table1 WHERE table1.col1 = :table1_col1_1 ORDER BY table1.col3")
+ "SELECT table1.col1, table1.col2, table1.col3 FROM table1 WHERE table1.col1 = :col1_1 ORDER BY table1.col3")
self.assert_compile(t1.select().select_from(select([t2], t2.c.col1==t1.c.col1)).order_by(t1.c.col3),
"SELECT table1.col1, table1.col2, table1.col3 FROM table1, (SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 "\
self.assert_compile(x, "SELECT anon_1.this_is_the_primarykey_column AS anon_1_this_is_the_prim_1, anon_1.this_is_the_data_column AS anon_1_this_is_the_data_2 "
"FROM (SELECT some_large_named_table.this_is_the_primarykey_column AS this_is_the_primarykey_column, some_large_named_table.this_is_the_data_column AS this_is_the_data_column "
"FROM some_large_named_table "
- "WHERE some_large_named_table.this_is_the_primarykey_column = :some_large_named_table__1) AS anon_1", dialect=compile_dialect)
+ "WHERE some_large_named_table.this_is_the_primarykey_column = :this_is_the_primarykey__1) AS anon_1", dialect=compile_dialect)
print x.execute().fetchall()
import datetime, re, operator
from sqlalchemy import *
from sqlalchemy import exceptions, sql, util
-from sqlalchemy.sql import table, column
+from sqlalchemy.sql import table, column, compiler
from sqlalchemy.databases import sqlite, postgres, mysql, oracle, firebird, mssql
from testlib import *
)
,
"SELECT myid, name, description FROM (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable "\
- "WHERE mytable.name = :mytable_name_1) WHERE myid = :myid_1")
+ "WHERE mytable.name = :name_1) WHERE myid = :myid_1")
sq = select([table1])
self.assert_compile(
self.assert_compile(
sq.select(sq.c.myid == 7),
"SELECT sq.myid, sq.name, sq.description FROM \
-(SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable) AS sq WHERE sq.myid = :sq_myid_1"
+(SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable) AS sq WHERE sq.myid = :myid_1"
)
sq = select(
sqstring = "SELECT mytable.myid AS mytable_myid, mytable.name AS mytable_name, \
mytable.description AS mytable_description, myothertable.otherid AS myothertable_otherid, \
myothertable.othername AS myothertable_othername FROM mytable, myothertable \
-WHERE mytable.myid = :mytable_myid_1 AND myothertable.otherid = mytable.myid"
+WHERE mytable.myid = :myid_1 AND myothertable.otherid = mytable.myid"
self.assert_compile(sq.select(), "SELECT sq.mytable_myid, sq.mytable_name, sq.mytable_description, sq.myothertable_otherid, \
sq.myothertable_othername FROM (" + sqstring + ") AS sq")
def test_nested_uselabels(self):
"""test nested anonymous label generation. this
essentially tests the ANONYMOUS_LABEL regex.
- """
+ """
s1 = table1.select()
s2 = s1.alias()
s3 = select([s2], use_labels=True)
"anon_1.anon_2_description AS anon_1_anon_2_description FROM (SELECT anon_2.myid AS anon_2_myid, anon_2.name AS anon_2_name, "\
"anon_2.description AS anon_2_description FROM (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description "\
"AS description FROM mytable) AS anon_2) AS anon_1")
-
+
def test_dont_overcorrelate(self):
self.assert_compile(select([table1], from_obj=[table1, table1.select()]), """SELECT mytable.myid, mytable.name, mytable.description FROM mytable, (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable)""")
- def test_intentional_full_correlate(self):
- """test a subquery that has no FROM clause."""
-
+ def test_full_correlate(self):
+ # intentional
t = table('t', column('a'), column('b'))
s = select([t.c.a]).where(t.c.a==1).correlate(t).as_scalar()
s2 = select([t.c.a, s])
- self.assert_compile(s2, """SELECT t.a, (SELECT t.a WHERE t.a = :t_a_1) AS anon_1 FROM t""")
+ self.assert_compile(s2, """SELECT t.a, (SELECT t.a WHERE t.a = :a_1) AS anon_1 FROM t""")
+
+ # unintentional
+ t2 = table('t2', column('c'), column('d'))
+ s = select([t.c.a]).where(t.c.a==t2.c.d).as_scalar()
+ s2 =select([t, t2, s])
+ self.assertRaises(exceptions.InvalidRequestError, str, s2)
+
+ # intentional again
+ s = s.correlate(t, t2)
+ s2 =select([t, t2, s])
+ self.assert_compile(s, "SELECT t.a WHERE t.a = t2.d")
def test_exists(self):
- self.assert_compile(exists([table1.c.myid], table1.c.myid==5).select(), "SELECT EXISTS (SELECT mytable.myid FROM mytable WHERE mytable.myid = :mytable_myid_1)", params={'mytable_myid':5})
+ self.assert_compile(exists([table1.c.myid], table1.c.myid==5).select(), "SELECT EXISTS (SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)", params={'mytable_myid':5})
self.assert_compile(select([table1, exists([1], from_obj=table2)]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) FROM mytable", params={})
self.assert_compile(
table1.select(table1.c.myid == select([table1.c.myid], table1.c.name=='jack')),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = (SELECT mytable.myid FROM mytable WHERE mytable.name = :mytable_name_1)"
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = (SELECT mytable.myid FROM mytable WHERE mytable.name = :name_1)"
)
self.assert_compile(
)
self.assert_compile(q,"SELECT places.id, places.nm, zips.zipcode, latlondist((SELECT zips.latitude FROM zips WHERE "
- "zips.zipcode = :zips_zipcode_1), (SELECT zips.longitude FROM zips WHERE zips.zipcode = :zips_zipcode_2)) AS dist "
- "FROM places, zips WHERE zips.zipcode = :zips_zipcode_3 ORDER BY dist, places.nm")
+ "zips.zipcode = :zipcode_1), (SELECT zips.longitude FROM zips WHERE zips.zipcode = :zipcode_2)) AS dist "
+ "FROM places, zips WHERE zips.zipcode = :zipcode_3 ORDER BY dist, places.nm")
zalias = zips.alias('main_zip')
qlat = select([zips.c.latitude], zips.c.zipcode == zalias.c.zipcode, scalar=True)
def test_conjunctions(self):
self.assert_compile(
and_(table1.c.myid == 12, table1.c.name=='asdf', table2.c.othername == 'foo', "sysdate() = today()"),
- "mytable.myid = :mytable_myid_1 AND mytable.name = :mytable_name_1 "\
- "AND myothertable.othername = :myothertable_othername_1 AND sysdate() = today()"
+ "mytable.myid = :myid_1 AND mytable.name = :name_1 "\
+ "AND myothertable.othername = :othername_1 AND sysdate() = today()"
)
self.assert_compile(
or_(table2.c.othername=='asdf', table2.c.othername == 'foo', table2.c.otherid == 9),
"sysdate() = today()",
),
- "mytable.myid = :mytable_myid_1 AND (myothertable.othername = :myothertable_othername_1 OR "\
- "myothertable.othername = :myothertable_othername_2 OR myothertable.otherid = :myothertable_otherid_1) AND sysdate() = today()",
- checkparams = {'myothertable_othername_1': 'asdf', 'myothertable_othername_2':'foo', 'myothertable_otherid_1': 9, 'mytable_myid_1': 12}
+ "mytable.myid = :myid_1 AND (myothertable.othername = :othername_1 OR "\
+ "myothertable.othername = :othername_2 OR myothertable.otherid = :otherid_1) AND sysdate() = today()",
+ checkparams = {'othername_1': 'asdf', 'othername_2':'foo', 'otherid_1': 9, 'myid_1': 12}
)
def test_distinct(self):
(operator.sub, '-'), (operator.div, '/'),
):
for (lhs, rhs, res) in (
- (5, table1.c.myid, ':mytable_myid_1 %s mytable.myid'),
+ (5, table1.c.myid, ':myid_1 %s mytable.myid'),
(5, literal(5), ':param_1 %s :param_2'),
- (table1.c.myid, 'b', 'mytable.myid %s :mytable_myid_1'),
+ (table1.c.myid, 'b', 'mytable.myid %s :myid_1'),
(table1.c.myid, literal(2.7), 'mytable.myid %s :param_1'),
(table1.c.myid, table1.c.myid, 'mytable.myid %s mytable.myid'),
(literal(5), 8, ':param_1 %s :param_2'),
(operator.le, '<=', '>='),
(operator.ge, '>=', '<=')):
for (lhs, rhs, l_sql, r_sql) in (
- ('a', table1.c.myid, ':mytable_myid_1', 'mytable.myid'),
+ ('a', table1.c.myid, ':myid_1', 'mytable.myid'),
('a', literal('b'), ':param_2', ':param_1'), # note swap!
- (table1.c.myid, 'b', 'mytable.myid', ':mytable_myid_1'),
+ (table1.c.myid, 'b', 'mytable.myid', ':myid_1'),
(table1.c.myid, literal('b'), 'mytable.myid', ':param_1'),
(table1.c.myid, table1.c.myid, 'mytable.myid', 'mytable.myid'),
(literal('a'), 'b', ':param_1', ':param_2'),
self.assert_compile(
table1.select((table1.c.myid != 12) & ~(table1.c.name=='john')),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid_1 AND mytable.name != :mytable_name_1"
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :myid_1 AND mytable.name != :name_1"
)
self.assert_compile(
table1.select((table1.c.myid != 12) & ~(table1.c.name.between('jack','john'))),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid_1 AND "\
- "NOT (mytable.name BETWEEN :mytable_name_1 AND :mytable_name_2)"
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :myid_1 AND "\
+ "NOT (mytable.name BETWEEN :name_1 AND :name_2)"
)
self.assert_compile(
table1.select((table1.c.myid != 12) & ~and_(table1.c.name=='john', table1.c.name=='ed', table1.c.name=='fred')),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid_1 AND "\
- "NOT (mytable.name = :mytable_name_1 AND mytable.name = :mytable_name_2 AND mytable.name = :mytable_name_3)"
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :myid_1 AND "\
+ "NOT (mytable.name = :name_1 AND mytable.name = :name_2 AND mytable.name = :name_3)"
)
self.assert_compile(
table1.select((table1.c.myid != 12) & ~table1.c.name),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid_1 AND NOT mytable.name"
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :myid_1 AND NOT mytable.name"
)
self.assert_compile(
# test the op() function, also that its results are further usable in expressions
self.assert_compile(
table1.select(table1.c.myid.op('hoho')(12)==14),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE (mytable.myid hoho :mytable_myid_1) = :param_1"
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE (mytable.myid hoho :myid_1) = :param_1"
)
# test that clauses can be pickled (operators need to be module-level, etc.)
def test_like(self):
for expr, check, dialect in [
- (table1.c.myid.like('somstr'), "mytable.myid LIKE :mytable_myid_1", None),
- (~table1.c.myid.like('somstr'), "mytable.myid NOT LIKE :mytable_myid_1", None),
- (table1.c.myid.like('somstr', escape='\\'), "mytable.myid LIKE :mytable_myid_1 ESCAPE '\\'", None),
- (~table1.c.myid.like('somstr', escape='\\'), "mytable.myid NOT LIKE :mytable_myid_1 ESCAPE '\\'", None),
- (table1.c.myid.ilike('somstr', escape='\\'), "lower(mytable.myid) LIKE lower(:mytable_myid_1) ESCAPE '\\'", None),
- (~table1.c.myid.ilike('somstr', escape='\\'), "lower(mytable.myid) NOT LIKE lower(:mytable_myid_1) ESCAPE '\\'", None),
- (table1.c.myid.ilike('somstr', escape='\\'), "mytable.myid ILIKE %(mytable_myid_1)s ESCAPE '\\'", postgres.PGDialect()),
- (~table1.c.myid.ilike('somstr', escape='\\'), "mytable.myid NOT ILIKE %(mytable_myid_1)s ESCAPE '\\'", postgres.PGDialect()),
- (table1.c.name.ilike('%something%'), "lower(mytable.name) LIKE lower(:mytable_name_1)", None),
- (table1.c.name.ilike('%something%'), "mytable.name ILIKE %(mytable_name_1)s", postgres.PGDialect()),
- (~table1.c.name.ilike('%something%'), "lower(mytable.name) NOT LIKE lower(:mytable_name_1)", None),
- (~table1.c.name.ilike('%something%'), "mytable.name NOT ILIKE %(mytable_name_1)s", postgres.PGDialect()),
+ (table1.c.myid.like('somstr'), "mytable.myid LIKE :myid_1", None),
+ (~table1.c.myid.like('somstr'), "mytable.myid NOT LIKE :myid_1", None),
+ (table1.c.myid.like('somstr', escape='\\'), "mytable.myid LIKE :myid_1 ESCAPE '\\'", None),
+ (~table1.c.myid.like('somstr', escape='\\'), "mytable.myid NOT LIKE :myid_1 ESCAPE '\\'", None),
+ (table1.c.myid.ilike('somstr', escape='\\'), "lower(mytable.myid) LIKE lower(:myid_1) ESCAPE '\\'", None),
+ (~table1.c.myid.ilike('somstr', escape='\\'), "lower(mytable.myid) NOT LIKE lower(:myid_1) ESCAPE '\\'", None),
+ (table1.c.myid.ilike('somstr', escape='\\'), "mytable.myid ILIKE %(myid_1)s ESCAPE '\\'", postgres.PGDialect()),
+ (~table1.c.myid.ilike('somstr', escape='\\'), "mytable.myid NOT ILIKE %(myid_1)s ESCAPE '\\'", postgres.PGDialect()),
+ (table1.c.name.ilike('%something%'), "lower(mytable.name) LIKE lower(:name_1)", None),
+ (table1.c.name.ilike('%something%'), "mytable.name ILIKE %(name_1)s", postgres.PGDialect()),
+ (~table1.c.name.ilike('%something%'), "lower(mytable.name) NOT LIKE lower(:name_1)", None),
+ (~table1.c.name.ilike('%something%'), "mytable.name NOT ILIKE %(name_1)s", postgres.PGDialect()),
]:
self.assert_compile(expr, check, dialect=dialect)
def test_composed_string_comparators(self):
self.assert_compile(
- table1.c.name.contains('jo'), "mytable.name LIKE '%%' || :mytable_name_1 || '%%'" , checkparams = {'mytable_name_1': u'jo'},
+ table1.c.name.contains('jo'), "mytable.name LIKE '%%' || :name_1 || '%%'" , checkparams = {'name_1': u'jo'},
)
self.assert_compile(
- table1.c.name.contains('jo'), "mytable.name LIKE concat(concat('%%', %s), '%%')" , checkparams = {'mytable_name_1': u'jo'},
+ table1.c.name.contains('jo'), "mytable.name LIKE concat(concat('%%', %s), '%%')" , checkparams = {'name_1': u'jo'},
dialect=mysql.dialect()
)
self.assert_compile(
- table1.c.name.contains('jo', escape='\\'), "mytable.name LIKE '%%' || :mytable_name_1 || '%%' ESCAPE '\\'" , checkparams = {'mytable_name_1': u'jo'},
+ table1.c.name.contains('jo', escape='\\'), "mytable.name LIKE '%%' || :name_1 || '%%' ESCAPE '\\'" , checkparams = {'name_1': u'jo'},
)
- self.assert_compile( table1.c.name.startswith('jo', escape='\\'), "mytable.name LIKE :mytable_name_1 || '%%' ESCAPE '\\'" )
- self.assert_compile( table1.c.name.endswith('jo', escape='\\'), "mytable.name LIKE '%%' || :mytable_name_1 ESCAPE '\\'" )
- self.assert_compile( table1.c.name.endswith('hn'), "mytable.name LIKE '%%' || :mytable_name_1", checkparams = {'mytable_name_1': u'hn'}, )
+ self.assert_compile( table1.c.name.startswith('jo', escape='\\'), "mytable.name LIKE :name_1 || '%%' ESCAPE '\\'" )
+ self.assert_compile( table1.c.name.endswith('jo', escape='\\'), "mytable.name LIKE '%%' || :name_1 ESCAPE '\\'" )
+ self.assert_compile( table1.c.name.endswith('hn'), "mytable.name LIKE '%%' || :name_1", checkparams = {'name_1': u'hn'}, )
self.assert_compile(
table1.c.name.endswith('hn'), "mytable.name LIKE concat('%%', %s)",
- checkparams = {'mytable_name_1': u'hn'}, dialect=mysql.dialect()
+ checkparams = {'name_1': u'hn'}, dialect=mysql.dialect()
)
self.assert_compile(
- table1.c.name.startswith(u"hi \xf6 \xf5"), "mytable.name LIKE :mytable_name_1 || '%%'",
- checkparams = {'mytable_name_1': u'hi \xf6 \xf5'},
+ table1.c.name.startswith(u"hi \xf6 \xf5"), "mytable.name LIKE :name_1 || '%%'",
+ checkparams = {'name_1': u'hi \xf6 \xf5'},
)
self.assert_compile(column('name').endswith(text("'foo'")), "name LIKE '%%' || 'foo'" )
self.assert_compile(column('name').endswith(literal_column("'foo'")), "name LIKE '%%' || 'foo'" )
def test_multiple_col_binds(self):
self.assert_compile(
select(["*"], or_(table1.c.myid == 12, table1.c.myid=='asdf', table1.c.myid == 'foo')),
- "SELECT * FROM mytable WHERE mytable.myid = :mytable_myid_1 OR mytable.myid = :mytable_myid_2 OR mytable.myid = :mytable_myid_3"
+ "SELECT * FROM mytable WHERE mytable.myid = :myid_1 OR mytable.myid = :myid_2 OR mytable.myid = :myid_3"
)
def test_orderby_groupby(self):
)
def test_for_update(self):
- self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid_1 FOR UPDATE")
+ self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
- self.assert_compile(table1.select(table1.c.myid==7, for_update="nowait"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid_1 FOR UPDATE")
+ self.assert_compile(table1.select(table1.c.myid==7, for_update="nowait"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
- self.assert_compile(table1.select(table1.c.myid==7, for_update="nowait"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid_1 FOR UPDATE NOWAIT", dialect=oracle.dialect())
+ self.assert_compile(table1.select(table1.c.myid==7, for_update="nowait"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE NOWAIT", dialect=oracle.dialect())
self.assert_compile(table1.select(table1.c.myid==7, for_update="read"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = %s LOCK IN SHARE MODE", dialect=mysql.dialect())
self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = %s FOR UPDATE", dialect=mysql.dialect())
- self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid_1 FOR UPDATE", dialect=oracle.dialect())
+ self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE", dialect=oracle.dialect())
def test_alias(self):
# test the alias for a table1. column names stay the same, table name "changes" to "foo".
t2view.mytable_description AS t2view_mytable_description, t2view.myothertable_otherid AS t2view_myothertable_otherid FROM \
(SELECT mytable.myid AS mytable_myid, mytable.name AS mytable_name, mytable.description AS mytable_description, \
myothertable.otherid AS myothertable_otherid FROM mytable, myothertable \
-WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = :t2view_mytable_myid_1"
+WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = :mytable_myid_1"
)
self.assert_compile(query,
"SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername \
FROM mytable LEFT OUTER JOIN myothertable ON mytable.myid = myothertable.otherid \
-WHERE mytable.name = :mytable_name_1 OR mytable.myid = :mytable_myid_1 OR \
-myothertable.othername != :myothertable_othername_1 OR \
+WHERE mytable.name = :name_1 OR mytable.myid = :myid_1 OR \
+myothertable.othername != :othername_1 OR \
EXISTS (select yay from foo where boo = lar)",
)
)
self.assert_compile(x, "SELECT mytable.myid, mytable.name, mytable.description \
-FROM mytable WHERE mytable.myid = :mytable_myid_1 UNION \
+FROM mytable WHERE mytable.myid = :myid_1 UNION \
SELECT mytable.myid, mytable.name, mytable.description \
-FROM mytable WHERE mytable.myid = :mytable_myid_2 ORDER BY mytable.myid")
+FROM mytable WHERE mytable.myid = :myid_2 ORDER BY mytable.myid")
u1 = union(
select([table1.c.myid, table1.c.name]),
)
,
"SELECT mytable.myid, mytable.name, max(mytable.description) AS max_1 FROM mytable \
-WHERE mytable.name = :mytable_name_1 GROUP BY mytable.myid, mytable.name UNION SELECT mytable.myid, mytable.name, mytable.description \
-FROM mytable WHERE mytable.name = :mytable_name_2"
+WHERE mytable.name = :name_1 GROUP BY mytable.myid, mytable.name UNION SELECT mytable.myid, mytable.name, mytable.description \
+FROM mytable WHERE mytable.name = :name_2"
)
self.assert_compile(
s2 = select([table1, s], table1.c.myid==s)
self.assert_compile(s2,
"SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable WHERE mytable.myid = "\
- ":mytable_myid_1) AS anon_1 FROM mytable WHERE mytable.myid = (SELECT mytable.myid FROM mytable WHERE mytable.myid = :mytable_myid_1)")
+ ":myid_1) AS anon_1 FROM mytable WHERE mytable.myid = (SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)")
positional = s2.compile(dialect=sqlite.dialect())
pp = positional.get_params()
assert [pp[k] for k in positional.positiontup] == [12, 12]
# check that conflicts with "unique" params are caught
- s = select([table1], or_(table1.c.myid==7, table1.c.myid==bindparam('mytable_myid_1')))
- try:
- print str(s)
- assert False
- except exceptions.CompileError, err:
- assert str(err) == "Bind parameter 'mytable_myid_1' conflicts with unique bind parameter of the same name"
+ s = select([table1], or_(table1.c.myid==7, table1.c.myid==bindparam('myid_1')))
+ self.assertRaisesMessage(exceptions.CompileError, "conflicts with unique bind parameter of the same name", str, s)
- s = select([table1], or_(table1.c.myid==7, table1.c.myid==8, table1.c.myid==bindparam('mytable_myid_1')))
- try:
- str(s)
- assert False
- except exceptions.CompileError, err:
- assert str(err) == "Bind parameter 'mytable_myid_1' conflicts with unique bind parameter of the same name"
+ s = select([table1], or_(table1.c.myid==7, table1.c.myid==8, table1.c.myid==bindparam('myid_1')))
+ self.assertRaisesMessage(exceptions.CompileError, "conflicts with unique bind parameter of the same name", str, s)
def test_bind_as_col(self):
t = table('foo', column('id'))
def test_in(self):
self.assert_compile(select([table1], table1.c.myid.in_(['a'])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid_1)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:myid_1)")
self.assert_compile(select([table1], ~table1.c.myid.in_(['a'])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid NOT IN (:mytable_myid_1)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid NOT IN (:myid_1)")
self.assert_compile(select([table1], table1.c.myid.in_(['a', 'b'])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid_1, :mytable_myid_2)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:myid_1, :myid_2)")
self.assert_compile(select([table1], table1.c.myid.in_(iter(['a', 'b']))),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid_1, :mytable_myid_2)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:myid_1, :myid_2)")
self.assert_compile(select([table1], table1.c.myid.in_([literal('a')])),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1)")
self.assert_compile(select([table1], table1.c.myid.in_([literal('a'), 'b'])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, :mytable_myid_1)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, :myid_1)")
self.assert_compile(select([table1], table1.c.myid.in_([literal('a'), literal('b')])),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, :param_2)")
self.assert_compile(select([table1], table1.c.myid.in_(['a', literal('b')])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid_1, :param_1)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:myid_1, :param_1)")
self.assert_compile(select([table1], table1.c.myid.in_([literal(1) + 'a'])),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1 + :param_2)")
self.assert_compile(select([table1], table1.c.myid.in_([literal('a') +'a', 'b'])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1 || :param_2, :mytable_myid_1)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1 || :param_2, :myid_1)")
self.assert_compile(select([table1], table1.c.myid.in_([literal('a') + literal('a'), literal('b')])),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1 || :param_2, :param_3)")
self.assert_compile(select([table1], table1.c.myid.in_([1, literal(3) + 4])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid_1, :param_1 + :param_2)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:myid_1, :param_1 + :param_2)")
self.assert_compile(select([table1], table1.c.myid.in_([literal('a') < 'b'])),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1 < :param_2)")
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (mytable.myid)")
self.assert_compile(select([table1], table1.c.myid.in_(['a', table1.c.myid])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid_1, mytable.myid)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:myid_1, mytable.myid)")
self.assert_compile(select([table1], table1.c.myid.in_([literal('a'), table1.c.myid])),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, mytable.myid)")
self.assert_compile(select([table1], table1.c.myid.in_([literal('a'), table1.c.myid +'a'])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, mytable.myid + :mytable_myid_1)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, mytable.myid + :myid_1)")
self.assert_compile(select([table1], table1.c.myid.in_([literal(1), 'a' + table1.c.myid])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, :mytable_myid_1 + mytable.myid)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, :myid_1 + mytable.myid)")
self.assert_compile(select([table1], table1.c.myid.in_([1, 2, 3])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid_1, :mytable_myid_2, :mytable_myid_3)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:myid_1, :myid_2, :myid_3)")
self.assert_compile(select([table1], table1.c.myid.in_(select([table2.c.otherid]))),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (SELECT myothertable.otherid FROM myothertable)")
)
)), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable \
WHERE mytable.myid IN (\
-SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid_1 \
-UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid_2)")
+SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :myid_1 \
+UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :myid_2)")
# test that putting a select in an IN clause does not blow away its ORDER BY clause
self.assert_compile(
@testing.uses_deprecated('passing in_')
def test_in_deprecated_api(self):
self.assert_compile(select([table1], table1.c.myid.in_('abc')),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid_1)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:myid_1)")
self.assert_compile(select([table1], table1.c.myid.in_(1)),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid_1)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:myid_1)")
self.assert_compile(select([table1], table1.c.myid.in_(1,2)),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid_1, :mytable_myid_2)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:myid_1, :myid_2)")
self.assert_compile(select([table1], table1.c.myid.in_()),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE (CASE WHEN (mytable.myid IS NULL) THEN NULL ELSE 0 END = 1)")
table = Table('dt', metadata,
Column('date', Date))
self.assert_compile(table.select(table.c.date.between(datetime.date(2006,6,1), datetime.date(2006,6,5))),
- "SELECT dt.date FROM dt WHERE dt.date BETWEEN :dt_date_1 AND :dt_date_2", checkparams={'dt_date_1':datetime.date(2006,6,1), 'dt_date_2':datetime.date(2006,6,5)})
+ "SELECT dt.date FROM dt WHERE dt.date BETWEEN :date_1 AND :date_2", checkparams={'date_1':datetime.date(2006,6,1), 'date_2':datetime.date(2006,6,5)})
self.assert_compile(table.select(sql.between(table.c.date, datetime.date(2006,6,1), datetime.date(2006,6,5))),
"SELECT dt.date FROM dt WHERE dt.date BETWEEN :param_1 AND :param_2", checkparams={'param_1':datetime.date(2006,6,1), 'param_2':datetime.date(2006,6,5)})
table = Table('op', metadata,
Column('field', Integer))
self.assert_compile(table.select((table.c.field == 5) == None),
- "SELECT op.field FROM op WHERE (op.field = :op_field_1) IS NULL")
+ "SELECT op.field FROM op WHERE (op.field = :field_1) IS NULL")
self.assert_compile(table.select((table.c.field + 5) == table.c.field),
- "SELECT op.field FROM op WHERE op.field + :op_field_1 = op.field")
+ "SELECT op.field FROM op WHERE op.field + :field_1 = op.field")
self.assert_compile(table.select((table.c.field + 5) * 6),
- "SELECT op.field FROM op WHERE (op.field + :op_field_1) * :param_1")
+ "SELECT op.field FROM op WHERE (op.field + :field_1) * :param_1")
self.assert_compile(table.select((table.c.field * 5) + 6),
- "SELECT op.field FROM op WHERE op.field * :op_field_1 + :param_1")
+ "SELECT op.field FROM op WHERE op.field * :field_1 + :param_1")
self.assert_compile(table.select(5 + table.c.field.in_([5,6])),
- "SELECT op.field FROM op WHERE :param_1 + (op.field IN (:op_field_1, :op_field_2))")
+ "SELECT op.field FROM op WHERE :param_1 + (op.field IN (:field_1, :field_2))")
self.assert_compile(table.select((5 + table.c.field).in_([5,6])),
- "SELECT op.field FROM op WHERE :op_field_1 + op.field IN (:param_1, :param_2)")
+ "SELECT op.field FROM op WHERE :field_1 + op.field IN (:param_1, :param_2)")
self.assert_compile(table.select(not_(and_(table.c.field == 5, table.c.field == 7))),
- "SELECT op.field FROM op WHERE NOT (op.field = :op_field_1 AND op.field = :op_field_2)")
+ "SELECT op.field FROM op WHERE NOT (op.field = :field_1 AND op.field = :field_2)")
self.assert_compile(table.select(not_(table.c.field == 5)),
- "SELECT op.field FROM op WHERE op.field != :op_field_1")
+ "SELECT op.field FROM op WHERE op.field != :field_1")
self.assert_compile(table.select(not_(table.c.field.between(5, 6))),
- "SELECT op.field FROM op WHERE NOT (op.field BETWEEN :op_field_1 AND :op_field_2)")
+ "SELECT op.field FROM op WHERE NOT (op.field BETWEEN :field_1 AND :field_2)")
self.assert_compile(table.select(not_(table.c.field) == 5),
"SELECT op.field FROM op WHERE (NOT op.field) = :param_1")
self.assert_compile(table.select((table.c.field == table.c.field).between(False, True)),
for col, key, expr, label in (
(table1.c.name, 'name', 'mytable.name', None),
- (table1.c.myid==12, 'mytable.myid = :mytable_myid_1', 'mytable.myid = :mytable_myid_1', 'anon_1'),
+ (table1.c.myid==12, 'mytable.myid = :myid_1', 'mytable.myid = :myid_1', 'anon_1'),
(func.hoho(table1.c.myid), 'hoho(mytable.myid)', 'hoho(mytable.myid)', 'hoho_1'),
(cast(table1.c.name, SLNumeric), 'CAST(mytable.name AS NUMERIC(10, 2))', 'CAST(mytable.name AS NUMERIC(10, 2))', 'anon_1'),
(t1.c.col1, 'col1', 'mytable.col1', None),
self.assert_compile(table.insert(inline=True), "INSERT INTO sometable (foo) VALUES (foobar())", params={})
def test_update(self):
- self.assert_compile(update(table1, table1.c.myid == 7), "UPDATE mytable SET name=:name WHERE mytable.myid = :mytable_myid_1", params = {table1.c.name:'fred'})
- self.assert_compile(table1.update().where(table1.c.myid==7).values({table1.c.myid:5}), "UPDATE mytable SET myid=:myid WHERE mytable.myid = :mytable_myid_1", checkparams={'myid':5, 'mytable_myid_1':7})
- self.assert_compile(update(table1, table1.c.myid == 7), "UPDATE mytable SET name=:name WHERE mytable.myid = :mytable_myid_1", params = {'name':'fred'})
+ self.assert_compile(update(table1, table1.c.myid == 7), "UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1", params = {table1.c.name:'fred'})
+ self.assert_compile(table1.update().where(table1.c.myid==7).values({table1.c.myid:5}), "UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1", checkparams={'myid':5, 'myid_1':7})
+ self.assert_compile(update(table1, table1.c.myid == 7), "UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1", params = {'name':'fred'})
self.assert_compile(update(table1, values = {table1.c.name : table1.c.myid}), "UPDATE mytable SET name=mytable.myid")
self.assert_compile(update(table1, whereclause = table1.c.name == bindparam('crit'), values = {table1.c.name : 'hi'}), "UPDATE mytable SET name=:name WHERE mytable.name = :crit", params = {'crit' : 'notthere'}, checkparams={'crit':'notthere', 'name':'hi'})
- self.assert_compile(update(table1, table1.c.myid == 12, values = {table1.c.name : table1.c.myid}), "UPDATE mytable SET name=mytable.myid, description=:description WHERE mytable.myid = :mytable_myid_1", params = {'description':'test'}, checkparams={'description':'test', 'mytable_myid_1':12})
- self.assert_compile(update(table1, table1.c.myid == 12, values = {table1.c.myid : 9}), "UPDATE mytable SET myid=:myid, description=:description WHERE mytable.myid = :mytable_myid_1", params = {'mytable_myid_1': 12, 'myid': 9, 'description': 'test'})
- self.assert_compile(update(table1, table1.c.myid ==12), "UPDATE mytable SET myid=:myid WHERE mytable.myid = :mytable_myid_1", params={'myid':18}, checkparams={'myid':18, 'mytable_myid_1':12})
+ self.assert_compile(update(table1, table1.c.myid == 12, values = {table1.c.name : table1.c.myid}), "UPDATE mytable SET name=mytable.myid, description=:description WHERE mytable.myid = :myid_1", params = {'description':'test'}, checkparams={'description':'test', 'myid_1':12})
+ self.assert_compile(update(table1, table1.c.myid == 12, values = {table1.c.myid : 9}), "UPDATE mytable SET myid=:myid, description=:description WHERE mytable.myid = :myid_1", params = {'myid_1': 12, 'myid': 9, 'description': 'test'})
+ self.assert_compile(update(table1, table1.c.myid ==12), "UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1", params={'myid':18}, checkparams={'myid':18, 'myid_1':12})
s = table1.update(table1.c.myid == 12, values = {table1.c.name : 'lala'})
- c = s.compile(column_keys=['mytable_id', 'name'])
- self.assert_compile(update(table1, table1.c.myid == 12, values = {table1.c.name : table1.c.myid}).values({table1.c.name:table1.c.name + 'foo'}), "UPDATE mytable SET name=(mytable.name || :mytable_name_1), description=:description WHERE mytable.myid = :mytable_myid_1", params = {'description':'test'})
+ c = s.compile(column_keys=['id', 'name'])
+ self.assert_compile(update(table1, table1.c.myid == 12, values = {table1.c.name : table1.c.myid}).values({table1.c.name:table1.c.name + 'foo'}), "UPDATE mytable SET name=(mytable.name || :name_1), description=:description WHERE mytable.myid = :myid_1", params = {'description':'test'})
self.assert_(str(s) == str(c))
self.assert_compile(update(table1,
values = {
table1.c.name : table1.c.name + "lala",
table1.c.myid : func.do_stuff(table1.c.myid, literal('hoho'))
- }), "UPDATE mytable SET myid=do_stuff(mytable.myid, :param_1), name=(mytable.name || :mytable_name_1) "
+ }), "UPDATE mytable SET myid=do_stuff(mytable.myid, :param_1), name=(mytable.name || :name_1) "
"WHERE mytable.myid = hoho(:hoho_1) AND mytable.name = :param_2 || mytable.name || :param_3")
def test_correlated_update(self):
# test against a regular constructed subquery
s = select([table2], table2.c.otherid == table1.c.myid)
u = update(table1, table1.c.name == 'jack', values = {table1.c.name : s})
- self.assert_compile(u, "UPDATE mytable SET name=(SELECT myothertable.otherid, myothertable.othername FROM myothertable WHERE myothertable.otherid = mytable.myid) WHERE mytable.name = :mytable_name_1")
+ self.assert_compile(u, "UPDATE mytable SET name=(SELECT myothertable.otherid, myothertable.othername FROM myothertable WHERE myothertable.otherid = mytable.myid) WHERE mytable.name = :name_1")
# test a non-correlated WHERE clause
s = select([table2.c.othername], table2.c.otherid == 7)
u = update(table1, table1.c.name==s)
self.assert_compile(u, "UPDATE mytable SET myid=:myid, name=:name, description=:description WHERE mytable.name = "\
- "(SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = :myothertable_otherid_1)")
+ "(SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = :otherid_1)")
# test one that is actually correlated...
s = select([table2.c.othername], table2.c.otherid == table1.c.myid)
"(SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = mytable.myid)")
def test_delete(self):
- self.assert_compile(delete(table1, table1.c.myid == 7), "DELETE FROM mytable WHERE mytable.myid = :mytable_myid_1")
- self.assert_compile(table1.delete().where(table1.c.myid == 7), "DELETE FROM mytable WHERE mytable.myid = :mytable_myid_1")
- self.assert_compile(table1.delete().where(table1.c.myid == 7).where(table1.c.name=='somename'), "DELETE FROM mytable WHERE mytable.myid = :mytable_myid_1 AND mytable.name = :mytable_name_1")
+ self.assert_compile(delete(table1, table1.c.myid == 7), "DELETE FROM mytable WHERE mytable.myid = :myid_1")
+ self.assert_compile(table1.delete().where(table1.c.myid == 7), "DELETE FROM mytable WHERE mytable.myid = :myid_1")
+ self.assert_compile(table1.delete().where(table1.c.myid == 7).where(table1.c.name=='somename'), "DELETE FROM mytable WHERE mytable.myid = :myid_1 AND mytable.name = :name_1")
def test_correlated_delete(self):
# test a non-correlated WHERE clause
s = select([table2.c.othername], table2.c.otherid == 7)
u = delete(table1, table1.c.name==s)
self.assert_compile(u, "DELETE FROM mytable WHERE mytable.name = "\
- "(SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = :myothertable_otherid_1)")
+ "(SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = :otherid_1)")
# test one that is actually correlated...
s = select([table2.c.othername], table2.c.otherid == table1.c.myid)
self.assert_compile(table4.select(), "SELECT remote_owner.remotetable.rem_id, remote_owner.remotetable.datatype_id, remote_owner.remotetable.value FROM remote_owner.remotetable")
self.assert_compile(table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi')),
"SELECT remote_owner.remotetable.rem_id, remote_owner.remotetable.datatype_id, remote_owner.remotetable.value FROM remote_owner.remotetable WHERE "\
- "remote_owner.remotetable.datatype_id = :remote_owner_remotetable_datatype_id_1 AND remote_owner.remotetable.value = :remote_owner_remotetable_value_1")
+ "remote_owner.remotetable.datatype_id = :datatype_id_1 AND remote_owner.remotetable.value = :value_1")
s = table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi'))
s.use_labels = True
self.assert_compile(s, "SELECT remote_owner.remotetable.rem_id AS remote_owner_remotetable_rem_id, remote_owner.remotetable.datatype_id AS remote_owner_remotetable_datatype_id, remote_owner.remotetable.value "\
"AS remote_owner_remotetable_value FROM remote_owner.remotetable WHERE "\
- "remote_owner.remotetable.datatype_id = :remote_owner_remotetable_datatype_id_1 AND remote_owner.remotetable.value = :remote_owner_remotetable_value_1")
+ "remote_owner.remotetable.datatype_id = :datatype_id_1 AND remote_owner.remotetable.value = :value_1")
def test_alias(self):
a = alias(table4, 'remtable')
self.assert_compile(a.select(a.c.datatype_id==7), "SELECT remtable.rem_id, remtable.datatype_id, remtable.value FROM remote_owner.remotetable AS remtable "\
- "WHERE remtable.datatype_id = :remtable_datatype_id_1")
+ "WHERE remtable.datatype_id = :datatype_id_1")
def test_update(self):
self.assert_compile(table4.update(table4.c.value=='test', values={table4.c.datatype_id:12}), "UPDATE remote_owner.remotetable SET datatype_id=:datatype_id "\
- "WHERE remote_owner.remotetable.value = :remote_owner_remotetable_value_1")
+ "WHERE remote_owner.remotetable.value = :value_1")
def test_insert(self):
self.assert_compile(table4.insert(values=(2, 5, 'test')), "INSERT INTO remote_owner.remotetable (rem_id, datatype_id, value) VALUES "\
b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True), Column('x', Integer, primary_key=True))
j = a.join(b, and_(a.c.id==b.c.id, b.c.x==5))
- assert str(j) == "a JOIN b ON a.id = b.id AND b.x = :b_x_1", str(j)
+ assert str(j) == "a JOIN b ON a.id = b.id AND b.x = :x_1", str(j)
assert list(j.primary_key) == [a.c.id, b.c.x]
def test_onclause_direction(self):
from sqlalchemy import util
from testlib import *
+__all__ = ['keywords', 'addresses', 'Base', 'Keyword', 'FixtureTest', 'Dingaling', 'item_keywords',
+ 'dingalings', 'User', 'items', 'Fixtures', 'orders', 'install_fixture_data', 'Address', 'users',
+ 'order_items', 'Item', 'Order', 'fixtures']
+
_recursion_stack = util.Set()
class Base(object):
def __init__(self, **kwargs):