import functools
import logging
import pycurl
+import re
import threading
import time
from io import BytesIO
curl_log = logging.getLogger("tornado.curl_httpclient")
+CR_OR_LF_RE = re.compile(b"\r|\n")
+
class CurlAsyncHTTPClient(AsyncHTTPClient):
def initialize( # type: ignore
if "Pragma" not in request.headers:
request.headers["Pragma"] = ""
- curl.setopt(
- pycurl.HTTPHEADER,
- [
- b"%s: %s"
- % (native_str(k).encode("ASCII"), native_str(v).encode("ISO8859-1"))
- for k, v in request.headers.get_all()
- ],
- )
+ encoded_headers = [
+ b"%s: %s"
+ % (native_str(k).encode("ASCII"), native_str(v).encode("ISO8859-1"))
+ for k, v in request.headers.get_all()
+ ]
+ for line in encoded_headers:
+ if CR_OR_LF_RE.search(line):
+ raise ValueError("Illegal characters in header (CR or LF): %r" % line)
+ curl.setopt(pycurl.HTTPHEADER, encoded_headers)
curl.setopt(
pycurl.HEADERFUNCTION,
from typing import cast, Optional, Type, Awaitable, Callable, Union, Tuple
+CR_OR_LF_RE = re.compile(b"\r|\n")
+
class _QuietException(Exception):
def __init__(self) -> None:
)
lines.extend(line.encode("latin1") for line in header_lines)
for line in lines:
- if b"\n" in line:
- raise ValueError("Newline in header: " + repr(line))
+ if CR_OR_LF_RE.search(line):
+ raise ValueError("Illegal characters (CR or LF) in header: %r" % line)
future = None
if self.stream.closed():
future = self._write_future = Future()
if el.logged_stack:
break
+ def test_header_crlf(self):
+ # Ensure that the client doesn't allow CRLF injection in headers. RFC 9112 section 2.2
+ # prohibits a bare CR specifically and "a recipient MAY recognize a single LF as a line
+ # terminator" so we check each character separately as well as the (redundant) CRLF pair.
+ for header, name in [
+ ("foo\rbar:", "cr"),
+ ("foo\nbar:", "lf"),
+ ("foo\r\nbar:", "crlf"),
+ ]:
+ with self.subTest(name=name, position="value"):
+ with self.assertRaises(ValueError):
+ self.fetch("/hello", headers={"foo": header})
+ with self.subTest(name=name, position="key"):
+ with self.assertRaises(ValueError):
+ self.fetch("/hello", headers={header: "foo"})
+
class RequestProxyTest(unittest.TestCase):
def test_request_set(self):