import base64
import binascii
-import gzip
-import socket
-from contextlib import closing
from tornado.escape import utf8
from tornado.httpclient import AsyncHTTPClient
-from tornado.testing import AsyncHTTPTestCase, LogTrapTestCase, get_unused_port
+from tornado.testing import AsyncHTTPTestCase, LogTrapTestCase
from tornado.util import b, bytes_type
-from tornado.web import Application, RequestHandler, asynchronous, url
+from tornado.web import Application, RequestHandler, url
class HelloWorldHandler(RequestHandler):
def get(self):
def get(self):
self.finish(self.request.headers["Authorization"])
-class HangHandler(RequestHandler):
- @asynchronous
- def get(self):
- pass
-
class CountdownHandler(RequestHandler):
def get(self, count):
count = int(count)
url("/post", PostHandler),
url("/chunk", ChunkHandler),
url("/auth", AuthHandler),
- url("/hang", HangHandler),
url("/countdown/([0-9]+)", CountdownHandler, name="countdown"),
url("/echopost", EchoPostHandler),
], gzip=True)
auth_password="open sesame").body,
b("Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ=="))
- def test_gzip(self):
- # All the tests in this file should be using gzip, but this test
- # ensures that it is in fact getting compressed.
- # Setting Accept-Encoding manually bypasses the client's
- # decompression so we can see the raw data.
- response = self.fetch("/chunk", use_gzip=False,
- headers={"Accept-Encoding": "gzip"})
- self.assertEqual(response.headers["Content-Encoding"], "gzip")
- self.assertNotEqual(response.body, b("asdfqwer"))
- # Our test data gets bigger when gzipped. Oops. :)
- self.assertEqual(len(response.body), 34)
- f = gzip.GzipFile(mode="r", fileobj=response.buffer)
- self.assertEqual(f.read(), b("asdfqwer"))
-
- def test_connect_timeout(self):
- # create a socket and bind it to a port, but don't
- # call accept so the connection will timeout.
- #get_unused_port()
- port = get_unused_port()
-
- with closing(socket.socket()) as sock:
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- sock.bind(('127.0.0.1', port))
- sock.listen(1)
- self.http_client.fetch("http://localhost:%d/" % port,
- self.stop,
- connect_timeout=0.1)
- response = self.wait()
- self.assertEqual(response.code, 599)
- self.assertEqual(str(response.error), "HTTP 599: Timeout")
-
- def test_request_timeout(self):
- response = self.fetch('/hang', request_timeout=0.1)
- self.assertEqual(response.code, 599)
- self.assertEqual(str(response.error), "HTTP 599: Timeout")
-
def test_follow_redirect(self):
response = self.fetch("/countdown/2", follow_redirects=False)
self.assertEqual(302, response.code)
self.assertTrue(response.effective_url.endswith("/countdown/0"))
self.assertEqual(b("Zero"), response.body)
- def test_max_redirects(self):
- response = self.fetch("/countdown/5", max_redirects=3)
- self.assertEqual(302, response.code)
- # We requested 5, followed three redirects for 4, 3, 2, then the last
- # unfollowed redirect is to 1.
- self.assertTrue(response.request.url.endswith("/countdown/5"))
- self.assertTrue(response.effective_url.endswith("/countdown/2"))
- self.assertTrue(response.headers["Location"].endswith("/countdown/1"))
-
def test_credentials_in_url(self):
url = self.get_url("/auth").replace("http://", "http://me:secret@")
self.http_client.fetch(url, self.stop)
self.assertEqual(response.headers["Content-Length"], "1")
self.assertEqual(response.body, byte_body)
- def test_ipv6(self):
- try:
- self.http_server.listen(self.get_http_port(), address='::1')
- except socket.gaierror, e:
- if e.errno == socket.EAI_ADDRFAMILY:
- # ipv6 is not configured on this system, so skip this test
- return
- raise
- url = self.get_url("/hello").replace("localhost", "[::1]")
-
- # ipv6 is currently disabled by default and must be explicitly requested
- self.http_client.fetch(url, self.stop)
- response = self.wait()
- self.assertEqual(response.code, 599)
-
- self.http_client.fetch(url, self.stop, allow_ipv6=True)
- response = self.wait()
- self.assertEqual(response.body, b("Hello world!"))
-
def test_types(self):
response = self.fetch("/hello")
self.assertEqual(type(response.body), bytes_type)
+from __future__ import with_statement
+
import collections
+import gzip
import logging
+import socket
+from contextlib import closing
from tornado.ioloop import IOLoop
from tornado.simple_httpclient import SimpleAsyncHTTPClient, _DEFAULT_CA_CERTS
-from tornado.test.httpclient_test import HTTPClientCommonTestCase
-from tornado.testing import AsyncHTTPTestCase, LogTrapTestCase
+from tornado.test.httpclient_test import HTTPClientCommonTestCase, ChunkHandler, CountdownHandler, HelloWorldHandler
+from tornado.testing import AsyncHTTPTestCase, LogTrapTestCase, get_unused_port
+from tornado.util import b
from tornado.web import RequestHandler, Application, asynchronous, url
class SimpleHTTPClientCommonTestCase(HTTPClientCommonTestCase):
self.queue.append(self.finish)
self.wake_callback()
+class HangHandler(RequestHandler):
+ @asynchronous
+ def get(self):
+ pass
+
class SimpleHTTPClientTestCase(AsyncHTTPTestCase, LogTrapTestCase):
def get_app(self):
# callable objects to finish pending /trigger requests
return Application([
url("/trigger", TriggerHandler, dict(queue=self.triggers,
wake_callback=self.stop)),
- ])
+ url("/chunk", ChunkHandler),
+ url("/countdown/([0-9]+)", CountdownHandler, name="countdown"),
+ url("/hang", HangHandler),
+ url("/hello", HelloWorldHandler),
+ ], gzip=True)
def test_singleton(self):
# Class "constructor" reuses objects on the same IOLoop
def test_default_certificates_exist(self):
open(_DEFAULT_CA_CERTS).close()
+ def test_gzip(self):
+ # All the tests in this file should be using gzip, but this test
+ # ensures that it is in fact getting compressed.
+ # Setting Accept-Encoding manually bypasses the client's
+ # decompression so we can see the raw data.
+ response = self.fetch("/chunk", use_gzip=False,
+ headers={"Accept-Encoding": "gzip"})
+ self.assertEqual(response.headers["Content-Encoding"], "gzip")
+ self.assertNotEqual(response.body, b("asdfqwer"))
+ # Our test data gets bigger when gzipped. Oops. :)
+ self.assertEqual(len(response.body), 34)
+ f = gzip.GzipFile(mode="r", fileobj=response.buffer)
+ self.assertEqual(f.read(), b("asdfqwer"))
+
+ def test_max_redirects(self):
+ response = self.fetch("/countdown/5", max_redirects=3)
+ self.assertEqual(302, response.code)
+ # We requested 5, followed three redirects for 4, 3, 2, then the last
+ # unfollowed redirect is to 1.
+ self.assertTrue(response.request.url.endswith("/countdown/5"))
+ self.assertTrue(response.effective_url.endswith("/countdown/2"))
+ self.assertTrue(response.headers["Location"].endswith("/countdown/1"))
+
+ def test_connect_timeout(self):
+ # create a socket and bind it to a port, but don't
+ # call accept so the connection will timeout.
+ #get_unused_port()
+ port = get_unused_port()
+
+ with closing(socket.socket()) as sock:
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind(('127.0.0.1', port))
+ sock.listen(1)
+ self.http_client.fetch("http://localhost:%d/" % port,
+ self.stop,
+ connect_timeout=0.1)
+ response = self.wait()
+ self.assertEqual(response.code, 599)
+ self.assertEqual(str(response.error), "HTTP 599: Timeout")
+
+ def test_request_timeout(self):
+ response = self.fetch('/hang', request_timeout=0.1)
+ self.assertEqual(response.code, 599)
+ self.assertEqual(str(response.error), "HTTP 599: Timeout")
+
+ def test_ipv6(self):
+ try:
+ self.http_server.listen(self.get_http_port(), address='::1')
+ except socket.gaierror, e:
+ if e.errno == socket.EAI_ADDRFAMILY:
+ # ipv6 is not configured on this system, so skip this test
+ return
+ raise
+ url = self.get_url("/hello").replace("localhost", "[::1]")
+
+ # ipv6 is currently disabled by default and must be explicitly requested
+ self.http_client.fetch(url, self.stop)
+ response = self.wait()
+ self.assertEqual(response.code, 599)
+
+ self.http_client.fetch(url, self.stop, allow_ipv6=True)
+ response = self.wait()
+ self.assertEqual(response.body, b("Hello world!"))
+