]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
curl_httpclient,http1connection: Prohibit CR and LF in headers 3386/head
authorBen Darnell <ben@bendarnell.com>
Wed, 5 Jun 2024 19:43:45 +0000 (15:43 -0400)
committerBen Darnell <ben@bendarnell.com>
Wed, 5 Jun 2024 19:43:45 +0000 (15:43 -0400)
libcurl does not check for CR and LF in headers, making this the
application's responsibility. However, Tornado's other HTTP interfaces
check for linefeeds so we should do the same here so that switching
between the simple and curl http clients does not introduce header
injection vulnerabilties.

http1connection previously checked only for LF in headers (alone or in a
CRLF pair). It now prohibits bare CR as well, following the requirement
in RFC 9112.

tornado/curl_httpclient.py
tornado/http1connection.py
tornado/test/httpclient_test.py

index 23320e48224aec2287e506af816621a86ca437b5..397c3a9752703d6dfe142da68dfc9bce7147ecf5 100644 (file)
@@ -19,6 +19,7 @@ import collections
 import functools
 import logging
 import pycurl
+import re
 import threading
 import time
 from io import BytesIO
@@ -44,6 +45,8 @@ if typing.TYPE_CHECKING:
 
 curl_log = logging.getLogger("tornado.curl_httpclient")
 
+CR_OR_LF_RE = re.compile(b"\r|\n")
+
 
 class CurlAsyncHTTPClient(AsyncHTTPClient):
     def initialize(  # type: ignore
@@ -347,14 +350,15 @@ class CurlAsyncHTTPClient(AsyncHTTPClient):
         if "Pragma" not in request.headers:
             request.headers["Pragma"] = ""
 
-        curl.setopt(
-            pycurl.HTTPHEADER,
-            [
-                b"%s: %s"
-                % (native_str(k).encode("ASCII"), native_str(v).encode("ISO8859-1"))
-                for k, v in request.headers.get_all()
-            ],
-        )
+        encoded_headers = [
+            b"%s: %s"
+            % (native_str(k).encode("ASCII"), native_str(v).encode("ISO8859-1"))
+            for k, v in request.headers.get_all()
+        ]
+        for line in encoded_headers:
+            if CR_OR_LF_RE.search(line):
+                raise ValueError("Illegal characters in header (CR or LF): %r" % line)
+        curl.setopt(pycurl.HTTPHEADER, encoded_headers)
 
         curl.setopt(
             pycurl.HEADERFUNCTION,
index ca50e8ff556d36a87adacf2252297e7176ef7c88..3bdffac115b60a44096fa8baa98d19b5d07c3e0a 100644 (file)
@@ -38,6 +38,8 @@ from tornado.util import GzipDecompressor
 
 from typing import cast, Optional, Type, Awaitable, Callable, Union, Tuple
 
+CR_OR_LF_RE = re.compile(b"\r|\n")
+
 
 class _QuietException(Exception):
     def __init__(self) -> None:
@@ -453,8 +455,8 @@ class HTTP1Connection(httputil.HTTPConnection):
         )
         lines.extend(line.encode("latin1") for line in header_lines)
         for line in lines:
-            if b"\n" in line:
-                raise ValueError("Newline in header: " + repr(line))
+            if CR_OR_LF_RE.search(line):
+                raise ValueError("Illegal characters (CR or LF) in header: %r" % line)
         future = None
         if self.stream.closed():
             future = self._write_future = Future()
index 31a1916199ad705a96ebf02ea7846313631193e8..17291f8f2e8151a5a81e0b19886a791bb4092259 100644 (file)
@@ -725,6 +725,22 @@ X-XSS-Protection: 1;
                 if el.logged_stack:
                     break
 
+    def test_header_crlf(self):
+        # Ensure that the client doesn't allow CRLF injection in headers. RFC 9112 section 2.2
+        # prohibits a bare CR specifically and "a recipient MAY recognize a single LF as a line
+        # terminator" so we check each character separately as well as the (redundant) CRLF pair.
+        for header, name in [
+            ("foo\rbar:", "cr"),
+            ("foo\nbar:", "lf"),
+            ("foo\r\nbar:", "crlf"),
+        ]:
+            with self.subTest(name=name, position="value"):
+                with self.assertRaises(ValueError):
+                    self.fetch("/hello", headers={"foo": header})
+            with self.subTest(name=name, position="key"):
+                with self.assertRaises(ValueError):
+                    self.fetch("/hello", headers={header: "foo"})
+
 
 class RequestProxyTest(unittest.TestCase):
     def test_request_set(self):