visit_char_length_func = visit_length_func
def function_argspec(self, func, **kw):
- if func.clauses:
+ if func.clauses is not None and len(func.clauses):
return self.process(func.clause_expr)
else:
return ""
else:
colspec += " NULL"
- if not column.table:
+ if column.table is None:
raise exc.InvalidRequestError("mssql requires Table-bound columns in order to generate DDL")
seq_col = column.table._autoincrement_column
text = "UPDATE " + self.preparer.format_table(update_stmt.table) + \
" SET " + ', '.join(["%s=%s" % (self.preparer.format_column(c[0]), c[1]) for c in colparams])
- if update_stmt._whereclause:
+ if update_stmt._whereclause is not None:
text += " WHERE " + self.process(update_stmt._whereclause)
limit = update_stmt.kwargs.get('mysql_limit', None)
froms = select._get_display_froms(existingfroms)
whereclause = self._get_nonansi_join_whereclause(froms)
- if whereclause:
+ if whereclause is not None:
select = select.where(whereclause)
select._oracle_visit = True
return text
def get_select_precolumns(self, select):
- if select._distinct:
- if isinstance(select._distinct, bool):
+ if select._distinct is not False:
+ if select._distinct is True:
return "DISTINCT "
elif isinstance(select._distinct, (list, tuple)):
return "DISTINCT ON (" + ', '.join(
else:
mapper_cls = mapper
- if not table and 'inherits' not in mapper_args:
+ if table is None and 'inherits' not in mapper_args:
raise exceptions.InvalidRequestError("Class %r does not have a __table__ or __tablename__ "
"specified and does not inherit from an existing table-mapped class." % cls)
elif 'inherits' in mapper_args and not mapper_args.get('concrete', False):
inherited_mapper = class_mapper(mapper_args['inherits'], compile=False)
inherited_table = inherited_mapper.local_table
- if 'inherit_condition' not in mapper_args and table:
+ if 'inherit_condition' not in mapper_args and table is not None:
# figure out the inherit condition with relaxed rules
# about nonexistent tables, to allow for ForeignKeys to
# not-yet-defined tables (since we know for sure that our
mapper_args['inherits'].__table__, table,
ignore_nonexistent_tables=True)
- if not table:
+ if table is None:
# single table inheritance.
# ensure no table args
table_args = cls.__dict__.get('__table_args__')
self.primary_key_argument = primary_key
self.non_primary = non_primary
- if order_by:
+ if order_by is not False:
self.order_by = util.to_list(order_by)
else:
self.order_by = order_by
if self.concrete:
self.mapped_table = self.local_table
for mapper in self.iterate_to_root():
- if mapper.polymorphic_on:
+ if mapper.polymorphic_on is not None:
mapper._requires_row_aliasing = True
else:
- if not self.inherit_condition:
+ if self.inherit_condition is None:
# figure out inherit condition from our table to the immediate table
# of the inherited mapper, not its full table which could pull in other
# stuff we dont want (allows test/inheritance.InheritTest4 to pass)
if self.polymorphic_identity is not None:
self.polymorphic_map[self.polymorphic_identity] = self
- if not self.polymorphic_on:
+ if self.polymorphic_on is None:
for mapper in self.iterate_to_root():
# try to set up polymorphic on using correesponding_column(); else leave
# as None
- if mapper.polymorphic_on:
+ if mapper.polymorphic_on is not None:
self.polymorphic_on = self.mapped_table.corresponding_column(mapper.polymorphic_on)
break
else:
# do a special check for the "discriminiator" column, as it may only be present
# in the 'with_polymorphic' selectable but we need it for the base mapper
- if self.polymorphic_on and self.polymorphic_on not in self._columntoproperty:
+ if self.polymorphic_on is not None and self.polymorphic_on not in self._columntoproperty:
col = self.mapped_table.corresponding_column(self.polymorphic_on)
- if not col:
+ if col is None:
instrument = False
col = self.polymorphic_on
else:
mapped_column = []
for c in columns:
mc = self.mapped_table.corresponding_column(c)
- if not mc:
+ if mc is None:
mc = self.local_table.corresponding_column(c)
- if mc:
+ if mc is not None:
# if the column is in the local table but not the mapped table,
# this corresponds to adding a column after the fact to the local table.
self.mapped_table._reset_exported()
mc = self.mapped_table.corresponding_column(c)
- if not mc:
+ if mc is None:
raise sa_exc.ArgumentError("Column '%s' is not represented in mapper's table. "
"Use the `column_property()` function to force this column "
"to be mapped as a read-only attribute." % c)
id(self), self.class_.__name__)
def __str__(self):
- return "Mapper|" + self.class_.__name__ + "|" + \
- (self.local_table and self.local_table.description or str(self.local_table)) + \
- (self.non_primary and "|non-primary" or "")
+ if self.local_table is not None:
+ tabledesc = self.local_table.description
+ else:
+ tabledesc = None
+ return "Mapper|%s|%s%s" % (
+ self.class_.__name__,
+ tabledesc,
+ self.non_primary and "|non-primary" or ""
+ )
# informational / status
else:
mappers = []
- if selectable:
+ if selectable is not None:
tables = set(sqlutil.find_tables(selectable, include_aliases=True))
mappers = [m for m in mappers if m.local_table in tables]
def _single_table_criterion(self):
if self.single and \
self.inherits and \
- self.polymorphic_on and \
+ self.polymorphic_on is not None and \
self.polymorphic_identity is not None:
return self.polymorphic_on.in_(
m.polymorphic_identity
return self.mapped_table
spec, selectable = self.with_polymorphic
- if selectable:
+ if selectable is not None:
return selectable
else:
return self._selectable_from_mappers(self._mappers_from_spec(spec, selectable))
selectable = self.with_polymorphic[1]
mappers = self._mappers_from_spec(spec, selectable)
- if selectable:
+ if selectable is not None:
return mappers, selectable
else:
return mappers, self._selectable_from_mappers(mappers)
chain(*[list(mapper.iterate_properties) for mapper in [self] + mappers])
):
if getattr(c, '_is_polymorphic_discriminator', False) and \
- (not self.polymorphic_on or c.columns[0] is not self.polymorphic_on):
+ (self.polymorphic_on is None or c.columns[0] is not self.polymorphic_on):
continue
yield c
else:
result[binary.right] = util.column_set((binary.left,))
for mapper in self.base_mapper.polymorphic_iterator():
- if mapper.inherit_condition:
+ if mapper.inherit_condition is not None:
visitors.traverse(mapper.inherit_condition, {}, {'binary':visit_binary})
return result
# check for descriptors, either local or from
# an inherited class
if local:
- if self.class_.__dict__.get(assigned_name, None)\
+ if self.class_.__dict__.get(assigned_name, None) is not None\
and self._is_userland_descriptor(self.class_.__dict__[assigned_name]):
return True
else:
- if getattr(self.class_, assigned_name, None)\
+ if getattr(self.class_, assigned_name, None) is not None\
and self._is_userland_descriptor(getattr(self.class_, assigned_name)):
return True
def _canload(self, state, allow_subtypes):
s = self.primary_mapper()
- if self.polymorphic_on or allow_subtypes:
+ if self.polymorphic_on is not None or allow_subtypes:
return _state_mapper(state).isa(s)
else:
return _state_mapper(state) is s
for col in mapper._cols_by_table[table]:
if col is mapper.version_id_col:
params[col.key] = 1
- elif mapper.polymorphic_on and mapper.polymorphic_on.shares_lineage(col):
+ elif mapper.polymorphic_on is not None and mapper.polymorphic_on.shares_lineage(col):
if self._should_log_debug:
self._log_debug(
"Using polymorphic identity '%s' for insert column '%s'" %
history = attributes.get_state_history(state, prop.key, passive=True)
if history.added:
hasdata = True
- elif mapper.polymorphic_on and mapper.polymorphic_on.shares_lineage(col) and col not in pks:
+ elif mapper.polymorphic_on is not None and mapper.polymorphic_on.shares_lineage(col) and col not in pks:
pass
else:
if post_update_cols is not None and col not in post_update_cols:
for col in mapper._pks_by_table[table]:
clause.clauses.append(col == sql.bindparam(col._label, type_=col.type))
- if mapper.version_id_col and table.c.contains_column(mapper.version_id_col):
+ if mapper.version_id_col is not None and table.c.contains_column(mapper.version_id_col):
clause.clauses.append(mapper.version_id_col == sql.bindparam(mapper.version_id_col._label, type_=col.type))
statement = table.update(clause)
postfetch_cols = resultproxy.postfetch_cols()
generated_cols = list(resultproxy.prefetch_cols())
- if self.polymorphic_on:
+ if self.polymorphic_on is not None:
po = table.corresponding_column(self.polymorphic_on)
- if po:
+ if po is not None:
generated_cols.append(po)
- if self.version_id_col:
+ if self.version_id_col is not None:
generated_cols.append(self.version_id_col)
for c in generated_cols:
delete.setdefault(connection, []).append(params)
for col in mapper._pks_by_table[table]:
params[col.key] = mapper._get_state_attr_by_column(state, col)
- if mapper.version_id_col and table.c.contains_column(mapper.version_id_col):
+ if mapper.version_id_col is not None and table.c.contains_column(mapper.version_id_col):
params[mapper.version_id_col.key] = mapper._get_state_attr_by_column(state, mapper.version_id_col)
for connection, del_objects in delete.iteritems():
clause = sql.and_()
for col in mapper._pks_by_table[table]:
clause.clauses.append(col == sql.bindparam(col.key, type_=col.type))
- if mapper.version_id_col and table.c.contains_column(mapper.version_id_col):
+ if mapper.version_id_col is not None and table.c.contains_column(mapper.version_id_col):
clause.clauses.append(
mapper.version_id_col ==
sql.bindparam(mapper.version_id_col.key, type_=mapper.version_id_col.type))
if polymorphic_from or refresh_state:
polymorphic_on = None
else:
- polymorphic_on = polymorphic_discriminator or self.polymorphic_on
+ if polymorphic_discriminator is not None:
+ polymorphic_on = polymorphic_discriminator
+ else:
+ polymorphic_on = self.polymorphic_on
polymorphic_instances = util.PopulateDict(self._configure_subclass_mapper(context, path, adapter))
version_id_col = self.version_id_col
if adapter:
pk_cols = [adapter.columns[c] for c in pk_cols]
- if polymorphic_on:
+ if polymorphic_on is not None:
polymorphic_on = adapter.columns[polymorphic_on]
- if version_id_col:
+ if version_id_col is not None:
version_id_col = adapter.columns[version_id_col]
identity_class = self._identity_class
if ret is not EXT_CONTINUE:
row = ret
- if polymorphic_on:
+ if polymorphic_on is not None:
discriminator = row[polymorphic_on]
if discriminator is not None:
_instance = polymorphic_instances[discriminator]
currentload = not isnew
loaded_instance = False
- if not currentload and version_id_col and context.version_check and \
+ if not currentload and version_id_col is not None and context.version_check and \
self._get_state_attr_by_column(state, self.version_id_col) != row[version_id_col]:
raise exc.ConcurrentModificationError(
"Instance '%s' version of %s does not match %s"
result = False
if mapper.inherits and not mapper.concrete:
statement = mapper._optimized_get_statement(state, attribute_names)
- if statement:
+ if statement is not None:
result = session.query(mapper).from_statement(statement)._get(None, only_load_props=attribute_names, refresh_state=state)
if result is False:
to_selectable = to_selectable.alias()
single_crit = target_mapper._single_table_criterion
- if single_crit:
+ if single_crit is not None:
if criterion is not None:
criterion = single_crit & criterion
else:
# annotate the *local* side of the join condition, in the case of pj + sj this
# is the full primaryjoin, in the case of just pj its the local side of
# the primaryjoin.
- if sj:
+ if sj is not None:
j = _orm_annotate(pj) & sj
else:
j = _orm_annotate(pj, exclude=self.property.remote_side)
- if criterion and target_adapter:
+ if criterion is not None and target_adapter:
# limit this adapter to annotated only?
criterion = target_adapter.traverse(criterion)
# only have the "joined left side" of what we return be subject to Query adaption. The right
# side of it is used for an exists() subquery and should not correlate or otherwise reach out
# to anything in the enclosing query.
- if criterion:
+ if criterion is not None:
criterion = criterion._annotate({'_halt_adapt': True})
crit = j & criterion
raise sa_exc.InvalidRequestError("'contains' not implemented for scalar attributes. Use ==")
clause = self.property._optimized_compare(other, adapt_source=self.adapter)
- if self.property.secondaryjoin:
+ if self.property.secondaryjoin is not None:
clause.negation_clause = self.__negated_contains_or_equals(other)
return clause
util.assert_arg_type(val, sql.ClauseElement, attr)
setattr(self, attr, _orm_deannotate(val))
- if self.order_by:
+ if self.order_by is not False and self.order_by is not None:
self.order_by = [expression._literal_as_column(x) for x in util.to_list(self.order_by)]
self._foreign_keys = util.column_set(expression._literal_as_column(x) for x in util.to_column_set(self._foreign_keys))
self.synchronize_pairs = eq_pairs
- if self.secondaryjoin:
+ if self.secondaryjoin is not None:
sq_pairs = criterion_as_pairs(self.secondaryjoin, consider_as_foreign_keys=self._foreign_keys, any_operator=self.viewonly)
sq_pairs = [(l, r) for l, r in sq_pairs if (self._col_is_part_of_mappings(l) and self._col_is_part_of_mappings(r)) or r in self._foreign_keys]
else:
if self.viewonly:
eq_pairs = self.synchronize_pairs
- if self.secondaryjoin:
+ if self.secondaryjoin is not None:
eq_pairs += self.secondary_synchronize_pairs
else:
eq_pairs = criterion_as_pairs(self.primaryjoin, consider_as_foreign_keys=self._foreign_keys, any_operator=True)
- if self.secondaryjoin:
+ if self.secondaryjoin is not None:
eq_pairs += criterion_as_pairs(self.secondaryjoin, consider_as_foreign_keys=self._foreign_keys, any_operator=True)
eq_pairs = [(l, r) for l, r in eq_pairs if self._col_is_part_of_mappings(l) and self._col_is_part_of_mappings(r)]
else:
aliased = True
- aliased = aliased or bool(source_selectable)
+ aliased = aliased or (source_selectable is not None)
primaryjoin, secondaryjoin, secondary = self.primaryjoin, self.secondaryjoin, self.secondary
dest_mapper = of_type or self.mapper
single_crit = dest_mapper._single_table_criterion
- if single_crit:
- if secondaryjoin:
+ if single_crit is not None:
+ if secondaryjoin is not None:
secondaryjoin = secondaryjoin & single_crit
else:
primaryjoin = primaryjoin & single_crit
if aliased:
- if secondary:
+ if secondary is not None:
secondary = secondary.alias()
primary_aliasizer = ClauseAdapter(secondary)
- if dest_selectable:
+ if dest_selectable is not None:
secondary_aliasizer = ClauseAdapter(dest_selectable, equivalents=self.mapper._equivalent_columns).chain(primary_aliasizer)
else:
secondary_aliasizer = primary_aliasizer
- if source_selectable:
+ if source_selectable is not None:
primary_aliasizer = ClauseAdapter(secondary).chain(ClauseAdapter(source_selectable, equivalents=self.parent._equivalent_columns))
secondaryjoin = secondary_aliasizer.traverse(secondaryjoin)
else:
- if dest_selectable:
+ if dest_selectable is not None:
primary_aliasizer = ClauseAdapter(dest_selectable, exclude=self.local_side, equivalents=self.mapper._equivalent_columns)
- if source_selectable:
+ if source_selectable is not None:
primary_aliasizer.chain(ClauseAdapter(source_selectable, exclude=self.remote_side, equivalents=self.parent._equivalent_columns))
- elif source_selectable:
+ elif source_selectable is not None:
primary_aliasizer = ClauseAdapter(source_selectable, exclude=self.remote_side, equivalents=self.parent._equivalent_columns)
secondary_aliasizer = None
else:
target_adapter = None
+ if source_selectable is None:
+ source_selectable = self.parent.local_table
+
+ if dest_selectable is None:
+ dest_selectable = self.mapper.local_table
+
return (primaryjoin, secondaryjoin,
- (source_selectable or self.parent.local_table),
- (dest_selectable or self.mapper.local_table), secondary, target_adapter)
+ source_selectable,
+ dest_selectable, secondary, target_adapter)
def _get_join(self, parent, primary=True, secondary=True, polymorphic_parent=True):
"""deprecated. use primary_join_against(), secondary_join_against(), full_join_against()"""
mapper = prop.mapper.primary_mapper()
if mapper._get_property(self.key, raiseerr=False) is None:
- if prop.secondary:
+ if prop.secondary is not None:
pj = self.kwargs.pop('primaryjoin', prop.secondaryjoin)
sj = self.kwargs.pop('secondaryjoin', prop.primaryjoin)
else:
else:
search = None
- if search:
+ if search is not None:
alias = self._polymorphic_adapters.get(search, None)
if alias:
return alias.adapt_clause(element)
for adapter in adapters:
e = adapter(elem)
- if e:
+ if e is not None:
return e
return replace
if "_orm_adapt" in elem._annotations or "parententity" in elem._annotations:
for adapter in adapters:
e = adapter(elem)
- if e:
+ if e is not None:
return e
return replace
def _no_criterion_condition(self, meth):
if not self._enable_assertions:
return
- if self._criterion or self._statement or self._from_obj or \
+ if self._criterion is not None or self._statement is not None or self._from_obj or \
self._limit is not None or self._offset is not None or \
self._group_by:
raise sa_exc.InvalidRequestError("Query.%s() being called on a Query with existing criterion. " % meth)
def add_entity(self, entity, alias=None):
"""add a mapped entity to the list of result columns to be returned."""
- if alias:
+ if alias is not None:
entity = aliased(entity, alias)
self._entities = list(self._entities)
onclause = descriptor
if isinstance(onclause, interfaces.PropComparator):
- if not right_entity:
+ if right_entity is None:
right_entity = onclause.property.mapper
of_type = getattr(onclause, '_of_type', None)
if of_type:
if self._from_obj:
replace_clause_index, clause = sql_util.find_join_source(self._from_obj, left_selectable)
- if clause:
+ if clause is not None:
# the entire query's FROM clause is an alias of itself (i.e. from_self(), similar).
# if the left clause is that one, ensure it aliases to the left side.
if self._from_obj_alias and clause is self._from_obj[0]:
This results in an execution of the underlying query.
"""
- if self._statement:
+ if self._statement is not None:
ret = list(self)[0:1]
else:
ret = list(self[0:1])
'offset':self._offset,
'distinct':self._distinct,
'group_by':self._group_by or None,
- 'having':self._having or None
+ 'having':self._having
}
@property
if synchronize_session == 'evaluate':
try:
evaluator_compiler = evaluator.EvaluatorCompiler()
- eval_condition = evaluator_compiler.process(self.whereclause or expression._Null)
+ if self.whereclause is not None:
+ eval_condition = evaluator_compiler.process(self.whereclause)
+ else:
+ eval_condition = evaluator_compiler.process(expression._Null)
+
except evaluator.UnevaluatableError:
raise sa_exc.InvalidRequestError("Could not evaluate current criteria in Python. "
"Specify 'fetch' or False for the synchronize_session parameter.")
if synchronize_session == 'evaluate':
try:
evaluator_compiler = evaluator.EvaluatorCompiler()
- eval_condition = evaluator_compiler.process(self.whereclause or expression._Null)
+ if self.whereclause is not None:
+ eval_condition = evaluator_compiler.process(self.whereclause)
+ else:
+ eval_condition = evaluator_compiler.process(expression._Null)
+
value_evaluators = {}
for key,value in values.iteritems():
def _compile_context(self, labels=True):
context = QueryContext(self)
- if context.statement:
+ if context.statement is not None:
return context
if self._lockmode:
"""
for entity, (mapper, adapter, s, i, w) in self._mapper_adapter_map.iteritems():
single_crit = mapper._single_table_criterion
- if single_crit:
+ if single_crit is not None:
if adapter:
single_crit = adapter.traverse(single_crit)
single_crit = self._adapt_clause(single_crit, False, False)
column_collection=context.primary_columns
)
- if self._polymorphic_discriminator:
+ if self._polymorphic_discriminator is not None:
if adapter:
pd = adapter.columns[self._polymorphic_discriminator]
else:
class QueryContext(object):
def __init__(self, query):
- if query._statement:
+ if query._statement is not None:
if isinstance(query._statement, expression._SelectBaseMixin) and not query._statement.use_labels:
self.statement = query._statement.apply_labels()
else:
return self.__binds[c_mapper.base_mapper]
elif c_mapper.mapped_table in self.__binds:
return self.__binds[c_mapper.mapped_table]
- if clause:
+ if clause is not None:
for t in sql_util.find_tables(clause):
if t in self.__binds:
return self.__binds[t]
use_proxies=True,
equivalents=self.mapper._equivalent_columns
)
+
if self.use_get:
for col in self._equated_columns.keys():
if col in self.mapper._equivalent_columns:
o = state.obj() # strong ref
bindparam.value = lambda: mapper._get_committed_attr_by_column(o, bind_to_col[bindparam.key])
- if self.parent_property.secondary and alias_secondary:
+ if self.parent_property.secondary is not None and alias_secondary:
criterion = sql_util.ClauseAdapter(self.parent_property.secondary.alias()).traverse(criterion)
criterion = visitors.cloned_traverse(criterion, {}, {'bindparam':visit_bindparam})
lookup = util.column_dict()
equated_columns = util.column_dict()
- if reverse_direction and not prop.secondaryjoin:
+ if reverse_direction and prop.secondaryjoin is None:
for l, r in prop.local_remote_pairs:
_list = lookup.setdefault(r, [])
_list.append((r, l))
lazywhere = prop.primaryjoin
- if not prop.secondaryjoin or not reverse_direction:
+ if prop.secondaryjoin is None or not reverse_direction:
lazywhere = visitors.replacement_traverse(lazywhere, {}, col_to_bind)
if prop.secondaryjoin is not None:
entity_key, default_towrap = entity, entity.selectable
else:
index, clause = sql_util.find_join_source(context.from_clause, entity.selectable)
- if clause:
+ if clause is not None:
# join to an existing FROM clause on the query.
# key it to its list index in the eager_joins dict.
# Query._compile_context will adapt as needed and append to the
# send a hint to the Query as to where it may "splice" this join
eagerjoin.stop_on = entity.selectable
- if not self.parent_property.secondary and context.query._should_nest_selectable and not parentmapper:
+ if self.parent_property.secondary is None and context.query._should_nest_selectable and not parentmapper:
# for parentclause that is the non-eager end of the join,
# ensure all the parent cols in the primaryjoin are actually in the
# columns clause (i.e. are not deferred), so that aliasing applied by the Query propagates
class LoadEagerFromAliasOption(PropertyOption):
def __init__(self, key, alias=None):
super(LoadEagerFromAliasOption, self).__init__(key)
- if alias:
+ if alias is not None:
if not isinstance(alias, basestring):
m, alias, is_aliased_class = mapperutil._entity_info(alias)
self.alias = alias
pass
def process_query_property(self, query, paths, mappers):
- if self.alias:
+ if self.alias is not None:
if isinstance(self.alias, basestring):
mapper = mappers[-1]
(root_mapper, propname) = paths[-1][-2:]
def __init__(self, cls, alias=None, name=None):
self.__mapper = _class_to_mapper(cls)
self.__target = self.__mapper.class_
- alias = alias or self.__mapper._with_polymorphic_selectable.alias()
+ if alias is None:
+ alias = self.__mapper._with_polymorphic_selectable.alias()
self.__adapter = sql_util.ClauseAdapter(alias, equivalents=self.__mapper._equivalent_columns)
self.__alias = alias
# used to assign a name to the RowTuple object
if isinstance(onclause, basestring):
prop = left_mapper.get_property(onclause)
elif isinstance(onclause, attributes.QueryableAttribute):
- if not adapt_from:
+ if adapt_from is None:
adapt_from = onclause.__clause_element__()
prop = onclause.property
elif isinstance(onclause, MapperProperty):
dest_polymorphic=True,
of_type=right_mapper)
- if sj:
+ if sj is not None:
left = sql.join(left, secondary, pj, isouter)
onclause = sj
else:
[repr(self.name)] + [repr(self.type)] +
[repr(x) for x in self.foreign_keys if x is not None] +
[repr(x) for x in self.constraints] +
- [(self.table and "table=<%s>" % self.table.description or "")] +
+ [(self.table is not None and "table=<%s>" % self.table.description or "")] +
["%s=%s" % (k, repr(getattr(self, k))) for k in kwarg])
def _set_parent(self, table):
del self._table_events
def _on_table_attach(self, fn):
- if self.table:
+ if self.table is not None:
fn(self.table)
else:
self._table_events.add(fn)
key = colname
_column = table.c.get(colname, None)
- if not _column:
+ if _column is None:
raise exc.NoReferencedColumnError(
"Could not create ForeignKey '%s' on table '%s': "
"table '%s' has no column named '%s'" % (
@property
def bind(self):
"""Return the connectable associated with this default."""
- if getattr(self, 'column', None):
+ if getattr(self, 'column', None) is not None:
return self.column.table.bind
else:
return None
raise exc.ArgumentError(
"sqltext must be a string and will be used verbatim.")
self.sqltext = sqltext
- if table:
+ if table is not None:
self._set_parent(table)
def __visit_name__(self):
def visit_case(self, clause, **kwargs):
x = "CASE "
- if clause.value:
+ if clause.value is not None:
x += self.process(clause.value) + " "
for cond, result in clause.whens:
x += "WHEN " + self.process(cond) + " THEN " + self.process(result) + " "
- if clause.else_:
+ if clause.else_ is not None:
x += "ELSE " + self.process(clause.else_) + " "
x += "END"
return x
if isinstance(column, sql._Label):
return column
- if select and select.use_labels and column._label:
+ if select is not None and select.use_labels and column._label:
return _CompileLabel(column, column._label)
if \
if self.returning_precedes_values:
text += " " + self.returning_clause(update_stmt, update_stmt._returning)
- if update_stmt._whereclause:
+ if update_stmt._whereclause is not None:
text += " WHERE " + self.process(update_stmt._whereclause)
if self.returning and not self.returning_precedes_values:
if self.returning_precedes_values:
text += " " + self.returning_clause(delete_stmt, delete_stmt._returning)
- if delete_stmt._whereclause:
+ if delete_stmt._whereclause is not None:
text += " WHERE " + self.process(delete_stmt._whereclause)
if self.returning and not self.returning_precedes_values:
def _corresponding_column_or_error(fromclause, column, require_embedded=False):
c = fromclause.corresponding_column(column,
require_embedded=require_embedded)
- if not c:
+ if c is None:
raise exc.InvalidRequestError(
"Given column '%s', attached to table '%s', "
"failed to locate a corresponding column from table '%s'"
d = self.__dict__.copy()
d.pop('_is_clone_of', None)
return d
-
+
+ if util.jython:
+ def __hash__(self):
+ """Return a distinct hash code.
+
+ ClauseElements may have special equality comparisons which
+ makes us rely on them having unique hash codes for use in
+ hash-based collections. Stock __hash__ doesn't guarantee
+ unique values on platforms with moving GCs.
+ """
+ return id(self)
+
def _annotate(self, values):
"""return a copy of this ClauseElement with the given annotations
dictionary.
def __invert__(self):
return self._negate()
- if util.jython:
- def __hash__(self):
- """Return a distinct hash code.
-
- ClauseElements may have special equality comparisons which
- makes us rely on them having unique hash codes for use in
- hash-based collections. Stock __hash__ doesn't guarantee
- unique values on platforms with moving GCs.
- """
- return id(self)
+ def __nonzero__(self):
+ raise TypeError("Boolean value of this clause is not defined")
def _negate(self):
if hasattr(self, 'negation_clause'):
# column names in their exported columns collection
existing = self[key]
if not existing.shares_lineage(value):
- table = getattr(existing, 'table', None) and existing.table.description
util.warn(("Column %r on table %r being replaced by another "
"column with the same key. Consider use_labels "
- "for select() statements.") % (key, table))
+ "for select() statements.") % (key, getattr(existing, 'table', None)))
util.OrderedProperties.__setitem__(self, key, value)
def remove(self, column):
except TypeError:
pass
- if value:
+ if value is not None:
whenlist = [
(_literal_as_binds(c).self_group(), _literal_as_binds(r)) for (c, r) in whens
]
self.else_ = None
def _copy_internals(self, clone=_clone):
- if self.value:
+ if self.value is not None:
self.value = clone(self.value)
self.whens = [(clone(x), clone(y)) for x, y in self.whens]
- if self.else_:
+ if self.else_ is not None:
self.else_ = clone(self.else_)
def get_children(self, **kwargs):
- if self.value:
+ if self.value is not None:
yield self.value
for x, y in self.whens:
yield x
yield y
- if self.else_:
+ if self.else_ is not None:
yield self.else_
@property
self.modifiers = {}
else:
self.modifiers = modifiers
-
+
+ def __nonzero__(self):
+ try:
+ return self.operator(hash(self.left), hash(self.right))
+ except:
+ raise TypeError("Boolean value of this clause is not defined")
+
@property
def _from_objects(self):
return self.left._from_objects + self.right._from_objects
if self.is_literal:
return None
- elif self.table and self.table.named_with_column:
+ elif self.table is not None and self.table.named_with_column:
if getattr(self.table, 'schema', None):
label = self.table.schema + "_" + \
_escape_for_generated(self.table.name) + "_" + \
@property
def _from_objects(self):
- if self.table:
+ if self.table is not None:
return [self.table]
else:
return []
if len(clauses) == 1 and clauses[0] is None:
self._order_by_clause = ClauseList()
else:
- if getattr(self, '_order_by_clause', None):
+ if getattr(self, '_order_by_clause', None) is not None:
clauses = list(self._order_by_clause) + list(clauses)
self._order_by_clause = ClauseList(*clauses)
if len(clauses) == 1 and clauses[0] is None:
self._group_by_clause = ClauseList()
else:
- if getattr(self, '_group_by_clause', None):
+ if getattr(self, '_group_by_clause', None) is not None:
clauses = list(self._group_by_clause) + list(clauses)
self._group_by_clause = ClauseList(*clauses)
self._froms = util.OrderedSet()
if columns:
- self._raw_columns = [
- isinstance(c, _ScalarSelect) and
- c.self_group(against=operators.comma_op) or c
- for c in [_literal_as_column(c) for c in columns]
- ]
+ self._raw_columns = []
+ for c in columns:
+ c = _literal_as_column(c)
+ if isinstance(c, _ScalarSelect):
+ c = c.self_group(against=operators.comma_op)
+ self._raw_columns.append(c)
self._froms.update(_from_objects(*self._raw_columns))
else:
self._raw_columns = []
- if whereclause:
+ if whereclause is not None:
self._whereclause = _literal_as_text(whereclause)
self._froms.update(_from_objects(self._whereclause))
else:
self._whereclause = None
- if from_obj:
- self._froms.update(
- _is_literal(f) and _TextClause(f) or f
- for f in util.to_list(from_obj))
+ if from_obj is not None:
+ for f in util.to_list(from_obj):
+ if _is_literal(f):
+ self._froms.add(_TextClause(f))
+ else:
+ self._froms.add(f)
- if having:
+ if having is not None:
self._having = _literal_as_text(having)
else:
self._having = None
_ValuesBase.__init__(self, table, values)
self._bind = bind
self._returning = returning
- if whereclause:
+ if whereclause is not None:
self._whereclause = _literal_as_text(whereclause)
else:
self._whereclause = None
self.table = table
self._returning = returning
- if whereclause:
+ if whereclause is not None:
self._whereclause = _literal_as_text(whereclause)
else:
self._whereclause = None
else:
raise
- if col:
+ if col is not None:
crit.append(col == fk.parent)
constraints.add(fk.constraint)
if a is not b:
else:
raise
- if col:
+ if col is not None:
crit.append(col == fk.parent)
constraints.add(fk.constraint)
stack.append((right.left, right))
else:
right = adapter.traverse(right)
- if prevright:
+ if prevright is not None:
prevright.left = right
- if not ret:
+ if ret is None:
ret = right
return ret
def _corresponding_column(self, col, require_embedded, _seen=util.EMPTY_SET):
newcol = self.selectable.corresponding_column(col, require_embedded=require_embedded)
- if not newcol and col in self.equivalents and col not in _seen:
+ if newcol is None and col in self.equivalents and col not in _seen:
for equiv in self.equivalents[col]:
newcol = self._corresponding_column(equiv, require_embedded=require_embedded, _seen=_seen.union([col]))
- if newcol:
+ if newcol is not None:
return newcol
return newcol
def _locate_col(self, col):
c = self._corresponding_column(col, False)
- if not c:
+ if c is None:
c = self.adapt_clause(col)
# anonymize labels in case they have a hardcoded name
def replace(elem):
for v in self._visitor_iterator:
e = v.replace(elem)
- if e:
+ if e is not None:
return e
return replacement_traverse(obj, self.__traverse_options__, replace)
def clone(element):
newelem = replace(element)
- if newelem:
+ if newelem is not None:
stop_on.add(newelem)
return newelem
assert len(table.primary_key) == len(reflected_table.primary_key)
for c in table.primary_key:
- assert reflected_table.primary_key.columns[c.name]
+ assert reflected_table.primary_key.columns[c.name] is not None
def assert_types_base(self, c1, c2):
base_mro = sqltypes.TypeEngine.__mro__
))) == ITERATIONS + 7
# Test now(), today(), year(), month(), day()
- assert len(fulltable(Zoo.select(Zoo.c.Founded != None
- and Zoo.c.Founded < func.current_timestamp(_type=Date)))) == 3
+ assert len(fulltable(Zoo.select(and_(Zoo.c.Founded != None,
+ Zoo.c.Founded < func.current_timestamp(_type=Date))))) == 3
assert len(fulltable(Animal.select(Animal.c.LastEscape == func.current_timestamp(_type=Date)))) == 0
assert len(fulltable(Animal.select(func.date_part('year', Animal.c.LastEscape) == 2004))) == 1
assert len(fulltable(Animal.select(func.date_part('month', Animal.c.LastEscape) == 12))) == 1
table1 = Table("django_admin_log", meta, autoload=True)
table2 = Table("django_content_type", meta, autoload=True)
j = table1.join(table2)
- assert j.onclause == table1.c.content_type_id==table2.c.id
+ assert j.onclause.compare(table1.c.content_type_id==table2.c.id)
finally:
testing.db.execute("drop table django_admin_log")
testing.db.execute("drop table django_content_type")
assert list(a2.primary_key) == [a2.c.id]
assert list(u2.primary_key) == [u2.c.id]
- assert u2.join(a2).onclause == u2.c.id==a2.c.id
+ assert u2.join(a2).onclause.compare(u2.c.id==a2.c.id)
meta3 = MetaData(testing.db)
u3 = Table('users', meta3, autoload=True)
assert list(a3.primary_key) == [a3.c.id]
assert list(u3.primary_key) == [u3.c.id]
- assert u3.join(a3).onclause == u3.c.id==a3.c.id
+ assert u3.join(a3).onclause.compare(u3.c.id==a3.c.id)
finally:
meta.drop_all()
assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id]
assert [c.parent for c in a2.c.user_id.foreign_keys] == [a2.c.user_id]
assert list(a2.c.user_id.foreign_keys)[0].parent is a2.c.user_id
- assert u2.join(a2).onclause == u2.c.id==a2.c.user_id
+ assert u2.join(a2).onclause.compare(u2.c.id==a2.c.user_id)
meta3 = MetaData(testing.db)
u3 = Table('users', meta3, autoload=True)
Column('user_id', sa.Integer, sa.ForeignKey('users.id')),
autoload=True)
- assert u3.join(a3).onclause == u3.c.id==a3.c.user_id
+ assert u3.join(a3).onclause.compare(u3.c.id==a3.c.user_id)
meta4 = MetaData(testing.db)
u4 = Table('users', meta4,
u2 = Table('users', meta2, autoload=True)
s = sa.select([a2])
- assert s.c.user_id
+ assert s.c.user_id is not None
assert len(a2.foreign_keys) == 1
assert len(a2.c.user_id.foreign_keys) == 1
assert len(a2.constraints) == 2
assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id]
assert [c.parent for c in a2.c.user_id.foreign_keys] == [a2.c.user_id]
assert list(a2.c.user_id.foreign_keys)[0].parent is a2.c.user_id
- assert u2.join(a2).onclause == u2.c.id==a2.c.user_id
+ assert u2.join(a2).onclause.compare(u2.c.id==a2.c.user_id)
meta2 = MetaData(testing.db)
u2 = Table('users', meta2,
autoload=True)
s = sa.select([a2])
- assert s.c.user_id
+ assert s.c.user_id is not None
assert len(a2.foreign_keys) == 1
assert len(a2.c.user_id.foreign_keys) == 1
assert len(a2.constraints) == 2
assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id]
assert [c.parent for c in a2.c.user_id.foreign_keys] == [a2.c.user_id]
assert list(a2.c.user_id.foreign_keys)[0].parent is a2.c.user_id
- assert u2.join(a2).onclause == u2.c.id==a2.c.user_id
+ assert u2.join(a2).onclause.compare(u2.c.id==a2.c.user_id)
finally:
meta.drop_all()
assert r1.unique == True
assert r2.unique == False
assert r3.unique == False
- assert [t2.c.id] == r1.columns
- assert [t2.c.name, t2.c.id] == r2.columns
- assert [t2.c.name] == r3.columns
+ assert set([t2.c.id]) == set(r1.columns)
+ assert set([t2.c.name, t2.c.id]) == set(r2.columns)
+ assert set([t2.c.name]) == set(r3.columns)
finally:
m1.drop_all()
# the exclude_properties collection so that sibling classes
# don't cross-pollinate.
- assert Person.__table__.c.company_id
- assert Person.__table__.c.golf_swing
- assert Person.__table__.c.primary_language
- assert Engineer.primary_language
- assert Manager.golf_swing
+ assert Person.__table__.c.company_id is not None
+ assert Person.__table__.c.golf_swing is not None
+ assert Person.__table__.c.primary_language is not None
+ assert Engineer.primary_language is not None
+ assert Manager.golf_swing is not None
assert not hasattr(Person, 'primary_language')
assert not hasattr(Person, 'golf_swing')
assert not hasattr(Engineer, 'golf_swing')
id = Column(Integer, ForeignKey('people.id'), primary_key=True)
primary_language = Column(String(50))
- assert Person.__table__.c.golf_swing
+ assert Person.__table__.c.golf_swing is not None
assert not Person.__table__.c.has_key('primary_language')
- assert Engineer.__table__.c.primary_language
- assert Engineer.primary_language
- assert Manager.golf_swing
+ assert Engineer.__table__.c.primary_language is not None
+ assert Engineer.primary_language is not None
+ assert Manager.golf_swing is not None
assert not hasattr(Person, 'primary_language')
assert not hasattr(Person, 'golf_swing')
assert not hasattr(Engineer, 'golf_swing')
# gets joined normally with the extra column.
eq_(
- class_mapper(Sub).get_property('id').columns,
- [base.c.base_id, subtable.c.base_id]
+ set(class_mapper(Sub).get_property('id').columns),
+ set([base.c.base_id, subtable.c.base_id])
)
s1 = Sub()
'a1s': relation(A, secondary=c2a1, lazy=False),
'a2s': relation(A, secondary=c2a2, lazy=False)})
- assert create_session().query(C).with_labels().statement
+ assert create_session().query(C).with_labels().statement is not None
# TODO: seems like just a test for an ancient exception throw.
# how about some data/inserts/queries/assertions for this one
compile_mappers()
assert m.get_property('b_view').local_remote_pairs == \
m.get_property('b_plain').local_remote_pairs == \
- [(t1.c.id, t12.c.t1_id), (t12.c.t2_id, t2.c.id)]
+ [(t1.c.id, t12.c.t1_id), (t2.c.id, t12.c.t2_id)]
engine = engines.testing_engine(options={'implicit_returning':False})
result = engine.execute(sometable.insert(), name="somename")
- assert 'id' in result.postfetch_cols()
+
+ assert set(result.postfetch_cols()) == set([sometable.c.obj_id])
result = engine.execute(sometable.insert(), name="someother")
- assert 'id' in result.postfetch_cols()
+ assert set(result.postfetch_cols()) == set([sometable.c.obj_id])
sometable.insert().execute(
{'name':'name3'},
from sqlalchemy.sql.visitors import *
from sqlalchemy import util
from sqlalchemy.sql import util as sql_util
-
+from sqlalchemy.test.testing import eq_
class TraversalTest(TestBase, AssertsExecutionResults):
"""test ClauseVisitor's traversal, particularly its ability to copy and modify
def test_binary(self):
clause = t1.c.col2 == t2.c.col2
- assert str(clause) == CloningVisitor().traverse(clause)
+ eq_(str(clause), str(CloningVisitor().traverse(clause)))
def test_binary_anon_label_quirk(self):
t = table('t1', column('col1'))
(t1.c.col1, 'col1', 'mytable.col1', None),
(column('some wacky thing'), 'some wacky thing', '"some wacky thing"', '')
):
- s1 = select([col], from_obj=getattr(col, 'table', None) or table1)
+ if getattr(col, 'table', None) is not None:
+ t = col.table
+ else:
+ t = table1
+
+ s1 = select([col], from_obj=t)
assert s1.c.keys() == [key], s1.c.keys()
if label:
u = union(select([table1.c.col1, table1.c.col2, table1.c.col3]), select([table1.c.col1, table1.c.col2, table1.c.col3]))
u = union(select([table1.c.col1, table1.c.col2, table1.c.col3]))
- assert u.c.col1
- assert u.c.col2
- assert u.c.col3
+ assert u.c.col1 is not None
+ assert u.c.col2 is not None
+ assert u.c.col3 is not None
def test_alias_union(self):
# same as testunion, except its an alias of the union
l1 = select([func.max(table1.c.col1)]).label('foo')
s = select([l1])
- assert s.corresponding_column(l1).name == s.c.foo
+ eq_(s.corresponding_column(l1), s.c.foo)
s = select([table1.c.col1, l1])
- assert s.corresponding_column(l1).name == s.c.foo
+ eq_(s.corresponding_column(l1), s.c.foo)
def test_select_alias_labels(self):
a = table2.select(use_labels=True).alias('a')
d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'), primary_key=True), Column('x', Integer))
print list(a.join(b, a.c.x==b.c.id).primary_key)
- assert list(a.join(b, a.c.x==b.c.id).primary_key) == [b.c.id]
+ assert list(a.join(b, a.c.x==b.c.id).primary_key) == [a.c.id]
assert list(b.join(c, b.c.x==c.c.id).primary_key) == [b.c.id]
assert list(a.join(b).join(c, c.c.id==b.c.x).primary_key) == [a.c.id]
- assert list(b.join(c, c.c.x==b.c.id).join(d).primary_key) == [c.c.id]
+ assert list(b.join(c, c.c.x==b.c.id).join(d).primary_key) == [b.c.id]
assert list(b.join(c, c.c.id==b.c.x).join(d).primary_key) == [b.c.id]
- assert list(d.join(b, d.c.id==b.c.id).join(c, b.c.id==c.c.x).primary_key) == [c.c.id]
+ assert list(d.join(b, d.c.id==b.c.id).join(c, b.c.id==c.c.x).primary_key) == [b.c.id]
assert list(a.join(b).join(c, c.c.id==b.c.x).join(d).primary_key) == [a.c.id]
assert list(a.join(b, and_(a.c.id==b.c.id, a.c.x==b.c.id)).primary_key) == [a.c.id]