import datetime
import email.utils
from http.client import responses
-import http.cookies as Cookie
+import http.cookies
import numbers
import re
from ssl import SSLError
# Reference it so pyflakes doesn't complain.
responses
-import typing # noqa: F401
+import typing
+from typing import (Tuple, Iterable, List, Mapping, Iterator, Dict, Union, Optional,
+ Awaitable, Generator)
+
+if typing.TYPE_CHECKING:
+ from typing import Deque # noqa
+ import unittest # noqa
# RFC 7230 section 3.5: a recipient MAY recognize a single LF as a line
>>> normalized_headers["coNtent-TYPE"]
'Content-Type'
"""
- def __init__(self, size):
+ def __init__(self, size: int) -> None:
super(_NormalizedHeaderCache, self).__init__()
self.size = size
- self.queue = collections.deque()
+ self.queue = collections.deque() # type: Deque[str]
- def __missing__(self, key):
+ def __missing__(self, key: str) -> str:
normalized = "-".join([w.capitalize() for w in key.split("-")])
self[key] = normalized
self.queue.append(key)
Set-Cookie: A=B
Set-Cookie: C=D
"""
- def __init__(self, *args, **kwargs):
+ @typing.overload
+ def __init__(self, __arg: Mapping[str, List[str]]) -> None:
+ pass
+
+ @typing.overload # noqa: F811
+ def __init__(self, *args: Tuple[str, str]) -> None:
+ pass
+
+ @typing.overload # noqa: F811
+ def __init__(self, **kwargs: str) -> None:
+ pass
+
+ def __init__(self, *args: typing.Any, **kwargs: str) -> None: # noqa: F811
self._dict = {} # type: typing.Dict[str, str]
self._as_list = {} # type: typing.Dict[str, typing.List[str]]
self._last_key = None
# new public methods
- def add(self, name, value):
- # type: (str, str) -> None
+ def add(self, name: str, value: str) -> None:
"""Adds a new value for the given key."""
norm_name = _normalized_headers[name]
self._last_key = norm_name
else:
self[norm_name] = value
- def get_list(self, name):
+ def get_list(self, name: str) -> List[str]:
"""Returns all values for the given header as a list."""
norm_name = _normalized_headers[name]
return self._as_list.get(norm_name, [])
- def get_all(self):
- # type: () -> typing.Iterable[typing.Tuple[str, str]]
+ def get_all(self) -> Iterable[Tuple[str, str]]:
"""Returns an iterable of all (name, value) pairs.
If a header has multiple values, multiple pairs will be
for value in values:
yield (name, value)
- def parse_line(self, line):
+ def parse_line(self, line: str) -> None:
"""Updates the dictionary with a single header line.
>>> h = HTTPHeaders()
self.add(name, value.strip())
@classmethod
- def parse(cls, headers):
+ def parse(cls, headers: str) -> 'HTTPHeaders':
"""Returns a dictionary from HTTP header text.
>>> h = HTTPHeaders.parse("Content-Type: text/html\\r\\nContent-Length: 42\\r\\n")
# MutableMapping abstract method implementations.
- def __setitem__(self, name, value):
+ def __setitem__(self, name: str, value: str) -> None:
norm_name = _normalized_headers[name]
self._dict[norm_name] = value
self._as_list[norm_name] = [value]
- def __getitem__(self, name):
- # type: (str) -> str
+ def __getitem__(self, name: str) -> str:
return self._dict[_normalized_headers[name]]
- def __delitem__(self, name):
+ def __delitem__(self, name: str) -> None:
norm_name = _normalized_headers[name]
del self._dict[norm_name]
del self._as_list[norm_name]
- def __len__(self):
+ def __len__(self) -> int:
return len(self._dict)
- def __iter__(self):
+ def __iter__(self) -> Iterator[typing.Any]:
return iter(self._dict)
- def copy(self):
+ def copy(self) -> 'HTTPHeaders':
# defined in dict but not in MutableMapping.
return HTTPHeaders(self)
# the appearance that HTTPHeaders is a single container.
__copy__ = copy
- def __str__(self):
+ def __str__(self) -> str:
lines = []
for name, value in self.get_all():
lines.append("%s: %s\n" % (name, value))
.. versionchanged:: 4.0
Moved from ``tornado.httpserver.HTTPRequest``.
"""
- def __init__(self, method=None, uri=None, version="HTTP/1.0", headers=None,
- body=None, host=None, files=None, connection=None,
- start_line=None, server_connection=None):
+ path = None # type: str
+ query = None # type: str
+
+ def __init__(self, method: str=None, uri: str=None, version: str="HTTP/1.0",
+ headers: HTTPHeaders=None, body: bytes=None, host: str=None,
+ files: Dict[str, 'HTTPFile']=None, connection: 'HTTPConnection'=None,
+ start_line: 'RequestStartLine'=None, server_connection: object=None) -> None:
if start_line is not None:
method, uri, version = start_line
self.method = method
self._start_time = time.time()
self._finish_time = None
- self.path, sep, self.query = uri.partition('?')
+ if uri is not None:
+ self.path, sep, self.query = uri.partition('?')
self.arguments = parse_qs_bytes(self.query, keep_blank_values=True)
self.query_arguments = copy.deepcopy(self.arguments)
- self.body_arguments = {}
+ self.body_arguments = {} # type: Dict[str, List[bytes]]
@property
- def cookies(self):
- """A dictionary of Cookie.Morsel objects."""
+ def cookies(self) -> Dict[str, http.cookies.Morsel]:
+ """A dictionary of ``http.cookies.Morsel`` objects."""
if not hasattr(self, "_cookies"):
- self._cookies = Cookie.SimpleCookie()
+ self._cookies = http.cookies.SimpleCookie()
if "Cookie" in self.headers:
try:
parsed = parse_cookie(self.headers["Cookie"])
pass
return self._cookies
- def full_url(self):
+ def full_url(self) -> str:
"""Reconstructs the full URL for this request."""
return self.protocol + "://" + self.host + self.uri
- def request_time(self):
+ def request_time(self) -> float:
"""Returns the amount of time it took for this request to execute."""
if self._finish_time is None:
return time.time() - self._start_time
else:
return self._finish_time - self._start_time
- def get_ssl_certificate(self, binary_form=False):
+ def get_ssl_certificate(self, binary_form: bool=False) -> Union[None, Dict, bytes]:
"""Returns the client's SSL certificate, if any.
To use client certificates, the HTTPServer's
http://docs.python.org/library/ssl.html#sslsocket-objects
"""
try:
- return self.connection.stream.socket.getpeercert(
+ if self.connection is None:
+ return None
+ # TODO: add a method to HTTPConnection for this so it can work with HTTP/2
+ return self.connection.stream.socket.getpeercert( # type: ignore
binary_form=binary_form)
except SSLError:
return None
- def _parse_body(self):
+ def _parse_body(self) -> None:
parse_body_arguments(
self.headers.get("Content-Type", ""), self.body,
self.body_arguments, self.files,
for k, v in self.body_arguments.items():
self.arguments.setdefault(k, []).extend(v)
- def __repr__(self):
+ def __repr__(self) -> str:
attrs = ("protocol", "host", "method", "uri", "version", "remote_ip")
args = ", ".join(["%s=%r" % (n, getattr(self, n)) for n in attrs])
return "%s(%s)" % (self.__class__.__name__, args)
.. versionadded:: 4.0
"""
- def start_request(self, server_conn, request_conn):
+ def start_request(self, server_conn: object,
+ request_conn: 'HTTPConnection')-> 'HTTPMessageDelegate':
"""This method is called by the server when a new request has started.
:arg server_conn: is an opaque object representing the long-lived
"""
raise NotImplementedError()
- def on_close(self, server_conn):
+ def on_close(self, server_conn: object) -> None:
"""This method is called when a connection has been closed.
:arg server_conn: is a server connection that has previously been
.. versionadded:: 4.0
"""
- def headers_received(self, start_line, headers):
+ def headers_received(self, start_line: Union['RequestStartLine', 'ResponseStartLine'],
+ headers: HTTPHeaders) -> Optional[Awaitable[None]]:
"""Called when the HTTP headers have been received and parsed.
:arg start_line: a `.RequestStartLine` or `.ResponseStartLine`
"""
pass
- def data_received(self, chunk):
+ def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]:
"""Called when a chunk of data has been received.
May return a `.Future` for flow control.
"""
pass
- def finish(self):
+ def finish(self) -> None:
"""Called after the last chunk of data has been received."""
pass
- def on_connection_close(self):
+ def on_connection_close(self) -> None:
"""Called if the connection is closed without finishing the request.
If ``headers_received`` is called, either ``finish`` or
.. versionadded:: 4.0
"""
- def write_headers(self, start_line, headers, chunk=None):
+ def write_headers(self, start_line: Union['RequestStartLine', 'ResponseStartLine'],
+ headers: HTTPHeaders, chunk: bytes=None) -> Awaitable[None]:
"""Write an HTTP header block.
:arg start_line: a `.RequestStartLine` or `.ResponseStartLine`.
:arg chunk: the first (optional) chunk of data. This is an optimization
so that small responses can be written in the same call as their
headers.
- :arg callback: a callback to be run when the write is complete.
The ``version`` field of ``start_line`` is ignored.
- Returns a `.Future` if no callback is given.
+ Returns an awaitable for flow control.
.. versionchanged:: 6.0
"""
raise NotImplementedError()
- def write(self, chunk):
+ def write(self, chunk: bytes) -> Awaitable[None]:
"""Writes a chunk of body data.
- The callback will be run when the write is complete. If no callback
- is given, returns a Future.
+ Returns an awaitable for flow control.
.. versionchanged:: 6.0
"""
raise NotImplementedError()
- def finish(self):
+ def finish(self) -> None:
"""Indicates that the last body data has been written.
"""
raise NotImplementedError()
-def url_concat(url, args):
+def url_concat(url: str, args: Union[Dict[str, str], List[Tuple[str, str]],
+ Tuple[Tuple[str, str], ...]]) -> str:
"""Concatenate url and arguments regardless of whether
url has existing query parameters.
pass
-def _parse_request_range(range_header):
+def _parse_request_range(range_header: str) -> Optional[Tuple[Optional[int], Optional[int]]]:
"""Parses a Range header.
Returns either ``None`` or tuple ``(start, end)``.
return (start, end)
-def _get_content_range(start, end, total):
+def _get_content_range(start: Optional[int], end: Optional[int], total: int) -> str:
"""Returns a suitable Content-Range header:
>>> print(_get_content_range(None, 1, 4))
return "bytes %s-%s/%s" % (start, end, total)
-def _int_or_none(val):
+def _int_or_none(val: str) -> Optional[int]:
val = val.strip()
if val == "":
return None
return int(val)
-def parse_body_arguments(content_type, body, arguments, files, headers=None):
+def parse_body_arguments(content_type: str, body: bytes, arguments: Dict[str, List[bytes]],
+ files: Dict[str, HTTPFile], headers: HTTPHeaders=None) -> None:
"""Parses a form request body.
Supports ``application/x-www-form-urlencoded`` and
gen_log.warning("Invalid multipart/form-data: %s", e)
-def parse_multipart_form_data(boundary, data, arguments, files):
+def parse_multipart_form_data(boundary: bytes, data: bytes, arguments: Dict[str, List[bytes]],
+ files: Dict[str, HTTPFile]) -> None:
"""Parses a ``multipart/form-data`` body.
The ``boundary`` and ``data`` parameters are both byte strings.
arguments.setdefault(name, []).append(value)
-def format_timestamp(ts):
+def format_timestamp(ts: Union[numbers.Real, tuple, time.struct_time, datetime.datetime]) -> str:
"""Formats a timestamp in the format used by HTTP.
The argument may be a numeric timestamp as returned by `time.time`,
'Sun, 27 Jan 2013 18:43:20 GMT'
"""
if isinstance(ts, numbers.Real):
- pass
+ time_float = typing.cast(float, ts)
elif isinstance(ts, (tuple, time.struct_time)):
- ts = calendar.timegm(ts)
+ time_float = calendar.timegm(ts)
elif isinstance(ts, datetime.datetime):
- ts = calendar.timegm(ts.utctimetuple())
+ time_float = calendar.timegm(ts.utctimetuple())
else:
raise TypeError("unknown timestamp type: %r" % ts)
- return email.utils.formatdate(ts, usegmt=True)
+ return email.utils.formatdate(time_float, usegmt=True)
RequestStartLine = collections.namedtuple(
'RequestStartLine', ['method', 'path', 'version'])
-def parse_request_start_line(line):
+def parse_request_start_line(line: str) -> RequestStartLine:
"""Returns a (method, path, version) tuple for an HTTP 1.x request line.
The response is a `collections.namedtuple`.
'ResponseStartLine', ['version', 'code', 'reason'])
-def parse_response_start_line(line):
+def parse_response_start_line(line: str) -> ResponseStartLine:
"""Returns a (version, code, reason) tuple for an HTTP 1.x response line.
The response is a `collections.namedtuple`.
# RFC 2231/5987 format.
-def _parseparam(s):
+def _parseparam(s: str) -> Generator[str, None, None]:
while s[:1] == ';':
s = s[1:]
end = s.find(';')
s = s[end:]
-def _parse_header(line):
+def _parse_header(line: str) -> Tuple[str, Dict[str, str]]:
r"""Parse a Content-type like header.
Return the main content-type and a dictionary of options.
name = p[:i].strip().lower()
value = p[i + 1:].strip()
params.append((name, native_str(value)))
- params = email.utils.decode_params(params)
- params.pop(0) # get rid of the dummy again
+ decoded_params = email.utils.decode_params(params)
+ decoded_params.pop(0) # get rid of the dummy again
pdict = {}
- for name, value in params:
- value = email.utils.collapse_rfc2231_value(value)
+ for name, decoded_value in decoded_params:
+ value = email.utils.collapse_rfc2231_value(decoded_value)
if len(value) >= 2 and value[0] == '"' and value[-1] == '"':
value = value[1:-1]
pdict[name] = value
return key, pdict
-def _encode_header(key, pdict):
+def _encode_header(key: str, pdict: Dict[str, str]) -> str:
"""Inverse of _parse_header.
>>> _encode_header('permessage-deflate',
return '; '.join(out)
-def encode_username_password(username, password):
+def encode_username_password(username: Union[str, bytes], password: Union[str, bytes]) -> bytes:
"""Encodes a username/password pair in the format used by HTTP auth.
The return value is a byte string in the form ``username:password``.
def doctests():
+ # type: () -> unittest.TestSuite
import doctest
return doctest.DocTestSuite()
-def split_host_and_port(netloc):
+def split_host_and_port(netloc: str) -> Tuple[str, Optional[int]]:
"""Returns ``(host, port)`` tuple from ``netloc``.
Returned ``port`` will be ``None`` if not present.
match = re.match(r'^(.+):(\d+)$', netloc)
if match:
host = match.group(1)
- port = int(match.group(2))
+ port = int(match.group(2)) # type: Optional[int]
else:
host = netloc
port = None
return (host, port)
-def qs_to_qsl(qs):
+def qs_to_qsl(qs: Dict[str, List[str]]) -> Iterable[Tuple[str, str]]:
"""Generator converting a result of ``parse_qs`` back to name-value pairs.
.. versionadded:: 5.0
_nulljoin = ''.join
-def _unquote_cookie(str):
+def _unquote_cookie(s: str) -> str:
"""Handle double quotes and escaping in cookie values.
This method is copied verbatim from the Python 3.5 standard
"""
# If there aren't any doublequotes,
# then there can't be any special characters. See RFC 2109.
- if str is None or len(str) < 2:
- return str
- if str[0] != '"' or str[-1] != '"':
- return str
+ if s is None or len(s) < 2:
+ return s
+ if s[0] != '"' or s[-1] != '"':
+ return s
# We have to assume that we must decode this string.
# Down to work.
# Remove the "s
- str = str[1:-1]
+ s = s[1:-1]
# Check for special sequences. Examples:
# \012 --> \n
# \" --> "
#
i = 0
- n = len(str)
+ n = len(s)
res = []
while 0 <= i < n:
- o_match = _OctalPatt.search(str, i)
- q_match = _QuotePatt.search(str, i)
+ o_match = _OctalPatt.search(s, i)
+ q_match = _QuotePatt.search(s, i)
if not o_match and not q_match: # Neither matched
- res.append(str[i:])
+ res.append(s[i:])
break
# else:
j = k = -1
if q_match:
k = q_match.start(0)
if q_match and (not o_match or k < j): # QuotePatt matched
- res.append(str[i:k])
- res.append(str[k + 1])
+ res.append(s[i:k])
+ res.append(s[k + 1])
i = k + 2
else: # OctalPatt matched
- res.append(str[i:j])
- res.append(chr(int(str[j + 1:j + 4], 8)))
+ res.append(s[i:j])
+ res.append(chr(int(s[j + 1:j + 4], 8)))
i = j + 4
return _nulljoin(res)
-def parse_cookie(cookie):
+def parse_cookie(cookie: str) -> Dict[str, str]:
"""Parse a ``Cookie`` HTTP header into a dict of name/value pairs.
This function attempts to mimic browser cookie parsing behavior;