from __future__ import absolute_import, division, print_function
-from tornado import netutil
+from tornado import gen, netutil
from tornado.concurrent import Future
from tornado.escape import json_decode, json_encode, utf8, _unicode, recursive_unicode, native_str
from tornado.http1connection import HTTP1Connection
from tornado.httpserver import HTTPServer
from tornado.httputil import HTTPHeaders, HTTPMessageDelegate, HTTPServerConnectionDelegate, ResponseStartLine # noqa: E501
from tornado.iostream import IOStream
+from tornado.locks import Event
from tornado.log import gen_log
from tornado.netutil import ssl_options_to_context
from tornado.simple_httpclient import SimpleAsyncHTTPClient
newline=newline)
self.assertEqual(response, b'Hello world')
+ @gen_test
def test_100_continue(self):
# Run through a 100-continue interaction by hand:
# When given Expect: 100-continue, we get a 100 response after the
# headers, and then the real response after the body.
stream = IOStream(socket.socket())
- stream.connect(("127.0.0.1", self.get_http_port()), callback=self.stop)
- self.wait()
- stream.write(b"\r\n".join([b"POST /hello HTTP/1.1",
- b"Content-Length: 1024",
- b"Expect: 100-continue",
- b"Connection: close",
- b"\r\n"]), callback=self.stop)
- self.wait()
- stream.read_until(b"\r\n\r\n", self.stop)
- data = self.wait()
+ yield stream.connect(("127.0.0.1", self.get_http_port()))
+ yield stream.write(b"\r\n".join([
+ b"POST /hello HTTP/1.1",
+ b"Content-Length: 1024",
+ b"Expect: 100-continue",
+ b"Connection: close",
+ b"\r\n"]))
+ data = yield stream.read_until(b"\r\n\r\n")
self.assertTrue(data.startswith(b"HTTP/1.1 100 "), data)
stream.write(b"a" * 1024)
- stream.read_until(b"\r\n", self.stop)
- first_line = self.wait()
+ first_line = yield stream.read_until(b"\r\n")
self.assertTrue(first_line.startswith(b"HTTP/1.1 200"), first_line)
- stream.read_until(b"\r\n\r\n", self.stop)
- header_data = self.wait()
+ header_data = yield stream.read_until(b"\r\n\r\n")
headers = HTTPHeaders.parse(native_str(header_data.decode('latin1')))
- stream.read_bytes(int(headers["Content-Length"]), self.stop)
- body = self.wait()
+ body = yield stream.read_bytes(int(headers["Content-Length"]))
self.assertEqual(body, b"Got 1024 bytes in POST")
stream.close()
start_line, headers, response = self.wait()
self.assertEqual(json_decode(response), {u'foo': [u'bar']})
+ @gen_test
def test_invalid_content_length(self):
with ExpectLog(gen_log, '.*Only integer Content-Length is allowed'):
self.stream.write(b"""\
bar
""".replace(b"\n", b"\r\n"))
- self.stream.read_until_close(self.stop)
- self.wait()
+ yield self.stream.read_until_close()
class XHeaderTest(HandlerBaseTestCase):
shutil.rmtree(self.tmpdir)
super(UnixSocketTest, self).tearDown()
+ @gen_test
def test_unix_socket(self):
self.stream.write(b"GET /hello HTTP/1.0\r\n\r\n")
- self.stream.read_until(b"\r\n", self.stop)
- response = self.wait()
+ response = yield self.stream.read_until(b"\r\n")
self.assertEqual(response, b"HTTP/1.1 200 OK\r\n")
- self.stream.read_until(b"\r\n\r\n", self.stop)
- headers = HTTPHeaders.parse(self.wait().decode('latin1'))
- self.stream.read_bytes(int(headers["Content-Length"]), self.stop)
- body = self.wait()
+ header_data = yield self.stream.read_until(b"\r\n\r\n")
+ headers = HTTPHeaders.parse(header_data.decode('latin1'))
+ body = yield self.stream.read_bytes(int(headers["Content-Length"]))
self.assertEqual(body, b"Hello world")
+ @gen_test
def test_unix_socket_bad_request(self):
# Unix sockets don't have remote addresses so they just return an
# empty string.
with ExpectLog(gen_log, "Malformed HTTP message from"):
self.stream.write(b"garbage\r\n\r\n")
- self.stream.read_until_close(self.stop)
- response = self.wait()
+ response = yield self.stream.read_until_close()
self.assertEqual(response, b"HTTP/1.1 400 Bad Request\r\n\r\n")
super(KeepAliveTest, self).tearDown()
# The next few methods are a crude manual http client
+ @gen.coroutine
def connect(self):
self.stream = IOStream(socket.socket())
- self.stream.connect(('127.0.0.1', self.get_http_port()), self.stop)
- self.wait()
+ yield self.stream.connect(('127.0.0.1', self.get_http_port()))
+ @gen.coroutine
def read_headers(self):
- self.stream.read_until(b'\r\n', self.stop)
- first_line = self.wait()
+ first_line = yield self.stream.read_until(b'\r\n')
self.assertTrue(first_line.startswith(b'HTTP/1.1 200'), first_line)
- self.stream.read_until(b'\r\n\r\n', self.stop)
- header_bytes = self.wait()
+ header_bytes = yield self.stream.read_until(b'\r\n\r\n')
headers = HTTPHeaders.parse(header_bytes.decode('latin1'))
- return headers
+ raise gen.Return(headers)
+ @gen.coroutine
def read_response(self):
- self.headers = self.read_headers()
- self.stream.read_bytes(int(self.headers['Content-Length']), self.stop)
- body = self.wait()
+ self.headers = yield self.read_headers()
+ body = yield self.stream.read_bytes(int(self.headers['Content-Length']))
self.assertEqual(b'Hello world', body)
def close(self):
self.stream.close()
del self.stream
+ @gen_test
def test_two_requests(self):
- self.connect()
+ yield self.connect()
self.stream.write(b'GET / HTTP/1.1\r\n\r\n')
- self.read_response()
+ yield self.read_response()
self.stream.write(b'GET / HTTP/1.1\r\n\r\n')
- self.read_response()
+ yield self.read_response()
self.close()
+ @gen_test
def test_request_close(self):
- self.connect()
+ yield self.connect()
self.stream.write(b'GET / HTTP/1.1\r\nConnection: close\r\n\r\n')
- self.read_response()
- self.stream.read_until_close(callback=self.stop)
- data = self.wait()
+ yield self.read_response()
+ data = yield self.stream.read_until_close()
self.assertTrue(not data)
self.assertEqual(self.headers['Connection'], 'close')
self.close()
# keepalive is supported for http 1.0 too, but it's opt-in
+ @gen_test
def test_http10(self):
self.http_version = b'HTTP/1.0'
- self.connect()
+ yield self.connect()
self.stream.write(b'GET / HTTP/1.0\r\n\r\n')
- self.read_response()
- self.stream.read_until_close(callback=self.stop)
- data = self.wait()
+ yield self.read_response()
+ data = yield self.stream.read_until_close()
self.assertTrue(not data)
self.assertTrue('Connection' not in self.headers)
self.close()
+ @gen_test
def test_http10_keepalive(self):
self.http_version = b'HTTP/1.0'
- self.connect()
+ yield self.connect()
self.stream.write(b'GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n')
- self.read_response()
+ yield self.read_response()
self.assertEqual(self.headers['Connection'], 'Keep-Alive')
self.stream.write(b'GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n')
- self.read_response()
+ yield self.read_response()
self.assertEqual(self.headers['Connection'], 'Keep-Alive')
self.close()
+ @gen_test
def test_http10_keepalive_extra_crlf(self):
self.http_version = b'HTTP/1.0'
- self.connect()
+ yield self.connect()
self.stream.write(b'GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n\r\n')
- self.read_response()
+ yield self.read_response()
self.assertEqual(self.headers['Connection'], 'Keep-Alive')
self.stream.write(b'GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n')
- self.read_response()
+ yield self.read_response()
self.assertEqual(self.headers['Connection'], 'Keep-Alive')
self.close()
+ @gen_test
def test_pipelined_requests(self):
- self.connect()
+ yield self.connect()
self.stream.write(b'GET / HTTP/1.1\r\n\r\nGET / HTTP/1.1\r\n\r\n')
- self.read_response()
- self.read_response()
+ yield self.read_response()
+ yield self.read_response()
self.close()
+ @gen_test
def test_pipelined_cancel(self):
- self.connect()
+ yield self.connect()
self.stream.write(b'GET / HTTP/1.1\r\n\r\nGET / HTTP/1.1\r\n\r\n')
# only read once
- self.read_response()
+ yield self.read_response()
self.close()
+ @gen_test
def test_cancel_during_download(self):
- self.connect()
+ yield self.connect()
self.stream.write(b'GET /large HTTP/1.1\r\n\r\n')
- self.read_headers()
- self.stream.read_bytes(1024, self.stop)
- self.wait()
+ yield self.read_headers()
+ yield self.stream.read_bytes(1024)
self.close()
+ @gen_test
def test_finish_while_closed(self):
- self.connect()
+ yield self.connect()
self.stream.write(b'GET /finish_on_close HTTP/1.1\r\n\r\n')
- self.read_headers()
+ yield self.read_headers()
self.close()
+ @gen_test
def test_keepalive_chunked(self):
self.http_version = b'HTTP/1.0'
- self.connect()
+ yield self.connect()
self.stream.write(b'POST / HTTP/1.0\r\n'
b'Connection: keep-alive\r\n'
b'Transfer-Encoding: chunked\r\n'
b'\r\n'
b'0\r\n'
b'\r\n')
- self.read_response()
+ yield self.read_response()
self.assertEqual(self.headers['Connection'], 'Keep-Alive')
self.stream.write(b'GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n')
- self.read_response()
+ yield self.read_response()
self.assertEqual(self.headers['Connection'], 'Keep-Alive')
self.close()
for stream in self.streams:
stream.close()
+ @gen.coroutine
def connect(self):
stream = IOStream(socket.socket())
- stream.connect(('127.0.0.1', self.get_http_port()), self.stop)
- self.wait()
+ yield stream.connect(('127.0.0.1', self.get_http_port()))
self.streams.append(stream)
- return stream
+ raise gen.Return(stream)
+ @gen_test
def test_unused_connection(self):
- stream = self.connect()
- stream.set_close_callback(self.stop)
- self.wait()
+ stream = yield self.connect()
+ event = Event()
+ stream.set_close_callback(event.set)
+ yield event.wait()
+ @gen_test
def test_idle_after_use(self):
- stream = self.connect()
- stream.set_close_callback(lambda: self.stop("closed"))
+ stream = yield self.connect()
+ event = Event()
+ stream.set_close_callback(event.set)
# Use the connection twice to make sure keep-alives are working
for i in range(2):
stream.write(b"GET / HTTP/1.1\r\n\r\n")
- stream.read_until(b"\r\n\r\n", self.stop)
- self.wait()
- stream.read_bytes(11, self.stop)
- data = self.wait()
+ yield stream.read_until(b"\r\n\r\n")
+ data = yield stream.read_bytes(11)
self.assertEqual(data, b"Hello world")
# Now let the timeout trigger and close the connection.
- data = self.wait()
- self.assertEqual(data, "closed")
+ yield event.wait()
class BodyLimitsTest(AsyncHTTPTestCase):
from tornado import netutil
from tornado.iostream import IOStream, SSLIOStream, PipeIOStream, StreamClosedError, _StreamBuffer
from tornado.httputil import HTTPHeaders
+from tornado.locks import Condition, Event
from tornado.log import gen_log, app_log
from tornado.netutil import ssl_wrap_socket
from tornado.stack_context import NullContext
from tornado.tcpserver import TCPServer
from tornado.testing import AsyncHTTPTestCase, AsyncHTTPSTestCase, AsyncTestCase, bind_unused_port, ExpectLog, gen_test # noqa: E501
-from tornado.test.util import unittest, skipIfNonUnix, refusing_port, skipPypy3V58
+from tornado.test.util import (unittest, skipIfNonUnix, refusing_port, skipPypy3V58,
+ ignore_deprecation)
from tornado.web import RequestHandler, Application
import errno
import hashlib
response = self.fetch("/", headers={"Connection": "close"})
response.rethrow()
+ @gen_test
def test_read_until_close(self):
stream = self._make_client_iostream()
- stream.connect(('127.0.0.1', self.get_http_port()), callback=self.stop)
- self.wait()
+ yield stream.connect(('127.0.0.1', self.get_http_port()))
stream.write(b"GET / HTTP/1.0\r\n\r\n")
- stream.read_until_close(self.stop)
- data = self.wait()
+ data = yield stream.read_until_close()
self.assertTrue(data.startswith(b"HTTP/1.1 200"))
self.assertTrue(data.endswith(b"Hello"))
+ @gen_test
def test_read_zero_bytes(self):
self.stream = self._make_client_iostream()
- self.stream.connect(("127.0.0.1", self.get_http_port()),
- callback=self.stop)
- self.wait()
+ yield self.stream.connect(("127.0.0.1", self.get_http_port()))
self.stream.write(b"GET / HTTP/1.0\r\n\r\n")
# normal read
- self.stream.read_bytes(9, self.stop)
- data = self.wait()
+ data = yield self.stream.read_bytes(9)
self.assertEqual(data, b"HTTP/1.1 ")
# zero bytes
- self.stream.read_bytes(0, self.stop)
- data = self.wait()
+ data = yield self.stream.read_bytes(0)
self.assertEqual(data, b"")
# another normal read
- self.stream.read_bytes(3, self.stop)
- data = self.wait()
+ data = yield self.stream.read_bytes(3)
self.assertEqual(data, b"200")
self.stream.close()
+ @gen_test
def test_write_while_connecting(self):
stream = self._make_client_iostream()
connected = [False]
+ cond = Condition()
def connected_callback():
connected[0] = True
- self.stop()
+ cond.notify()
stream.connect(("127.0.0.1", self.get_http_port()),
callback=connected_callback)
# unlike the previous tests, try to write before the connection
def write_callback():
written[0] = True
- self.stop()
+ cond.notify()
stream.write(b"GET / HTTP/1.0\r\nConnection: close\r\n\r\n",
callback=write_callback)
self.assertTrue(not connected[0])
# by the time the write has flushed, the connection callback has
# also run
try:
- self.wait(lambda: connected[0] and written[0])
+ while not (connected[0] and written[0]):
+ yield cond.wait()
finally:
logging.debug((connected, written))
- stream.read_until_close(self.stop)
- data = self.wait()
+ data = yield stream.read_until_close()
self.assertTrue(data.endswith(b"Hello"))
stream.close()
def make_iostream_pair(self, **kwargs):
raise NotImplementedError
+ @gen_test
def test_write_zero_bytes(self):
# Attempting to write zero bytes should run the callback without
# going into an infinite loop.
- rs, ws = self.make_iostream_pair()
- ws.write(b'', callback=self.stop)
- self.wait()
+ rs, ws = yield self.make_iostream_pair()
+ yield ws.write(b'')
ws.close()
rs.close()
+ @gen_test
def test_streaming_callback(self):
- rs, ws = self.make_iostream_pair()
+ rs, ws = yield self.make_iostream_pair()
+ try:
+ chunks = []
+ cond = Condition()
+
+ def streaming_callback(data):
+ chunks.append(data)
+ cond.notify()
+
+ fut = rs.read_bytes(6, streaming_callback=streaming_callback)
+ ws.write(b"1234")
+ while not chunks:
+ yield cond.wait()
+ ws.write(b"5678")
+ final_data = yield(fut)
+ self.assertFalse(final_data)
+ self.assertEqual(chunks, [b"1234", b"56"])
+
+ # the rest of the last chunk is still in the buffer
+ data = yield rs.read_bytes(2)
+ self.assertEqual(data, b"78")
+ finally:
+ rs.close()
+ ws.close()
+
+ @gen_test
+ def test_streaming_callback_with_final_callback(self):
+ rs, ws = yield self.make_iostream_pair()
try:
chunks = []
final_called = []
+ cond = Condition()
def streaming_callback(data):
chunks.append(data)
- self.stop()
+ cond.notify()
def final_callback(data):
self.assertFalse(data)
final_called.append(True)
- self.stop()
- rs.read_bytes(6, callback=final_callback,
- streaming_callback=streaming_callback)
+ cond.notify()
+ with ignore_deprecation():
+ rs.read_bytes(6, callback=final_callback,
+ streaming_callback=streaming_callback)
ws.write(b"1234")
- self.wait(condition=lambda: chunks)
+ while not chunks:
+ yield cond.wait()
ws.write(b"5678")
- self.wait(condition=lambda: final_called)
+ while not final_called:
+ yield cond.wait()
self.assertEqual(chunks, [b"1234", b"56"])
# the rest of the last chunk is still in the buffer
- rs.read_bytes(2, callback=self.stop)
- data = self.wait()
+ data = yield rs.read_bytes(2)
self.assertEqual(data, b"78")
finally:
rs.close()
ws.close()
+ @gen_test
def test_streaming_callback_with_data_in_buffer(self):
- rs, ws = self.make_iostream_pair()
+ rs, ws = yield self.make_iostream_pair()
ws.write(b"abcd\r\nefgh")
- rs.read_until(b"\r\n", self.stop)
- data = self.wait()
+ data = yield rs.read_until(b"\r\n")
self.assertEqual(data, b"abcd\r\n")
- def closed_callback(chunk):
- self.fail()
- rs.read_until_close(callback=closed_callback,
- streaming_callback=self.stop)
- #self.io_loop.add_timeout(self.io_loop.time() + 0.01, self.stop)
- data = self.wait()
+ streaming_fut = Future()
+ rs.read_until_close(streaming_callback=streaming_fut.set_result)
+ data = yield streaming_fut
self.assertEqual(data, b"efgh")
rs.close()
ws.close()
+ @gen_test
def test_streaming_until_close(self):
- rs, ws = self.make_iostream_pair()
+ rs, ws = yield self.make_iostream_pair()
try:
chunks = []
closed = [False]
+ cond = Condition()
def streaming_callback(data):
chunks.append(data)
- self.stop()
+ cond.notify()
def close_callback(data):
assert not data, data
closed[0] = True
- self.stop()
- rs.read_until_close(callback=close_callback,
- streaming_callback=streaming_callback)
+ cond.notify()
+ with ignore_deprecation():
+ rs.read_until_close(callback=close_callback,
+ streaming_callback=streaming_callback)
ws.write(b"1234")
- self.wait(condition=lambda: len(chunks) == 1)
- ws.write(b"5678", self.stop)
- self.wait()
+ while len(chunks) != 1:
+ yield cond.wait()
+ yield ws.write(b"5678")
ws.close()
- self.wait(condition=lambda: closed[0])
+ while not closed[0]:
+ yield cond.wait()
self.assertEqual(chunks, [b"1234", b"5678"])
finally:
ws.close()
rs.close()
+ @gen_test
def test_streaming_until_close_future(self):
- rs, ws = self.make_iostream_pair()
+ rs, ws = yield self.make_iostream_pair()
try:
chunks = []
yield ws.write(b"5678")
ws.close()
- @gen.coroutine
- def f():
- yield [rs_task(), ws_task()]
- self.io_loop.run_sync(f)
+ yield [rs_task(), ws_task()]
self.assertEqual(chunks, [b"1234", b"5678"])
finally:
ws.close()
rs.close()
+ @gen_test
def test_delayed_close_callback(self):
# The scenario: Server closes the connection while there is a pending
# read that can be served out of buffered data. The client does not
# run the close_callback as soon as it detects the close, but rather
# defers it until after the buffered read has finished.
- rs, ws = self.make_iostream_pair()
+ rs, ws = yield self.make_iostream_pair()
try:
- rs.set_close_callback(self.stop)
+ event = Event()
+ rs.set_close_callback(event.set)
ws.write(b"12")
chunks = []
def callback1(data):
chunks.append(data)
- rs.read_bytes(1, callback2)
+ with ignore_deprecation():
+ rs.read_bytes(1, callback2)
ws.close()
def callback2(data):
chunks.append(data)
- rs.read_bytes(1, callback1)
- self.wait() # stopped by close_callback
+ with ignore_deprecation():
+ rs.read_bytes(1, callback1)
+ yield event.wait() # stopped by close_callback
self.assertEqual(chunks, [b"1", b"2"])
finally:
ws.close()
rs.close()
+ @gen_test
def test_future_delayed_close_callback(self):
# Same as test_delayed_close_callback, but with the future interface.
- rs, ws = self.make_iostream_pair()
+ rs, ws = yield self.make_iostream_pair()
- # We can't call make_iostream_pair inside a gen_test function
- # because the ioloop is not reentrant.
- @gen_test
- def f(self):
+ try:
ws.write(b"12")
chunks = []
chunks.append((yield rs.read_bytes(1)))
ws.close()
chunks.append((yield rs.read_bytes(1)))
self.assertEqual(chunks, [b"1", b"2"])
- try:
- f(self)
finally:
ws.close()
rs.close()
+ @gen_test
def test_close_buffered_data(self):
# Similar to the previous test, but with data stored in the OS's
# socket buffers instead of the IOStream's read buffer. Out-of-band
#
# This depends on the read_chunk_size being smaller than the
# OS socket buffer, so make it small.
- rs, ws = self.make_iostream_pair(read_chunk_size=256)
+ rs, ws = yield self.make_iostream_pair(read_chunk_size=256)
try:
ws.write(b"A" * 512)
- rs.read_bytes(256, self.stop)
- data = self.wait()
+ data = yield rs.read_bytes(256)
self.assertEqual(b"A" * 256, data)
ws.close()
# Allow the close to propagate to the `rs` side of the
# connection. Using add_callback instead of add_timeout
# doesn't seem to work, even with multiple iterations
- self.io_loop.add_timeout(self.io_loop.time() + 0.01, self.stop)
- self.wait()
- rs.read_bytes(256, self.stop)
- data = self.wait()
+ yield gen.sleep(0.01)
+ data = yield rs.read_bytes(256)
self.assertEqual(b"A" * 256, data)
finally:
ws.close()
rs.close()
+ @gen_test
def test_read_until_close_after_close(self):
# Similar to test_delayed_close_callback, but read_until_close takes
# a separate code path so test it separately.
- rs, ws = self.make_iostream_pair()
+ rs, ws = yield self.make_iostream_pair()
try:
ws.write(b"1234")
ws.close()
# Read one byte to make sure the client has received the data.
# It won't run the close callback as long as there is more buffered
# data that could satisfy a later read.
- rs.read_bytes(1, self.stop)
- data = self.wait()
+ data = yield rs.read_bytes(1)
self.assertEqual(data, b"1")
- rs.read_until_close(self.stop)
- data = self.wait()
+ data = yield rs.read_until_close()
self.assertEqual(data, b"234")
finally:
ws.close()
rs.close()
+ @gen_test
def test_streaming_read_until_close_after_close(self):
# Same as the preceding test but with a streaming_callback.
# All data should go through the streaming callback,
# and the final read callback just gets an empty string.
- rs, ws = self.make_iostream_pair()
+ rs, ws = yield self.make_iostream_pair()
try:
ws.write(b"1234")
ws.close()
- rs.read_bytes(1, self.stop)
- data = self.wait()
+ data = yield rs.read_bytes(1)
self.assertEqual(data, b"1")
streaming_data = []
- rs.read_until_close(self.stop,
- streaming_callback=streaming_data.append)
- data = self.wait()
- self.assertEqual(b'', data)
+ final_future = Future()
+ with ignore_deprecation():
+ rs.read_until_close(final_future.set_result,
+ streaming_callback=streaming_data.append)
+ final_data = yield final_future
+ self.assertEqual(b'', final_data)
self.assertEqual(b''.join(streaming_data), b"234")
finally:
ws.close()
rs.close()
+ @gen_test
def test_large_read_until(self):
# Performance test: read_until used to have a quadratic component
# so a read_until of 4MB would take 8 seconds; now it takes 0.25
# seconds.
- rs, ws = self.make_iostream_pair()
+ rs, ws = yield self.make_iostream_pair()
try:
# This test fails on pypy with ssl. I think it's because
# pypy's gc defeats moves objects, breaking the
for i in range(NUM_KB):
ws.write(b"A" * 1024)
ws.write(b"\r\n")
- rs.read_until(b"\r\n", self.stop)
- data = self.wait()
+ data = yield rs.read_until(b"\r\n")
self.assertEqual(len(data), NUM_KB * 1024 + 2)
finally:
ws.close()
rs.close()
+ @gen_test
def test_close_callback_with_pending_read(self):
# Regression test for a bug that was introduced in 2.3
# where the IOStream._close_callback would never be called
# if there were pending reads.
OK = b"OK\r\n"
- rs, ws = self.make_iostream_pair()
- rs.set_close_callback(self.stop)
+ rs, ws = yield self.make_iostream_pair()
+ event = Event()
+ rs.set_close_callback(event.set)
try:
ws.write(OK)
- rs.read_until(b"\r\n", self.stop)
- res = self.wait()
+ res = yield rs.read_until(b"\r\n")
self.assertEqual(res, OK)
ws.close()
- rs.read_until(b"\r\n", lambda x: x)
+ with ignore_deprecation():
+ rs.read_until(b"\r\n", lambda x: x)
# If _close_callback (self.stop) is not called,
# an AssertionError: Async operation timed out after 5 seconds
# will be raised.
- res = self.wait()
- self.assertTrue(res is None)
+ yield event.wait()
finally:
ws.close()
rs.close()
+ @gen_test
def test_future_close_callback(self):
# Regression test for interaction between the Future read interfaces
# and IOStream._maybe_add_error_listener.
- rs, ws = self.make_iostream_pair()
+ rs, ws = yield self.make_iostream_pair()
closed = [False]
+ cond = Condition()
def close_callback():
closed[0] = True
- self.stop()
+ cond.notify()
rs.set_close_callback(close_callback)
try:
ws.write(b'a')
- future = rs.read_bytes(1)
- self.io_loop.add_future(future, self.stop)
- self.assertEqual(self.wait().result(), b'a')
+ res = yield rs.read_bytes(1)
+ self.assertEqual(res, b'a')
self.assertFalse(closed[0])
ws.close()
- self.wait()
+ yield cond.wait()
self.assertTrue(closed[0])
finally:
rs.close()
ws.close()
+ @gen_test
def test_write_memoryview(self):
- rs, ws = self.make_iostream_pair()
+ rs, ws = yield self.make_iostream_pair()
try:
- rs.read_bytes(4, self.stop)
+ fut = rs.read_bytes(4)
ws.write(memoryview(b"hello"))
- data = self.wait()
+ data = yield fut
self.assertEqual(data, b"hell")
finally:
ws.close()
rs.close()
+ @gen_test
def test_read_bytes_partial(self):
- rs, ws = self.make_iostream_pair()
+ rs, ws = yield self.make_iostream_pair()
try:
# Ask for more than is available with partial=True
- rs.read_bytes(50, self.stop, partial=True)
+ fut = rs.read_bytes(50, partial=True)
ws.write(b"hello")
- data = self.wait()
+ data = yield fut
self.assertEqual(data, b"hello")
# Ask for less than what is available; num_bytes is still
# respected.
- rs.read_bytes(3, self.stop, partial=True)
+ fut = rs.read_bytes(3, partial=True)
ws.write(b"world")
- data = self.wait()
+ data = yield fut
self.assertEqual(data, b"wor")
# Partial reads won't return an empty string, but read_bytes(0)
# will.
- rs.read_bytes(0, self.stop, partial=True)
- data = self.wait()
+ data = yield rs.read_bytes(0, partial=True)
self.assertEqual(data, b'')
finally:
ws.close()
rs.close()
+ @gen_test
def test_read_until_max_bytes(self):
- rs, ws = self.make_iostream_pair()
- rs.set_close_callback(lambda: self.stop("closed"))
+ rs, ws = yield self.make_iostream_pair()
+ closed = Event()
+ rs.set_close_callback(closed.set)
try:
# Extra room under the limit
- rs.read_until(b"def", self.stop, max_bytes=50)
+ fut = rs.read_until(b"def", max_bytes=50)
ws.write(b"abcdef")
- data = self.wait()
+ data = yield fut
self.assertEqual(data, b"abcdef")
# Just enough space
- rs.read_until(b"def", self.stop, max_bytes=6)
+ fut = rs.read_until(b"def", max_bytes=6)
ws.write(b"abcdef")
- data = self.wait()
+ data = yield fut
self.assertEqual(data, b"abcdef")
# Not enough space, but we don't know it until all we can do is
# log a warning and close the connection.
with ExpectLog(gen_log, "Unsatisfiable read"):
- rs.read_until(b"def", self.stop, max_bytes=5)
+ fut = rs.read_until(b"def", max_bytes=5)
ws.write(b"123456")
- data = self.wait()
- self.assertEqual(data, "closed")
+ yield closed.wait()
finally:
ws.close()
rs.close()
+ @gen_test
def test_read_until_max_bytes_inline(self):
- rs, ws = self.make_iostream_pair()
- rs.set_close_callback(lambda: self.stop("closed"))
+ rs, ws = yield self.make_iostream_pair()
+ closed = Event()
+ rs.set_close_callback(closed.set)
try:
# Similar to the error case in the previous test, but the
# ws writes first so rs reads are satisfied
# do not raise the error synchronously.
ws.write(b"123456")
with ExpectLog(gen_log, "Unsatisfiable read"):
- rs.read_until(b"def", self.stop, max_bytes=5)
- data = self.wait()
- self.assertEqual(data, "closed")
+ with ignore_deprecation():
+ rs.read_until(b"def", callback=lambda x: self.fail(), max_bytes=5)
+ yield closed.wait()
finally:
ws.close()
rs.close()
+ @gen_test
def test_read_until_max_bytes_ignores_extra(self):
- rs, ws = self.make_iostream_pair()
- rs.set_close_callback(lambda: self.stop("closed"))
+ rs, ws = yield self.make_iostream_pair()
+ closed = Event()
+ rs.set_close_callback(closed.set)
try:
# Even though data that matches arrives the same packet that
# puts us over the limit, we fail the request because it was not
# found within the limit.
ws.write(b"abcdef")
with ExpectLog(gen_log, "Unsatisfiable read"):
- rs.read_until(b"def", self.stop, max_bytes=5)
- data = self.wait()
- self.assertEqual(data, "closed")
+ rs.read_until(b"def", max_bytes=5)
+ yield closed.wait()
finally:
ws.close()
rs.close()
+ @gen_test
def test_read_until_regex_max_bytes(self):
- rs, ws = self.make_iostream_pair()
- rs.set_close_callback(lambda: self.stop("closed"))
+ rs, ws = yield self.make_iostream_pair()
+ closed = Event()
+ rs.set_close_callback(closed.set)
try:
# Extra room under the limit
- rs.read_until_regex(b"def", self.stop, max_bytes=50)
+ fut = rs.read_until_regex(b"def", max_bytes=50)
ws.write(b"abcdef")
- data = self.wait()
+ data = yield fut
self.assertEqual(data, b"abcdef")
# Just enough space
- rs.read_until_regex(b"def", self.stop, max_bytes=6)
+ fut = rs.read_until_regex(b"def", max_bytes=6)
ws.write(b"abcdef")
- data = self.wait()
+ data = yield fut
self.assertEqual(data, b"abcdef")
# Not enough space, but we don't know it until all we can do is
# log a warning and close the connection.
with ExpectLog(gen_log, "Unsatisfiable read"):
- rs.read_until_regex(b"def", self.stop, max_bytes=5)
+ rs.read_until_regex(b"def", max_bytes=5)
ws.write(b"123456")
- data = self.wait()
- self.assertEqual(data, "closed")
+ yield closed.wait()
finally:
ws.close()
rs.close()
+ @gen_test
def test_read_until_regex_max_bytes_inline(self):
- rs, ws = self.make_iostream_pair()
- rs.set_close_callback(lambda: self.stop("closed"))
+ rs, ws = yield self.make_iostream_pair()
+ closed = Event()
+ rs.set_close_callback(closed.set)
try:
# Similar to the error case in the previous test, but the
# ws writes first so rs reads are satisfied
# do not raise the error synchronously.
ws.write(b"123456")
with ExpectLog(gen_log, "Unsatisfiable read"):
- rs.read_until_regex(b"def", self.stop, max_bytes=5)
- data = self.wait()
- self.assertEqual(data, "closed")
+ rs.read_until_regex(b"def", max_bytes=5)
+ yield closed.wait()
finally:
ws.close()
rs.close()
+ @gen_test
def test_read_until_regex_max_bytes_ignores_extra(self):
- rs, ws = self.make_iostream_pair()
- rs.set_close_callback(lambda: self.stop("closed"))
+ rs, ws = yield self.make_iostream_pair()
+ closed = Event()
+ rs.set_close_callback(closed.set)
try:
# Even though data that matches arrives the same packet that
# puts us over the limit, we fail the request because it was not
# found within the limit.
ws.write(b"abcdef")
with ExpectLog(gen_log, "Unsatisfiable read"):
- rs.read_until_regex(b"def", self.stop, max_bytes=5)
- data = self.wait()
- self.assertEqual(data, "closed")
+ rs.read_until_regex(b"def", max_bytes=5)
+ yield closed.wait()
finally:
ws.close()
rs.close()
+ @gen_test
def test_small_reads_from_large_buffer(self):
# 10KB buffer size, 100KB available to read.
# Read 1KB at a time and make sure that the buffer is not eagerly
# filled.
- rs, ws = self.make_iostream_pair(max_buffer_size=10 * 1024)
+ rs, ws = yield self.make_iostream_pair(max_buffer_size=10 * 1024)
try:
ws.write(b"a" * 1024 * 100)
for i in range(100):
- rs.read_bytes(1024, self.stop)
- data = self.wait()
+ data = yield rs.read_bytes(1024)
self.assertEqual(data, b"a" * 1024)
finally:
ws.close()
rs.close()
+ @gen_test
def test_small_read_untils_from_large_buffer(self):
# 10KB buffer size, 100KB available to read.
# Read 1KB at a time and make sure that the buffer is not eagerly
# filled.
- rs, ws = self.make_iostream_pair(max_buffer_size=10 * 1024)
+ rs, ws = yield self.make_iostream_pair(max_buffer_size=10 * 1024)
try:
ws.write((b"a" * 1023 + b"\n") * 100)
for i in range(100):
- rs.read_until(b"\n", self.stop, max_bytes=4096)
- data = self.wait()
+ data = yield rs.read_until(b"\n", max_bytes=4096)
self.assertEqual(data, b"a" * 1023 + b"\n")
finally:
ws.close()
rs.close()
+ @gen_test
def test_flow_control(self):
MB = 1024 * 1024
- rs, ws = self.make_iostream_pair(max_buffer_size=5 * MB)
+ rs, ws = yield self.make_iostream_pair(max_buffer_size=5 * MB)
try:
# Client writes more than the rs will accept.
ws.write(b"a" * 10 * MB)
# The rs pauses while reading.
- rs.read_bytes(MB, self.stop)
- self.wait()
- self.io_loop.call_later(0.1, self.stop)
- self.wait()
+ yield rs.read_bytes(MB)
+ yield gen.sleep(0.1)
# The ws's writes have been blocked; the rs can
# continue to read gradually.
for i in range(9):
- rs.read_bytes(MB, self.stop)
- self.wait()
+ yield rs.read_bytes(MB)
finally:
rs.close()
ws.close()
+ @gen_test
def test_read_into(self):
- rs, ws = self.make_iostream_pair()
+ rs, ws = yield self.make_iostream_pair()
def sleep_some():
self.io_loop.run_sync(lambda: gen.sleep(0.05))
try:
buf = bytearray(10)
- rs.read_into(buf, callback=self.stop)
+ fut = rs.read_into(buf)
ws.write(b"hello")
- sleep_some()
+ yield gen.sleep(0.05)
self.assertTrue(rs.reading())
ws.write(b"world!!")
- data = self.wait()
+ data = yield fut
self.assertFalse(rs.reading())
self.assertEqual(data, 10)
self.assertEqual(bytes(buf), b"helloworld")
# Existing buffer is fed into user buffer
- rs.read_into(buf, callback=self.stop)
- sleep_some()
+ fut = rs.read_into(buf)
+ yield gen.sleep(0.05)
self.assertTrue(rs.reading())
ws.write(b"1234567890")
- data = self.wait()
+ data = yield fut
self.assertFalse(rs.reading())
self.assertEqual(data, 10)
self.assertEqual(bytes(buf), b"!!12345678")
# Existing buffer can satisfy read immediately
buf = bytearray(4)
ws.write(b"abcdefghi")
- rs.read_into(buf, callback=self.stop)
- data = self.wait()
+ data = yield rs.read_into(buf)
self.assertEqual(data, 4)
self.assertEqual(bytes(buf), b"90ab")
- rs.read_bytes(7, self.stop)
- data = self.wait()
+ data = yield rs.read_bytes(7)
self.assertEqual(data, b"cdefghi")
finally:
ws.close()
rs.close()
+ @gen_test
def test_read_into_partial(self):
- rs, ws = self.make_iostream_pair()
+ rs, ws = yield self.make_iostream_pair()
- def sleep_some():
- self.io_loop.run_sync(lambda: gen.sleep(0.05))
try:
# Partial read
buf = bytearray(10)
- rs.read_into(buf, callback=self.stop, partial=True)
+ fut = rs.read_into(buf, partial=True)
ws.write(b"hello")
- data = self.wait()
+ data = yield fut
self.assertFalse(rs.reading())
self.assertEqual(data, 5)
self.assertEqual(bytes(buf), b"hello\0\0\0\0\0")
# Full read despite partial=True
ws.write(b"world!1234567890")
- rs.read_into(buf, callback=self.stop, partial=True)
- data = self.wait()
+ data = yield rs.read_into(buf, partial=True)
self.assertEqual(data, 10)
self.assertEqual(bytes(buf), b"world!1234")
# Existing buffer can satisfy read immediately
- rs.read_into(buf, callback=self.stop, partial=True)
- data = self.wait()
+ data = yield rs.read_into(buf, partial=True)
self.assertEqual(data, 6)
self.assertEqual(bytes(buf), b"5678901234")
ws.close()
rs.close()
+ @gen_test
def test_read_into_zero_bytes(self):
- rs, ws = self.make_iostream_pair()
+ rs, ws = yield self.make_iostream_pair()
try:
buf = bytearray()
fut = rs.read_into(buf)
ws.close()
rs.close()
+ @gen_test
def test_many_mixed_reads(self):
# Stress buffer handling when going back and forth between
# read_bytes() (using an internal buffer) and read_into()
# (using a user-allocated buffer).
r = random.Random(42)
nbytes = 1000000
- rs, ws = self.make_iostream_pair()
+ rs, ws = yield self.make_iostream_pair()
produce_hash = hashlib.sha1()
consume_hash = hashlib.sha1()
remaining -= size
assert remaining == 0
- @gen.coroutine
- def main():
+ try:
yield [produce(), consume()]
assert produce_hash.hexdigest() == consume_hash.hexdigest()
-
- try:
- self.io_loop.run_sync(main)
finally:
ws.close()
rs.close()
def _make_client_iostream(self, connection, **kwargs):
raise NotImplementedError()
+ @gen.coroutine
def make_iostream_pair(self, **kwargs):
listener, port = bind_unused_port()
- streams = [None, None]
+ server_stream_fut = Future()
def accept_callback(connection, address):
- streams[0] = self._make_server_iostream(connection, **kwargs)
- self.stop()
+ server_stream_fut.set_result(self._make_server_iostream(connection, **kwargs))
- def connect_callback():
- streams[1] = client_stream
- self.stop()
netutil.add_accept_handler(listener, accept_callback)
client_stream = self._make_client_iostream(socket.socket(), **kwargs)
- client_stream.connect(('127.0.0.1', port),
- callback=connect_callback)
- self.wait(condition=lambda: all(streams))
+ connect_fut = client_stream.connect(('127.0.0.1', port))
+ server_stream, client_stream = yield [server_stream_fut, connect_fut]
self.io_loop.remove_handler(listener.fileno())
listener.close()
- return streams
+ raise gen.Return((server_stream, client_stream))
def test_connection_refused(self):
# When a connection is refused, the connect callback should not
self.wait()
self.assertIsInstance(stream.error, socket.gaierror)
+ @gen_test
def test_read_callback_error(self):
# Test that IOStream sets its exc_info when a read callback throws
- server, client = self.make_iostream_pair()
+ server, client = yield self.make_iostream_pair()
try:
- server.set_close_callback(self.stop)
+ closed = Event()
+ server.set_close_callback(closed.set)
with ExpectLog(
app_log, "(Uncaught exception|Exception in callback)"
):
# Clear ExceptionStackContext so IOStream catches error
with NullContext():
- server.read_bytes(1, callback=lambda data: 1 / 0)
+ with ignore_deprecation():
+ server.read_bytes(1, callback=lambda data: 1 / 0)
client.write(b"1")
- self.wait()
+ yield closed.wait()
self.assertTrue(isinstance(server.error, ZeroDivisionError))
finally:
server.close()
client.close()
@unittest.skipIf(mock is None, 'mock package not present')
+ @gen_test
def test_read_until_close_with_error(self):
- server, client = self.make_iostream_pair()
+ server, client = yield self.make_iostream_pair()
try:
with mock.patch('tornado.iostream.BaseIOStream._try_inline_read',
side_effect=IOError('boom')):
with self.assertRaisesRegexp(IOError, 'boom'):
- client.read_until_close(self.stop)
+ with ignore_deprecation():
+ client.read_until_close(lambda x: None)
finally:
server.close()
client.close()
@skipIfNonUnix
@skipPypy3V58
+ @gen_test
def test_inline_read_error(self):
# An error on an inline read is raised without logging (on the
# assumption that it will eventually be noticed or logged further
# on socket FDs, but we can't close the socket object normally
# because we won't get the error we want if the socket knows
# it's closed.
- server, client = self.make_iostream_pair()
+ server, client = yield self.make_iostream_pair()
try:
os.close(server.socket.fileno())
with self.assertRaises(socket.error):
- server.read_bytes(1, lambda data: None)
+ server.read_bytes(1)
finally:
server.close()
client.close()
@skipPypy3V58
+ @gen_test
def test_async_read_error_logging(self):
# Socket errors on asynchronous reads should be logged (but only
# once).
- server, client = self.make_iostream_pair()
- server.set_close_callback(self.stop)
+ server, client = yield self.make_iostream_pair()
+ closed = Event()
+ server.set_close_callback(closed.set)
try:
# Start a read that will be fulfilled asynchronously.
- server.read_bytes(1, lambda data: None)
+ with ignore_deprecation():
+ server.read_bytes(1, lambda data: None)
client.write(b'a')
# Stub out read_from_fd to make it fail.
server.read_from_fd = fake_read_from_fd
# This log message is from _handle_read (not read_from_fd).
with ExpectLog(gen_log, "error on read"):
- self.wait()
+ yield closed.wait()
finally:
server.close()
client.close()
+ @gen_test
def test_future_write(self):
"""
Test that write() Futures are never orphaned.
m, n = 10000, 1000
nproducers = 10
total_bytes = m * n * nproducers
- server, client = self.make_iostream_pair(max_buffer_size=total_bytes)
+ server, client = yield self.make_iostream_pair(max_buffer_size=total_bytes)
@gen.coroutine
def produce():
res = yield client.read_bytes(m)
nread += len(res)
- @gen.coroutine
- def main():
- yield [produce() for i in range(nproducers)] + [consume()]
-
try:
- self.io_loop.run_sync(main)
+ yield [produce() for i in range(nproducers)] + [consume()]
finally:
server.close()
client.close()
@skipIfNonUnix
class TestPipeIOStream(TestReadWriteMixin, AsyncTestCase):
+ @gen.coroutine
def make_iostream_pair(self, **kwargs):
r, w = os.pipe()
return PipeIOStream(r, **kwargs), PipeIOStream(w, **kwargs)
+ @gen_test
def test_pipe_iostream(self):
- rs, ws = self.make_iostream_pair()
+ rs, ws = yield self.make_iostream_pair()
ws.write(b"hel")
ws.write(b"lo world")
- rs.read_until(b' ', callback=self.stop)
- data = self.wait()
+ data = yield rs.read_until(b' ')
self.assertEqual(data, b"hello ")
- rs.read_bytes(3, self.stop)
- data = self.wait()
+ data = yield rs.read_bytes(3)
self.assertEqual(data, b"wor")
ws.close()
- rs.read_until_close(self.stop)
- data = self.wait()
+ data = yield rs.read_until_close()
self.assertEqual(data, b"ld")
rs.close()
+ @gen_test
def test_pipe_iostream_big_write(self):
- rs, ws = self.make_iostream_pair()
+ rs, ws = yield self.make_iostream_pair()
NUM_BYTES = 1048576
# Write 1MB of data, which should fill the buffer
ws.write(b"1" * NUM_BYTES)
- rs.read_bytes(NUM_BYTES, self.stop)
- data = self.wait()
+ data = yield rs.read_bytes(NUM_BYTES)
self.assertEqual(data, b"1" * NUM_BYTES)
ws.close()