except ImportError:
_set_nonblocking = None
+class StreamClosedError(IOError):
+ pass
class BaseIOStream(object):
"""A utility class to write to and read from a non-blocking file or socket.
chunk = self.read_from_fd()
except (socket.error, IOError, OSError), e:
# ssl.SSLError is a subclass of socket.error
+ if e.args[0] == errno.ECONNRESET:
+ # Treat ECONNRESET as a connection close rather than
+ # an error to minimize log spam (the exception will
+ # be available on self.error for apps that care).
+ self.close()
+ return
gen_log.warning("Read error on %d: %s",
self.fileno(), e)
self.close()
def _check_closed(self):
if self.closed():
- raise IOError("Stream is closed")
+ raise StreamClosedError("Stream is closed")
def _maybe_add_error_listener(self):
if self._state is None and self._pending_callbacks == 0:
from tornado.testing import AsyncHTTPTestCase, AsyncHTTPSTestCase, AsyncTestCase, ExpectLog
from tornado.test.util import unittest
from tornado.util import b, bytes_type
-from tornado.web import Application, RequestHandler
+from tornado.web import Application, RequestHandler, asynchronous
+import datetime
import os
import shutil
import socket
UnixSocketTest = unittest.skipIf(
not hasattr(socket, 'AF_UNIX') or sys.platform == 'cygwin',
"unix sockets not supported on this platform")
+
+class KeepAliveTest(AsyncHTTPTestCase):
+ """Tests various scenarios for HTTP 1.1 keep-alive support.
+
+ These tests don't use AsyncHTTPClient because we want to control
+ connection reuse and closing.
+ """
+ def get_app(self):
+ test = self
+
+ class HelloHandler(RequestHandler):
+ def get(self):
+ self.finish('Hello world')
+
+ class LargeHandler(RequestHandler):
+ def get(self):
+ # 512KB should be bigger than the socket buffers so it will
+ # be written out in chunks.
+ self.write(''.join(chr(i % 256) * 1024 for i in xrange(512)))
+
+ class FinishOnCloseHandler(RequestHandler):
+ @asynchronous
+ def get(self):
+ self.flush()
+
+ def on_connection_close(self):
+ # This is not very realistic, but finishing the request
+ # from the close callback has the right timing to mimic
+ # some errors seen in the wild.
+ self.finish('closed')
+
+ return Application([('/', HelloHandler),
+ ('/large', LargeHandler),
+ ('/finish_on_close', FinishOnCloseHandler)])
+
+ def setUp(self):
+ super(KeepAliveTest, self).setUp()
+ self.http_version = b('HTTP/1.1')
+
+ def tearDown(self):
+ # We just closed the client side of the socket; let the IOLoop run
+ # once to make sure the server side got the message.
+ self.io_loop.add_timeout(datetime.timedelta(seconds=0.001), self.stop)
+ self.wait()
+
+ if hasattr(self, 'stream'):
+ self.stream.close()
+ super(KeepAliveTest, self).tearDown()
+
+ # The next few methods are a crude manual http client
+ def connect(self):
+ self.stream = IOStream(socket.socket(), io_loop=self.io_loop)
+ self.stream.connect(('localhost', self.get_http_port()), self.stop)
+ self.wait()
+
+ def read_headers(self):
+ self.stream.read_until(b('\r\n'), self.stop)
+ first_line = self.wait()
+ self.assertTrue(first_line.startswith(self.http_version + b(' 200')), first_line)
+ self.stream.read_until(b('\r\n\r\n'), self.stop)
+ header_bytes = self.wait()
+ headers = HTTPHeaders.parse(header_bytes.decode('latin1'))
+ return headers
+
+ def read_response(self):
+ headers = self.read_headers()
+ self.stream.read_bytes(int(headers['Content-Length']), self.stop)
+ body = self.wait()
+ self.assertEqual(b('Hello world'), body)
+
+ def close(self):
+ self.stream.close()
+ del self.stream
+
+ def test_two_requests(self):
+ self.connect()
+ self.stream.write(b('GET / HTTP/1.1\r\n\r\n'))
+ self.read_response()
+ self.stream.write(b('GET / HTTP/1.1\r\n\r\n'))
+ self.read_response()
+ self.close()
+
+ def test_request_close(self):
+ self.connect()
+ self.stream.write(b('GET / HTTP/1.1\r\nConnection: close\r\n\r\n'))
+ self.read_response()
+ self.stream.read_until_close(callback=self.stop)
+ data = self.wait()
+ self.assertTrue(not data)
+ self.close()
+
+ # keepalive is supported for http 1.0 too, but it's opt-in
+ def test_http10(self):
+ self.http_version = b('HTTP/1.0')
+ self.connect()
+ self.stream.write(b('GET / HTTP/1.0\r\n\r\n'))
+ self.read_response()
+ self.stream.read_until_close(callback=self.stop)
+ data = self.wait()
+ self.assertTrue(not data)
+ self.close()
+
+ def test_http10_keepalive(self):
+ self.http_version = b('HTTP/1.0')
+ self.connect()
+ self.stream.write(b('GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n'))
+ self.read_response()
+ self.stream.write(b('GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n'))
+ self.read_response()
+ self.close()
+
+ def test_pipelined_requests(self):
+ self.connect()
+ self.stream.write(b('GET / HTTP/1.1\r\n\r\nGET / HTTP/1.1\r\n\r\n'))
+ self.read_response()
+ self.read_response()
+ self.close()
+
+ def test_pipelined_cancel(self):
+ self.connect()
+ self.stream.write(b('GET / HTTP/1.1\r\n\r\nGET / HTTP/1.1\r\n\r\n'))
+ # only read once
+ self.read_response()
+ self.close()
+
+ def test_cancel_during_download(self):
+ self.connect()
+ self.stream.write(b('GET /large HTTP/1.1\r\n\r\n'))
+ self.read_headers()
+ self.stream.read_bytes(1024, self.stop)
+ self.wait()
+ self.close()
+
+ def test_finish_while_closed(self):
+ self.connect()
+ self.stream.write(b('GET /finish_on_close HTTP/1.1\r\n\r\n'))
+ self.read_headers()
+ self.close()