new_request.url = urllib.parse.urljoin(
self.request.url, self.headers["Location"]
)
+ new_request.headers = self.request.headers.copy()
+ parsed_orig_url = urllib.parse.urlsplit(original_request.url)
+ parsed_new_url = urllib.parse.urlsplit(new_request.url)
+ if (
+ parsed_orig_url.scheme != parsed_new_url.scheme
+ or parsed_orig_url.netloc != parsed_new_url.netloc
+ ):
+ # Cross-origin redirect: strip auth headers.
+ # Note that while there is no formal specification of headers that should be
+ # stripped here, libcurl strips the Authorization and Cookie headers, so we
+ # do the same.
+ # Reference:
+ # https://github.com/curl/curl/blob/01d8191b25a05e8fa91553a6c0d48acb99907d26/lib/http.c#L1827-L1828
+ #
+ # Note that checking for cross-origin redirects is a crude heuristic. It is both
+ # too weak (e.g. cookies that have a path attribute may need to be stripped even on
+ # same-origin redirects) and too strong (e.g. cookies may be kept on cross-host
+ # redirects within the same domain). However, we cannot know the full details of
+ # the cookie policy at this layer, so we use the same heuristic as libcurl.
+ # Applications that need more control over behavior on redirects can set
+ # follow_redirects=False and handle 3xx responses themselves.
+ new_request.auth_username = None
+ new_request.auth_password = None
+ if "@" in parsed_new_url.netloc:
+ if parsed_new_url.port is not None:
+ new_netloc = f"{parsed_new_url.hostname}:{parsed_new_url.port}"
+ else:
+ assert parsed_new_url.hostname is not None
+ new_netloc = parsed_new_url.hostname
+ parsed_new_url = parsed_new_url._replace(netloc=new_netloc)
+ new_request.url = urllib.parse.urlunsplit(parsed_new_url)
+ for h in ["Authorization", "Cookie"]:
+ try:
+ del new_request.headers[h]
+ except KeyError:
+ pass
assert self.request.max_redirects is not None
new_request.max_redirects = self.request.max_redirects - 1
del new_request.headers["Host"]
"Transfer-Encoding",
]:
try:
- del self.request.headers[h]
+ del new_request.headers[h]
except KeyError:
pass
new_request.original_request = original_request # type: ignore
from contextlib import closing
from io import BytesIO
+from tornado.escape import utf8, native_str, to_unicode, json_encode, json_decode
from tornado import gen, netutil
-from tornado.escape import native_str, to_unicode, utf8
from tornado.httpclient import (
HTTPClient,
HTTPError,
self.finish(self.request.headers["Foo"].encode("ISO8859-1"))
+class EchoHeadersHandler(RequestHandler):
+ def get(self):
+ self.write(json_encode(dict(self.request.headers.get_all())))
+
+
# These tests end up getting run redundantly: once here with the default
# HTTPClient implementation, and then again in each implementation's own
# test suite.
url("/set_header", SetHeaderHandler),
url("/invalid_gzip", InvalidGzipHandler),
url("/header-encoding", HeaderEncodingHandler),
+ url("/echo_headers", EchoHeadersHandler),
],
gzip=True,
)
+ def setUp(self):
+ super().setUp()
+
+ # Add a second port (serving the same app) to the HTTP server, so we can test the effects
+ # of redirects that span different origins.
+ sock, port = bind_unused_port()
+ self.http_server.add_socket(sock)
+ self.__port2 = port
+
+ def get_url2(self, path: str) -> str:
+ return f"{self.get_protocol()}://127.0.0.1:{self.__port2}{path}"
+
def test_patch_receives_payload(self):
body = b"some patch data"
response = self.fetch("/patch", method="PATCH", body=body)
with self.assertRaises(ValueError):
self.fetch("/hello", headers={header: "foo"})
+ def test_strip_headers_on_redirect(self):
+ # Ensure that headers that should be stripped on cross-origin redirects
+ # are stripped, even if the redirect is to a different port on localhost.
+ test_cases: list[tuple[str, dict, str]] = [
+ ("manual auth header", dict(headers={"Authorization": "secret"}), ""),
+ ("credentials in URL", dict(), "me:secret"),
+ ("auth parameters", dict(auth_username="me", auth_password="secret"), ""),
+ ("manual cookie header", dict(headers={"Cookie": "secret"}), ""),
+ ]
+ for name, kwargs, url_creds in test_cases:
+ with self.subTest(name=name, origin="different"):
+ url = self.get_url(
+ "/redirect?url=%s&status=302" % self.get_url2("/echo_headers")
+ )
+ if url_creds:
+ url = url.replace("http://", "http://%s@" % url_creds)
+ response = self.fetch(**dict(path=url) | kwargs)
+ response.rethrow()
+ echoed_headers = json_decode(response.body)
+ # Confirm that non-auth headers are getting through
+ self.assertIn("User-Agent", echoed_headers)
+ # Auth headers are stripped, however they were set.
+ self.assertNotIn("Authorization", echoed_headers)
+ self.assertNotIn("Cookie", echoed_headers)
+ with self.subTest(name=name, origin="same"):
+ url = self.get_url(
+ "/redirect?url=%s&status=302" % self.get_url("/echo_headers")
+ )
+ if url_creds:
+ url = url.replace("http://", "http://%s@" % url_creds)
+ response = self.fetch(**dict(path=url) | kwargs)
+ response.rethrow()
+ echoed_headers = json_decode(response.body)
+ # Confirm that non-auth headers are getting through
+ self.assertIn("User-Agent", echoed_headers)
+ # Auth headers are not stripped when the redirect is same-origin.
+ # Each of our tests uses one of these headers, but not both.
+ self.assertTrue(
+ "Authorization" in echoed_headers or "Cookie" in echoed_headers
+ )
+
class RequestProxyTest(unittest.TestCase):
def test_request_set(self):