# the MIT License: http://www.opensource.org/licenses/mit-license.php
from .compat import callable, cmp, reduce, defaultdict, py25_dict, \
- threading, py3k, py3k_warning, jython, pypy, cpython, win32, set_types, buffer, \
- pickle, update_wrapper, partial, md5_hex, decode_slice, dottedgetter,\
- parse_qsl, any, contextmanager, namedtuple, next, WeakSet
+ threading, py3k, py3k_warning, jython, pypy, cpython, win32, set_types, \
+ buffer, pickle, update_wrapper, partial, md5_hex, decode_slice, \
+ dottedgetter, parse_qsl, any, contextmanager, namedtuple, next, WeakSet
from ._collections import KeyedTuple, ImmutableContainer, immutabledict, \
Properties, OrderedProperties, ImmutableProperties, OrderedDict, \
from .deprecations import warn_deprecated, warn_pending_deprecation, \
deprecated, pending_deprecation
-
except ImportError:
import pickle
+
# a controversial feature, required by MySQLdb currently
def buffer(x):
return x
# they're bringing it back in 3.2. brilliant !
def callable(fn):
return hasattr(fn, '__call__')
+
def cmp(a, b):
return (a > b) - (a < b)
for i, fname in enumerate(fieldnames):
setattr(tup, fname, tup[i])
return tup
- tuptype = type(typename, (tuple, ), {'__new__':__new__})
+ tuptype = type(typename, (tuple, ), {'__new__': __new__})
return tuptype
try:
from collections import defaultdict
except ImportError:
class defaultdict(dict):
+
def __init__(self, default_factory=None, *a, **kw):
if (default_factory is not None and
not hasattr(default_factory, '__call__')):
raise TypeError('first argument must be callable')
dict.__init__(self, *a, **kw)
self.default_factory = default_factory
+
def __getitem__(self, key):
try:
return dict.__getitem__(self, key)
except KeyError:
return self.__missing__(key)
+
def __missing__(self, key):
if self.default_factory is None:
raise KeyError(key)
self[key] = value = self.default_factory()
return value
+
def __reduce__(self):
if self.default_factory is None:
args = tuple()
else:
args = self.default_factory,
return type(self), args, None, None, self.iteritems()
+
def copy(self):
return self.__copy__()
+
def __copy__(self):
return type(self)(self.default_factory, self)
+
def __deepcopy__(self, memo):
import copy
return type(self)(self.default_factory,
copy.deepcopy(self.items()))
+
def __repr__(self):
return 'defaultdict(%s, %s)' % (self.default_factory,
dict.__repr__(self))
def add(self, other):
self._storage[other] = True
+
# find or create a dict implementation that supports __missing__
class _probe(dict):
def __missing__(self, key):
import md5
_md5 = md5.new
+
def md5_hex(x):
# Py3K
#x = x.encode('utf-8')
import decimal
-
callable, inspect_getfullargspec
from .. import exc
+
def _unique_symbols(used, *bases):
used = set(used)
for base in bases:
else:
raise NameError("exhausted namespace for symbol base %s" % base)
+
def decorator(target):
"""A signature-matching decorator factory."""
code = 'lambda %(args)s: %(target)s(%(fn)s, %(apply_kw)s)' % (
metadata)
- decorated = eval(code, {targ_name:target, fn_name:fn})
+ decorated = eval(code, {targ_name: target, fn_name: fn})
decorated.func_defaults = getattr(fn, 'im_func', fn).func_defaults
return update_wrapper(decorated, fn)
return update_wrapper(decorate, target)
+
class PluginLoader(object):
+
def __init__(self, group, auto_fn=None):
self.group = group
self.impls = {}
def load(self, name):
if name in self.impls:
- return self.impls[name]()
+ return self.impls[name]()
if self.auto_fn:
loader = self.auto_fn(name)
try:
from inspect import CO_VARKEYWORDS
+
def inspect_func_args(fn):
co = fn.func_code
nargs = co.co_argcount
args = list(names[:nargs])
has_kw = bool(co.co_flags & CO_VARKEYWORDS)
return args, has_kw
+
except ImportError:
def inspect_func_args(fn):
names, _, has_kw, _ = inspect.getargspec(fn)
return names, bool(has_kw)
+
def get_func_kwargs(func):
"""Return the set of legal kwargs for the given `func`.
return inspect.getargspec(func)[0]
+
def format_argspec_plus(fn, grouped=True):
"""Returns a dictionary of formatted, introspected function arguments.
return dict(args=args[1:-1], self_arg=self_arg,
apply_pos=apply_pos[1:-1], apply_kw=apply_kw[1:-1])
+
def format_argspec_init(method, grouped=True):
"""format_argspec_plus with considerations for typical __init__ methods
or 'self, *args, **kwargs')
return dict(self_arg='self', args=args, apply_pos=args, apply_kw=args)
+
def getargspec_init(method):
"""inspect.getargspec with considerations for typical __init__ methods
def unbound_method_to_callable(func_or_cls):
- """Adjust the incoming callable such that a 'self' argument is not required."""
+ """Adjust the incoming callable such that a 'self' argument is not
+ required.
+
+ """
if isinstance(func_or_cls, types.MethodType) and not func_or_cls.im_self:
return func_or_cls.im_func
else:
return func_or_cls
+
def generic_repr(obj, additional_kw=(), to_inspect=None):
"""Produce a __repr__() based on direct association of the __init__()
specification vs. same-named attributes present.
"""
if to_inspect is None:
to_inspect = obj
+
def genargs():
try:
- (args, vargs, vkw, defaults) = inspect.getargspec(to_inspect.__init__)
+ (args, vargs, vkw, defaults) = \
+ inspect.getargspec(to_inspect.__init__)
except TypeError:
return
return "%s(%s)" % (obj.__class__.__name__, ", ".join(genargs()))
+
class portable_instancemethod(object):
"""Turn an instancemethod into a (parent, name) pair
to produce a serializable callable.
def __call__(self, *arg, **kw):
return getattr(self.target, self.name)(*arg, **kw)
+
def class_hierarchy(cls):
"""Return an unordered sequence of all classes related to cls.
hier.add(s)
return list(hier)
+
def iterate_attributes(cls):
"""iterate all the keys and attributes associated
with a class, without using getattr().
yield (key, c.__dict__[key])
break
+
def monkeypatch_proxied_specials(into_cls, from_cls, skip=None, only=None,
name='self.proxy', from_instance=None):
"""Automates delegation of __specials__ for a proxying type."""
return getattr(meth1, 'im_func', meth1) is getattr(meth2, 'im_func', meth2)
# end Py2K
+
def as_interface(obj, cls=None, methods=None, required=None):
"""Ensure basic interface compliance for an instance or dict of callables.
def _reset(self, obj):
obj.__dict__.pop(self.__name__, None)
+
class memoized_instancemethod(object):
"""Decorate a method memoize its return value.
def __get__(self, obj, cls):
if obj is None:
return self
+
def oneshot(*args, **kw):
result = self.fget(obj, *args, **kw)
memo = lambda *a, **kw: result
memo.__doc__ = self.__doc__
obj.__dict__[self.__name__] = memo
return result
+
oneshot.__name__ = self.__name__
oneshot.__doc__ = self.__doc__
return oneshot
self.attributes.append(fn.__name__)
return memoized_instancemethod(fn)
+
class importlater(object):
"""Deferred import object.
self.__dict__[key] = attr
return attr
+
# from paste.deploy.converters
def asbool(obj):
if isinstance(obj, (str, unicode)):
raise ValueError("String is not true/false: %r" % obj)
return bool(obj)
+
def bool_or_str(*text):
"""Return a callable that will evaulate a string as
boolean, or one of a set of "alternate" string values.
return asbool(obj)
return bool_or_value
+
def asint(value):
"""Coerce to integer."""
return _next
+
def duck_type_collection(specimen, default=None):
"""Given an instance or class, guess if it is or is acting as one of
the basic collection types: list, set and dict. If the __emulates__
else:
return default
+
def assert_arg_type(arg, argtype, name):
if isinstance(arg, argtype):
return arg
else:
if isinstance(argtype, tuple):
raise exc.ArgumentError(
- "Argument '%s' is expected to be one of type %s, got '%s'" %
- (name, ' or '.join("'%s'" % a for a in argtype), type(arg)))
+ "Argument '%s' is expected to be one of type %s, got '%s'" %
+ (name, ' or '.join("'%s'" % a for a in argtype), type(arg)))
else:
raise exc.ArgumentError(
- "Argument '%s' is expected to be of type '%s', got '%s'" %
- (name, argtype, type(arg)))
+ "Argument '%s' is expected to be of type '%s', got '%s'" %
+ (name, argtype, type(arg)))
def dictlike_iteritems(dictlike):
def __get__(desc, self, cls):
return desc.fget(cls)
+
class hybridmethod(object):
"""Decorate a function as cls- or instance- level."""
def __init__(self, func, expr=None):
_creation_order = 1
+
+
def set_creation_order(instance):
"""Assign a '_creation_order' sequence to the given instance.
"""
global _creation_order
instance._creation_order = _creation_order
- _creation_order +=1
+ _creation_order += 1
+
def warn_exception(func, *args, **kwargs):
- """executes the given function, catches all exceptions and converts to a warning."""
+ """executes the given function, catches all exceptions and converts to
+ a warning.
+
+ """
try:
return func(*args, **kwargs)
except:
else:
warnings.warn(msg, stacklevel=stacklevel)
+
_SQLA_RE = re.compile(r'sqlalchemy/([a-z_]+/){0,2}[a-z_]+\.py')
_UNITTEST_RE = re.compile(r'unit(?:2|test2?/)')
+
+
def chop_traceback(tb, exclude_prefix=_UNITTEST_RE, exclude_suffix=_SQLA_RE):
"""Chop extraneous lines off beginning and end of a traceback.
start += 1
while start <= end and exclude_suffix.search(tb[end]):
end -= 1
- return tb[start:end+1]
+ return tb[start:end + 1]
NoneType = type(None)