]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Split IOStream into BaseIOStream and IOStream.
authorBen Darnell <ben@bendarnell.com>
Sun, 16 Sep 2012 23:58:48 +0000 (16:58 -0700)
committerBen Darnell <ben@bendarnell.com>
Sun, 16 Sep 2012 23:58:48 +0000 (16:58 -0700)
IOStream the socket-specific code; BaseIOStream will be the basis
for non-socket-based streams (i.e. pipes)

tornado/iostream.py
website/sphinx/iostream.rst

index 8713d8c9ef599ff45dfa48fdf6c1e21be522ab23..94f00f9660cd9379244f17ecdbb72e6863147768 100644 (file)
 # License for the specific language governing permissions and limitations
 # under the License.
 
-"""A utility class to write to and read from a non-blocking socket."""
+"""Utility classes to write to and read from non-blocking files and sockets.
+
+Contents:
+
+* `BaseIOStream`: Generic interface for reading and writing.
+* `IOStream`: Implementation of BaseIOStream using non-blocking sockets.
+* `SSLIOStream`: SSL-aware version of IOStream.
+"""
 
 from __future__ import absolute_import, division, with_statement
 
@@ -36,55 +43,21 @@ except ImportError:
     ssl = None
 
 
-class IOStream(object):
-    r"""A utility class to write to and read from a non-blocking socket.
+class BaseIOStream(object):
+    """A utility class to write to and read from a non-blocking file or socket.
 
     We support a non-blocking ``write()`` and a family of ``read_*()`` methods.
     All of the methods take callbacks (since writing and reading are
     non-blocking and asynchronous).
 
-    The socket parameter may either be connected or unconnected.  For
-    server operations the socket is the result of calling socket.accept().
-    For client operations the socket is created with socket.socket(),
-    and may either be connected before passing it to the IOStream or
-    connected with IOStream.connect.
-
     When a stream is closed due to an error, the IOStream's `error`
     attribute contains the exception object.
 
-    A very simple (and broken) HTTP client using this class::
-
-        from tornado import ioloop
-        from tornado import iostream
-        import socket
-
-        def send_request():
-            stream.write("GET / HTTP/1.0\r\nHost: friendfeed.com\r\n\r\n")
-            stream.read_until("\r\n\r\n", on_headers)
-
-        def on_headers(data):
-            headers = {}
-            for line in data.split("\r\n"):
-               parts = line.split(":")
-               if len(parts) == 2:
-                   headers[parts[0].strip()] = parts[1].strip()
-            stream.read_bytes(int(headers["Content-Length"]), on_body)
-
-        def on_body(data):
-            print data
-            stream.close()
-            ioloop.IOLoop.instance().stop()
-
-        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
-        stream = iostream.IOStream(s)
-        stream.connect(("friendfeed.com", 80), send_request)
-        ioloop.IOLoop.instance().start()
-
+    Subclasses must implement `fileno`, `close_fd`, `write_to_fd`,
+    `read_from_fd`, and optionally `get_fd_error`.
     """
-    def __init__(self, socket, io_loop=None, max_buffer_size=104857600,
+    def __init__(self, io_loop=None, max_buffer_size=104857600,
                  read_chunk_size=4096):
-        self.socket = socket
-        self.socket.setblocking(False)
         self.io_loop = io_loop or ioloop.IOLoop.instance()
         self.max_buffer_size = max_buffer_size
         self.read_chunk_size = read_chunk_size
@@ -105,40 +78,45 @@ class IOStream(object):
         self._connecting = False
         self._state = None
         self._pending_callbacks = 0
+        self._closed = False
 
-    def connect(self, address, callback=None):
-        """Connects the socket to a remote address without blocking.
+    def fileno(self):
+        """Returns the file descriptor for this stream."""
+        raise NotImplementedError()
 
-        May only be called if the socket passed to the constructor was
-        not previously connected.  The address parameter is in the
-        same format as for socket.connect, i.e. a (host, port) tuple.
-        If callback is specified, it will be called when the
-        connection is completed.
+    def close_fd(self):
+        """Closes the file underlying this stream.
 
-        Note that it is safe to call IOStream.write while the
-        connection is pending, in which case the data will be written
-        as soon as the connection is ready.  Calling IOStream read
-        methods before the socket is connected works on some platforms
-        but is non-portable.
+        ``close_fd`` is called by `BaseIOStream` and should not be called
+        elsewhere; other users should call `close` instead.
         """
