self.READ)
def close(self, all_fds=False):
- self._waker.mark_closing()
self._closing = True
self.remove_handler(self._waker.fileno())
if all_fds:
import errno
import socket
+import time
from tornado.platform import interface
+def try_close(f):
+ # Avoid issue #875 (race condition when using the file in another
+ # thread).
+ for i in range(10):
+ try:
+ f.close()
+ except IOError:
+ # Yield to another thread
+ time.sleep(1e-3)
+ else:
+ break
+ # Try a last time and let raise
+ f.close()
+
+
class Waker(interface.Waker):
"""Create an OS independent asynchronous pipe.
# Based on Zope select_trigger.py:
# https://github.com/zopefoundation/Zope/blob/master/src/ZServer/medusa/thread/select_trigger.py
- self.closing = False
self.writer = socket.socket()
# Disable buffering -- pulling the trigger sends 1 byte,
# and we want that sent immediately, to wake up ASAP.
return self.writer.fileno()
def wake(self):
- if self.closing:
- # Avoid issue #875 (race condition when closing the fd in another
- # thread).
- return
try:
self.writer.send(b"x")
except (IOError, socket.error):
def close(self):
self.reader.close()
- self.writer.close()
-
- def mark_closing(self):
- self.closing = True
+ try_close(self.writer)
"""Closes the waker's file descriptor(s)."""
raise NotImplementedError()
- def mark_closing(self):
- """Mark the waker as closing."""
- raise NotImplementedError()
-
-
def monotonic_time():
raise NotImplementedError()
import fcntl
import os
-from tornado.platform import interface
+from tornado.platform import common, interface
def set_close_exec(fd):
class Waker(interface.Waker):
def __init__(self):
- self.closing = False
r, w = os.pipe()
_set_nonblocking(r)
_set_nonblocking(w)
return self.writer.fileno()
def wake(self):
- if self.closing:
- # Avoid issue #875 (race condition when closing the fd in another
- # thread).
- return
try:
self.writer.write(b"x")
except IOError:
def close(self):
self.reader.close()
- self.writer.close()
-
- def mark_closing(self):
- self.closing = True
+ common.try_close(self.writer)