return ip
def _tunnel(self):
+ if _contains_disallowed_url_pchar_re.search(self._tunnel_host):
+ raise ValueError('Tunnel host can\'t contain control characters %r'
+ % (self._tunnel_host,))
connect = b"CONNECT %s:%d %s\r\n" % (
self._wrap_ipv6(self._tunnel_host.encode("idna")),
self._tunnel_port,
self._http_vsn_str.encode("ascii"))
headers = [connect]
for header, value in self._tunnel_headers.items():
- headers.append(f"{header}: {value}\r\n".encode("latin-1"))
+ header_bytes = header.encode("latin-1")
+ value_bytes = value.encode("latin-1")
+ if not _is_legal_header_name(header_bytes):
+ raise ValueError('Invalid header name %r' % (header_bytes,))
+ if _is_illegal_header_value(value_bytes):
+ raise ValueError('Invalid header value %r' % (value_bytes,))
+ headers.append(b"%s: %s\r\n" % (header_bytes, value_bytes))
headers.append(b"\r\n")
# Making a single send() call instead of one per line encourages
# the host OS to use a more optimal packet size instead of
with self.assertRaisesRegex(ValueError, 'Invalid header'):
conn.putheader(name, value)
+ def test_invalid_tunnel_headers(self):
+ cases = (
+ ('Invalid\r\nName', 'ValidValue'),
+ ('Invalid\rName', 'ValidValue'),
+ ('Invalid\nName', 'ValidValue'),
+ ('\r\nInvalidName', 'ValidValue'),
+ ('\rInvalidName', 'ValidValue'),
+ ('\nInvalidName', 'ValidValue'),
+ (' InvalidName', 'ValidValue'),
+ ('\tInvalidName', 'ValidValue'),
+ ('Invalid:Name', 'ValidValue'),
+ (':InvalidName', 'ValidValue'),
+ ('ValidName', 'Invalid\r\nValue'),
+ ('ValidName', 'Invalid\rValue'),
+ ('ValidName', 'Invalid\nValue'),
+ ('ValidName', 'InvalidValue\r\n'),
+ ('ValidName', 'InvalidValue\r'),
+ ('ValidName', 'InvalidValue\n'),
+ )
+ for name, value in cases:
+ with self.subTest((name, value)):
+ conn = client.HTTPConnection('example.com')
+ conn.set_tunnel('tunnel', headers={
+ name: value
+ })
+ conn.sock = FakeSocket('')
+ with self.assertRaisesRegex(ValueError, 'Invalid header'):
+ conn._tunnel() # Called in .connect()
+
+ def test_invalid_tunnel_host(self):
+ cases = (
+ 'invalid\r.host',
+ '\ninvalid.host',
+ 'invalid.host\r\n',
+ 'invalid.host\x00',
+ 'invalid host',
+ )
+ for tunnel_host in cases:
+ with self.subTest(tunnel_host):
+ conn = client.HTTPConnection('example.com')
+ conn.set_tunnel(tunnel_host)
+ conn.sock = FakeSocket('')
+ with self.assertRaisesRegex(ValueError, 'Tunnel host can\'t contain control characters'):
+ conn._tunnel() # Called in .connect()
+
def test_headers_debuglevel(self):
body = (
b'HTTP/1.1 200 OK\r\n'