-        self._connecting = True
-        try:
-            self.socket.connect(address)
-        except socket.error, e:
-            # In non-blocking mode we expect connect() to raise an
-            # exception with EINPROGRESS or EWOULDBLOCK.
-            #
-            # On freebsd, other errors such as ECONNREFUSED may be
-            # returned immediately when attempting to connect to
-            # localhost, so handle them the same way as an error
-            # reported later in _handle_connect.
-            if e.args[0] not in (errno.EINPROGRESS, errno.EWOULDBLOCK):
-                gen_log.warning("Connect error on fd %d: %s",
-                                self.socket.fileno(), e)
-                self.close()
-                return
-        self._connect_callback = stack_context.wrap(callback)
-        self._add_io_state(self.io_loop.WRITE)
+        raise NotImplementedError()
+
+    def write_to_fd(self, data):
+        """Attempts to write ``data`` to the underlying file.
+
+        Returns the number of bytes written.
+        """
+        raise NotImplementedError()
+
+    def read_from_fd(self):
+        """Attempts to read from the underlying file.
+
+        Returns ``None`` if there was nothing to read (the socket returned
+        EWOULDBLOCK or equivalent), otherwise returns the data.  When possible,
+        should return no more than ``self.read_chunk_size`` bytes at a time.
+        """
+        raise NotImplementedError()
+
+    def get_fd_error(self):
+        """Returns information about any error on the underlying file.
+
+        This method is called after the IOLoop has signaled an error on the
+        file descriptor, and should return an Exception (such as `socket.error`
+        with additional information, or None if no such information is
+        available.
+        """
+        return None
 
     def read_until_regex(self, regex, callback):
         """Call callback when we read the given regex pattern."""
@@ -219,7 +197,7 @@ class IOStream(object):
 
     def close(self):
         """Close this stream."""
-        if self.socket is not None:
+        if not self.closed():
             if any(sys.exc_info()):
                 self.error = sys.exc_info()[1]
             if self._read_until_close:
@@ -229,14 +207,14 @@ class IOStream(object):
                 self._run_callback(callback,
                                    self._consume(self._read_buffer_size))
             if self._state is not None:
-                self.io_loop.remove_handler(self.socket.fileno())
+                self.io_loop.remove_handler(self.fileno())
                 self._state = None
-            self.socket.close()
-            self.socket = None
+            self.close_fd()
+            self._closed = True
         self._maybe_run_close_callback()
 
     def _maybe_run_close_callback(self):
-        if (self.socket is None and self._close_callback and
+        if (self.closed() and self._close_callback and
             self._pending_callbacks == 0):
             # if there are pending callbacks, don't run the close callback
             # until they're done (see _maybe_add_error_handler)
@@ -254,27 +232,25 @@ class IOStream(object):
 
     def closed(self):
         """Returns true if the stream has been closed."""
-        return self.socket is None
+        return self._closed
 
     def _handle_events(self, fd, events):
-        if not self.socket:
+        if self.closed():
             gen_log.warning("Got events for closed stream %d", fd)
             return
         try:
             if events & self.io_loop.READ:
                 self._handle_read()
-            if not self.socket:
+            if self.closed():
                 return
             if events & self.io_loop.WRITE:
                 if self._connecting:
                     self._handle_connect()
                 self._handle_write()
-            if not self.socket:
+            if self.closed():
                 return
             if events & self.io_loop.ERROR:
-                errno = self.socket.getsockopt(socket.SOL_SOCKET,
-                                               socket.SO_ERROR)
-                self.error = socket.error(errno, os.strerror(errno))
+                self.error = self.get_fd_error()
                 # We may have queued up a user callback in _handle_read or
                 # _handle_write, so don't close the IOStream until those
                 # callbacks have had a chance to run.
@@ -291,7 +267,7 @@ class IOStream(object):
                 assert self._state is not None, \
                     "shouldn't happen: _handle_events without self._state"
                 self._state = state
-                self.io_loop.update_handler(self.socket.fileno(), self._state)
+                self.io_loop.update_handler(self.fileno(), self._state)
         except Exception:
             gen_log.error("Uncaught exception, closing connection.",
                           exc_info=True)
@@ -393,24 +369,6 @@ class IOStream(object):
             return
         self._maybe_add_error_listener()
 
-    def _read_from_socket(self):
-        """Attempts to read from the socket.
-
-        Returns the data read or None if there is nothing to read.
-        May be overridden in subclasses.
-        """
-        try:
-            chunk = self.socket.recv(self.read_chunk_size)
-        except socket.error, e:
-            if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
-                return None
-            else:
-                raise
-        if not chunk:
-            self.close()
-            return None
-        return chunk
-
     def _read_to_buffer(self):
         """Reads from the socket and appends the result to the read buffer.
 
@@ -419,11 +377,11 @@ class IOStream(object):
         error closes the socket and raises an exception.
         """
         try:
-            chunk = self._read_from_socket()
+            chunk = self.read_from_fd()
         except socket.error, e:
             # ssl.SSLError is a subclass of socket.error
             gen_log.warning("Read error on %d: %s",
-                            self.socket.fileno(), e)
+                            self.fileno(), e)
             self.close()
             raise
         if chunk is None:
