]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Lockless implementation of add_callback
authorAntoine Pitrou <antoine@python.org>
Thu, 3 Nov 2016 09:48:55 +0000 (10:48 +0100)
committerAntoine Pitrou <antoine@python.org>
Thu, 3 Nov 2016 09:48:55 +0000 (10:48 +0100)
Fixes #1874.

tornado/ioloop.py
tornado/platform/common.py
tornado/platform/interface.py
tornado/platform/posix.py

index d61831766586cebe2c7a3e228caf73816050cae1..c69da9e95fb8bcb6febf8657d10ed67e75b70035 100644 (file)
@@ -28,6 +28,7 @@ In addition to I/O events, the `IOLoop` can also schedule time-based events.
 
 from __future__ import absolute_import, division, print_function, with_statement
 
+import collections
 import datetime
 import errno
 import functools
@@ -693,8 +694,7 @@ class PollIOLoop(IOLoop):
         self.time_func = time_func or time.time
         self._handlers = {}
         self._events = {}
-        self._callbacks = []
-        self._callback_lock = threading.Lock()
+        self._callbacks = collections.deque()
         self._timeouts = []
         self._cancellations = 0
         self._running = False
@@ -712,8 +712,8 @@ class PollIOLoop(IOLoop):
                          self.READ)
 
     def close(self, all_fds=False):
-        with self._callback_lock:
-            self._closing = True
+        self._waker.mark_closing()
+        self._closing = True
         self.remove_handler(self._waker.fileno())
         if all_fds:
             for fd, handler in self._handlers.values():
@@ -798,12 +798,6 @@ class PollIOLoop(IOLoop):
 
         try:
             while True:
-                # Prevent IO event starvation by delaying new callbacks
-                # to the next iteration of the event loop.
-                with self._callback_lock:
-                    callbacks = self._callbacks
-                    self._callbacks = []
-
                 # Add any timeouts that have come due to the callback list.
                 # Do not run anything until we have determined which ones
                 # are ready, so timeouts that call add_timeout cannot
@@ -831,14 +825,17 @@ class PollIOLoop(IOLoop):
                                           if x.callback is not None]
                         heapq.heapify(self._timeouts)
 
-                for callback in callbacks:
-                    self._run_callback(callback)
+                # Prevent IO event starvation by delaying new callbacks
+                # to the next iteration of the event loop.
+                n = len(self._callbacks)
+                for i in range(n):
+                    self._run_callback(self._callbacks.popleft())
                 for timeout in due_timeouts:
                     if timeout.callback is not None:
                         self._run_callback(timeout.callback)
                 # Closures may be holding on to a lot of memory, so allow
                 # them to be freed before we go into our poll wait.
-                callbacks = callback = due_timeouts = timeout = None
+                due_timeouts = timeout = None
 
                 if self._callbacks:
                     # If any callbacks or timeouts called add_callback,
@@ -934,36 +931,20 @@ class PollIOLoop(IOLoop):
         self._cancellations += 1
 
     def add_callback(self, callback, *args, **kwargs):
+        if self._closing:
+            return
+        # Blindly insert into self._callbacks. This is safe even
+        # from signal handlers because deque.append is atomic.
+        self._callbacks.append(functools.partial(
+            stack_context.wrap(callback), *args, **kwargs))
         if thread.get_ident() != self._thread_ident:
-            # If we're not on the IOLoop's thread, we need to synchronize
-            # with other threads, or waking logic will induce a race.
-            with self._callback_lock:
-                if self._closing:
-                    return
-                list_empty = not self._callbacks
-                self._callbacks.append(functools.partial(
-                    stack_context.wrap(callback), *args, **kwargs))
-                if list_empty:
-                    # If we're not in the IOLoop's thread, and we added the
-                    # first callback to an empty list, we may need to wake it
-                    # up (it may wake up on its own, but an occasional extra
-                    # wake is harmless).  Waking up a polling IOLoop is
-                    # relatively expensive, so we try to avoid it when we can.
-                    self._waker.wake()
+            # This will write one byte but Waker.consume() reads many
+            # at once, so it's ok to write even when not strictly
+            # necessary.
+            self._waker.wake()
         else:
-            if self._closing:
-                return
-            # If we're on the IOLoop's thread, we don't need the lock,
-            # since we don't need to wake anyone, just add the
-            # callback. Blindly insert into self._callbacks. This is
-            # safe even from signal handlers because the GIL makes
-            # list.append atomic. One subtlety is that if the signal
-            # is interrupting another thread holding the
-            # _callback_lock block in IOLoop.start, we may modify
-            # either the old or new version of self._callbacks, but
-            # either way will work.
-            self._callbacks.append(functools.partial(
-                stack_context.wrap(callback), *args, **kwargs))
+            # If we're on the IOLoop's thread, we don't need to wake anyone.
+            pass
 
     def add_callback_from_signal(self, callback, *args, **kwargs):
         with stack_context.NullContext():
index b409a903f39c3dbaa77a6c6717e9185ce35bc8e8..3c3f68a51906b2b02c6d7cd5f1635a250f73a462 100644 (file)
@@ -18,6 +18,7 @@ class Waker(interface.Waker):
         # Based on Zope select_trigger.py:
         # https://github.com/zopefoundation/Zope/blob/master/src/ZServer/medusa/thread/select_trigger.py
 
+        self.closing = False
         self.writer = socket.socket()
         # Disable buffering -- pulling the trigger sends 1 byte,
         # and we want that sent immediately, to wake up ASAP.
@@ -73,6 +74,10 @@ class Waker(interface.Waker):
         return self.writer.fileno()
 
     def wake(self):
+        if self.closing:
+            # Avoid issue #875 (race condition when closing the fd in another
+            # thread).
+            return
         try:
             self.writer.send(b"x")
         except (IOError, socket.error):
@@ -90,3 +95,6 @@ class Waker(interface.Waker):
     def close(self):
         self.reader.close()
         self.writer.close()
+
+    def mark_closing(self):
+        self.closing = True
index cc062391175fee2e0346a868eecf3b8b0c248066..b133b33950a45983e5c181f809ba87a4bb6ccb39 100644 (file)
@@ -62,5 +62,10 @@ class Waker(object):
         """Closes the waker's file descriptor(s)."""
         raise NotImplementedError()
 
+    def mark_closing(self):
+        """Mark the waker as closing."""
+        raise NotImplementedError()
+
+
 def monotonic_time():
     raise NotImplementedError()
index 41a5794c63af29e397abbab4e5a2588007b2c3d5..e01f46f54cbc332fe19b0991195f88654c47b624 100644 (file)
@@ -36,6 +36,7 @@ def _set_nonblocking(fd):
 
 class Waker(interface.Waker):
     def __init__(self):
+        self.closing = False
         r, w = os.pipe()
         _set_nonblocking(r)
         _set_nonblocking(w)
@@ -51,6 +52,10 @@ class Waker(interface.Waker):
         return self.writer.fileno()
 
     def wake(self):
+        if self.closing:
+            # Avoid issue #875 (race condition when closing the fd in another
+            # thread).
+            return
         try:
             self.writer.write(b"x")
         except IOError:
@@ -68,3 +73,6 @@ class Waker(interface.Waker):
     def close(self):
         self.reader.close()
         self.writer.close()
+
+    def mark_closing(self):
+        self.closing = True