from .engine import create_engine, engine_from_config
-__all__ = sorted(name for name, obj in list(locals().items())
+__all__ = sorted(name for name, obj in locals().items()
if not (name.startswith('_') or _inspect.ismodule(obj)))
__version__ = '0.8.2'
e, None, None, None, self)
else:
inputsizes = {}
- for key in list(self.compiled.bind_names.values()):
+ for key in self.compiled.bind_names.values():
typeengine = types[key]
dbtype = typeengine.dialect_impl(self.dialect).\
get_dbapi_type(self.dialect.dbapi)
def clear(self):
"""Clear all class level listeners"""
- for dispatcher in list(self._clslevel.values()):
+ for dispatcher in self._clslevel.values():
dispatcher[:] = []
def for_modify(self, obj):
iteritems = _iteritems
def values(self):
- return [self._get(member) for member in list(self.col.values())]
+ return [self._get(member) for member in self.col.values()]
def items(self):
return [(k, self._get(self.col[k])) for k in self]
def prepare(cls, engine):
"""Reflect all :class:`.Table` objects for all current
:class:`.DeferredReflection` subclasses"""
- to_map = [m for m in list(_MapperConfig.configs.values())
+ to_map = [m for m in _MapperConfig.configs.values()
if issubclass(m.cls, cls)]
for thingy in to_map:
cls._sa_decl_prepare(thingy.local_table, engine)
class_mapped = _declared_mapping_info(base) is not None
- for name, obj in list(vars(base).items()):
+ for name, obj in vars(base).items():
if name == '__mapper_args__':
if not mapper_args_fn and (
not class_mapped or
ret.doc = obj.__doc__
# apply inherited columns as we should
- for k, v in list(potential_columns.items()):
+ for k, v in potential_columns.items():
dict_[k] = v
if inherited_table_args and not tablename:
def changed(self):
"""Subclasses should call this method whenever change events occur."""
- for parent, key in list(self._parents.items()):
+ for parent, key in self._parents.items():
flag_modified(parent, key)
@classmethod
def changed(self):
"""Subclasses should call this method whenever change events occur."""
- for parent, key in list(self._parents.items()):
+ for parent, key in self._parents.items():
prop = object_mapper(parent).get_property(key)
for value, attr_name in zip(
self.collection_factory = typecallable
def __copy(self, item):
- return [y for y in list(collections.collection_adapter(item))]
+ return [y for y in collections.collection_adapter(item)]
def get_history(self, state, dict_, passive=PASSIVE_OFF):
current = self.get(state, dict_, passive=passive)
# search for _sa_instrument_role-decorated methods in
# method resolution order, assign to roles
for supercls in cls.__mro__:
- for name, method in list(vars(supercls).items()):
+ for name, method in vars(supercls).items():
if not util.callable(method):
continue
collection_type = util.duck_type_collection(cls)
if collection_type in __interfaces:
canned_roles, decorators = __interfaces[collection_type]
- for role, name in list(canned_roles.items()):
+ for role, name in canned_roles.items():
roles.setdefault(role, name)
# apply ABC auto-decoration to methods that need it
- for method, decorator in list(decorators.items()):
+ for method, decorator in decorators.items():
fn = getattr(cls, method, None)
if (fn and method not in methods and
not hasattr(fn, '_sa_instrumented')):
# apply ad-hoc instrumentation from decorators, class-level defaults
# and implicit role declarations
- for method_name, (before, argument, after) in list(methods.items()):
+ for method_name, (before, argument, after) in methods.items():
setattr(cls, method_name,
_instrument_membership_mutator(getattr(cls, method_name),
before, argument, after))
# intern the role map
- for role, method_name in list(roles.items()):
+ for role, method_name in roles.items():
setattr(cls, '_sa_%s' % role, getattr(cls, method_name))
setattr(cls, '_sa_instrumented', id(cls))
def update(self, __other=Unspecified, **kw):
if __other is not Unspecified:
if hasattr(__other, 'keys'):
- for key in list(__other.keys()):
+ for key in list(__other):
if (key not in self or
self[key] is not __other[key]):
self[key] = __other[key]
raise sa_exc.ArgumentError(
"Valid strategies for session synchronization "
"are %s" % (", ".join(sorted(repr(x)
- for x in list(lookup.keys())))))
+ for x in lookup))))
else:
return klass(*arg)
def close_all(cls):
"""Close *all* sessions in memory."""
- for sess in list(_sessions.values()):
+ for sess in _sessions.values():
sess.close()
@classmethod
if s.key:
del s.key
- for s, (oldkey, newkey) in list(self._key_switches.items()):
+ for s, (oldkey, newkey) in self._key_switches.items():
self.session.identity_map.discard(s)
s.key = oldkey
self.session.identity_map.replace(s)
session = Session() # invokes sessionmaker.__call__()
"""
- for k, v in list(self.kw.items()):
+ for k, v in self.kw.items():
local_kw.setdefault(k, v)
return self.class_(**local_kw)
return "%s(class_=%r%s)" % (
self.__class__.__name__,
self.class_.__name__,
- ", ".join("%s=%r" % (k, v) for k, v in list(self.kw.items()))
+ ", ".join("%s=%r" % (k, v) for k, v in self.kw.items())
)
_sessions = weakref.WeakValueDictionary()
against this set when a refresh operation occurs.
"""
- return set([k for k, v in list(self.callables.items()) if v is self])
+ return set([k for k, v in self.callables.items() if v is self])
def _instance_dict(self):
return None
)
if self.use_get:
- for col in list(self._equated_columns.keys()):
+ for col in list(self._equated_columns):
if col in self.mapper._equivalent_columns:
for c in self.mapper._equivalent_columns[col]:
self._equated_columns[c] = self._equated_columns[col]
for dep in convert[edge[1]]:
self.dependencies.add((edge[0], dep))
- return set([a for a in list(self.postsort_actions.values())
+ return set([a for a in self.postsort_actions.values()
if not a.disabled
]
).difference(cycles)
def __repr__(self):
return "%s(%s)" % (
self.__class__.__name__,
- ",".join(str(x) for x in list(self.__dict__.values()))
+ ",".join(str(x) for x in self.__dict__.values())
)
self._create_pool_mutex = threading.Lock()
def close(self):
- for key in list(self.pools.keys()):
+ for key in list(self.pools):
del self.pools[key]
def __del__(self):
if self.key in table._columns:
col = table._columns.get(self.key)
if col is not self:
- for fk in list(col.foreign_keys):
+ for fk in col.foreign_keys:
table.foreign_keys.remove(fk)
if fk.constraint in table.constraints:
# this might have been removed
remote_table = list(constraint._elements.values())[0].column.table
text += "FOREIGN KEY(%s) REFERENCES %s (%s)" % (
', '.join(preparer.quote(f.parent.name, f.parent.quote)
- for f in list(constraint._elements.values())),
+ for f in constraint._elements.values()),
self.define_constraint_remote_table(
constraint, remote_table, preparer),
', '.join(preparer.quote(f.column.name, f.column.quote)
- for f in list(constraint._elements.values()))
+ for f in constraint._elements.values())
)
text += self.define_constraint_match(constraint)
text += self.define_constraint_cascades(constraint)
from .. import event, pool
import re
import warnings
-
+from .. import util
class ConnectionKiller(object):
"rollback/close connection: %s" % e)
def rollback_all(self):
- for rec in list(self.proxy_refs.keys()):
+ for rec in list(self.proxy_refs):
if rec is not None and rec.is_valid:
self._safe(rec.rollback)
def close_all(self):
- for rec in list(self.proxy_refs.keys()):
+ for rec in list(self.proxy_refs):
if rec is not None:
self._safe(rec._close)
self.conns = set()
- for rec in list(self.testing_engines.keys()):
+ for rec in list(self.testing_engines):
if rec is not config.db:
rec.dispose()
for conn in self.conns:
self._safe(conn.close)
self.conns = set()
- for rec in list(self.testing_engines.keys()):
+ for rec in list(self.testing_engines):
rec.dispose()
def assert_all_closed(self):
Callable = object()
NoAttribute = object()
-# start Py3K
- Natives = set([getattr(types, t)
- for t in dir(types) if not t.startswith('_')]). \
- union([type(t) if not isinstance(t, type)
- else t for t in list(__builtins__.values())]).\
+ if util.py2k:
+ Natives = set([getattr(types, t)
+ for t in dir(types) if not t.startswith('_')]).\
difference([getattr(types, t)
- for t in ('FunctionType', 'BuiltinFunctionType',
- 'MethodType', 'BuiltinMethodType',
- 'LambdaType', )])
-# end Py3K
-# start Py2K
-# Natives = set([getattr(types, t)
-# for t in dir(types) if not t.startswith('_')]). \
-# difference([getattr(types, t)
-# for t in ('FunctionType', 'BuiltinFunctionType',
-# 'MethodType', 'BuiltinMethodType',
-# 'LambdaType', 'UnboundMethodType',)])
-# end Py2K
+ for t in ('FunctionType', 'BuiltinFunctionType',
+ 'MethodType', 'BuiltinMethodType',
+ 'LambdaType', 'UnboundMethodType',)])
+ else:
+ Natives = set([getattr(types, t)
+ for t in dir(types) if not t.startswith('_')]).\
+ union([type(t) if not isinstance(t, type)
+ else t for t in __builtins__.values()]).\
+ difference([getattr(types, t)
+ for t in ('FunctionType', 'BuiltinFunctionType',
+ 'MethodType', 'BuiltinMethodType',
+ 'LambdaType', )])
def __init__(self):
self.buffer = deque()
a = self
b = other
- for attr in list(a.__dict__.keys()):
+ for attr in list(a.__dict__):
if attr.startswith('_'):
continue
value = getattr(a, attr)
def Table(*args, **kw):
"""A schema.Table wrapper/hook for dialect-specific tweaks."""
- test_opts = dict([(k, kw.pop(k)) for k in list(kw.keys())
+ test_opts = dict([(k, kw.pop(k)) for k in list(kw)
if k.startswith('test_')])
kw.update(table_options)
def Column(*args, **kw):
"""A schema.Column wrapper/hook for dialect-specific tweaks."""
- test_opts = dict([(k, kw.pop(k)) for k in list(kw.keys())
+ test_opts = dict([(k, kw.pop(k)) for k in list(kw)
if k.startswith('test_')])
if not config.requirements.foreign_key_ddl.enabled:
.. versionadded:: 0.8
"""
- return dict((key, self.__dict__[key]) for key in list(self.keys()))
+ return dict((key, self.__dict__[key]) for key in self.keys())
class ImmutableContainer(object):
def update(self, ____sequence=None, **kwargs):
if ____sequence is not None:
if hasattr(____sequence, 'keys'):
- for key in list(____sequence.keys()):
+ for key in ____sequence.keys():
self.__setitem__(key, ____sequence[key])
else:
for key, value in ____sequence:
return iter(list(self.keys()))
def items(self):
- return [(key, self[key]) for key in list(self.keys())]
+ return [(key, self[key]) for key in self.keys()]
def iteritems(self):
return iter(list(self.items()))
from sqlalchemy.testing import fixtures, AssertsExecutionResults, profiling
from sqlalchemy import testing
from sqlalchemy.testing import eq_
+from sqlalchemy.util import u
NUM_FIELDS = 10
NUM_RECORDS = 1000
def setup(self):
metadata.create_all()
- t.insert().execute([dict(('field%d' % fnum, 'value%d' % fnum)
+ t.insert().execute([dict(('field%d' % fnum, u('value%d' % fnum))
for fnum in range(NUM_FIELDS)) for r_num in
range(NUM_RECORDS)])
- t2.insert().execute([dict(('field%d' % fnum, 'value%d' % fnum)
+ t2.insert().execute([dict(('field%d' % fnum, u('value%d' % fnum))
for fnum in range(NUM_FIELDS)) for r_num in
range(NUM_RECORDS)])