def _coerce_config(configuration, prefix):
"""Convert configuration values to expected types."""
- options = dict([(key[len(prefix):], configuration[key])
- for key in configuration if key.startswith(prefix)])
+ options = dict((key[len(prefix):], configuration[key])
+ for key in configuration
+ if key.startswith(prefix))
for option, type_ in (
('convert_unicode', bool),
('pool_timeout', int),
def __eq__(self, other):
return ((other is self) or
- (other == tuple([self.__parent._get_col(self.__row, key)
- for key in xrange(len(self.__row))])))
+ (other == tuple(self.__parent._get_col(self.__row, key)
+ for key in xrange(len(self.__row)))))
def __ne__(self, other):
return not self.__eq__(other)
self.__props[o] = rec
if self.__echo:
- self.context.engine.logger.debug("Col " + repr(tuple([x[0] for x in metadata])))
+ self.context.engine.logger.debug(
+ "Col " + repr(tuple(x[0] for x in metadata)))
def _create_key_cache(self):
# local copies to avoid circular ref against 'self'
# so we use an exception catch to reduce conditionals in _get_col
if isinstance(key, slice):
indices = key.indices(len(row))
- return tuple([self._get_col(row, i) for i in xrange(*indices)])
+ return tuple(self._get_col(row, i) for i in xrange(*indices))
else:
raise
# so we use an exception catch to reduce conditionals in _get_col
if isinstance(key, slice):
indices = key.indices(len(row))
- return tuple([self._get_col(row, i) for i in xrange(*indices)])
+ return tuple(self._get_col(row, i) for i in xrange(*indices))
else:
raise
# compiled clauseelement. process bind params, process table defaults,
# track collections used by ResultProxy to target and process results
- self.processors = dict([
+ self.processors = dict(
(key, value) for key, value in
- [(
- compiled.bind_names[bindparam],
- bindparam.bind_processor(self.dialect)
- ) for bindparam in compiled.bind_names]
- if value is not None
- ])
+ ( (compiled.bind_names[bindparam],
+ bindparam.bind_processor(self.dialect))
+ for bindparam in compiled.bind_names )
+ if value is not None)
self.result_map = compiled.result_map
# that doesnt specify positional (because of execute_text())
if not isinstance(d, dict):
return d
- return dict([(k.encode(self.dialect.encoding), d[k]) for k in d])
+ return dict((k.encode(self.dialect.encoding), d[k]) for k in d)
return [proc(d) for d in params] or [{}]
def __convert_compiled_params(self, compiled_parameters):
from the bind parameter's ``TypeEngine`` objects.
"""
- types = dict([
+ types = dict(
(self.compiled.bind_names[bindparam], bindparam.type)
- for bindparam in self.compiled.bind_names
- ])
+ for bindparam in self.compiled.bind_names)
if self.dialect.positional:
inputsizes = []
"Invalid argument(s) %s sent to create_engine(), "
"using configuration %s/%s/%s. Please check that the "
"keyword arguments are appropriate for this combination "
- "of components." % (','.join(["'%s'" % k for k in kwargs]),
+ "of components." % (','.join("'%s'" % k for k in kwargs),
dialect.__class__.__name__,
pool.__class__.__name__,
engineclass.__name__))
if self.query:
keys = self.query.keys()
keys.sort()
- s += '?' + "&".join(["%s=%s" % (k, self.query[k]) for k in keys])
+ s += '?' + "&".join("%s=%s" % (k, self.query[k]) for k in keys)
return s
def __eq__(self, other):
components['database'] = tokens[0]
query = (len(tokens) > 1 and dict(cgi.parse_qsl(tokens[1]))) or None
if query is not None:
- query = dict([(k.encode('ascii'), query[k]) for k in query])
+ query = dict((k.encode('ascii'), query[k]) for k in query)
else:
query = None
components['query'] = query
self.connection_invalidated = connection_invalidated
def __str__(self):
- return ' '.join([SQLAlchemyError.__str__(self),
- repr(self.statement), repr(self.params)])
+ return ' '.join((SQLAlchemyError.__str__(self),
+ repr(self.statement), repr(self.params)))
# As of 0.4, SQLError is now DBAPIError.
if isinstance(selectable, sql.Alias):
return _selectable_name(selectable.element)
elif isinstance(selectable, sql.Select):
- return ''.join([_selectable_name(s) for s in selectable.froms])
+ return ''.join(_selectable_name(s) for s in selectable.froms)
elif isinstance(selectable, schema.Table):
return selectable.name.capitalize()
else:
def unmodified(self):
"""a set of keys which have no uncommitted changes"""
- return util.Set([
- key for key in self.manager.keys() if
- key not in self.committed_state
- or (key in self.manager.mutable_attributes and not self.manager[key].impl.check_mutable_modified(self))
- ])
+ return util.Set(
+ key for key in self.manager.keys()
+ if (key not in self.committed_state or
+ (key in self.manager.mutable_attributes and
+ not self.manager[key].impl.check_mutable_modified(self))))
+
unmodified = property(unmodified)
def unloaded(self):
was never populated or modified.
"""
- return util.Set([
- key for key in self.manager.keys() if
- key not in self.committed_state and key not in self.dict
- ])
+ return util.Set(
+ key for key in self.manager.keys()
+ if key not in self.committed_state and key not in self.dict)
+
unloaded = property(unloaded)
def expire_attributes(self, attribute_names):
def keyfunc(value):
state = instance_state(value)
m = _state_mapper(state)
- return tuple([m._get_state_attr_by_column(state, c)
- for c in mapping_spec])
+ return tuple(m._get_state_attr_by_column(state, c)
+ for c in mapping_spec)
return lambda: MappedCollection(keyfunc)
def attribute_mapped_collection(attr_name):
self._process_key_switches(deplist, uowcommit)
def _process_key_switches(self, deplist, uowcommit):
- switchers = util.Set([s for s in deplist if self._pks_changed(uowcommit, s)])
+ switchers = util.Set(s for s in deplist if self._pks_changed(uowcommit, s))
if switchers:
# yes, we're doing a linear search right now through the UOW. only
# takes effect when primary key values have actually changed.
self._inheriting_mappers = []
def polymorphic_iterator(self):
- return iter([self])
+ return iter((self,))
def _register_dependencies(self, uowcommit):
pass
class UnevaluatableError(Exception):
pass
-_straight_ops = Set([getattr(operators, op) for op in [
- 'add', 'mul', 'sub', 'div', 'mod', 'truediv', 'lt', 'le', 'ne', 'gt', 'ge', 'eq'
-]])
+_straight_ops = Set(getattr(operators, op)
+ for op in ('add', 'mul', 'sub', 'div', 'mod', 'truediv',
+ 'lt', 'le', 'ne', 'gt', 'ge', 'eq'))
-_notimplemented_ops = Set([getattr(operators, op) for op in [
- 'like_op', 'notlike_op', 'ilike_op', 'notilike_op', 'between_op', 'in_op', 'notin_op',
- 'endswith_op', 'concat_op',
-]])
+_notimplemented_ops = Set(getattr(operators, op)
+ for op in ('like_op', 'notlike_op', 'ilike_op',
+ 'notilike_op', 'between_op', 'in_op',
+ 'notin_op', 'endswith_op', 'concat_op'))
class EvaluatorCompiler(object):
def process(self, clause):
See PropertyLoader for the related instance implementation.
"""
- return iter([])
+ return iter(())
def set_parent(self, parent):
self.parent = parent
self._cols_by_table = {}
all_cols = util.Set(chain(*[col.proxy_set for col in self._columntoproperty]))
- pk_cols = util.Set([c for c in all_cols if c.primary_key])
+ pk_cols = util.Set(c for c in all_cols if c.primary_key)
# identify primary key columns which are also mapped by this mapper.
for t in util.Set(self.tables + [self.mapped_table]):
# ordering is important since it determines the ordering of mapper.primary_key (and therefore query.get())
self._pks_by_table[t] = util.OrderedSet(t.primary_key).intersection(pk_cols)
self._cols_by_table[t] = util.OrderedSet(t.c).intersection(all_cols)
-
- # determine cols that aren't expressed within our tables;
- # mark these as "read only" properties which are refreshed upon
- # INSERT/UPDATE
- self._readonly_props = util.Set([
- self._columntoproperty[col] for col in self._columntoproperty if
- not hasattr(col, 'table') or col.table not in self._cols_by_table
- ])
-
+
+ # determine cols that aren't expressed within our tables; mark these
+ # as "read only" properties which are refreshed upon INSERT/UPDATE
+ self._readonly_props = util.Set(
+ self._columntoproperty[col]
+ for col in self._columntoproperty
+ if not hasattr(col, 'table') or col.table not in self._cols_by_table)
+
# if explicit PK argument sent, add those columns to the primary key mappings
if self.primary_key_argument:
for k in self.primary_key_argument:
else:
# determine primary key from argument or mapped_table pks - reduce to the minimal set of columns
if self.primary_key_argument:
- primary_key = sqlutil.reduce_columns([self.mapped_table.corresponding_column(c) for c in self.primary_key_argument])
+ primary_key = sqlutil.reduce_columns(
+ self.mapped_table.corresponding_column(c)
+ for c in self.primary_key_argument)
else:
- primary_key = sqlutil.reduce_columns(self._pks_by_table[self.mapped_table])
+ primary_key = sqlutil.reduce_columns(
+ self._pks_by_table[self.mapped_table])
if len(primary_key) == 0:
raise sa_exc.ArgumentError("Mapper %s could not assemble any primary key columns for mapped table '%s'" % (self, self.mapped_table.description))
else:
self.secondary_synchronize_pairs = None
- self._foreign_keys = util.Set([r for l, r in self.synchronize_pairs])
+ self._foreign_keys = util.Set(r for l, r in self.synchronize_pairs)
if self.secondary_synchronize_pairs:
- self._foreign_keys.update([r for l, r in self.secondary_synchronize_pairs])
+ self._foreign_keys.update(r for l, r in self.secondary_synchronize_pairs)
def _determine_direction(self):
if self.secondaryjoin is not None:
if log.is_info_enabled(self.logger):
self.logger.info(str(self) + " setup primary join %s" % self.primaryjoin)
self.logger.info(str(self) + " setup secondary join %s" % self.secondaryjoin)
- self.logger.info(str(self) + " synchronize pairs [%s]" % ",".join(["(%s => %s)" % (l, r) for l, r in self.synchronize_pairs]))
- self.logger.info(str(self) + " secondary synchronize pairs [%s]" % ",".join(["(%s => %s)" % (l, r) for l, r in self.secondary_synchronize_pairs or []]))
- self.logger.info(str(self) + " local/remote pairs [%s]" % ",".join(["(%s / %s)" % (l, r) for l, r in self.local_remote_pairs]))
+ self.logger.info(str(self) + " synchronize pairs [%s]" % ",".join("(%s => %s)" % (l, r) for l, r in self.synchronize_pairs))
+ self.logger.info(str(self) + " secondary synchronize pairs [%s]" % ",".join(("(%s => %s)" % (l, r) for l, r in self.secondary_synchronize_pairs or [])))
+ self.logger.info(str(self) + " local/remote pairs [%s]" % ",".join("(%s / %s)" % (l, r) for l, r in self.local_remote_pairs))
self.logger.info(str(self) + " relation direction %s" % self.direction)
if self.uselist is None and self.direction is MANYTOONE:
query_from_parent = classmethod(util.deprecated(None, False)(query_from_parent))
def correlate(self, *args):
- self._correlate = self._correlate.union([_orm_selectable(s) for s in args])
+ self._correlate = self._correlate.union(_orm_selectable(s) for s in args)
correlate = _generative()(correlate)
def autoflush(self, setting):
(process, labels) = zip(*[query_entity.row_processor(self, context, custom_rows) for query_entity in self._entities])
if not single_entity:
- labels = dict([(label, property(util.itemgetter(i))) for i, label in enumerate(labels) if label])
+ labels = dict((label, property(util.itemgetter(i)))
+ for i, label in enumerate(labels)
+ if label)
rowtuple = type.__new__(type, "RowTuple", (tuple,), labels)
rowtuple.keys = labels.keys
elif single_entity:
rows = [process[0](context, row) for row in fetch]
else:
- rows = [rowtuple([proc(context, row) for proc in process]) for row in fetch]
+ rows = [rowtuple(proc(context, row) for proc in process)
+ for row in fetch]
if filter:
rows = filter(rows)
try:
params[_get_params[primary_key].key] = ident[i]
except IndexError:
- raise sa_exc.InvalidRequestError("Could not find enough values to formulate primary key for query.get(); primary key columns are %s" % ', '.join(["'%s'" % str(c) for c in q.mapper.primary_key]))
+ raise sa_exc.InvalidRequestError("Could not find enough values to formulate primary key for query.get(); primary key columns are %s" % ', '.join("'%s'" % str(c) for c in q.mapper.primary_key))
q._params = params
if lockmode is not None:
"""
for entity, (mapper, adapter, s, i, w) in self._mapper_adapter_map.iteritems():
if mapper.single and mapper.inherits and mapper.polymorphic_on and mapper.polymorphic_identity is not None:
- crit = mapper.polymorphic_on.in_([m.polymorphic_identity for m in mapper.polymorphic_iterator()])
+ crit = mapper.polymorphic_on.in_(
+ m.polymorphic_identity
+ for m in mapper.polymorphic_iterator())
if adapter:
crit = adapter.traverse(crit)
crit = self._adapt_clause(crit, False, False)
self.column = column
self.entity_name = None
self.froms = util.Set()
- self.entities = util.OrderedSet([
- elem._annotations['parententity'] for elem in visitors.iterate(column, {})
- if 'parententity' in elem._annotations
- ])
+ self.entities = util.OrderedSet(
+ elem._annotations['parententity']
+ for elem in visitors.iterate(column, {})
+ if 'parententity' in elem._annotations)
if self.entities:
self.entity_zero = list(self.entities)[0]
from sqlalchemy.orm import mapper
- extension_args = dict([(arg, kwargs.pop(arg))
- for arg in get_cls_kwargs(_ScopedExt)
- if arg in kwargs])
+ extension_args = dict((arg, kwargs.pop(arg))
+ for arg in get_cls_kwargs(_ScopedExt)
+ if arg in kwargs)
kwargs['extension'] = extension = to_list(kwargs.get('extension', []))
if extension_args:
secondaryjoin = visitors.replacement_traverse(secondaryjoin, {}, col_to_bind)
lazywhere = sql.and_(lazywhere, secondaryjoin)
- bind_to_col = dict([(binds[col].key, col) for col in binds])
+ bind_to_col = dict((binds[col].key, col) for col in binds)
return (lazywhere, bind_to_col, equated_columns)
_create_lazy_clause = classmethod(_create_lazy_clause)
ret.append(task)
if self._should_log_debug:
- self.logger.debug("Dependent tuples:\n" + "\n".join(["(%s->%s)" % (d[0].class_.__name__, d[1].class_.__name__) for d in self.dependencies]))
+ self.logger.debug("Dependent tuples:\n" + "\n".join(
+ "(%s->%s)" % (d[0].class_.__name__, d[1].class_.__name__)
+ for d in self.dependencies))
self.logger.debug("Dependency sort:\n"+ str(ret))
return ret
from sqlalchemy.orm.interfaces import MapperExtension, EXT_CONTINUE, PropComparator, MapperProperty
from sqlalchemy.orm import attributes, exc
-all_cascades = util.FrozenSet(["delete", "delete-orphan", "all", "merge",
- "expunge", "save-update", "refresh-expire", "none"])
+all_cascades = util.FrozenSet(("delete", "delete-orphan", "all", "merge",
+ "expunge", "save-update", "refresh-expire",
+ "none"))
_INSTRUMENTOR = ('mapper', 'instrumentor')
"""Keeps track of the options sent to relation().cascade"""
def __init__(self, arg=""):
- values = util.Set([c.strip() for c in arg.split(',')])
+ values = util.Set(c.strip() for c in arg.split(','))
self.delete_orphan = "delete-orphan" in values
self.delete = "delete" in values or "all" in values
self.save_update = "save-update" in values or "all" in values
[sql.literal_column("'%s'" % type).label(typecolname)],
from_obj=[table]))
else:
- result.append(sql.select([col(name, table) for name in colnames], from_obj=[table]))
+ result.append(sql.select([col(name, table) for name in colnames],
+ from_obj=[table]))
return sql.union_all(*result).alias(aliasname)
def identity_key(*args, **kwargs):
"""
- interface = util.Set([method for method in dir(MapperExtension)
- if not method.startswith('_')])
+ interface = util.Set(method for method in dir(MapperExtension)
+ if not method.startswith('_'))
def __init__(self, extensions=None):
self.methods = {}
def __extra_kwargs(self, **kwargs):
# validate remaining kwargs that they all specify DB prefixes
- if len([k for k in kwargs if not re.match(r'^(?:%s)_' % '|'.join(databases.__all__), k)]):
- raise TypeError("Invalid argument(s) for Table: %s" % repr(kwargs.keys()))
+ if len([k for k in kwargs
+ if not re.match(r'^(?:%s)_' % '|'.join(databases.__all__), k)]):
+ raise TypeError(
+ "Invalid argument(s) for Table: %s" % repr(kwargs.keys()))
self.kwargs.update(kwargs)
def __post_init(self, *args, **kwargs):
if kwargs:
raise exc.ArgumentError(
'Unknown PrimaryKeyConstraint argument(s): %s' %
- ', '.join([repr(x) for x in kwargs.keys()]))
+ ', '.join(repr(x) for x in kwargs.keys()))
super(PrimaryKeyConstraint, self).__init__(**constraint_args)
self.__colnames = list(columns)
if kwargs:
raise exc.ArgumentError(
'Unknown UniqueConstraint argument(s): %s' %
- ', '.join([repr(x) for x in kwargs.keys()]))
+ ', '.join(repr(x) for x in kwargs.keys()))
super(UniqueConstraint, self).__init__(**constraint_args)
self.__colnames = list(columns)
def __repr__(self):
return 'Index("%s", %s%s)' % (self.name,
- ', '.join([repr(c)
- for c in self.columns]),
+ ', '.join(repr(c) for c in self.columns),
(self.unique and ', unique=True') or '')
class MetaData(SchemaItem):
sep = ', '
else:
sep = " " + self.operator_string(clauselist.operator) + " "
- return sep.join([s for s in [self.process(c) for c in clauselist.clauses] if s is not None])
+ return sep.join(s for s in (self.process(c) for c in clauselist.clauses)
+ if s is not None)
def visit_calculatedclause(self, clause, **kwargs):
return self.process(clause.clause_expr)
stack_entry['is_subquery'] = True
self.stack.append(stack_entry)
- text = string.join([self.process(c, asfrom=asfrom, parens=False) for c in cs.selects], " " + cs.keyword + " ")
+ text = string.join((self.process(c, asfrom=asfrom, parens=False)
+ for c in cs.selects),
+ " " + cs.keyword + " ")
group_by = self.process(cs._group_by_clause, asfrom=asfrom)
if group_by:
text += " GROUP BY " + group_by
if froms:
text += " \nFROM "
- text += string.join(from_strings, ', ')
+ text += ', '.join(from_strings)
else:
text += self.default_from()
return (insert + " INTO %s (%s) VALUES (%s)" %
(preparer.format_table(insert_stmt.table),
- ', '.join([preparer.quote(c[0].name, c[0].quote)
- for c in colparams]),
- ', '.join([c[1] for c in colparams])))
+ ', '.join(preparer.quote(c[0].name, c[0].quote)
+ for c in colparams),
+ ', '.join(c[1] for c in colparams)))
def visit_update(self, update_stmt):
self.stack.append({'from':util.Set([update_stmt.table])})
self.isupdate = True
colparams = self._get_colparams(update_stmt)
- text = "UPDATE " + self.preparer.format_table(update_stmt.table) + " SET " + string.join(["%s=%s" % (self.preparer.quote(c[0].name, c[0].quote), c[1]) for c in colparams], ', ')
+ text = ' '.join((
+ "UPDATE",
+ self.preparer.format_table(update_stmt.table),
+ 'SET',
+ ', '.join(self.preparer.quote(c[0].name, c[0].quote) + '=' + c[1]
+ for c in colparams)
+ ))
if update_stmt._whereclause:
text += " WHERE " + self.process(update_stmt._whereclause)
if self.column_keys is None:
parameters = {}
else:
- parameters = dict([(getattr(key, 'key', key), None) for key in self.column_keys])
+ parameters = dict((getattr(key, 'key', key), None)
+ for key in self.column_keys)
if stmt.parameters is not None:
for k, v in stmt.parameters.iteritems():
if column.default is not None:
self.traverse_single(column.default)
- self.append("\n" + " ".join(['CREATE'] + table._prefixes + ['TABLE', self.preparer.format_table(table), "("]))
-
+ self.append("\n" + " ".join(['CREATE'] +
+ table._prefixes +
+ ['TABLE',
+ self.preparer.format_table(table),
+ "("]))
separator = "\n"
# if only one primary key, specify it along with the column
if constraint.name is not None:
self.append("CONSTRAINT %s " % self.preparer.format_constraint(constraint))
self.append("PRIMARY KEY ")
- self.append("(%s)" % ', '.join([self.preparer.quote(c.name, c.quote) for c in constraint]))
+ self.append("(%s)" % ', '.join(self.preparer.quote(c.name, c.quote)
+ for c in constraint))
self.define_constraint_deferrability(constraint)
def visit_foreign_key_constraint(self, constraint):
preparer.format_constraint(constraint))
table = list(constraint.elements)[0].column.table
self.append("FOREIGN KEY(%s) REFERENCES %s (%s)" % (
- ', '.join([preparer.quote(f.parent.name, f.parent.quote) for f in constraint.elements]),
+ ', '.join(preparer.quote(f.parent.name, f.parent.quote)
+ for f in constraint.elements),
preparer.format_table(table),
- ', '.join([preparer.quote(f.column.name, f.column.quote) for f in constraint.elements])
+ ', '.join(preparer.quote(f.column.name, f.column.quote)
+ for f in constraint.elements)
))
if constraint.ondelete is not None:
self.append(" ON DELETE %s" % constraint.ondelete)
if constraint.name is not None:
self.append("CONSTRAINT %s " %
self.preparer.format_constraint(constraint))
- self.append(" UNIQUE (%s)" % (', '.join([self.preparer.quote(c.name, c.quote) for c in constraint])))
+ self.append(" UNIQUE (%s)" % (', '.join(self.preparer.quote(c.name, c.quote) for c in constraint)))
self.define_constraint_deferrability(constraint)
def define_constraint_deferrability(self, constraint):
self.append("INDEX %s ON %s (%s)" \
% (preparer.quote(self._validate_identifier(index.name, True), index.quote),
preparer.format_table(index.table),
- string.join([preparer.quote(c.name, c.quote) for c in index.columns], ', ')))
+ ', '.join(preparer.quote(c.name, c.quote)
+ for c in index.columns)))
self.execute()
def base_columns(self):
if not hasattr(self, '_base_columns'):
- self._base_columns = util.Set([c for c in self.proxy_set if not hasattr(c, 'proxies')])
+ self._base_columns = util.Set(c for c in self.proxy_set
+ if not hasattr(c, 'proxies'))
return self._base_columns
base_columns = property(base_columns)
type = property(type)
def _copy_internals(self, clone=_clone):
- self.bindparams = dict([(b.key, clone(b)) for b in self.bindparams.values()])
+ self.bindparams = dict((b.key, clone(b))
+ for b in self.bindparams.values())
def get_children(self, **kwargs):
return self.bindparams.values()
global sql_util
if not sql_util:
from sqlalchemy.sql import util as sql_util
- self._primary_key.extend(sql_util.reduce_columns([c for c in columns if c.primary_key], self.onclause))
- self._columns.update([(col._label, col) for col in columns])
+ self._primary_key.extend(sql_util.reduce_columns(
+ (c for c in columns if c.primary_key), self.onclause))
+ self._columns.update((col._label, col) for col in columns)
self._foreign_keys.update(itertools.chain(*[col.foreign_keys for col in columns]))
self._oid_column = self.left.oid_column
self._whereclause = None
if from_obj:
- self._froms.update([
+ self._froms.update(
_is_literal(f) and _TextClause(f) or f
- for f in util.to_list(from_obj)
- ])
+ for f in util.to_list(from_obj))
if having:
self._having = _literal_as_text(having)
def _copy_internals(self, clone=_clone):
self._reset_exported()
- from_cloned = dict([(f, clone(f)) for f in self._froms.union(self._correlate)])
- self._froms = util.Set([from_cloned[f] for f in self._froms])
- self._correlate = util.Set([from_cloned[f] for f in self._correlate])
+ from_cloned = dict((f, clone(f))
+ for f in self._froms.union(self._correlate))
+ self._froms = util.Set(from_cloned[f] for f in self._froms)
+ self._correlate = util.Set(from_cloned[f] for f in self._correlate)
self._raw_columns = [clone(c) for c in self._raw_columns]
for attr in ('_whereclause', '_having', '_order_by_clause', '_group_by_clause'):
if getattr(self, attr) is not None:
str(self.item) + \
(self.cycles is not None and (" (cycles: " + repr([x for x in self.cycles]) + ")") or "") + \
"\n" + \
- ''.join([str(n) for n in self.children])
+ ''.join(str(n) for n in self.children)
def __repr__(self):
return "%s" % (str(self.item))
yield child
def __len__(self):
- return sum([len(x) for x in self.parent_to_children.values()])
+ return sum(len(x) for x in self.parent_to_children.values())
def __iter__(self):
for parent, children in self.parent_to_children.iteritems():
traverse(parent)
# sets are not hashable, so uniquify with id
- unique_cycles = dict([(id(s), s) for s in cycles.values()]).values()
+ unique_cycles = dict((id(s), s) for s in cycles.values()).values()
for cycle in unique_cycles:
edgecollection = [edge for edge in edges
if edge[0] in cycle and edge[1] in cycle]
def __repr__(self):
return "%s(%s)" % (
self.__class__.__name__,
- ", ".join(["%s=%r" % (k, getattr(self, k, None))
- for k in inspect.getargspec(self.__init__)[0][1:]]))
+ ", ".join("%s=%r" % (k, getattr(self, k, None))
+ for k in inspect.getargspec(self.__init__)[0][1:]))
class TypeEngine(AbstractType):
def dialect_impl(self, dialect, **kwargs):
yield key, getter(key)
return iterator()
elif hasattr(dictlike, 'keys'):
- return iter([(key, getter(key)) for key in dictlike.keys()])
+ return iter((key, getter(key)) for key in dictlike.keys())
else:
raise TypeError(
"Object '%r' is not dict-like" % dictlike)
return arg
else:
if isinstance(argtype, tuple):
- raise exc.ArgumentError("Argument '%s' is expected to be one of type %s, got '%s'" % (name, ' or '.join(["'%s'" % str(a) for a in argtype]), str(type(arg))))
+ raise exc.ArgumentError("Argument '%s' is expected to be one of type %s, got '%s'" % (name, ' or '.join("'%s'" % str(a) for a in argtype), str(type(arg))))
else:
raise exc.ArgumentError("Argument '%s' is expected to be of type '%s', got '%s'" % (name, str(argtype), str(type(arg))))
def intersection(self, other):
other = Set(other)
- return self.__class__([a for a in self if a in other])
+ return self.__class__(a for a in self if a in other)
__and__ = intersection
def symmetric_difference(self, other):
other = Set(other)
- result = self.__class__([a for a in self if a not in other])
- result.update([a for a in other if a not in self])
+ result = self.__class__(a for a in self if a not in other)
+ result.update(a for a in other if a not in self)
return result
__xor__ = symmetric_difference
def difference(self, other):
other = Set(other)
- return self.__class__([a for a in self if a not in other])
+ return self.__class__(a for a in self if a not in other)
__sub__ = difference
return cmp(tuple(self), tuple(other))
def __iter__(self):
- return iter([arg() for arg in self.args])
+ return iter(arg() for arg in self.args)
class _symbol(object):
def __init__(self, name):
from ez_setup import use_setuptools
use_setuptools()
+import os
+import sys
+from os import path
from setuptools import setup, find_packages
from distutils.command.build_py import build_py as _build_py
from setuptools.command.sdist import sdist as _sdist
-import os
-from os import path
+
+if sys.version_info < (2, 4):
+ raise Exception("SQLAlchemy requires Python 2.4 or higher.")
v = open(path.join(path.dirname(__file__), 'VERSION'))
VERSION = v.readline().strip()
Column('c1', Integer, primary_key=True),
Column('c2', String(30)))
- @profiling.function_call_count(67, {'2.3': 44, '2.4': 42})
+ @profiling.function_call_count(72, {'2.3': 44, '2.4': 46})
def test_insert(self):
t1.insert().compile()