StreamClosedError,
_StreamBuffer,
)
-from tornado.httpclient import AsyncHTTPClient
+from tornado.httpclient import AsyncHTTPClient, HTTPResponse
from tornado.httputil import HTTPHeaders
from tornado.locks import Condition, Event
from tornado.log import gen_log
refusing_port,
skipPypy3V58,
ignore_deprecation,
+ abstract_base_test,
)
from tornado.web import RequestHandler, Application
import asyncio
self.write("Hello")
-class TestIOStreamWebMixin:
+@abstract_base_test
+class TestIOStreamWebMixin(AsyncTestCase):
+ # We want to run these tests with both AsyncHTTPTestCase and AsyncHTTPSTestCase,
+ # but this leads to some tricky inheritance situations. We want this class's
+ # get_app, but the test classes's get_http_port and fetch. There's no way to make
+ # the method resolution order to do what we want in all cases, so the current
+ # state is that that AsyncHTTP(S)TestCase must be the first base class of the
+ # final class, and that class must define a get_app method that calls mixin_get_app.
+ #
+ # Alternatives include defining this class in a factory that can change the base class
+ # or refactoring to use composition instead of inheritance for the http components.
def _make_client_iostream(self):
raise NotImplementedError()
- def get_app(self):
+ def mixin_get_app(self):
return Application([("/", HelloHandler)])
- def test_connection_closed(self: typing.Any):
+ def get_http_port(self) -> int:
+ raise NotImplementedError()
+
+ def fetch(
+ self, path: str, raise_error: bool = False, **kwargs: typing.Any
+ ) -> HTTPResponse:
+ # To be filled in by mixing in AsyncHTTPTestCase or AsyncHTTPSTestCase
+ raise NotImplementedError()
+
+ def test_connection_closed(self):
# When a server sends a response and then closes the connection,
# the client must be allowed to read the data before the IOStream
# closes itself. Epoll reports closed connections with a separate
response.rethrow()
@gen_test
- def test_read_until_close(self: typing.Any):
+ def test_read_until_close(self):
stream = self._make_client_iostream()
yield stream.connect(("127.0.0.1", self.get_http_port()))
stream.write(b"GET / HTTP/1.0\r\n\r\n")
self.assertTrue(data.endswith(b"Hello"))
@gen_test
- def test_read_zero_bytes(self: typing.Any):
+ def test_read_zero_bytes(self):
self.stream = self._make_client_iostream()
yield self.stream.connect(("127.0.0.1", self.get_http_port()))
self.stream.write(b"GET / HTTP/1.0\r\n\r\n")
self.stream.close()
@gen_test
- def test_write_while_connecting(self: typing.Any):
+ def test_write_while_connecting(self):
stream = self._make_client_iostream()
connect_fut = stream.connect(("127.0.0.1", self.get_http_port()))
# unlike the previous tests, try to write before the connection
stream.close()
@gen_test
- def test_future_interface(self: typing.Any):
+ def test_future_interface(self):
"""Basic test of IOStream's ability to return Futures."""
stream = self._make_client_iostream()
connect_result = yield stream.connect(("127.0.0.1", self.get_http_port()))
stream.close()
@gen_test
- def test_future_close_while_reading(self: typing.Any):
+ def test_future_close_while_reading(self):
stream = self._make_client_iostream()
yield stream.connect(("127.0.0.1", self.get_http_port()))
yield stream.write(b"GET / HTTP/1.0\r\n\r\n")
stream.close()
@gen_test
- def test_future_read_until_close(self: typing.Any):
+ def test_future_read_until_close(self):
# Ensure that the data comes through before the StreamClosedError.
stream = self._make_client_iostream()
yield stream.connect(("127.0.0.1", self.get_http_port()))
stream.read_bytes(1)
-class TestReadWriteMixin:
+@abstract_base_test
+class TestReadWriteMixin(AsyncTestCase):
# Tests where one stream reads and the other writes.
# These should work for BaseIOStream implementations.
rs.close()
@gen_test
- def test_future_delayed_close_callback(self: typing.Any):
+ def test_future_delayed_close_callback(self):
# Same as test_delayed_close_callback, but with the future interface.
rs, ws = yield self.make_iostream_pair()
rs.close()
@gen_test
- def test_close_buffered_data(self: typing.Any):
+ def test_close_buffered_data(self):
# Similar to the previous test, but with data stored in the OS's
# socket buffers instead of the IOStream's read buffer. Out-of-band
# close notifications must be delayed until all data has been
rs.close()
@gen_test
- def test_read_until_close_after_close(self: typing.Any):
+ def test_read_until_close_after_close(self):
# Similar to test_delayed_close_callback, but read_until_close takes
# a separate code path so test it separately.
rs, ws = yield self.make_iostream_pair()
rs.close()
@gen_test
- def test_large_read_until(self: typing.Any):
+ def test_large_read_until(self):
# Performance test: read_until used to have a quadratic component
# so a read_until of 4MB would take 8 seconds; now it takes 0.25
# seconds.
await rf
@gen_test
- async def test_read_until_unsatisfied_after_close(self: typing.Any):
+ async def test_read_until_unsatisfied_after_close(self):
# If a stream is closed while reading, it raises
# StreamClosedError instead of UnsatisfiableReadError (the
# latter should only be raised when byte limits are reached).
await rf
@gen_test
- def test_close_callback_with_pending_read(self: typing.Any):
+ def test_close_callback_with_pending_read(self):
# Regression test for a bug that was introduced in 2.3
# where the IOStream._close_callback would never be called
# if there were pending reads.
rs.close()
@gen_test
- def test_future_close_callback(self: typing.Any):
+ def test_future_close_callback(self):
# Regression test for interaction between the Future read interfaces
# and IOStream._maybe_add_error_listener.
rs, ws = yield self.make_iostream_pair()
ws.close()
@gen_test
- def test_write_memoryview(self: typing.Any):
+ def test_write_memoryview(self):
rs, ws = yield self.make_iostream_pair()
try:
fut = rs.read_bytes(4)
rs.close()
@gen_test
- def test_read_bytes_partial(self: typing.Any):
+ def test_read_bytes_partial(self):
rs, ws = yield self.make_iostream_pair()
try:
# Ask for more than is available with partial=True
rs.close()
@gen_test
- def test_read_until_max_bytes(self: typing.Any):
+ def test_read_until_max_bytes(self):
rs, ws = yield self.make_iostream_pair()
closed = Event()
rs.set_close_callback(closed.set)
rs.close()
@gen_test
- def test_read_until_max_bytes_inline(self: typing.Any):
+ def test_read_until_max_bytes_inline(self):
rs, ws = yield self.make_iostream_pair()
closed = Event()
rs.set_close_callback(closed.set)
rs.close()
@gen_test
- def test_read_until_max_bytes_ignores_extra(self: typing.Any):
+ def test_read_until_max_bytes_ignores_extra(self):
rs, ws = yield self.make_iostream_pair()
closed = Event()
rs.set_close_callback(closed.set)
rs.close()
@gen_test
- def test_read_until_regex_max_bytes(self: typing.Any):
+ def test_read_until_regex_max_bytes(self):
rs, ws = yield self.make_iostream_pair()
closed = Event()
rs.set_close_callback(closed.set)
rs.close()
@gen_test
- def test_read_until_regex_max_bytes_inline(self: typing.Any):
+ def test_read_until_regex_max_bytes_inline(self):
rs, ws = yield self.make_iostream_pair()
closed = Event()
rs.set_close_callback(closed.set)
rs.close()
@gen_test
- def test_small_reads_from_large_buffer(self: typing.Any):
+ def test_small_reads_from_large_buffer(self):
# 10KB buffer size, 100KB available to read.
# Read 1KB at a time and make sure that the buffer is not eagerly
# filled.
rs.close()
@gen_test
- def test_small_read_untils_from_large_buffer(self: typing.Any):
+ def test_small_read_untils_from_large_buffer(self):
# 10KB buffer size, 100KB available to read.
# Read 1KB at a time and make sure that the buffer is not eagerly
# filled.
ws.close()
@gen_test
- def test_read_into(self: typing.Any):
+ def test_read_into(self):
rs, ws = yield self.make_iostream_pair()
def sleep_some():
rs.close()
@gen_test
- def test_read_into_partial(self: typing.Any):
+ def test_read_into_partial(self):
rs, ws = yield self.make_iostream_pair()
try:
rs.close()
@gen_test
- def test_read_into_zero_bytes(self: typing.Any):
+ def test_read_into_zero_bytes(self):
rs, ws = yield self.make_iostream_pair()
try:
buf = bytearray()
rs.close()
+@abstract_base_test
class TestIOStreamMixin(TestReadWriteMixin):
def _make_server_iostream(self, connection, **kwargs):
raise NotImplementedError()
raise NotImplementedError()
@gen.coroutine
- def make_iostream_pair(self: typing.Any, **kwargs):
+ def make_iostream_pair(self, **kwargs):
listener, port = bind_unused_port()
server_stream_fut = Future() # type: Future[IOStream]
raise gen.Return((server_stream, client_stream))
@gen_test
- def test_connection_refused(self: typing.Any):
+ def test_connection_refused(self):
# When a connection is refused, the connect callback should not
# be run. (The kqueue IOLoop used to behave differently from the
# epoll IOLoop in this respect)
self.assertTrue(isinstance(stream.error, ConnectionRefusedError), stream.error)
@gen_test
- def test_gaierror(self: typing.Any):
+ def test_gaierror(self):
# Test that IOStream sets its exc_info on getaddrinfo error.
# It's difficult to reliably trigger a getaddrinfo error;
# some resolvers own't even return errors for malformed names,
self.assertTrue(isinstance(stream.error, socket.gaierror))
@gen_test
- def test_read_until_close_with_error(self: typing.Any):
+ def test_read_until_close_with_error(self):
server, client = yield self.make_iostream_pair()
try:
with mock.patch(
@skipIfNonUnix
@skipPypy3V58
@gen_test
- def test_inline_read_error(self: typing.Any):
+ def test_inline_read_error(self):
# An error on an inline read is raised without logging (on the
# assumption that it will eventually be noticed or logged further
# up the stack).
client.close()
-class TestIOStreamWebHTTP(TestIOStreamWebMixin, AsyncHTTPTestCase):
+class TestIOStreamWebHTTP(AsyncHTTPTestCase, TestIOStreamWebMixin):
def _make_client_iostream(self):
return IOStream(socket.socket())
+ def get_app(self):
+ return self.mixin_get_app()
+
-class TestIOStreamWebHTTPS(TestIOStreamWebMixin, AsyncHTTPSTestCase):
+class TestIOStreamWebHTTPS(AsyncHTTPSTestCase, TestIOStreamWebMixin):
def _make_client_iostream(self):
return SSLIOStream(socket.socket(), ssl_options=dict(cert_reqs=ssl.CERT_NONE))
+ def get_app(self):
+ return self.mixin_get_app()
+
-class TestIOStream(TestIOStreamMixin, AsyncTestCase):
+class TestIOStream(TestIOStreamMixin):
def _make_server_iostream(self, connection, **kwargs):
return IOStream(connection, **kwargs)
return IOStream(connection, **kwargs)
-class TestIOStreamSSL(TestIOStreamMixin, AsyncTestCase):
+class TestIOStreamSSL(TestIOStreamMixin):
def _make_server_iostream(self, connection, **kwargs):
ssl_ctx = ssl_options_to_context(_server_ssl_options(), server_side=True)
connection = ssl_ctx.wrap_socket(
# This will run some tests that are basically redundant but it's the
# simplest way to make sure that it works to pass an SSLContext
# instead of an ssl_options dict to the SSLIOStream constructor.
-class TestIOStreamSSLContext(TestIOStreamMixin, AsyncTestCase):
+class TestIOStreamSSLContext(TestIOStreamMixin):
def _make_server_iostream(self, connection, **kwargs):
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain(