]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
asyncio: sync with Tulip
authorVictor Stinner <victor.stinner@gmail.com>
Tue, 29 Jul 2014 10:58:23 +0000 (12:58 +0200)
committerVictor Stinner <victor.stinner@gmail.com>
Tue, 29 Jul 2014 10:58:23 +0000 (12:58 +0200)
* _WaitHandleFuture.cancel() now notify IocpProactor through the overlapped
  object that the wait was cancelled.
* Optimize IocpProactor.wait_for_handle() gets the result if the wait is
  signaled immediatly.
* Enhance representation of Future and Future subclasses

  - Add "created at filename:lineno" in the representation
  - Add Future._repr_info() method which can be more easily overriden than
    Future.__repr__(). It should now be more easy to enhance Future
    representation without having to modify each subclass. For example,
    _OverlappedFuture and _WaitHandleFuture get the new "created at" information.
  - Use reprlib to format Future result, and function arguments when formatting a
    callback, to limit the length of the representation.

* Fix repr(_WaitHandleFuture)
* _WaitHandleFuture and _OverlappedFuture: hide frames of internal calls in the
  source traceback.
* Cleanup ProactorIocp._poll(): set the timeout to 0 after the first call to
  GetQueuedCompletionStatus()
* test_locks: close the temporary event loop and check the condition lock
* Remove workaround in test_futures, no more needed

Lib/asyncio/events.py
Lib/asyncio/futures.py
Lib/asyncio/tasks.py
Lib/asyncio/windows_events.py
Lib/test/test_asyncio/test_futures.py
Lib/test/test_asyncio/test_locks.py
Lib/test/test_asyncio/test_tasks.py