@@ -496,24 +454,6 @@ class IOStream(object):
                     _double_prefix(self._read_buffer)
         return False
 
-    def _handle_connect(self):
-        err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
-        if err != 0:
-            self.error = socket.error(err, os.strerror(err))
-            # IOLoop implementations may vary: some of them return
-            # an error state before the socket becomes writable, so
-            # in that case a connection failure would be handled by the
-            # error path in _handle_events instead of here.
-            gen_log.warning("Connect error on fd %d: %s",
-                            self.socket.fileno(), errno.errorcode[err])
-            self.close()
-            return
-        if self._connect_callback is not None:
-            callback = self._connect_callback
-            self._connect_callback = None
-            self._run_callback(callback)
-        self._connecting = False
-
     def _handle_write(self):
         while self._write_buffer:
             try:
@@ -524,7 +464,7 @@ class IOStream(object):
                     # process.  Therefore we must not call socket.send
                     # with more than 128KB at a time.
                     _merge_prefix(self._write_buffer, 128 * 1024)
-                num_bytes = self.socket.send(self._write_buffer[0])
+                num_bytes = self.write_to_fd(self._write_buffer[0])
                 if num_bytes == 0:
                     # With OpenSSL, if we couldn't write the entire buffer,
                     # the very same string object must be used on the
@@ -545,7 +485,7 @@ class IOStream(object):
                     break
                 else:
                     gen_log.warning("Write error on %d: %s",
-                                    self.socket.fileno(), e)
+                                    self.fileno(), e)
                     self.close()
                     return
         if not self._write_buffer and self._write_callback:
@@ -561,12 +501,12 @@ class IOStream(object):
         return self._read_buffer.popleft()
 
     def _check_closed(self):
-        if not self.socket:
+        if self.closed():
             raise IOError("Stream is closed")
 
     def _maybe_add_error_listener(self):
         if self._state is None and self._pending_callbacks == 0:
-            if self.socket is None:
+            if self.closed():
                 self._maybe_run_close_callback()
             else:
                 self._add_io_state(ioloop.IOLoop.READ)
