From: Antoine Pitrou Date: Thu, 3 Nov 2016 09:48:55 +0000 (+0100) Subject: Lockless implementation of add_callback X-Git-Tag: v4.5.0~61^2~3 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=29b1429bda629e568ba429d3dcbf250c6722d7ee;p=thirdparty%2Ftornado.git Lockless implementation of add_callback Fixes #1874. --- diff --git a/tornado/ioloop.py b/tornado/ioloop.py index d61831766..c69da9e95 100644 --- a/tornado/ioloop.py +++ b/tornado/ioloop.py @@ -28,6 +28,7 @@ In addition to I/O events, the `IOLoop` can also schedule time-based events. from __future__ import absolute_import, division, print_function, with_statement +import collections import datetime import errno import functools @@ -693,8 +694,7 @@ class PollIOLoop(IOLoop): self.time_func = time_func or time.time self._handlers = {} self._events = {} - self._callbacks = [] - self._callback_lock = threading.Lock() + self._callbacks = collections.deque() self._timeouts = [] self._cancellations = 0 self._running = False @@ -712,8 +712,8 @@ class PollIOLoop(IOLoop): self.READ) def close(self, all_fds=False): - with self._callback_lock: - self._closing = True + self._waker.mark_closing() + self._closing = True self.remove_handler(self._waker.fileno()) if all_fds: for fd, handler in self._handlers.values(): @@ -798,12 +798,6 @@ class PollIOLoop(IOLoop): try: while True: - # Prevent IO event starvation by delaying new callbacks - # to the next iteration of the event loop. - with self._callback_lock: - callbacks = self._callbacks - self._callbacks = [] - # Add any timeouts that have come due to the callback list. # Do not run anything until we have determined which ones # are ready, so timeouts that call add_timeout cannot @@ -831,14 +825,17 @@ class PollIOLoop(IOLoop): if x.callback is not None] heapq.heapify(self._timeouts) - for callback in callbacks: - self._run_callback(callback) + # Prevent IO event starvation by delaying new callbacks + # to the next iteration of the event loop. + n = len(self._callbacks) + for i in range(n): + self._run_callback(self._callbacks.popleft()) for timeout in due_timeouts: if timeout.callback is not None: self._run_callback(timeout.callback) # Closures may be holding on to a lot of memory, so allow # them to be freed before we go into our poll wait. - callbacks = callback = due_timeouts = timeout = None + due_timeouts = timeout = None if self._callbacks: # If any callbacks or timeouts called add_callback, @@ -934,36 +931,20 @@ class PollIOLoop(IOLoop): self._cancellations += 1 def add_callback(self, callback, *args, **kwargs): + if self._closing: + return + # Blindly insert into self._callbacks. This is safe even + # from signal handlers because deque.append is atomic. + self._callbacks.append(functools.partial( + stack_context.wrap(callback), *args, **kwargs)) if thread.get_ident() != self._thread_ident: - # If we're not on the IOLoop's thread, we need to synchronize - # with other threads, or waking logic will induce a race. - with self._callback_lock: - if self._closing: - return - list_empty = not self._callbacks - self._callbacks.append(functools.partial( - stack_context.wrap(callback), *args, **kwargs)) - if list_empty: - # If we're not in the IOLoop's thread, and we added the - # first callback to an empty list, we may need to wake it - # up (it may wake up on its own, but an occasional extra - # wake is harmless). Waking up a polling IOLoop is - # relatively expensive, so we try to avoid it when we can. - self._waker.wake() + # This will write one byte but Waker.consume() reads many + # at once, so it's ok to write even when not strictly + # necessary. + self._waker.wake() else: - if self._closing: - return - # If we're on the IOLoop's thread, we don't need the lock, - # since we don't need to wake anyone, just add the - # callback. Blindly insert into self._callbacks. This is - # safe even from signal handlers because the GIL makes - # list.append atomic. One subtlety is that if the signal - # is interrupting another thread holding the - # _callback_lock block in IOLoop.start, we may modify - # either the old or new version of self._callbacks, but - # either way will work. - self._callbacks.append(functools.partial( - stack_context.wrap(callback), *args, **kwargs)) + # If we're on the IOLoop's thread, we don't need to wake anyone. + pass def add_callback_from_signal(self, callback, *args, **kwargs): with stack_context.NullContext(): diff --git a/tornado/platform/common.py b/tornado/platform/common.py index b409a903f..3c3f68a51 100644 --- a/tornado/platform/common.py +++ b/tornado/platform/common.py @@ -18,6 +18,7 @@ class Waker(interface.Waker): # Based on Zope select_trigger.py: # https://github.com/zopefoundation/Zope/blob/master/src/ZServer/medusa/thread/select_trigger.py + self.closing = False self.writer = socket.socket() # Disable buffering -- pulling the trigger sends 1 byte, # and we want that sent immediately, to wake up ASAP. @@ -73,6 +74,10 @@ class Waker(interface.Waker): return self.writer.fileno() def wake(self): + if self.closing: + # Avoid issue #875 (race condition when closing the fd in another + # thread). + return try: self.writer.send(b"x") except (IOError, socket.error): @@ -90,3 +95,6 @@ class Waker(interface.Waker): def close(self): self.reader.close() self.writer.close() + + def mark_closing(self): + self.closing = True diff --git a/tornado/platform/interface.py b/tornado/platform/interface.py index cc0623911..b133b3395 100644 --- a/tornado/platform/interface.py +++ b/tornado/platform/interface.py @@ -62,5 +62,10 @@ class Waker(object): """Closes the waker's file descriptor(s).""" raise NotImplementedError() + def mark_closing(self): + """Mark the waker as closing.""" + raise NotImplementedError() + + def monotonic_time(): raise NotImplementedError() diff --git a/tornado/platform/posix.py b/tornado/platform/posix.py index 41a5794c6..e01f46f54 100644 --- a/tornado/platform/posix.py +++ b/tornado/platform/posix.py @@ -36,6 +36,7 @@ def _set_nonblocking(fd): class Waker(interface.Waker): def __init__(self): + self.closing = False r, w = os.pipe() _set_nonblocking(r) _set_nonblocking(w) @@ -51,6 +52,10 @@ class Waker(interface.Waker): return self.writer.fileno() def wake(self): + if self.closing: + # Avoid issue #875 (race condition when closing the fd in another + # thread). + return try: self.writer.write(b"x") except IOError: @@ -68,3 +73,6 @@ class Waker(interface.Waker): def close(self): self.reader.close() self.writer.close() + + def mark_closing(self): + self.closing = True