From bc717035f3972cde303e752c586a41941271fe98 Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Tue, 26 May 2026 21:30:28 -0400 Subject: [PATCH] simple_httpclient: Strip auth headers on cross-origin redirects When following a redirect to a different origin (scheme, host, or port), auth-related headers (Authorization and Cookie) should be stripped to avoid exposing them to the new host. --- tornado/simple_httpclient.py | 38 +++++++++++++++++++- tornado/test/httpclient_test.py | 61 ++++++++++++++++++++++++++++++++- 2 files changed, 97 insertions(+), 2 deletions(-) diff --git a/tornado/simple_httpclient.py b/tornado/simple_httpclient.py index f878a584..b8e4d8c9 100644 --- a/tornado/simple_httpclient.py +++ b/tornado/simple_httpclient.py @@ -624,6 +624,42 @@ class _HTTPConnection(httputil.HTTPMessageDelegate): new_request.url = urllib.parse.urljoin( self.request.url, self.headers["Location"] ) + new_request.headers = self.request.headers.copy() + parsed_orig_url = urllib.parse.urlsplit(original_request.url) + parsed_new_url = urllib.parse.urlsplit(new_request.url) + if ( + parsed_orig_url.scheme != parsed_new_url.scheme + or parsed_orig_url.netloc != parsed_new_url.netloc + ): + # Cross-origin redirect: strip auth headers. + # Note that while there is no formal specification of headers that should be + # stripped here, libcurl strips the Authorization and Cookie headers, so we + # do the same. + # Reference: + # https://github.com/curl/curl/blob/01d8191b25a05e8fa91553a6c0d48acb99907d26/lib/http.c#L1827-L1828 + # + # Note that checking for cross-origin redirects is a crude heuristic. It is both + # too weak (e.g. cookies that have a path attribute may need to be stripped even on + # same-origin redirects) and too strong (e.g. cookies may be kept on cross-host + # redirects within the same domain). However, we cannot know the full details of + # the cookie policy at this layer, so we use the same heuristic as libcurl. + # Applications that need more control over behavior on redirects can set + # follow_redirects=False and handle 3xx responses themselves. + new_request.auth_username = None + new_request.auth_password = None + if "@" in parsed_new_url.netloc: + if parsed_new_url.port is not None: + new_netloc = f"{parsed_new_url.hostname}:{parsed_new_url.port}" + else: + assert parsed_new_url.hostname is not None + new_netloc = parsed_new_url.hostname + parsed_new_url = parsed_new_url._replace(netloc=new_netloc) + new_request.url = urllib.parse.urlunsplit(parsed_new_url) + for h in ["Authorization", "Cookie"]: + try: + del new_request.headers[h] + except KeyError: + pass assert self.request.max_redirects is not None new_request.max_redirects = self.request.max_redirects - 1 del new_request.headers["Host"] @@ -648,7 +684,7 @@ class _HTTPConnection(httputil.HTTPMessageDelegate): "Transfer-Encoding", ]: try: - del self.request.headers[h] + del new_request.headers[h] except KeyError: pass new_request.original_request = original_request # type: ignore diff --git a/tornado/test/httpclient_test.py b/tornado/test/httpclient_test.py index c7fd735d..64e5cc5f 100644 --- a/tornado/test/httpclient_test.py +++ b/tornado/test/httpclient_test.py @@ -12,8 +12,8 @@ import unittest from contextlib import closing from io import BytesIO +from tornado.escape import utf8, native_str, to_unicode, json_encode, json_decode from tornado import gen, netutil -from tornado.escape import native_str, to_unicode, utf8 from tornado.httpclient import ( HTTPClient, HTTPError, @@ -154,6 +154,11 @@ class HeaderEncodingHandler(RequestHandler): self.finish(self.request.headers["Foo"].encode("ISO8859-1")) +class EchoHeadersHandler(RequestHandler): + def get(self): + self.write(json_encode(dict(self.request.headers.get_all()))) + + # These tests end up getting run redundantly: once here with the default # HTTPClient implementation, and then again in each implementation's own # test suite. @@ -179,10 +184,23 @@ class HTTPClientCommonTestCase(AsyncHTTPTestCase): url("/set_header", SetHeaderHandler), url("/invalid_gzip", InvalidGzipHandler), url("/header-encoding", HeaderEncodingHandler), + url("/echo_headers", EchoHeadersHandler), ], gzip=True, ) + def setUp(self): + super().setUp() + + # Add a second port (serving the same app) to the HTTP server, so we can test the effects + # of redirects that span different origins. + sock, port = bind_unused_port() + self.http_server.add_socket(sock) + self.__port2 = port + + def get_url2(self, path: str) -> str: + return f"{self.get_protocol()}://127.0.0.1:{self.__port2}{path}" + def test_patch_receives_payload(self): body = b"some patch data" response = self.fetch("/patch", method="PATCH", body=body) @@ -752,6 +770,47 @@ X-XSS-Protection: 1; with self.assertRaises(ValueError): self.fetch("/hello", headers={header: "foo"}) + def test_strip_headers_on_redirect(self): + # Ensure that headers that should be stripped on cross-origin redirects + # are stripped, even if the redirect is to a different port on localhost. + test_cases: list[tuple[str, dict, str]] = [ + ("manual auth header", dict(headers={"Authorization": "secret"}), ""), + ("credentials in URL", dict(), "me:secret"), + ("auth parameters", dict(auth_username="me", auth_password="secret"), ""), + ("manual cookie header", dict(headers={"Cookie": "secret"}), ""), + ] + for name, kwargs, url_creds in test_cases: + with self.subTest(name=name, origin="different"): + url = self.get_url( + "/redirect?url=%s&status=302" % self.get_url2("/echo_headers") + ) + if url_creds: + url = url.replace("http://", "http://%s@" % url_creds) + response = self.fetch(**dict(path=url) | kwargs) + response.rethrow() + echoed_headers = json_decode(response.body) + # Confirm that non-auth headers are getting through + self.assertIn("User-Agent", echoed_headers) + # Auth headers are stripped, however they were set. + self.assertNotIn("Authorization", echoed_headers) + self.assertNotIn("Cookie", echoed_headers) + with self.subTest(name=name, origin="same"): + url = self.get_url( + "/redirect?url=%s&status=302" % self.get_url("/echo_headers") + ) + if url_creds: + url = url.replace("http://", "http://%s@" % url_creds) + response = self.fetch(**dict(path=url) | kwargs) + response.rethrow() + echoed_headers = json_decode(response.body) + # Confirm that non-auth headers are getting through + self.assertIn("User-Agent", echoed_headers) + # Auth headers are not stripped when the redirect is same-origin. + # Each of our tests uses one of these headers, but not both. + self.assertTrue( + "Authorization" in echoed_headers or "Cookie" in echoed_headers + ) + class RequestProxyTest(unittest.TestCase): def test_request_set(self): -- 2.47.3