@@ -592,17 +532,143 @@ class IOStream(object):
         (since the write callback is optional so we can have a
         fast-path write with no `_run_callback`)
         """
-        if self.socket is None:
+        if self.closed():
             # connection has been closed, so there can be no future events
             return
         if self._state is None:
             self._state = ioloop.IOLoop.ERROR | state
             with stack_context.NullContext():
                 self.io_loop.add_handler(
-                    self.socket.fileno(), self._handle_events, self._state)
+                    self.fileno(), self._handle_events, self._state)
         elif not self._state & state:
             self._state = self._state | state
-            self.io_loop.update_handler(self.socket.fileno(), self._state)
+            self.io_loop.update_handler(self.fileno(), self._state)
+
+
+class IOStream(BaseIOStream):
+    r"""Socket-based IOStream implementation.
+
+    This class supports the read and write methods from `BaseIOStream`
+    plus a `connect` method.
+
+    The socket parameter may either be connected or unconnected.  For
+    server operations the socket is the result of calling socket.accept().
+    For client operations the socket is created with socket.socket(),
+    and may either be connected before passing it to the IOStream or
+    connected with IOStream.connect.
+
+    A very simple (and broken) HTTP client using this class::
+
+        from tornado import ioloop
+        from tornado import iostream
+        import socket
+
+        def send_request():
+            stream.write("GET / HTTP/1.0\r\nHost: friendfeed.com\r\n\r\n")
+            stream.read_until("\r\n\r\n", on_headers)
+
+        def on_headers(data):
+            headers = {}
+            for line in data.split("\r\n"):
+               parts = line.split(":")
+               if len(parts) == 2:
+                   headers[parts[0].strip()] = parts[1].strip()
+            stream.read_bytes(int(headers["Content-Length"]), on_body)
+
+        def on_body(data):
+            print data
+            stream.close()
+            ioloop.IOLoop.instance().stop()
+
+        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
+        stream = iostream.IOStream(s)
+        stream.connect(("friendfeed.com", 80), send_request)
+        ioloop.IOLoop.instance().start()
+    """
+    def __init__(self, socket, *args, **kwargs):
+        self.socket = socket
+        self.socket.setblocking(False)
+        super(IOStream, self).__init__(*args, **kwargs)
+
+    def fileno(self):
+        return self.socket.fileno()
+
+    def close_fd(self):
+        self.socket.close()
+        self.socket = None
+
+    def get_fd_error(self):
+        errno = self.socket.getsockopt(socket.SOL_SOCKET,
+                                       socket.SO_ERROR)
+        return socket.error(errno, os.strerror(errno))
+
+    def read_from_fd(self):
+        try:
+            chunk = self.socket.recv(self.read_chunk_size)
+        except socket.error, e:
+            if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
+                return None
+            else:
+                raise
+        if not chunk:
+            self.close()
+            return None
+        return chunk
+
+    def write_to_fd(self, data):
+        return self.socket.send(data)
+
+    def connect(self, address, callback=None):
+        """Connects the socket to a remote address without blocking.
+
+        May only be called if the socket passed to the constructor was
+        not previously connected.  The address parameter is in the
+        same format as for socket.connect, i.e. a (host, port) tuple.
+        If callback is specified, it will be called when the
+        connection is completed.
+
+        Note that it is safe to call IOStream.write while the
+        connection is pending, in which case the data will be written
+        as soon as the connection is ready.  Calling IOStream read
+        methods before the socket is connected works on some platforms
+        but is non-portable.
+        """
+        self._connecting = True
+        try:
+            self.socket.connect(address)
+        except socket.error, e:
+            # In non-blocking mode we expect connect() to raise an
+            # exception with EINPROGRESS or EWOULDBLOCK.
+            #
+            # On freebsd, other errors such as ECONNREFUSED may be
+            # returned immediately when attempting to connect to
+            # localhost, so handle them the same way as an error
+            # reported later in _handle_connect.
+            if e.args[0] not in (errno.EINPROGRESS, errno.EWOULDBLOCK):
+                gen_log.warning("Connect error on fd %d: %s",
+                                self.socket.fileno(), e)
+                self.close()
+                return
+        self._connect_callback = stack_context.wrap(callback)
+        self._add_io_state(self.io_loop.WRITE)
+
+    def _handle_connect(self):
+        err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
+        if err != 0:
+            self.error = socket.error(err, os.strerror(err))
+            # IOLoop implementations may vary: some of them return
+            # an error state before the socket becomes writable, so
+            # in that case a connection failure would be handled by the
+            # error path in _handle_events instead of here.
+            gen_log.warning("Connect error on fd %d: %s",
+                            self.socket.fileno(), errno.errorcode[err])
+            self.close()
+            return
+        if self._connect_callback is not None:
+            callback = self._connect_callback
+            self._connect_callback = None
+            self._run_callback(callback)
+        self._connecting = False
 
 
 class SSLIOStream(IOStream):
@@ -700,7 +766,7 @@ class SSLIOStream(IOStream):
                                       **self._ssl_options)
         super(SSLIOStream, self)._handle_connect()
 
-    def _read_from_socket(self):
+    def read_from_fd(self):
         if self._ssl_accepting:
             # If the handshake hasn't finished yet, there can't be anything
             # to read (attempting to read may or may not raise an exception
index 28de480b32cae07e246c77bd3a426a8d4cbdfdeb..d37a3793c78675255a1b4e5924d5497b91c04675 100644 (file)
@@ -2,4 +2,40 @@
 =====================================================================
 
 .. automodule:: tornado.iostream
-   :members:
+
+   Base class
+   ----------
+
+   .. autoclass:: BaseIOStream
+
+   Main interface
+   ^^^^^^^^^^^^^^
+
+   .. automethod:: BaseIOStream.write
+   .. automethod:: BaseIOStream.read_bytes
+   .. automethod:: BaseIOStream.read_until
+   .. automethod:: BaseIOStream.read_until_regex
+   .. automethod:: BaseIOStream.read_until_close
+   .. automethod:: BaseIOStream.close
+   .. automethod:: BaseIOStream.set_close_callback
+   .. automethod:: BaseIOStream.closed
+   .. automethod:: BaseIOStream.reading
+   .. automethod:: BaseIOStream.writing
+
+   Methods for subclasses
+   ^^^^^^^^^^^^^^^^^^^^^^
+
+   .. automethod:: BaseIOStream.fileno
+   .. automethod:: BaseIOStream.close_fd
+   .. automethod:: BaseIOStream.write_to_fd
+   .. automethod:: BaseIOStream.read_from_fd
+   .. automethod:: BaseIOStream.get_fd_error
+
+   Implementations
+   ---------------
+
+   .. autoclass:: IOStream
+      :members:
+
+   .. autoclass:: SSLIOStream
+      :members: