# License for the specific language governing permissions and limitations
# under the License.
-"""A utility class to write to and read from a non-blocking socket."""
+"""Utility classes to write to and read from non-blocking files and sockets.
+
+Contents:
+
+* `BaseIOStream`: Generic interface for reading and writing.
+* `IOStream`: Implementation of BaseIOStream using non-blocking sockets.
+* `SSLIOStream`: SSL-aware version of IOStream.
+"""
from __future__ import absolute_import, division, with_statement
ssl = None
-class IOStream(object):
- r"""A utility class to write to and read from a non-blocking socket.
+class BaseIOStream(object):
+ """A utility class to write to and read from a non-blocking file or socket.
We support a non-blocking ``write()`` and a family of ``read_*()`` methods.
All of the methods take callbacks (since writing and reading are
non-blocking and asynchronous).
- The socket parameter may either be connected or unconnected. For
- server operations the socket is the result of calling socket.accept().
- For client operations the socket is created with socket.socket(),
- and may either be connected before passing it to the IOStream or
- connected with IOStream.connect.
-
When a stream is closed due to an error, the IOStream's `error`
attribute contains the exception object.
- A very simple (and broken) HTTP client using this class::
-
- from tornado import ioloop
- from tornado import iostream
- import socket
-
- def send_request():
- stream.write("GET / HTTP/1.0\r\nHost: friendfeed.com\r\n\r\n")
- stream.read_until("\r\n\r\n", on_headers)
-
- def on_headers(data):
- headers = {}
- for line in data.split("\r\n"):
- parts = line.split(":")
- if len(parts) == 2:
- headers[parts[0].strip()] = parts[1].strip()
- stream.read_bytes(int(headers["Content-Length"]), on_body)
-
- def on_body(data):
- print data
- stream.close()
- ioloop.IOLoop.instance().stop()
-
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
- stream = iostream.IOStream(s)
- stream.connect(("friendfeed.com", 80), send_request)
- ioloop.IOLoop.instance().start()
-
+ Subclasses must implement `fileno`, `close_fd`, `write_to_fd`,
+ `read_from_fd`, and optionally `get_fd_error`.
"""
- def __init__(self, socket, io_loop=None, max_buffer_size=104857600,
+ def __init__(self, io_loop=None, max_buffer_size=104857600,
read_chunk_size=4096):
- self.socket = socket
- self.socket.setblocking(False)
self.io_loop = io_loop or ioloop.IOLoop.instance()
self.max_buffer_size = max_buffer_size
self.read_chunk_size = read_chunk_size
self._connecting = False
self._state = None
self._pending_callbacks = 0
+ self._closed = False
- def connect(self, address, callback=None):
- """Connects the socket to a remote address without blocking.
+ def fileno(self):
+ """Returns the file descriptor for this stream."""
+ raise NotImplementedError()
- May only be called if the socket passed to the constructor was
- not previously connected. The address parameter is in the
- same format as for socket.connect, i.e. a (host, port) tuple.
- If callback is specified, it will be called when the
- connection is completed.
+ def close_fd(self):
+ """Closes the file underlying this stream.
- Note that it is safe to call IOStream.write while the
- connection is pending, in which case the data will be written
- as soon as the connection is ready. Calling IOStream read
- methods before the socket is connected works on some platforms
- but is non-portable.
+ ``close_fd`` is called by `BaseIOStream` and should not be called
+ elsewhere; other users should call `close` instead.
"""
- self._connecting = True
- try:
- self.socket.connect(address)
- except socket.error, e:
- # In non-blocking mode we expect connect() to raise an
- # exception with EINPROGRESS or EWOULDBLOCK.
- #
- # On freebsd, other errors such as ECONNREFUSED may be
- # returned immediately when attempting to connect to
- # localhost, so handle them the same way as an error
- # reported later in _handle_connect.
- if e.args[0] not in (errno.EINPROGRESS, errno.EWOULDBLOCK):
- gen_log.warning("Connect error on fd %d: %s",
- self.socket.fileno(), e)
- self.close()
- return
- self._connect_callback = stack_context.wrap(callback)
- self._add_io_state(self.io_loop.WRITE)
+ raise NotImplementedError()
+
+ def write_to_fd(self, data):
+ """Attempts to write ``data`` to the underlying file.
+
+ Returns the number of bytes written.
+ """
+ raise NotImplementedError()
+
+ def read_from_fd(self):
+ """Attempts to read from the underlying file.
+
+ Returns ``None`` if there was nothing to read (the socket returned
+ EWOULDBLOCK or equivalent), otherwise returns the data. When possible,
+ should return no more than ``self.read_chunk_size`` bytes at a time.
+ """
+ raise NotImplementedError()
+
+ def get_fd_error(self):
+ """Returns information about any error on the underlying file.
+
+ This method is called after the IOLoop has signaled an error on the
+ file descriptor, and should return an Exception (such as `socket.error`
+ with additional information, or None if no such information is
+ available.
+ """
+ return None
def read_until_regex(self, regex, callback):
"""Call callback when we read the given regex pattern."""
def close(self):
"""Close this stream."""
- if self.socket is not None:
+ if not self.closed():
if any(sys.exc_info()):
self.error = sys.exc_info()[1]
if self._read_until_close:
self._run_callback(callback,
self._consume(self._read_buffer_size))
if self._state is not None:
- self.io_loop.remove_handler(self.socket.fileno())
+ self.io_loop.remove_handler(self.fileno())
self._state = None
- self.socket.close()
- self.socket = None
+ self.close_fd()
+ self._closed = True
self._maybe_run_close_callback()
def _maybe_run_close_callback(self):
- if (self.socket is None and self._close_callback and
+ if (self.closed() and self._close_callback and
self._pending_callbacks == 0):
# if there are pending callbacks, don't run the close callback
# until they're done (see _maybe_add_error_handler)
def closed(self):
"""Returns true if the stream has been closed."""
- return self.socket is None
+ return self._closed
def _handle_events(self, fd, events):
- if not self.socket:
+ if self.closed():
gen_log.warning("Got events for closed stream %d", fd)
return
try:
if events & self.io_loop.READ:
self._handle_read()
- if not self.socket:
+ if self.closed():
return
if events & self.io_loop.WRITE:
if self._connecting:
self._handle_connect()
self._handle_write()
- if not self.socket:
+ if self.closed():
return
if events & self.io_loop.ERROR:
- errno = self.socket.getsockopt(socket.SOL_SOCKET,
- socket.SO_ERROR)
- self.error = socket.error(errno, os.strerror(errno))
+ self.error = self.get_fd_error()
# We may have queued up a user callback in _handle_read or
# _handle_write, so don't close the IOStream until those
# callbacks have had a chance to run.
assert self._state is not None, \
"shouldn't happen: _handle_events without self._state"
self._state = state
- self.io_loop.update_handler(self.socket.fileno(), self._state)
+ self.io_loop.update_handler(self.fileno(), self._state)
except Exception:
gen_log.error("Uncaught exception, closing connection.",
exc_info=True)
return
self._maybe_add_error_listener()
- def _read_from_socket(self):
- """Attempts to read from the socket.
-
- Returns the data read or None if there is nothing to read.
- May be overridden in subclasses.
- """
- try:
- chunk = self.socket.recv(self.read_chunk_size)
- except socket.error, e:
- if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
- return None
- else:
- raise
- if not chunk:
- self.close()
- return None
- return chunk
-
def _read_to_buffer(self):
"""Reads from the socket and appends the result to the read buffer.
error closes the socket and raises an exception.
"""
try:
- chunk = self._read_from_socket()
+ chunk = self.read_from_fd()
except socket.error, e:
# ssl.SSLError is a subclass of socket.error
gen_log.warning("Read error on %d: %s",
- self.socket.fileno(), e)
+ self.fileno(), e)
self.close()
raise
if chunk is None:
_double_prefix(self._read_buffer)
return False
- def _handle_connect(self):
- err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
- if err != 0:
- self.error = socket.error(err, os.strerror(err))
- # IOLoop implementations may vary: some of them return
- # an error state before the socket becomes writable, so
- # in that case a connection failure would be handled by the
- # error path in _handle_events instead of here.
- gen_log.warning("Connect error on fd %d: %s",
- self.socket.fileno(), errno.errorcode[err])
- self.close()
- return
- if self._connect_callback is not None:
- callback = self._connect_callback
- self._connect_callback = None
- self._run_callback(callback)
- self._connecting = False
-
def _handle_write(self):
while self._write_buffer:
try:
# process. Therefore we must not call socket.send
# with more than 128KB at a time.
_merge_prefix(self._write_buffer, 128 * 1024)
- num_bytes = self.socket.send(self._write_buffer[0])
+ num_bytes = self.write_to_fd(self._write_buffer[0])
if num_bytes == 0:
# With OpenSSL, if we couldn't write the entire buffer,
# the very same string object must be used on the
break
else:
gen_log.warning("Write error on %d: %s",
- self.socket.fileno(), e)
+ self.fileno(), e)
self.close()
return
if not self._write_buffer and self._write_callback:
return self._read_buffer.popleft()
def _check_closed(self):
- if not self.socket:
+ if self.closed():
raise IOError("Stream is closed")
def _maybe_add_error_listener(self):
if self._state is None and self._pending_callbacks == 0:
- if self.socket is None:
+ if self.closed():
self._maybe_run_close_callback()
else:
self._add_io_state(ioloop.IOLoop.READ)
(since the write callback is optional so we can have a
fast-path write with no `_run_callback`)
"""
- if self.socket is None:
+ if self.closed():
# connection has been closed, so there can be no future events
return
if self._state is None:
self._state = ioloop.IOLoop.ERROR | state
with stack_context.NullContext():
self.io_loop.add_handler(
- self.socket.fileno(), self._handle_events, self._state)
+ self.fileno(), self._handle_events, self._state)
elif not self._state & state:
self._state = self._state | state
- self.io_loop.update_handler(self.socket.fileno(), self._state)
+ self.io_loop.update_handler(self.fileno(), self._state)
+
+
+class IOStream(BaseIOStream):
+ r"""Socket-based IOStream implementation.
+
+ This class supports the read and write methods from `BaseIOStream`
+ plus a `connect` method.
+
+ The socket parameter may either be connected or unconnected. For
+ server operations the socket is the result of calling socket.accept().
+ For client operations the socket is created with socket.socket(),
+ and may either be connected before passing it to the IOStream or
+ connected with IOStream.connect.
+
+ A very simple (and broken) HTTP client using this class::
+
+ from tornado import ioloop
+ from tornado import iostream
+ import socket
+
+ def send_request():
+ stream.write("GET / HTTP/1.0\r\nHost: friendfeed.com\r\n\r\n")
+ stream.read_until("\r\n\r\n", on_headers)
+
+ def on_headers(data):
+ headers = {}
+ for line in data.split("\r\n"):
+ parts = line.split(":")
+ if len(parts) == 2:
+ headers[parts[0].strip()] = parts[1].strip()
+ stream.read_bytes(int(headers["Content-Length"]), on_body)
+
+ def on_body(data):
+ print data
+ stream.close()
+ ioloop.IOLoop.instance().stop()
+
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
+ stream = iostream.IOStream(s)
+ stream.connect(("friendfeed.com", 80), send_request)
+ ioloop.IOLoop.instance().start()
+ """
+ def __init__(self, socket, *args, **kwargs):
+ self.socket = socket
+ self.socket.setblocking(False)
+ super(IOStream, self).__init__(*args, **kwargs)
+
+ def fileno(self):
+ return self.socket.fileno()
+
+ def close_fd(self):
+ self.socket.close()
+ self.socket = None
+
+ def get_fd_error(self):
+ errno = self.socket.getsockopt(socket.SOL_SOCKET,
+ socket.SO_ERROR)
+ return socket.error(errno, os.strerror(errno))
+
+ def read_from_fd(self):
+ try:
+ chunk = self.socket.recv(self.read_chunk_size)
+ except socket.error, e:
+ if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
+ return None
+ else:
+ raise
+ if not chunk:
+ self.close()
+ return None
+ return chunk
+
+ def write_to_fd(self, data):
+ return self.socket.send(data)
+
+ def connect(self, address, callback=None):
+ """Connects the socket to a remote address without blocking.
+
+ May only be called if the socket passed to the constructor was
+ not previously connected. The address parameter is in the
+ same format as for socket.connect, i.e. a (host, port) tuple.
+ If callback is specified, it will be called when the
+ connection is completed.
+
+ Note that it is safe to call IOStream.write while the
+ connection is pending, in which case the data will be written
+ as soon as the connection is ready. Calling IOStream read
+ methods before the socket is connected works on some platforms
+ but is non-portable.
+ """
+ self._connecting = True
+ try:
+ self.socket.connect(address)
+ except socket.error, e:
+ # In non-blocking mode we expect connect() to raise an
+ # exception with EINPROGRESS or EWOULDBLOCK.
+ #
+ # On freebsd, other errors such as ECONNREFUSED may be
+ # returned immediately when attempting to connect to
+ # localhost, so handle them the same way as an error
+ # reported later in _handle_connect.
+ if e.args[0] not in (errno.EINPROGRESS, errno.EWOULDBLOCK):
+ gen_log.warning("Connect error on fd %d: %s",
+ self.socket.fileno(), e)
+ self.close()
+ return
+ self._connect_callback = stack_context.wrap(callback)
+ self._add_io_state(self.io_loop.WRITE)
+
+ def _handle_connect(self):
+ err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
+ if err != 0:
+ self.error = socket.error(err, os.strerror(err))
+ # IOLoop implementations may vary: some of them return
+ # an error state before the socket becomes writable, so
+ # in that case a connection failure would be handled by the
+ # error path in _handle_events instead of here.
+ gen_log.warning("Connect error on fd %d: %s",
+ self.socket.fileno(), errno.errorcode[err])
+ self.close()
+ return
+ if self._connect_callback is not None:
+ callback = self._connect_callback
+ self._connect_callback = None
+ self._run_callback(callback)
+ self._connecting = False
class SSLIOStream(IOStream):
**self._ssl_options)
super(SSLIOStream, self)._handle_connect()
- def _read_from_socket(self):
+ def read_from_fd(self):
if self._ssl_accepting:
# If the handshake hasn't finished yet, there can't be anything
# to read (attempting to read may or may not raise an exception