# This method is more useful to debug a single function call.
- def runcall(self, func, *args, **kwds):
+ def runcall(*args, **kwds):
"""Debug a single function call.
Return the result of the function call.
"""
+ if len(args) >= 2:
+ self, func, *args = args
+ elif not args:
+ raise TypeError("descriptor 'runcall' of 'Bdb' object "
+ "needs an argument")
+ elif 'func' in kwds:
+ func = kwds.pop('func')
+ self, *args = args
+ else:
+ raise TypeError('runcall expected at least 1 positional argument, '
+ 'got %d' % (len(args)-1))
+
self.reset()
sys.settrace(self.trace_dispatch)
res = None
return self
# This method is more useful to profile a single function call.
- def runcall(self, func, *args, **kw):
+ def runcall(*args, **kw):
+ if len(args) >= 2:
+ self, func, *args = args
+ elif not args:
+ raise TypeError("descriptor 'runcall' of 'Profile' object "
+ "needs an argument")
+ elif 'func' in kw:
+ func = kw.pop('func')
+ self, *args = args
+ else:
+ raise TypeError('runcall expected at least 1 positional argument, '
+ 'got %d' % (len(args)-1))
+
self.enable()
try:
return func(*args, **kw)
class Executor(object):
"""This is an abstract base class for concrete asynchronous executors."""
- def submit(self, fn, *args, **kwargs):
+ def submit(*args, **kwargs):
"""Submits a callable to be executed with the given arguments.
Schedules the callable to be executed as fn(*args, **kwargs) and returns
Returns:
A Future representing the given call.
"""
+ if len(args) >= 2:
+ pass
+ elif not args:
+ raise TypeError("descriptor 'submit' of 'Executor' object "
+ "needs an argument")
+ elif 'fn' not in kwargs:
+ raise TypeError('submit expected at least 1 positional argument, '
+ 'got %d' % (len(args)-1))
+
raise NotImplementedError()
def map(self, fn, *iterables, timeout=None, chunksize=1):
p.start()
self._processes[p.pid] = p
- def submit(self, fn, *args, **kwargs):
+ def submit(*args, **kwargs):
+ if len(args) >= 2:
+ self, fn, *args = args
+ elif not args:
+ raise TypeError("descriptor 'submit' of 'ProcessPoolExecutor' object "
+ "needs an argument")
+ elif 'fn' in kwargs:
+ fn = kwargs.pop('fn')
+ self, *args = args
+ else:
+ raise TypeError('submit expected at least 1 positional argument, '
+ 'got %d' % (len(args)-1))
+
with self._shutdown_lock:
if self._broken:
raise BrokenProcessPool(self._broken)
self._initializer = initializer
self._initargs = initargs
- def submit(self, fn, *args, **kwargs):
+ def submit(*args, **kwargs):
+ if len(args) >= 2:
+ self, fn, *args = args
+ elif not args:
+ raise TypeError("descriptor 'submit' of 'ThreadPoolExecutor' object "
+ "needs an argument")
+ elif 'fn' in kwargs:
+ fn = kwargs.pop('fn')
+ self, *args = args
+ else:
+ raise TypeError('submit expected at least 1 positional argument, '
+ 'got %d' % (len(args)-1))
+
with self._shutdown_lock:
if self._broken:
raise BrokenThreadPool(self._broken)
return _exit_wrapper
@staticmethod
- def _create_cb_wrapper(callback, *args, **kwds):
+ def _create_cb_wrapper(*args, **kwds):
+ callback, *args = args
def _exit_wrapper(exc_type, exc, tb):
callback(*args, **kwds)
return _exit_wrapper
self._push_cm_exit(cm, _exit)
return result
- def callback(self, callback, *args, **kwds):
+ def callback(*args, **kwds):
"""Registers an arbitrary callback and arguments.
Cannot suppress exceptions.
"""
+ if len(args) >= 2:
+ self, callback, *args = args
+ elif not args:
+ raise TypeError("descriptor 'callback' of '_BaseExitStack' object "
+ "needs an argument")
+ elif 'callback' in kwds:
+ callback = kwds.pop('callback')
+ self, *args = args
+ else:
+ raise TypeError('callback expected at least 1 positional argument, '
+ 'got %d' % (len(args)-1))
+
_exit_wrapper = self._create_cb_wrapper(callback, *args, **kwds)
# We changed the signature, so using @wraps is not appropriate, but
return _exit_wrapper
@staticmethod
- def _create_async_cb_wrapper(callback, *args, **kwds):
+ def _create_async_cb_wrapper(*args, **kwds):
+ callback, *args = args
async def _exit_wrapper(exc_type, exc, tb):
await callback(*args, **kwds)
return _exit_wrapper
self._push_async_cm_exit(exit, exit_method)
return exit # Allow use as a decorator
- def push_async_callback(self, callback, *args, **kwds):
+ def push_async_callback(*args, **kwds):
"""Registers an arbitrary coroutine function and arguments.
Cannot suppress exceptions.
"""
+ if len(args) >= 2:
+ self, callback, *args = args
+ elif not args:
+ raise TypeError("descriptor 'push_async_callback' of "
+ "'AsyncExitStack' object needs an argument")
+ elif 'callback' in kwds:
+ callback = kwds.pop('callback')
+ self, *args = args
+ else:
+ raise TypeError('push_async_callback expected at least 1 '
+ 'positional argument, got %d' % (len(args)-1))
+
_exit_wrapper = self._create_async_cb_wrapper(callback, *args, **kwds)
# We changed the signature, so using @wraps is not appropriate, but
# raises an exception, wrapper() will restore the terminal to a sane state so
# you can read the resulting traceback.
-def wrapper(func, *args, **kwds):
+def wrapper(*args, **kwds):
"""Wrapper function that initializes curses and calls another function,
restoring normal keyboard/screen behavior on error.
The callable object 'func' is then passed the main window 'stdscr'
wrapper().
"""
+ if args:
+ func, *args = args
+ elif 'func' in kwds:
+ func = kwds.pop('func')
+ else:
+ raise TypeError('wrapper expected at least 1 positional argument, '
+ 'got %d' % len(args))
+
try:
# Initialize curses
stdscr = initscr()
callables as instance methods.
"""
- def __init__(self, func, *args, **keywords):
+ def __init__(*args, **keywords):
+ if len(args) >= 2:
+ self, func, *args = args
+ elif not args:
+ raise TypeError("descriptor '__init__' of partialmethod "
+ "needs an argument")
+ elif 'func' in keywords:
+ func = keywords.pop('func')
+ self, *args = args
+ else:
+ raise TypeError("type 'partialmethod' takes at least one argument, "
+ "got %d" % (len(args)-1))
+ args = tuple(args)
+
if not callable(func) and not hasattr(func, "__get__"):
raise TypeError("{!r} is not callable or a descriptor"
.format(func))
finally:
self.stop_event.set()
- def create(self, c, typeid, *args, **kwds):
+ def create(*args, **kwds):
'''
Create a new shared object and return its id
'''
+ if len(args) >= 3:
+ self, c, typeid, *args = args
+ elif not args:
+ raise TypeError("descriptor 'create' of 'Server' object "
+ "needs an argument")
+ else:
+ if 'typeid' not in kwds:
+ raise TypeError('create expected at least 2 positional '
+ 'arguments, got %d' % (len(args)-1))
+ typeid = kwds.pop('typeid')
+ if len(args) >= 2:
+ self, c, *args = args
+ else:
+ if 'c' not in kwds:
+ raise TypeError('create expected at least 2 positional '
+ 'arguments, got %d' % (len(args)-1))
+ c = kwds.pop('c')
+ self, *args = args
+ args = tuple(args)
+
with self.mutex:
callable, exposed, method_to_typeid, proxytype = \
self.registry[typeid]
util.info('manager serving at %r', server.address)
server.serve_forever()
- def _create(self, typeid, *args, **kwds):
+ def _create(*args, **kwds):
'''
Create a new shared object; return the token and exposed tuple
'''
+ self, typeid, *args = args
+ args = tuple(args)
+
assert self._state.value == State.STARTED, 'server not yet started'
conn = self._Client(self._address, authkey=self._authkey)
try:
return self
# This method is more useful to profile a single function call.
- def runcall(self, func, *args, **kw):
+ def runcall(*args, **kw):
+ if len(args) >= 2:
+ self, func, *args = args
+ elif not args:
+ raise TypeError("descriptor 'runcall' of 'Profile' object "
+ "needs an argument")
+ elif 'func' in kw:
+ func = kw.pop('func')
+ self, *args = args
+ else:
+ raise TypeError('runcall expected at least 1 positional argument, '
+ 'got %d' % (len(args)-1))
+
self.set_cmd(repr(func))
sys.setprofile(self.dispatcher)
try:
def mul(x, y):
return x * y
+def capture(*args, **kwargs):
+ return args, kwargs
+
def sleep_and_raise(t):
time.sleep(t)
raise Exception('this is an exception')
def test_submit_keyword(self):
future = self.executor.submit(mul, 2, y=8)
self.assertEqual(16, future.result())
+ future = self.executor.submit(capture, 1, self=2, fn=3)
+ self.assertEqual(future.result(), ((1,), {'self': 2, 'fn': 3}))
+ future = self.executor.submit(fn=capture, arg=1)
+ self.assertEqual(future.result(), ((), {'arg': 1}))
+ with self.assertRaises(TypeError):
+ self.executor.submit(arg=1)
def test_map(self):
self.assertEqual(
((), dict(example=1)),
((1,), dict(example=1)),
((1,2), dict(example=1)),
+ ((1,2), dict(self=3, callback=4)),
]
result = []
def _exit(*args, **kwds):
self.assertIsNone(wrapper[1].__doc__, _exit.__doc__)
self.assertEqual(result, expected)
+ result = []
+ with self.exit_stack() as stack:
+ with self.assertRaises(TypeError):
+ stack.callback(arg=1)
+ with self.assertRaises(TypeError):
+ self.exit_stack.callback(arg=2)
+ stack.callback(callback=_exit, arg=3)
+ self.assertEqual(result, [((), {'arg': 3})])
+
def test_push(self):
exc_raised = ZeroDivisionError
def _expect_exc(exc_type, exc, exc_tb):
self.assertEqual(result, expected)
+ result = []
+ async with AsyncExitStack() as stack:
+ with self.assertRaises(TypeError):
+ stack.push_async_callback(arg=1)
+ with self.assertRaises(TypeError):
+ self.exit_stack.push_async_callback(arg=2)
+ stack.push_async_callback(callback=_exit, arg=3)
+ self.assertEqual(result, [((), {'arg': 3})])
+
@_async_test
async def test_async_push(self):
exc_raised = ZeroDivisionError
positional = functools.partialmethod(capture, 1)
keywords = functools.partialmethod(capture, a=2)
both = functools.partialmethod(capture, 3, b=4)
+ spec_keywords = functools.partialmethod(capture, self=1, func=2)
nested = functools.partialmethod(positional, 5)
self.assertEqual(self.A.both(self.a, 5, c=6), ((self.a, 3, 5), {'b': 4, 'c': 6}))
+ self.assertEqual(self.a.spec_keywords(), ((self.a,), {'self': 1, 'func': 2}))
+
def test_nested(self):
self.assertEqual(self.a.nested(), ((self.a, 1, 5), {}))
self.assertEqual(self.a.nested(6), ((self.a, 1, 5, 6), {}))
with self.assertRaises(TypeError):
class B(object):
method = functools.partialmethod(None, 1)
+ with self.assertRaises(TypeError):
+ class B:
+ method = functools.partialmethod()
+ class B:
+ method = functools.partialmethod(func=capture, a=1)
+ b = B()
+ self.assertEqual(b.method(2, x=3), ((b, 2), {'a': 1, 'x': 3}))
def test_repr(self):
self.assertEqual(repr(vars(self.A)['both']),
def traced_doubler(num):
return num * 2
+def traced_capturer(*args, **kwargs):
+ return args, kwargs
+
def traced_caller_list_comprehension():
k = 10
mylist = [traced_doubler(i) for i in range(k)]
}
self.assertEqual(self.tracer.results().calledfuncs, expected)
+ def test_arg_errors(self):
+ res = self.tracer.runfunc(traced_capturer, 1, 2, self=3, func=4)
+ self.assertEqual(res, ((1, 2), {'self': 3, 'func': 4}))
+ res = self.tracer.runfunc(func=traced_capturer, arg=1)
+ self.assertEqual(res, ((), {'arg': 1}))
+ with self.assertRaises(TypeError):
+ self.tracer.runfunc()
+
def test_loop_caller_importing(self):
self.tracer.runfunc(traced_func_importing_caller, 1)
self.assertEqual(f.alive, False)
self.assertEqual(res, [199])
+ def test_arg_errors(self):
+ def fin(*args, **kwargs):
+ res.append((args, kwargs))
+
+ a = self.A()
+
+ res = []
+ f = weakref.finalize(a, fin, 1, 2, func=3, obj=4)
+ self.assertEqual(f.peek(), (a, fin, (1, 2), {'func': 3, 'obj': 4}))
+ f()
+ self.assertEqual(res, [((1, 2), {'func': 3, 'obj': 4})])
+
+ res = []
+ f = weakref.finalize(a, func=fin, arg=1)
+ self.assertEqual(f.peek(), (a, fin, (), {'arg': 1}))
+ f()
+ self.assertEqual(res, [((), {'arg': 1})])
+
+ res = []
+ f = weakref.finalize(obj=a, func=fin, arg=1)
+ self.assertEqual(f.peek(), (a, fin, (), {'arg': 1}))
+ f()
+ self.assertEqual(res, [((), {'arg': 1})])
+
+ self.assertRaises(TypeError, weakref.finalize, a)
+ self.assertRaises(TypeError, weakref.finalize)
+
def test_order(self):
a = self.A()
res = []
sys.settrace(None)
threading.settrace(None)
- def runfunc(self, func, *args, **kw):
+ def runfunc(*args, **kw):
+ if len(args) >= 2:
+ self, func, *args = args
+ elif not args:
+ raise TypeError("descriptor 'runfunc' of 'Trace' object "
+ "needs an argument")
+ elif 'func' in kw:
+ func = kw.pop('func')
+ self, *args = args
+ else:
+ raise TypeError('runfunc expected at least 1 positional argument, '
+ 'got %d' % (len(args)-1))
+
result = None
if not self.donothing:
sys.settrace(self.globaltrace)
"""
self._type_equality_funcs[typeobj] = function
- def addCleanup(self, function, *args, **kwargs):
+ def addCleanup(*args, **kwargs):
"""Add a function, with arguments, to be called when the test is
completed. Functions added are called on a LIFO basis and are
called after tearDown on test failure or success.
Cleanup items are called even if setUp fails (unlike tearDown)."""
+ if len(args) >= 2:
+ self, function, *args = args
+ elif not args:
+ raise TypeError("descriptor 'addCleanup' of 'TestCase' object "
+ "needs an argument")
+ elif 'function' in kwargs:
+ function = kwargs.pop('function')
+ self, *args = args
+ else:
+ raise TypeError('addCleanup expected at least 1 positional '
+ 'argument, got %d' % (len(args)-1))
+ args = tuple(args)
+
self._cleanups.append((function, args, kwargs))
def setUp(self):
class _Info:
__slots__ = ("weakref", "func", "args", "kwargs", "atexit", "index")
- def __init__(self, obj, func, *args, **kwargs):
+ def __init__(*args, **kwargs):
+ if len(args) >= 3:
+ self, obj, func, *args = args
+ elif not args:
+ raise TypeError("descriptor '__init__' of 'finalize' object "
+ "needs an argument")
+ else:
+ if 'func' not in kwargs:
+ raise TypeError('finalize expected at least 2 positional '
+ 'arguments, got %d' % (len(args)-1))
+ func = kwargs.pop('func')
+ if len(args) >= 2:
+ self, obj, *args = args
+ else:
+ if 'obj' not in kwargs:
+ raise TypeError('finalize expected at least 2 positional '
+ 'arguments, got %d' % (len(args)-1))
+ obj = kwargs.pop('obj')
+ self, *args = args
+ args = tuple(args)
+
if not self._registered_with_atexit:
# We may register the exit function more than once because
# of a thread race, but that is harmless
--- /dev/null
+Arbitrary keyword arguments (even with names "self" and "func") can now be
+passed to some functions which should accept arbitrary keyword arguments and
+pass them to other function (for example partialmethod(), TestCase.addCleanup()
+and Profile.runcall()) if the required arguments are passed as positional
+arguments.