index bddd7e3649c3d2c5a168d59c180ed98a5bdca761..3c7a36d0763ffeb7f2d22ce93e0b1d6811b0dbdf 100644 (file)
@@ -10,11 +10,12 @@ __all__ = ['AbstractEventLoopPolicy',
 
 import functools
 import inspect
-import subprocess
-import traceback
-import threading
+import reprlib
 import socket
+import subprocess
 import sys
+import threading
+import traceback
 
 
 _PY34 = sys.version_info >= (3, 4)
@@ -36,8 +37,12 @@ def _get_function_source(func):
 
 
 def _format_args(args):
-    # function formatting ('hello',) as ('hello')
-    args_repr = repr(args)
+    """Format function arguments.
+
+    Special case for a single parameter: ('hello',) is formatted as ('hello').
+    """
+    # use reprlib to limit the length of the output
+    args_repr = reprlib.repr(args)
     if len(args) == 1 and args_repr.endswith(',)'):
         args_repr = args_repr[:-2] + ')'
     return args_repr
index 022fef76efe5fd75243f2e389c0a29fbbc41f90e..7998fbbcfbf21595238a4df6fa34e80faf58c6d0 100644 (file)
@@ -7,6 +7,7 @@ __all__ = ['CancelledError', 'TimeoutError',
 
 import concurrent.futures._base
 import logging
+import reprlib
 import sys
 import traceback
 
@@ -175,20 +176,25 @@ class Future:
                                             format_cb(cb[-1]))
         return 'cb=[%s]' % cb
 
-    def _format_result(self):
-        if self._state != _FINISHED:
-            return None
-        elif self._exception is not None:
-            return 'exception={!r}'.format(self._exception)
-        else:
-            return 'result={!r}'.format(self._result)
-
-    def __repr__(self):
+    def _repr_info(self):
         info = [self._state.lower()]
         if self._state == _FINISHED:
-            info.append(self._format_result())
+            if self._exception is not None:
+                info.append('exception={!r}'.format(self._exception))
+            else:
+                # use reprlib to limit the length of the output, especially
+                # for very long strings
+                result = reprlib.repr(self._result)
+                info.append('result={}'.format(result))
         if self._callbacks:
             info.append(self._format_callbacks())
+        if self._source_traceback:
+            frame = self._source_traceback[-1]
+            info.append('created at %s:%s' % (frame[0], frame[1]))
+        return info
+
+    def __repr__(self):
+        info = self._repr_info()
         return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
 
     # On Python 3.3 or older, objects with a destructor part of a reference
index 07952c9a64f6f9ae896a01081064f5b76866db66..92070162a79cadcfa43d291f9dc53926198bbe53 100644 (file)
@@ -92,30 +92,19 @@ class Task(futures.Future):
                 self._loop.call_exception_handler(context)
             futures.Future.__del__(self)
 
-    def __repr__(self):
-        info = []
+    def _repr_info(self):
+        info = super()._repr_info()
+
         if self._must_cancel:
-            info.append('cancelling')
-        else:
-            info.append(self._state.lower())
+            # replace status
+            info[0] = 'cancelling'
 
         coro = coroutines._format_coroutine(self._coro)
-        info.append('coro=<%s>' % coro)
-
-        if self._source_traceback:
-            frame = self._source_traceback[-1]
-            info.append('created at %s:%s' % (frame[0], frame[1]))
-
-        if self._state == futures._FINISHED:
-            info.append(self._format_result())
-
-        if self._callbacks:
-            info.append(self._format_callbacks())
+        info.insert(1, 'coro=<%s>' % coro)
 
         if self._fut_waiter is not None:
-            info.append('wait_for=%r' % self._fut_waiter)
-
-        return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
+            info.insert(2, 'wait_for=%r' % self._fut_waiter)
+        return info
 
     def get_stack(self, *, limit=None):
         """Return the list of stack frames for this task's coroutine.
index 41be8da2a043a5856245a4838c3d29b01ddce70b..ec427d5c7052be311d88761d8edb071594249dda 100644 (file)
@@ -42,16 +42,12 @@ class _OverlappedFuture(futures.Future):
             del self._source_traceback[-1]
         self._ov = ov
 
-    def __repr__(self):
-        info = [self._state.lower()]
+    def _repr_info(self):
+        info = super()._repr_info()
         if self._ov is not None:
             state = 'pending' if self._ov.pending else 'completed'
-            info.append('overlapped=<%s, %#x>' % (state, self._ov.address))
-        if self._state == futures._FINISHED:
-            info.append(self._format_result())
-        if self._callbacks:
-            info.append(self._format_callbacks())
-        return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
+            info.insert(1, 'overlapped=<%s, %#x>' % (state, self._ov.address))
+        return info
 
     def _cancel_overlapped(self):
         if self._ov is None:
@@ -85,8 +81,14 @@ class _OverlappedFuture(futures.Future):
 class _WaitHandleFuture(futures.Future):
     """Subclass of Future which represents a wait handle."""
 
-    def __init__(self, handle, wait_handle, *, loop=None):
+    def __init__(self, iocp, ov, handle, wait_handle, *, loop=None):
         super().__init__(loop=loop)
+        if self._source_traceback:
+            del self._source_traceback[-1]
+        # iocp and ov are only used by cancel() to notify IocpProactor
+        # that the wait was cancelled
+        self._iocp = iocp
+        self._ov = ov
         self._handle = handle
         self._wait_handle = wait_handle
 
@@ -95,19 +97,16 @@ class _WaitHandleFuture(futures.Future):
         return (_winapi.WaitForSingleObject(self._handle, 0) ==
                 _winapi.WAIT_OBJECT_0)
 
-    def __repr__(self):
-        info = [self._state.lower()]
+    def _repr_info(self):
+        info = super()._repr_info()
+        info.insert(1, 'handle=%#x' % self._handle)
         if self._wait_handle:
-            state = 'pending' if self._poll() else 'completed'
-            info.append('wait_handle=<%s, %#x>' % (state, self._wait_handle))
-        info.append('handle=<%#x>' % self._handle)
-        if self._state == futures._FINISHED:
-            info.append(self._format_result())
-        if self._callbacks:
-            info.append(self._format_callbacks())
-        return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
-
-    def _unregister(self):
+            state = 'signaled' if self._poll() else 'waiting'
+            info.insert(1, 'wait_handle=<%s, %#x>'
+                           % (state, self._wait_handle))
+        return info
+
+    def _unregister_wait(self):
         if self._wait_handle is None:
             return
         try:
@@ -117,10 +116,25 @@ class _WaitHandleFuture(futures.Future):
                 raise
             # ERROR_IO_PENDING is not an error, the wait was unregistered
         self._wait_handle = None
+        self._iocp = None
+        self._ov = None
 
     def cancel(self):
-        self._unregister()
-        return super().cancel()
+        result = super().cancel()
+        if self._ov is not None:
+            # signal the cancellation to the overlapped object
+            _overlapped.PostQueuedCompletionStatus(self._iocp, True,
+                                                   0, self._ov.address)
+        self._unregister_wait()
+        return result
+
+    def set_exception(self, exception):
+        super().set_exception(exception)
+        self._unregister_wait()
+
+    def set_result(self, result):
+        super().set_result(result)
+        self._unregister_wait()
 
 
 class PipeServer(object):
@@ -405,7 +419,9 @@ class IocpProactor:
         ov = _overlapped.Overlapped(NULL)
         wh = _overlapped.RegisterWaitWithQueue(
             handle, self._iocp, ov.address, ms)
-        f = _WaitHandleFuture(handle, wh, loop=self._loop)
+        f = _WaitHandleFuture(self._iocp, ov, handle, wh, loop=self._loop)
+        if f._source_traceback:
+            del f._source_traceback[-1]
 
         def finish_wait_for_handle(trans, key, ov):
             # Note that this second wait means that we should only use
@@ -414,12 +430,17 @@ class IocpProactor:
             # or semaphores are not.  Also note if the handle is
             # signalled and then quickly reset, then we may return
             # False even though we have not timed out.
+            return f._poll()
+
+        if f._poll():
             try:
-                return f._poll()
-            finally:
-                f._unregister()
+                result = f._poll()
+            except OSError as exc:
+                f.set_exception(exc)
+            else:
+                f.set_result(result)
 
-        self._cache[ov.address] = (f, ov, None, finish_wait_for_handle)
+        self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
         return f
 
     def _register_with_iocp(self, obj):
@@ -438,6 +459,8 @@ class IocpProactor:
         # operation when it completes.  The future's value is actually
         # the value returned by callback().
         f = _OverlappedFuture(ov, loop=self._loop)
+        if f._source_traceback:
+            del f._source_traceback[-1]
         if not ov.pending and not wait_for_post:
             # The operation has completed, so no need to postpone the
             # work.  We cannot take this short cut if we need the
@@ -484,10 +507,13 @@ class IocpProactor:
             ms = math.ceil(timeout * 1e3)
             if ms >= INFINITE:
                 raise ValueError("timeout too big")
+
         while True:
             status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
             if status is None:
                 return
+            ms = 0
+
             err, transferred, key, address = status
             try:
                 f, ov, obj, callback = self._cache.pop(address)
@@ -504,7 +530,6 @@ class IocpProactor:
                 # handle which should be closed to avoid a leak.
                 if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
                     _winapi.CloseHandle(key)
-                ms = 0
                 continue
 
             if obj in self._stopped_serving:
@@ -520,7 +545,6 @@ class IocpProactor:
                 else:
                     f.set_result(value)
                     self._results.append(f)
-            ms = 0
 
     def _stop_serving(self, obj):
         # obj is a socket or pipe handle.  It will be closed in
index 50e9414ab8fb35d4424a173f9e13519fb2b56bef..e5002bc825d51c1c9c2f9788c51d329101dfe3ff 100644 (file)
@@ -105,6 +105,15 @@ class FutureTests(test_utils.TestCase):
         self.assertEqual(next(g), ('C', 42))  # yield 'C', y.
 
     def test_future_repr(self):
+        self.loop.set_debug(True)
+        f_pending_debug = asyncio.Future(loop=self.loop)
+        frame = f_pending_debug._source_traceback[-1]
+        self.assertEqual(repr(f_pending_debug),
+                         '<Future pending created at %s:%s>'
+                         % (frame[0], frame[1]))
+        f_pending_debug.cancel()
+
+        self.loop.set_debug(False)
         f_pending = asyncio.Future(loop=self.loop)
         self.assertEqual(repr(f_pending), '<Future pending>')
         f_pending.cancel()
@@ -299,12 +308,6 @@ class FutureTests(test_utils.TestCase):
 
     @mock.patch('asyncio.base_events.logger')
     def test_future_exception_never_retrieved(self, m_log):
-        # FIXME: Python issue #21163, other tests may "leak" pending task which
-        # emit a warning when they are destroyed by the GC
-        support.gc_collect()
-        m_log.error.reset_mock()
-        # ---
-
         self.loop.set_debug(True)
 
         def memory_error():
@@ -324,7 +327,7 @@ class FutureTests(test_utils.TestCase):
         if sys.version_info >= (3, 4):
             frame = source_traceback[-1]
             regex = (r'^Future exception was never retrieved\n'
-                     r'future: <Future finished exception=MemoryError\(\)>\n'
+                     r'future: <Future finished exception=MemoryError\(\) created at {filename}:{lineno}>\n'
                      r'source_traceback: Object created at \(most recent call last\):\n'
                      r'  File'
                      r'.*\n'
index c4e74e333038dc883d172e3d8368c0c8db9f1eab..dda4577aeddd27ff3d20ad734b2f30177e9f3c16 100644 (file)
@@ -660,10 +660,13 @@ class ConditionTests(test_utils.TestCase):
         lock = asyncio.Lock(loop=self.loop)
         cond = asyncio.Condition(lock, loop=self.loop)
 
-        self.assertIs(lock._loop, cond._loop)
+        self.assertIs(cond._lock, lock)
+        self.assertIs(cond._loop, lock._loop)
 
     def test_ambiguous_loops(self):
         loop = self.new_test_loop()
+        self.addCleanup(loop.close)
+
         lock = asyncio.Lock(loop=self.loop)
         with self.assertRaises(ValueError):
             asyncio.Condition(lock, loop=loop)
index 7b93a0e210916131fbef520e1f0042939e11d752..95cba542410acd4d9ab6a2d7b55f587761b71774 100644 (file)
@@ -132,6 +132,8 @@ class TaskTests(test_utils.TestCase):
             asyncio.async('ok')
 
     def test_task_repr(self):
+        self.loop.set_debug(False)
+
         @asyncio.coroutine
         def notmuch():
             yield from []
@@ -189,6 +191,8 @@ class TaskTests(test_utils.TestCase):
                          "<Task finished %s result='abc'>" % coro)
 
     def test_task_repr_coro_decorator(self):
+        self.loop.set_debug(False)
+
         @asyncio.coroutine
         def notmuch():
             # notmuch() function doesn't use yield from: it will be wrapped by
@@ -252,6 +256,8 @@ class TaskTests(test_utils.TestCase):
         self.loop.run_until_complete(t)
 
     def test_task_repr_wait_for(self):
+        self.loop.set_debug(False)
+
         @asyncio.coroutine
         def wait_for(fut):
             return (yield from fut)