"""
handler = cast(RequestHandler, self)
# Verify the OpenID response via direct request to the OP
- args = {
+ args: Dict[str, Union[str, bytes]] = {
k: v[-1] for k, v in handler.request.arguments.items()
- } # type: Dict[str, Union[str, bytes]]
+ }
args["openid.mode"] = "check_authentication"
url = self._OPENID_ENDPOINT # type: ignore
if http_client is None:
}
)
ax_attrs = set(ax_attrs)
- required = [] # type: List[str]
+ required: List[str] = []
if "name" in ax_attrs:
ax_attrs -= {"name", "firstname", "fullname", "lastname"}
required += ["firstname", "fullname", "lastname"]
)
if cookie_key != request_key:
raise AuthError("Request token does not match cookie")
- token = dict(
- key=cookie_key, secret=cookie_secret
- ) # type: Dict[str, Union[str, bytes]]
+ token: Dict[str, Union[str, bytes]] = dict(key=cookie_key, secret=cookie_secret)
if oauth_verifier:
token["verifier"] = oauth_verifier
if http_client is None:
extra_params: Optional[Dict[str, Any]] = None,
) -> str:
url = self._OAUTH_ACCESS_TOKEN_URL # type: ignore
- args = {} # type: Dict[str, str]
+ args: Dict[str, str] = {}
if redirect_uri is not None:
args["redirect_uri"] = redirect_uri
if code is not None:
def submit( # type: ignore[override]
self, fn: Callable[..., _T], *args: Any, **kwargs: Any
) -> "futures.Future[_T]":
- future = futures.Future() # type: futures.Future[_T]
+ future: futures.Future[_T] = futures.Future()
try:
future_set_result_unless_cancelled(future, fn(*args, **kwargs))
except Exception:
@functools.wraps(fn)
def wrapper(self: Any, *args: Any, **kwargs: Any) -> Future:
- async_future = Future() # type: Future
+ async_future: Future = Future()
conc_future = getattr(self, executor).submit(fn, self, *args, **kwargs)
chain_future(conc_future, async_future)
return async_future
) -> None:
super().initialize(defaults=defaults)
# Typeshed is incomplete for CurlMulti, so just use Any for now.
- self._multi = pycurl.CurlMulti() # type: Any
+ self._multi: Any = pycurl.CurlMulti()
self._multi.setopt(pycurl.M_TIMERFUNCTION, self._set_timeout)
self._multi.setopt(pycurl.M_SOCKETFUNCTION, self._handle_socket)
self._curls = [self._curl_create() for i in range(max_clients)]
self._free_list = self._curls[:]
- self._requests = (
- collections.deque()
- ) # type: Deque[Tuple[HTTPRequest, Callable[[HTTPResponse], None], float]]
- self._fds = {} # type: Dict[int, int]
- self._timeout = None # type: Optional[object]
+ self._requests: Deque[
+ Tuple[HTTPRequest, Callable[[HTTPResponse], None], float]
+ ] = collections.deque()
+ self._fds: Dict[int, int] = {}
+ self._timeout: Optional[object] = None
# libcurl has bugs that sometimes cause it to not report all
# relevant file descriptors and timeouts to TIMERFUNCTION/
buffer = info["buffer"]
if curl_error:
assert curl_message is not None
- error = CurlError(curl_error, curl_message) # type: Optional[CurlError]
+ error: Optional[CurlError] = CurlError(curl_error, curl_message)
assert error is not None
code = error.code
effective_url = None
def _create_future() -> Future:
- future = Future() # type: Future
+ future: Future = Future()
# Fixup asyncio debug info by removing extraneous stack entries
source_traceback = getattr(future, "_source_traceback", ())
while source_traceback:
"""
@functools.wraps(func)
- def wrapper(*args, **kwargs):
- # type: (*Any, **Any) -> Future[_T]
+ def wrapper(*args: Any, **kwargs: Any) -> Future[_T]:
# This function is type-annotated with a comment to work around
# https://bitbucket.org/pypy/pypy/issues/2868/segfault-with-args-type-annotation-in
future = _create_future()
if contextvars is not None:
- ctx_run = contextvars.copy_context().run # type: Callable
+ ctx_run: Callable = contextvars.copy_context().run
else:
ctx_run = _fake_ctx_run
try:
"""
- _unfinished = {} # type: Dict[Future, Union[int, str]]
+ _unfinished: Dict[Future, Union[int, str]] = {}
def __init__(self, *args: Future, **kwargs: Future) -> None:
if args and kwargs:
if kwargs:
self._unfinished = {f: k for (k, f) in kwargs.items()}
- futures = list(kwargs.values()) # type: Sequence[Future]
+ futures: Sequence[Future] = list(kwargs.values())
else:
self._unfinished = {f: i for (i, f) in enumerate(args)}
futures = args
- self._finished = collections.deque() # type: Deque[Future]
- self.current_index = None # type: Optional[Union[str, int]]
- self.current_future = None # type: Optional[Future]
- self._running_future = None # type: Optional[Future]
+ self._finished: Deque[Future] = collections.deque()
+ self.current_index: Optional[Union[str, int]] = None
+ self.current_future: Optional[Future] = None
+ self._running_future: Optional[Future] = None
for future in futures:
future_add_done_callback(future, self._done_callback)
Use `multi` instead.
"""
if isinstance(children, dict):
- keys = list(children.keys()) # type: Optional[List]
- children_seq = children.values() # type: Iterable
+ keys: Optional[List] = list(children.keys())
+ children_seq: Iterable = children.values()
else:
keys = None
children_seq = children
else:
future_set_result_unless_cancelled(future, result_list)
- listening = set() # type: Set[Future]
+ listening: Set[Future] = set()
for f in children_futs:
if f not in listening:
listening.add(f)
self.ctx_run = ctx_run
self.gen = gen
self.result_future = result_future
- self.future = _null_future # type: Union[None, Future]
+ self.future: Union[None, Future] = _null_future
self.running = False
self.finished = False
self.io_loop = IOLoop.current()
self._read_finished = False
# _finish_future resolves when all data has been written and flushed
# to the IOStream.
- self._finish_future = Future() # type: Future[None]
+ self._finish_future: Future[None] = Future()
# If true, the connection should be closed after this request
# (after the response has been written in the server side,
# and after it has been read in the client)
# Save the start lines after we read or write them; they
# affect later processing (e.g. 304 responses and HEAD methods
# have content-length but no bodies)
- self._request_start_line = None # type: Optional[httputil.RequestStartLine]
- self._response_start_line = None # type: Optional[httputil.ResponseStartLine]
- self._request_headers = None # type: Optional[httputil.HTTPHeaders]
+ self._request_start_line: Optional[httputil.RequestStartLine] = None
+ self._response_start_line: Optional[httputil.ResponseStartLine] = None
+ self._request_headers: Optional[httputil.HTTPHeaders] = None
# True if we are writing output with chunked encoding.
self._chunking_output = False
# While reading a body with a content-length, this is the
# amount left to read.
- self._expected_content_remaining = None # type: Optional[int]
+ self._expected_content_remaining: Optional[int] = None
# A Future for our outgoing writes, returned by IOStream.write.
- self._pending_write = None # type: Optional[Future[None]]
+ self._pending_write: Optional[Future[None]] = None
def read_response(self, delegate: httputil.HTTPMessageDelegate) -> Awaitable[bool]:
"""Read a single HTTP response.
if self.is_client:
resp_start_line = httputil.parse_response_start_line(start_line_str)
self._response_start_line = resp_start_line
- start_line = (
- resp_start_line
- ) # type: Union[httputil.RequestStartLine, httputil.ResponseStartLine]
+ start_line: Union[
+ httputil.RequestStartLine, httputil.ResponseStartLine
+ ] = resp_start_line
# TODO: this will need to change to support client-side keepalive
self._disconnect_on_finish = False
else:
quickly in CPython by breaking up reference cycles.
"""
self._write_callback = None
- self._write_future = None # type: Optional[Future[None]]
- self._close_callback = None # type: Optional[Callable[[], None]]
+ self._write_future: Optional[Future[None]] = None
+ self._close_callback: Optional[Callable[[], None]] = None
if self.stream is not None:
self.stream.set_close_callback(None)
def __init__(self, delegate: httputil.HTTPMessageDelegate, chunk_size: int) -> None:
self._delegate = delegate
self._chunk_size = chunk_size
- self._decompressor = None # type: Optional[GzipDecompressor]
+ self._decompressor: Optional[GzipDecompressor] = None
def headers_received(
self,
params = HTTP1ConnectionParameters()
self.params = params
self.context = context
- self._serving_future = None # type: Optional[Future[None]]
+ self._serving_future: Optional[Future[None]] = None
async def close(self) -> None:
"""Closes the connection.
# where normal dicts get converted to HTTPHeaders objects.
request.headers = httputil.HTTPHeaders(request.headers)
request_proxy = _RequestProxy(request, self.defaults)
- future = Future() # type: Future[HTTPResponse]
+ future: Future[HTTPResponse] = Future()
def handle_response(response: "HTTPResponse") -> None:
if response.error:
self.max_redirects = max_redirects
self.user_agent = user_agent
if decompress_response is not None:
- self.decompress_response = decompress_response # type: Optional[bool]
+ self.decompress_response: Optional[bool] = decompress_response
else:
self.decompress_response = use_gzip
self.network_interface = network_interface
"""
# I'm not sure why these don't get type-inferred from the references in __init__.
- error = None # type: Optional[BaseException]
+ error: Optional[BaseException] = None
_error_is_response_code = False
request: HTTPRequest
else:
self.headers = httputil.HTTPHeaders()
self.buffer = buffer
- self._body = None # type: Optional[bytes]
+ self._body: Optional[bytes] = None
if effective_url is None:
self.effective_url = request.url
else:
max_buffer_size=max_buffer_size,
read_chunk_size=chunk_size,
)
- self._connections = set() # type: Set[HTTP1ServerConnection]
+ self._connections: Set[HTTP1ServerConnection] = set()
self.trusted_downstream = trusted_downstream
@classmethod
) -> None:
self.connection = request_conn
self.request_callback = request_callback
- self.request = None # type: Optional[httputil.HTTPServerRequest]
+ self.request: Optional[httputil.HTTPServerRequest] = None
self.delegate = None
- self._chunks = [] # type: List[bytes]
+ self._chunks: List[bytes] = []
def headers_received(
self,
via `tornado.web.RequestHandler.request`.
"""
+from __future__ import annotations
+
import calendar
import collections.abc
import copy
# on demand (and cleared whenever the list is modified).
self._as_list: dict[str, list[str]] = {}
self._combined_cache: dict[str, str] = {}
- self._last_key = None # type: Optional[str]
+ self._last_key: Optional[str] = None
if len(args) == 1 and len(kwargs) == 0 and isinstance(args[0], HTTPHeaders):
# Copy constructor
for k, v in args[0].get_all():
self.path, sep, self.query = uri.partition("?")
self.arguments = parse_qs_bytes(self.query, keep_blank_values=True)
self.query_arguments = copy.deepcopy(self.arguments)
- self.body_arguments = {} # type: Dict[str, List[bytes]]
+ self.body_arguments: Dict[str, List[bytes]] = {}
@property
def cookies(self) -> Dict[str, http.cookies.Morsel]:
"""A dictionary of ``http.cookies.Morsel`` objects."""
if not hasattr(self, "_cookies"):
- self._cookies = (
- http.cookies.SimpleCookie()
- ) # type: http.cookies.SimpleCookie
+ self._cookies: http.cookies.SimpleCookie = http.cookies.SimpleCookie()
if "Cookie" in self.headers:
try:
parsed = parse_cookie(self.headers["Cookie"])
return utf8(username) + b":" + utf8(password)
-def doctests():
- # type: () -> unittest.TestSuite
+def doctests() -> unittest.TestSuite:
import doctest
return doctest.DocTestSuite(optionflags=doctest.ELLIPSIS)
match = _netloc_re.match(netloc)
if match:
host = match.group(1)
- port = int(match.group(2)) # type: Optional[int]
+ port: Optional[int] = int(match.group(2))
else:
host = netloc
port = None
"""
+from __future__ import annotations
+
import asyncio
import concurrent.futures
import datetime
ERROR = 0x018
# In Python 3, _ioloop_for_asyncio maps from asyncio loops to IOLoops.
- _ioloop_for_asyncio = dict() # type: Dict[asyncio.AbstractEventLoop, IOLoop]
+ _ioloop_for_asyncio: Dict[asyncio.AbstractEventLoop, IOLoop] = dict()
# Maintain a set of all pending tasks to follow the warning in the docs
# of asyncio.create_tasks:
# https://github.com/python/cpython/issues/91887
# If that change is accepted, this can eventually be removed.
# If it is not, we will consider the rationale and may remove this.
- _pending_tasks = set() # type: Set[Future]
+ _pending_tasks: Set[Future] = set()
@classmethod
def configure(
if instance:
from tornado.platform.asyncio import AsyncIOMainLoop
- current = AsyncIOMainLoop() # type: Optional[IOLoop]
+ current: Optional[IOLoop] = AsyncIOMainLoop()
else:
current = None
return current
FutureCell = TypedDict( # noqa: F841
"FutureCell", {"future": Optional[Future], "timeout_called": bool}
)
- future_cell = {"future": None, "timeout_called": False} # type: FutureCell
+ future_cell: FutureCell = {"future": None, "timeout_called": False}
def run() -> None:
try:
result = convert_yielded(result)
except Exception:
- fut = Future() # type: Future[Any]
+ fut: Future[Any] = Future()
future_cell["future"] = fut
future_set_exc_info(fut, sys.exc_info())
else:
if not hasattr(self, "_executor"):
from tornado.process import cpu_count
- self._executor = concurrent.futures.ThreadPoolExecutor(
- max_workers=(cpu_count() * 5)
- ) # type: concurrent.futures.Executor
+ self._executor: concurrent.futures.Executor = (
+ concurrent.futures.ThreadPoolExecutor(max_workers=(cpu_count() * 5))
+ )
executor = self._executor
c_future = executor.submit(func, *args)
# Concurrent Futures are not usable with await. Wrap this in a
# Tornado Future instead, using self.add_future for thread-safety.
- t_future = Future() # type: Future[_T]
+ t_future: Future[_T] = Future()
self.add_future(c_future, lambda f: chain_future(f, t_future))
return t_future
raise TypeError("Unsupported deadline %r" % deadline)
self.deadline = deadline
self.callback = callback
- self.tdeadline = (
+ self.tdeadline: Tuple[float, int] = (
deadline,
next(io_loop._timeout_counter),
- ) # type: Tuple[float, int]
+ )
# Comparison methods to sort by deadline, with object id as a tiebreaker
# to guarantee a consistent ordering. The heapq module uses __le__
self.callback_time = callback_time
self.jitter = jitter
self._running = False
- self._timeout = None # type: object
+ self._timeout: object = None
def start(self) -> None:
"""Starts the timer."""
def __init__(self) -> None:
# A sequence of (False, bytearray) and (True, memoryview) objects
- self._buffers = (
+ self._buffers: Deque[Tuple[bool, Union[bytearray, memoryview]]] = (
collections.deque()
- ) # type: Deque[Tuple[bool, Union[bytearray, memoryview]]]
+ )
# Position in the first buffer
self._first_pos = 0
self._size = 0
# spurious failures.
self.read_chunk_size = min(read_chunk_size or 65536, self.max_buffer_size // 2)
self.max_write_buffer_size = max_write_buffer_size
- self.error = None # type: Optional[BaseException]
+ self.error: Optional[BaseException] = None
self._read_buffer = bytearray()
self._read_buffer_size = 0
self._user_read_buffer = False
- self._after_user_read_buffer = None # type: Optional[bytearray]
+ self._after_user_read_buffer: Optional[bytearray] = None
self._write_buffer = _StreamBuffer()
self._total_write_index = 0
self._total_write_done_index = 0
- self._read_delimiter = None # type: Optional[bytes]
- self._read_regex = None # type: Optional[Pattern]
- self._read_max_bytes = None # type: Optional[int]
- self._read_bytes = None # type: Optional[int]
+ self._read_delimiter: Optional[bytes] = None
+ self._read_regex: Optional[Pattern] = None
+ self._read_max_bytes: Optional[int] = None
+ self._read_bytes: Optional[int] = None
self._read_partial = False
self._read_until_close = False
- self._read_future = None # type: Optional[Future]
- self._write_futures = (
- collections.deque()
- ) # type: Deque[Tuple[int, Future[None]]]
- self._close_callback = None # type: Optional[Callable[[], None]]
- self._connect_future = None # type: Optional[Future[IOStream]]
+ self._read_future: Optional[Future] = None
+ self._write_futures: Deque[Tuple[int, Future[None]]] = collections.deque()
+ self._close_callback: Optional[Callable[[], None]] = None
+ self._connect_future: Optional[Future[IOStream]] = None
# _ssl_connect_future should be defined in SSLIOStream
# but it's here so we can clean it up in _signal_closed
# TODO: refactor that so subclasses can add additional futures
# to be cancelled.
- self._ssl_connect_future = None # type: Optional[Future[SSLIOStream]]
+ self._ssl_connect_future: Optional[Future[SSLIOStream]] = None
self._connecting = False
- self._state = None # type: Optional[int]
+ self._state: Optional[int] = None
self._closed = False
def fileno(self) -> Union[int, ioloop._Selectable]:
raise StreamBufferFullError("Reached maximum write buffer size")
self._write_buffer.append(data)
self._total_write_index += len(data)
- future = Future() # type: Future[None]
+ future: Future[None] = Future()
future.add_done_callback(lambda f: f.exception())
self._write_futures.append((self._total_write_index, future))
if not self._connecting:
self._signal_closed()
def _signal_closed(self) -> None:
- futures = [] # type: List[Future]
+ futures: List[Future] = []
if self._read_future is not None:
futures.append(self._read_future)
self._read_future = None
def _read_to_buffer_loop(self) -> Optional[int]:
# This method is called from _handle_read and _try_inline_read.
if self._read_bytes is not None:
- target_bytes = self._read_bytes # type: Optional[int]
+ target_bytes: Optional[int] = self._read_bytes
elif self._read_max_bytes is not None:
target_bytes = self._read_max_bytes
elif self.reading():
self._after_user_read_buffer = None
self._read_buffer_size = len(self._read_buffer)
self._user_read_buffer = False
- result = size # type: Union[int, bytes]
+ result: Union[int, bytes] = size
else:
result = self._consume(size)
if self._read_future is not None:
while True:
try:
if self._user_read_buffer:
- buf = memoryview(self._read_buffer)[
- self._read_buffer_size :
- ] # type: Union[memoryview, bytearray]
+ buf: Union[memoryview, bytearray] = memoryview(
+ self._read_buffer
+ )[self._read_buffer_size :]
else:
buf = bytearray(self.read_chunk_size)
bytes_read = self.read_from_fd(buf)
"""
self._connecting = True
- future = Future() # type: Future[_IOStreamType]
+ future: Future[_IOStreamType] = Future()
self._connect_future = typing.cast("Future[IOStream]", future)
try:
self.socket.connect(address)
orig_close_callback = self._close_callback
self._close_callback = None
- future = Future() # type: Future[SSLIOStream]
+ future: Future[SSLIOStream] = Future()
ssl_stream = SSLIOStream(socket, ssl_options=ssl_options)
ssl_stream.set_close_callback(orig_close_callback)
ssl_stream._ssl_connect_future = future
self._ssl_accepting = True
self._handshake_reading = False
self._handshake_writing = False
- self._server_hostname = None # type: Optional[str]
+ self._server_hostname: Optional[str] = None
# If the socket is already connected, attempt to start the handshake.
try:
the `Locale.translate` method will simply return the original string.
"""
+from __future__ import annotations
+
import codecs
import csv
import datetime
from typing import Iterable, Any, Union, Dict, Optional
_default_locale = "en_US"
-_translations = {} # type: Dict[str, Any]
+_translations: Dict[str, Any] = {}
_supported_locales = frozenset([_default_locale])
_use_gettext = False
CONTEXT_SEPARATOR = "\x04"
call `get` or `get_closest` to get a Locale object.
"""
- _cache = {} # type: Dict[str, Locale]
+ _cache: Dict[str, Locale] = {}
@classmethod
def get_closest(cls, *locale_codes: str) -> "Locale":
assert code in _supported_locales
translations = _translations.get(code, None)
if translations is None:
- locale = CSVLocale(code, {}) # type: Locale
+ locale: Locale = CSVLocale(code, {})
elif _use_gettext:
locale = GettextLocale(code, translations)
else:
"""
def __init__(self) -> None:
- self._waiters = collections.deque() # type: Deque[Future]
+ self._waiters: Deque[Future] = collections.deque()
self._timeouts = 0
def _garbage_collect(self) -> None:
Returns a `.Future` that resolves ``True`` if the condition is notified,
or ``False`` after a timeout.
"""
- waiter = Future() # type: Future[bool]
+ waiter: Future[bool] = Future()
self._waiters.append(waiter)
if timeout:
def __init__(self) -> None:
self._value = False
- self._waiters = set() # type: Set[Future[None]]
+ self._waiters: Set[Future[None]] = set()
def __repr__(self) -> str:
return "<{} {}>".format(
Returns an awaitable, which raises `tornado.util.TimeoutError` after a
timeout.
"""
- fut = Future() # type: Future[None]
+ fut: Future[None] = Future()
if self._value:
fut.set_result(None)
return fut
Block if the counter is zero and wait for a `.release`. The awaitable
raises `.TimeoutError` after the deadline.
"""
- waiter = Future() # type: Future[_ReleasingContextManager]
+ waiter: Future[_ReleasingContextManager] = Future()
if self._value > 0:
self._value -= 1
waiter.set_result(_ReleasingContextManager(self))
logging.Formatter.__init__(self, datefmt=datefmt)
self._fmt = fmt
- self._colors = {} # type: Dict[int, str]
+ self._colors: Dict[int, str] = {}
if color and _stderr_supports_color():
if curses is not None:
fg_color = curses.tigetstr("setaf") or curses.tigetstr("setf") or b""
if options.log_file_prefix:
rotate_mode = options.log_rotate_mode
if rotate_mode == "size":
- channel = logging.handlers.RotatingFileHandler(
+ channel: logging.Handler = logging.handlers.RotatingFileHandler(
filename=options.log_file_prefix,
maxBytes=options.log_file_max_size,
backupCount=options.log_file_num_backups,
encoding="utf-8",
- ) # type: logging.Handler
+ )
elif rotate_mode == "time":
channel = logging.handlers.TimedRotatingFileHandler(
filename=options.log_file_prefix,
if flags is None:
flags = socket.AI_PASSIVE
bound_port = None
- unique_addresses = set() # type: set
+ unique_addresses: set = set()
for res in sorted(
socket.getaddrinfo(address, port, family, socket.SOCK_STREAM, 0, flags),
key=lambda x: x[0],
else:
type = str
if group:
- group_name = group # type: Optional[str]
+ group_name: Optional[str] = group
else:
group_name = file_name
option = _Option(
"""
if args is None:
args = sys.argv
- remaining = [] # type: List[str]
+ remaining: List[str] = []
for i in range(1, len(args)):
# All things after the last option are command line arguments
if not args[i].startswith("-"):
file = sys.stderr
print("Usage: %s [OPTIONS]" % sys.argv[0], file=file)
print("\nOptions:\n", file=file)
- by_group = {} # type: Dict[str, List[_Option]]
+ by_group: Dict[str, List[_Option]] = {}
for option in self._options.values():
by_group.setdefault(option.group_name, []).append(option)
self.group_name = group_name
self.callback = callback
self.default = default
- self._value = _Option.UNSET # type: Any
+ self._value: Any = _Option.UNSET
def value(self) -> Any:
return self.default if self._value is _Option.UNSET else self._value
def parse(self, value: str) -> Any:
- _parse = {
+ _parse: Callable[[str], Any] = {
datetime.datetime: self._parse_datetime,
datetime.timedelta: self._parse_timedelta,
bool: self._parse_bool,
basestring_type: self._parse_string,
- }.get(
- self.type, self.type
- ) # type: Callable[[str], Any]
+ }.get(self.type, self.type)
if self.multiple:
self._value = []
for part in value.split(","):
def initialize(self) -> None:
self.io_loop = IOLoop.current()
self.channel = pycares.Channel(sock_state_cb=self._sock_state_cb)
- self.fds = {} # type: Dict[int, int]
+ self.fds: Dict[int, int] = {}
def _sock_state_cb(self, fd: int, readable: bool, writable: bool) -> None:
state = (IOLoop.READ if readable else 0) | (IOLoop.WRITE if writable else 0)
addresses = [host]
else:
# gethostbyname doesn't take callback as a kwarg
- fut = Future() # type: Future[Tuple[Any, Any]]
+ fut: Future[Tuple[Any, Any]] = Future()
self.channel.gethostbyname(
host, family, lambda result, error: fut.set_result((result, error))
)
@gen.convert_yielded.register(Deferred)
def _(d: Deferred) -> Future:
- f = Future() # type: Future[typing.Any]
+ f: Future[typing.Any] = Future()
def errback(failure: failure.Failure) -> None:
try:
self.io_loop = ioloop.IOLoop.current()
# All FDs we create should be closed on error; those in to_close
# should be closed in the parent process on success.
- pipe_fds = [] # type: List[int]
- to_close = [] # type: List[int]
+ pipe_fds: List[int] = []
+ to_close: List[int] = []
if kwargs.get("stdin") is Subprocess.STREAM:
in_r, in_w = os.pipe()
kwargs["stdin"] = in_r
for attr in ["stdin", "stdout", "stderr"]:
if not hasattr(self, attr): # don't clobber streams set above
setattr(self, attr, getattr(self.proc, attr))
- self._exit_callback = None # type: Optional[Callable[[int], None]]
- self.returncode = None # type: Optional[int]
+ self._exit_callback: Optional[Callable[[int], None]] = None
+ self.returncode: Optional[int] = None
def set_exit_callback(self, callback: Callable[[int], None]) -> None:
"""Runs ``callback`` when this process exits.
Availability: Unix
"""
- future = Future() # type: Future[int]
+ future: Future[int] = Future()
def callback(ret: int) -> None:
if ret != 0 and raise_error:
"""
+from __future__ import annotations
+
import collections
import datetime
import heapq
# Exact type depends on subclass. Could be another generic
# parameter and use protocols to be more precise here.
- _queue = None # type: Any
+ _queue: Any = None
def __init__(self, maxsize: int = 0) -> None:
if maxsize is None:
self._maxsize = maxsize
self._init()
- self._getters = collections.deque([]) # type: Deque[Future[_T]]
- self._putters = collections.deque([]) # type: Deque[Tuple[_T, Future[None]]]
+ self._getters: Deque[Future[_T]] = collections.deque([])
+ self._putters: Deque[Tuple[_T, Future[None]]] = collections.deque([])
self._unfinished_tasks = 0
self._finished = Event()
self._finished.set()
`datetime.timedelta` object for a deadline relative to the
current time.
"""
- future = Future() # type: Future[None]
+ future: Future[None] = Future()
try:
self.put_nowait(item)
except QueueFull:
with other timeouts in Tornado).
"""
- future = Future() # type: Future[_T]
+ future: Future[_T] = Future()
try:
future.set_result(self.get_nowait())
except QueueEmpty:
) -> None:
self.server_conn = server_conn
self.request_conn = request_conn
- self.delegate = None # type: Optional[httputil.HTTPMessageDelegate]
- self.router = router # type: Router
+ self.delegate: Optional[httputil.HTTPMessageDelegate] = None
+ self.router: Router = router
def headers_received(
self,
:arg rules: a list of `Rule` instances or tuples of `Rule`
constructor arguments.
"""
- self.rules = [] # type: List[Rule]
+ self.rules: List[Rule] = []
if rules:
self.add_rules(rules)
"""
def __init__(self, rules: Optional[_RuleList] = None) -> None:
- self.named_rules = {} # type: Dict[str, Any]
+ self.named_rules: Dict[str, Any] = {}
super().__init__(rules)
def process_rule(self, rule: "Rule") -> "Rule":
# Must be a fully qualified name (module.ClassName)
target = import_object(target)
- self.matcher = matcher # type: Matcher
+ self.matcher: Matcher = matcher
self.target = target
self.target_kwargs = target_kwargs if target_kwargs else {}
self.name = name
if not self.regex.groups:
return {}
- path_args = [] # type: List[bytes]
- path_kwargs = {} # type: Dict[str, bytes]
+ path_args: List[bytes] = []
+ path_kwargs: Dict[str, bytes] = {}
# Pass matched groups to the handler. Since
# match.groups() includes both named and
) -> None:
super().initialize(defaults=defaults)
self.max_clients = max_clients
- self.queue = (
- collections.deque()
- ) # type: Deque[Tuple[object, HTTPRequest, Callable[[HTTPResponse], None]]]
- self.active = (
- {}
- ) # type: Dict[object, Tuple[HTTPRequest, Callable[[HTTPResponse], None]]]
- self.waiting = (
- {}
- ) # type: Dict[object, Tuple[HTTPRequest, Callable[[HTTPResponse], None], object]]
+ self.queue: Deque[
+ Tuple[object, HTTPRequest, Callable[[HTTPResponse], None]]
+ ] = collections.deque()
+ self.active: Dict[
+ object, Tuple[HTTPRequest, Callable[[HTTPResponse], None]]
+ ] = {}
+ self.waiting: Dict[
+ object, Tuple[HTTPRequest, Callable[[HTTPResponse], None], object]
+ ] = {}
self.max_buffer_size = max_buffer_size
self.max_header_size = max_header_size
self.max_body_size = max_body_size
self.tcp_client = tcp_client
self.max_header_size = max_header_size
self.max_body_size = max_body_size
- self.code = None # type: Optional[int]
- self.headers = None # type: Optional[httputil.HTTPHeaders]
- self.chunks = [] # type: List[bytes]
+ self.code: Optional[int] = None
+ self.headers: Optional[httputil.HTTPHeaders] = None
+ self.chunks: List[bytes] = []
self._decompressor = None
# Timeout handle returned by IOLoop.add_timeout
- self._timeout = None # type: object
+ self._timeout: object = None
self._sockaddr = None
IOLoop.current().add_future(
gen.convert_yielded(self.run()), lambda f: f.result()
self.io_loop = IOLoop.current()
self.connect = connect
- self.future = (
- Future()
- ) # type: Future[Tuple[socket.AddressFamily, Any, IOStream]]
- self.timeout = None # type: Optional[object]
- self.connect_timeout = None # type: Optional[object]
- self.last_error = None # type: Optional[Exception]
+ self.future: Future[Tuple[socket.AddressFamily, Any, IOStream]] = Future()
+ self.timeout: Optional[object] = None
+ self.connect_timeout: Optional[object] = None
+ self.last_error: Optional[Exception] = None
self.remaining = len(addrinfo)
self.primary_addrs, self.secondary_addrs = self.split(addrinfo)
- self.streams = set() # type: Set[IOStream]
+ self.streams: Set[IOStream] = set()
@staticmethod
def split(
try:
stream = IOStream(socket_obj, max_buffer_size=max_buffer_size)
except OSError as e:
- fu = Future() # type: Future[IOStream]
+ fu: Future[IOStream] = Future()
fu.set_exception(e)
return stream, fu
else:
read_chunk_size: Optional[int] = None,
) -> None:
self.ssl_options = ssl_options
- self._sockets = {} # type: Dict[int, socket.socket]
- self._handlers = {} # type: Dict[int, Callable[[], None]]
- self._pending_sockets = [] # type: List[socket.socket]
+ self._sockets: Dict[int, socket.socket] = {}
+ self._handlers: Dict[int, Callable[[], None]] = {}
+ self._pending_sockets: List[socket.socket] = []
self._started = False
self._stopped = False
self.max_buffer_size = max_buffer_size
raise
try:
if self.ssl_options is not None:
- stream = SSLIOStream(
+ stream: IOStream = SSLIOStream(
connection,
max_buffer_size=self.max_buffer_size,
read_chunk_size=self.read_chunk_size,
- ) # type: IOStream
+ )
else:
stream = IOStream(
connection,
filter_whitespace(whitespace, "")
if not isinstance(autoescape, _UnsetMarker):
- self.autoescape = autoescape # type: Optional[str]
+ self.autoescape: Optional[str] = autoescape
elif loader:
self.autoescape = loader.autoescape
else:
buffer = StringIO()
try:
# named_blocks maps from names to _NamedBlock objects
- named_blocks = {} # type: Dict[str, _NamedBlock]
+ named_blocks: Dict[str, _NamedBlock] = {}
ancestors = self._get_ancestors(loader)
ancestors.reverse()
for ancestor in ancestors:
self.autoescape = autoescape
self.namespace = namespace or {}
self.whitespace = whitespace
- self.templates = {} # type: Dict[str, Template]
+ self.templates: Dict[str, Template] = {}
# self.lock protects self.templates. It's a reentrant lock
# because templates may load other templates via `include` or
# `extends`. Note that thanks to the GIL this code would be safe
self.loader = loader
self.current_template = current_template
self.apply_counter = 0
- self.include_stack = [] # type: List[Tuple[Template, int]]
+ self.include_stack: List[Tuple[Template, int]] = []
self._indent = 0
def indent_size(self) -> int:
suffix = suffix.strip('"').strip("'")
if not suffix:
reader.raise_parse_error("extends missing file path")
- block = _ExtendsBlock(suffix) # type: _Node
+ block: _Node = _ExtendsBlock(suffix)
elif operator in ("import", "from"):
if not suffix:
reader.raise_parse_error("import missing statement")
reader.raise_parse_error("set missing statement")
block = _Statement(suffix, line)
elif operator == "autoescape":
- fn = suffix.strip() # type: Optional[str]
+ fn: Optional[str] = suffix.strip()
if fn == "None":
fn = None
template.autoescape = fn
class MiscFutureTest(AsyncTestCase):
def test_future_set_result_unless_cancelled(self):
- fut = Future() # type: Future[int]
+ fut: Future[int] = Future()
future_set_result_unless_cancelled(fut, 42)
self.assertEqual(fut.result(), 42)
self.assertFalse(fut.cancelled())
from typing import List, Tuple, Union, Dict, Any # noqa: F401
-linkify_tests = [
+linkify_tests: List[Tuple[Union[str, bytes], Dict[str, Any], str]] = [
# (input, linkify_kwargs, expected_output)
(
"hello http://world.com/!",
{"extra_params": lambda href: ' rel="nofollow" class="external" '},
'<a href="http://www.external-link.com" rel="nofollow" class="external">www.external-link.com</a>', # noqa: E501
),
-] # type: List[Tuple[Union[str, bytes], Dict[str, Any], str]]
+]
class EscapeTestCase(unittest.TestCase):
self.assertEqual(linked, html)
def test_xhtml_escape(self):
- tests = [
+ tests: List[Tuple[Union[str, bytes], Union[str, bytes]]] = [
("<foo>", "<foo>"),
("<foo>", "<foo>"),
(b"<foo>", b"<foo>"),
("&", "&amp;"),
("<\u00e9>", "<\u00e9>"),
(b"<\xc3\xa9>", b"<\xc3\xa9>"),
- ] # type: List[Tuple[Union[str, bytes], Union[str, bytes]]]
+ ]
for unescaped, escaped in tests:
self.assertEqual(utf8(xhtml_escape(unescaped)), utf8(escaped))
self.assertEqual(utf8(unescaped), utf8(xhtml_unescape(escaped)))
self.assertEqual(unescaped, xhtml_unescape(escaped))
def test_url_escape_unicode(self):
- tests = [
+ tests: List[Tuple[Union[str, bytes], str]] = [
# byte strings are passed through as-is
("\u00e9".encode(), "%C3%A9"),
("\u00e9".encode("latin1"), "%E9"),
# unicode strings become utf8
("\u00e9", "%C3%A9"),
- ] # type: List[Tuple[Union[str, bytes], str]]
+ ]
for unescaped, escaped in tests:
self.assertEqual(url_escape(unescaped), escaped)
# First, confirm the behavior without moment: each coroutine
# monopolizes the event loop until it finishes.
- immediate = Future() # type: Future[None]
+ immediate: Future[None] = Future()
immediate.set_result(None)
yield [f("a", immediate), f("b", immediate)]
self.assertEqual("".join(calls), "aaaaabbbbb")
class UndecoratedCoroutinesHandler(RequestHandler):
@gen.coroutine
def prepare(self):
- self.chunks = [] # type: List[str]
+ self.chunks: List[str] = []
yield gen.moment
self.chunks.append("1")
@gen_test
def test_completes_before_timeout(self):
- future = Future() # type: Future[str]
+ future: Future[str] = Future()
self.io_loop.add_timeout(
datetime.timedelta(seconds=0.1), lambda: future.set_result("asdf")
)
@gen_test
def test_fails_before_timeout(self):
- future = Future() # type: Future[str]
+ future: Future[str] = Future()
self.io_loop.add_timeout(
datetime.timedelta(seconds=0.1),
lambda: future.set_exception(ZeroDivisionError()),
@gen_test
def test_already_resolved(self):
- future = Future() # type: Future[str]
+ future: Future[str] = Future()
future.set_result("asdf")
result = yield gen.with_timeout(datetime.timedelta(seconds=3600), future)
self.assertEqual(result, "asdf")
@gen_test
def test_already_done(self):
- f1 = Future() # type: Future[int]
- f2 = Future() # type: Future[int]
- f3 = Future() # type: Future[int]
+ f1: Future[int] = Future()
+ f2: Future[int] = Future()
+ f3: Future[int] = Future()
f1.set_result(24)
f2.set_result(42)
f3.set_result(84)
@gen_test
def test_iterator(self):
- futures = [Future(), Future(), Future(), Future()] # type: List[Future[int]]
+ futures: List[Future[int]] = [Future(), Future(), Future(), Future()]
self.finish_coroutines(0, futures)
# Recreate the previous test with py35 syntax. It's a little clunky
# because of the way the previous test handles an exception on
# a single iteration.
- futures = [Future(), Future(), Future(), Future()] # type: List[Future[int]]
+ futures: List[Future[int]] = [Future(), Future(), Future(), Future()]
self.finish_coroutines(0, futures)
self.finished = False
def test_gc(self):
# GitHub issue 1769: Runner objects can get GCed unexpectedly
# while their future is alive.
- weakref_scope = [None] # type: List[Optional[weakref.ReferenceType]]
+ weakref_scope: List[Optional[weakref.ReferenceType]] = [None]
def callback():
gc.collect(2)
@gen.coroutine
def tester():
- fut = Future() # type: Future[int]
+ fut: Future[int] = Future()
weakref_scope[0] = weakref.ref(fut)
self.io_loop.add_callback(callback)
yield fut
# their loop is closed, even if they're involved in a reference
# cycle.
loop = self.get_new_ioloop()
- result = [] # type: List[Optional[bool]]
+ result: List[Optional[bool]] = []
wfut = []
@gen.coroutine
result.append(None)
loop = self.get_new_ioloop()
- result = [] # type: List[Optional[bool]]
+ result: List[Optional[bool]] = []
wfut = []
@gen.coroutine
if contextvars is not None:
- ctx_var = contextvars.ContextVar("ctx_var") # type: contextvars.ContextVar[int]
+ ctx_var: contextvars.ContextVar[int] = contextvars.ContextVar("ctx_var")
@unittest.skipIf(contextvars is None, "contextvars module not present")
class HTTP1ConnectionTest(AsyncTestCase):
- code = None # type: typing.Optional[int]
+ code: typing.Optional[int] = None
def setUp(self):
super().setUp()
def test_streaming_callback(self):
# streaming_callback is also tested in test_chunked
- chunks = [] # type: typing.List[bytes]
+ chunks: typing.List[bytes] = []
response = self.fetch("/hello", streaming_callback=chunks.append)
# with streaming_callback, data goes to the callback and not response.body
self.assertEqual(chunks, [b"Hello world!"])
response = self.fetch("/chunk")
self.assertEqual(response.body, b"asdfqwer")
- chunks = [] # type: typing.List[bytes]
+ chunks: typing.List[bytes] = []
response = self.fetch("/chunk", streaming_callback=chunks.append)
self.assertEqual(chunks, [b"asdf", b"qwer"])
self.assertFalse(response.body)
class TypeCheckHandler(RequestHandler):
def prepare(self):
- self.errors = {} # type: Dict[str, str]
+ self.errors: Dict[str, str] = {}
fields = [
("method", str),
("uri", str),
self.connection = connection
def headers_received(self, start_line, headers):
- self.chunk_lengths = [] # type: List[int]
+ self.chunk_lengths: List[int] = []
def data_received(self, chunk):
self.chunk_lengths.append(len(chunk))
def setUp(self):
super().setUp()
- self.streams = [] # type: List[IOStream]
+ self.streams: List[IOStream] = []
def tearDown(self):
super().tearDown()
def test_timeout_with_arguments(self):
# This tests that all the timeout methods pass through *args correctly.
- results = [] # type: List[int]
+ results: List[int] = []
self.io_loop.add_timeout(self.io_loop.time(), results.append, 1)
self.io_loop.add_timeout(datetime.timedelta(seconds=0), results.append, 2)
self.io_loop.call_at(self.io_loop.time(), results.append, 3)
class TestIOLoopCurrent(unittest.TestCase):
def setUp(self):
setup_with_context_manager(self, ignore_deprecation())
- self.io_loop = None # type: typing.Optional[IOLoop]
+ self.io_loop: typing.Optional[IOLoop] = None
IOLoop.clear_current()
def tearDown(self):
@gen.coroutine
def make_iostream_pair(self, **kwargs):
listener, port = bind_unused_port()
- server_stream_fut = Future() # type: Future[IOStream]
+ server_stream_fut: Future[IOStream] = Future()
def accept_callback(connection, address):
server_stream_fut.set_result(
super().setUp()
self.listener, self.port = bind_unused_port()
self.server_stream = None
- self.server_accepted = Future() # type: Future[None]
+ self.server_accepted: Future[None] = Future()
netutil.add_accept_handler(self.listener, self.accept)
- self.client_stream = IOStream(
- socket.socket()
- ) # type: typing.Optional[IOStream]
+ self.client_stream: typing.Optional[IOStream] = IOStream(socket.socket())
self.io_loop.add_future(
self.client_stream.connect(("127.0.0.1", self.port)), self.stop
)
@gen_test
def test_wait_for_handshake_future(self):
test = self
- handshake_future = Future() # type: Future[None]
+ handshake_future: Future[None] = Future()
class TestServer(TCPServer):
def handle_stream(self, stream, address):
@gen_test
def test_wait_for_handshake_already_waiting_error(self):
test = self
- handshake_future = Future() # type: Future[None]
+ handshake_future: Future[None] = Future()
class TestServer(TCPServer):
@gen.coroutine
@gen_test
def test_wait_for_handshake_already_connected(self):
- handshake_future = Future() # type: Future[None]
+ handshake_future: Future[None] = Future()
class TestServer(TCPServer):
@gen.coroutine
tornado.locale.Locale._cache = {}
def setUp(self):
- self.saved = {} # type: dict
+ self.saved: dict = {}
for var in TranslationLoaderTest.SAVE_VARS:
self.saved[var] = getattr(tornado.locale, var)
self.clear_locale_cache()
class ConditionTest(AsyncTestCase):
def setUp(self):
super().setUp()
- self.history = [] # type: typing.List[typing.Union[int, str]]
+ self.history: typing.List[typing.Union[int, str]] = []
def record_done(self, future, key):
"""Record the resolution of a Future returned by Condition.wait."""
@abstract_base_test
class _ResolverTestMixin(AsyncTestCase):
- resolver = None # type: typing.Any
+ resolver: typing.Any = None
@gen_test
def test_localhost(self):
# resolution, so test this case separately, using mocks as needed.
@abstract_base_test
class _ResolverErrorTestMixin(AsyncTestCase):
- resolver = None # type: typing.Any
+ resolver: typing.Any = None
@gen_test
def test_bad_host(self):
options.foo = "2"
def test_setattr_with_callback(self):
- values = [] # type: List[int]
+ values: List[int] = []
options = OptionParser()
options.define("foo", default=1, type=int, callback=values.append)
options.foo = 2
class QueueBasicTest(AsyncTestCase):
def test_repr_and_str(self):
- q = queues.Queue(maxsize=1) # type: queues.Queue[None]
+ q: queues.Queue[None] = queues.Queue(maxsize=1)
self.assertIn(hex(id(q)), repr(q))
self.assertNotIn(hex(id(q)), str(q))
q.get()
self.assertIn("tasks=2", q_str)
def test_order(self):
- q = queues.Queue() # type: queues.Queue[int]
+ q: queues.Queue[int] = queues.Queue()
for i in [1, 3, 2]:
q.put_nowait(i)
self.assertRaises(TypeError, queues.Queue, maxsize=None)
self.assertRaises(ValueError, queues.Queue, maxsize=-1)
- q = queues.Queue(maxsize=2) # type: queues.Queue[int]
+ q: queues.Queue[int] = queues.Queue(maxsize=2)
self.assertTrue(q.empty())
self.assertFalse(q.full())
self.assertEqual(2, q.maxsize)
class QueueGetTest(AsyncTestCase):
@gen_test
def test_blocking_get(self):
- q = queues.Queue() # type: queues.Queue[int]
+ q: queues.Queue[int] = queues.Queue()
q.put_nowait(0)
self.assertEqual(0, (yield q.get()))
def test_nonblocking_get(self):
- q = queues.Queue() # type: queues.Queue[int]
+ q: queues.Queue[int] = queues.Queue()
q.put_nowait(0)
self.assertEqual(0, q.get_nowait())
def test_nonblocking_get_exception(self):
- q = queues.Queue() # type: queues.Queue[int]
+ q: queues.Queue[int] = queues.Queue()
self.assertRaises(queues.QueueEmpty, q.get_nowait)
@gen_test
def test_get_with_putters(self):
- q = queues.Queue(1) # type: queues.Queue[int]
+ q: queues.Queue[int] = queues.Queue(1)
q.put_nowait(0)
put = q.put(1)
self.assertEqual(0, (yield q.get()))
@gen_test
def test_blocking_get_wait(self):
- q = queues.Queue() # type: queues.Queue[int]
+ q: queues.Queue[int] = queues.Queue()
q.put(0)
self.io_loop.call_later(0.01, q.put_nowait, 1)
self.io_loop.call_later(0.02, q.put_nowait, 2)
@gen_test
def test_get_timeout(self):
- q = queues.Queue() # type: queues.Queue[int]
+ q: queues.Queue[int] = queues.Queue()
get_timeout = q.get(timeout=timedelta(seconds=0.01))
get = q.get()
with self.assertRaises(TimeoutError):
@gen_test
def test_get_timeout_preempted(self):
- q = queues.Queue() # type: queues.Queue[int]
+ q: queues.Queue[int] = queues.Queue()
get = q.get(timeout=timedelta(seconds=0.01))
q.put(0)
yield gen.sleep(0.02)
@gen_test
def test_get_clears_timed_out_putters(self):
- q = queues.Queue(1) # type: queues.Queue[int]
+ q: queues.Queue[int] = queues.Queue(1)
# First putter succeeds, remainder block.
putters = [q.put(i, timedelta(seconds=0.01)) for i in range(10)]
put = q.put(10)
@gen_test
def test_get_clears_timed_out_getters(self):
- q = queues.Queue() # type: queues.Queue[int]
+ q: queues.Queue[int] = queues.Queue()
getters = [
asyncio.ensure_future(q.get(timedelta(seconds=0.01))) for _ in range(10)
]
@gen_test
def test_async_for(self):
- q = queues.Queue() # type: queues.Queue[int]
+ q: queues.Queue[int] = queues.Queue()
for i in range(5):
q.put(i)
class QueuePutTest(AsyncTestCase):
@gen_test
def test_blocking_put(self):
- q = queues.Queue() # type: queues.Queue[int]
+ q: queues.Queue[int] = queues.Queue()
q.put(0)
self.assertEqual(0, q.get_nowait())
def test_nonblocking_put_exception(self):
- q = queues.Queue(1) # type: queues.Queue[int]
+ q: queues.Queue[int] = queues.Queue(1)
q.put(0)
self.assertRaises(queues.QueueFull, q.put_nowait, 1)
@gen_test
def test_put_with_getters(self):
- q = queues.Queue() # type: queues.Queue[int]
+ q: queues.Queue[int] = queues.Queue()
get0 = q.get()
get1 = q.get()
yield q.put(0)
@gen_test
def test_nonblocking_put_with_getters(self):
- q = queues.Queue() # type: queues.Queue[int]
+ q: queues.Queue[int] = queues.Queue()
get0 = q.get()
get1 = q.get()
q.put_nowait(0)
@gen_test
def test_blocking_put_wait(self):
- q = queues.Queue(1) # type: queues.Queue[int]
+ q: queues.Queue[int] = queues.Queue(1)
q.put_nowait(0)
def get_and_discard():
@gen_test
def test_put_timeout(self):
- q = queues.Queue(1) # type: queues.Queue[int]
+ q: queues.Queue[int] = queues.Queue(1)
q.put_nowait(0) # Now it's full.
put_timeout = q.put(1, timeout=timedelta(seconds=0.01))
put = q.put(2)
@gen_test
def test_put_timeout_preempted(self):
- q = queues.Queue(1) # type: queues.Queue[int]
+ q: queues.Queue[int] = queues.Queue(1)
q.put_nowait(0)
put = q.put(1, timeout=timedelta(seconds=0.01))
q.get()
@gen_test
def test_put_clears_timed_out_putters(self):
- q = queues.Queue(1) # type: queues.Queue[int]
+ q: queues.Queue[int] = queues.Queue(1)
# First putter succeeds, remainder block.
putters = [q.put(i, timedelta(seconds=0.01)) for i in range(10)]
put = q.put(10)
@gen_test
def test_put_clears_timed_out_getters(self):
- q = queues.Queue() # type: queues.Queue[int]
+ q: queues.Queue[int] = queues.Queue()
getters = [
asyncio.ensure_future(q.get(timedelta(seconds=0.01))) for _ in range(10)
]
queue_class = queues.Queue
def test_task_done_underflow(self):
- q = self.queue_class() # type: queues.Queue
+ q: queues.Queue = self.queue_class()
self.assertRaises(ValueError, q.task_done)
@gen_test
def test_task_done(self):
- q = self.queue_class() # type: queues.Queue
+ q: queues.Queue = self.queue_class()
for i in range(100):
q.put_nowait(i)
@gen_test
def test_task_done_delay(self):
# Verify it is task_done(), not get(), that unblocks join().
- q = self.queue_class() # type: queues.Queue
+ q: queues.Queue = self.queue_class()
q.put_nowait(0)
join = asyncio.ensure_future(q.join())
self.assertFalse(join.done())
@gen_test
def test_join_empty_queue(self):
- q = self.queue_class() # type: queues.Queue
+ q: queues.Queue = self.queue_class()
yield q.join()
yield q.join()
@gen_test
def test_join_timeout(self):
- q = self.queue_class() # type: queues.Queue
+ q: queues.Queue = self.queue_class()
q.put(0)
with self.assertRaises(TimeoutError):
yield q.join(timeout=timedelta(seconds=0.01))
class ProducerConsumerTest(AsyncTestCase):
@gen_test
def test_producer_consumer(self):
- q = queues.Queue(maxsize=3) # type: queues.Queue[int]
+ q: queues.Queue[int] = queues.Queue(maxsize=3)
history = []
# We don't yield between get() and task_done(), so get() must wait for
self.assertEqual(response.body, b"OK")
-resources = {} # type: typing.Dict[str, bytes]
+resources: typing.Dict[str, bytes] = {}
class GetResource(RequestHandler):
class CustomRouter(ReversibleRouter):
def __init__(self):
super().__init__()
- self.routes = {} # type: typing.Dict[str, typing.Any]
+ self.routes: typing.Dict[str, typing.Any] = {}
def add_routes(self, routes):
self.routes.update(routes)
def mixin_get_app(self):
# callable objects to finish pending /trigger requests
- self.triggers = (
- collections.deque()
- ) # type: typing.Deque[typing.Callable[[], None]]
+ self.triggers: typing.Deque[typing.Callable[[], None]] = collections.deque()
return Application(
[
url(
# simple_httpclient_test, but it fails with the version of libcurl
# available on travis-ci. Move it when that has been upgraded
# or we have a better framework to skip tests based on curl version.
- headers = [] # type: typing.List[str]
- chunk_bytes = [] # type: typing.List[bytes]
+ headers: typing.List[str] = []
+ chunk_bytes: typing.List[bytes] = []
self.fetch(
"/redirect?url=/hello",
header_callback=headers.append,
self.assertEqual(num_start_lines, 1)
def test_streaming_callback_coroutine(self: typing.Any):
- headers = [] # type: typing.List[str]
- chunk_bytes = [] # type: typing.List[bytes]
+ headers: typing.List[str] = []
+ chunk_bytes: typing.List[bytes] = []
import asyncio
class TestTCPServer(TCPServer):
def __init__(self, family):
super().__init__()
- self.streams = [] # type: List[IOStream]
- self.queue = Queue() # type: Queue[IOStream]
+ self.streams: List[IOStream] = []
+ self.queue: Queue[IOStream] = Queue()
sockets = bind_sockets(0, "localhost", family)
self.add_sockets(sockets)
self.port = sockets[0].getsockname()[1]
def setUp(self):
super().setUp()
- self.connect_futures = (
- {}
- ) # type: Dict[Tuple[int, typing.Any], Future[ConnectorTest.FakeStream]]
- self.streams = {} # type: Dict[typing.Any, ConnectorTest.FakeStream]
+ self.connect_futures: Dict[
+ Tuple[int, typing.Any], Future[ConnectorTest.FakeStream]
+ ] = {}
+ self.streams: Dict[typing.Any, ConnectorTest.FakeStream] = {}
self.addrinfo = [(AF1, "a"), (AF1, "b"), (AF2, "c"), (AF2, "d")]
def tearDown(self):
def create_stream(self, af, addr):
stream = ConnectorTest.FakeStream()
self.streams[addr] = stream
- future = Future() # type: Future[ConnectorTest.FakeStream]
+ future: Future[ConnectorTest.FakeStream] = Future()
self.connect_futures[(af, addr)] = future
return stream, future
self.assertTrue("# test.html:2" in traceback.format_exc())
def test_error_line_number_module(self):
- loader = None # type: typing.Optional[DictLoader]
+ loader: typing.Optional[DictLoader] = None
def load_generate(path, **kwargs):
assert loader is not None
# globals: it's all global from the perspective of code defined
# in s.
global_namespace = dict(caller_globals, **caller_locals) # type: ignore
- local_namespace = {} # type: typing.Dict[str, typing.Any]
+ local_namespace: typing.Dict[str, typing.Any] = {}
exec(textwrap.dedent(s), global_namespace, local_namespace)
return local_namespace
# stub out enough methods to make the signed_cookie functions work
def __init__(self, cookie_secret="0123456789", key_version=None):
# don't call super.__init__
- self._cookies = {} # type: typing.Dict[str, bytes]
+ self._cookies: typing.Dict[str, bytes] = {}
if key_version is None:
self.application = ObjectDict( # type: ignore
settings=dict(cookie_secret=cookie_secret)
class TypeCheckHandler(RequestHandler):
def prepare(self):
- self.errors = {} # type: typing.Dict[str, str]
+ self.errors: typing.Dict[str, str] = {}
self.check_type("status", self.get_status(), int)
@gen_test
def test_streaming_body(self):
- self.prepared = Future() # type: Future[None]
- self.data = Future() # type: Future[bytes]
- self.finished = Future() # type: Future[None]
+ self.prepared: Future[None] = Future()
+ self.data: Future[bytes] = Future()
+ self.finished: Future[None] = Future()
stream = self.connect(b"/stream_body", connection_close=True)
yield self.prepared
@gen_test
def test_close_during_upload(self):
- self.close_future = Future() # type: Future[None]
+ self.close_future: Future[None] = Future()
stream = self.connect(b"/close_detection", connection_close=False)
stream.close()
yield self.close_future
def initialize(self, test):
self.test = test
self.method = None
- self.methods = [] # type: typing.List[str]
+ self.methods: typing.List[str] = []
@contextlib.contextmanager
def in_method(self, method):
class WebSocketTest(WebSocketBaseTestCase):
def get_app(self):
- self.close_future = Future() # type: Future[None]
+ self.close_future: Future[None] = Future()
return Application(
[
("/echo", EchoHandler, dict(close_future=self.close_future)),
super().__init__(methodName)
self.__stopped = False
self.__running = False
- self.__failure = None # type: Optional[_ExcInfoTuple]
- self.__stop_args = None # type: Any
- self.__timeout = None # type: Optional[object]
+ self.__failure: Optional[_ExcInfoTuple] = None
+ self.__stop_args: Any = None
+ self.__timeout: Optional[object] = None
# Not used in this class itself, but used by @gen_test
- self._test_generator = None # type: Optional[Union[Generator, Coroutine]]
+ self._test_generator: Optional[Union[Generator, Coroutine]] = None
def setUp(self) -> None:
py_ver = sys.version_info
# This is a good case study arguing for either some sort of
# extensibility in the gen decorators or cancellation support.
@functools.wraps(f)
- def pre_coroutine(self, *args, **kwargs):
- # type: (AsyncTestCase, *Any, **Any) -> Union[Generator, Coroutine]
+ def pre_coroutine(
+ self: AsyncTestCase, *args: Any, **kwargs: Any
+ ) -> Union[Generator, Coroutine]:
# Type comments used to avoid pypy3 bug.
result = f(self, *args, **kwargs)
if isinstance(result, Generator) or inspect.iscoroutine(result):
coro = gen.coroutine(pre_coroutine) # type: ignore[assignment]
@functools.wraps(coro)
- def post_coroutine(self, *args, **kwargs):
- # type: (AsyncTestCase, *Any, **Any) -> None
+ def post_coroutine(self: AsyncTestCase, *args: Any, **kwargs: Any) -> None:
try:
return self.io_loop.run_sync(
functools.partial(coro, self, *args, **kwargs), timeout=timeout
self.deprecated_level_matched = 0
self.logged_stack = False
self.level = level
- self.orig_level = None # type: Optional[int]
+ self.orig_level: Optional[int] = None
def filter(self, record: logging.LogRecord) -> bool:
if record.exc_info:
and `.Resolver`.
"""
+from __future__ import annotations
+
import array
import asyncio
from inspect import getfullargspec
def __new__(cls, *args: Any, **kwargs: Any) -> Any:
base = cls.configurable_base()
- init_kwargs = {} # type: Dict[str, Any]
+ init_kwargs: Dict[str, Any] = {}
if cls is base:
impl = cls.configured_class()
if base.__impl_kwargs:
return instance
@classmethod
- def configurable_base(cls):
- # type: () -> Type[Configurable]
+ def configurable_base(cls) -> Type[Configurable]:
"""Returns the base class of a configurable hierarchy.
This will normally return the class in which it is defined.
raise NotImplementedError()
@classmethod
- def configurable_default(cls):
- # type: () -> Type[Configurable]
+ def configurable_default(cls) -> Type[Configurable]:
"""Returns the implementation class to be used if none is configured."""
raise NotImplementedError()
def _initialize(self) -> None:
pass
- initialize = _initialize # type: Callable[..., None]
+ initialize: Callable[..., None] = _initialize
"""Initialize a `Configurable` subclass instance.
Configurable classes should use `initialize` instead of ``__init__``.
"""
@classmethod
- def configure(cls, impl, **kwargs):
- # type: (Union[None, str, Type[Configurable]], Any) -> None
+ def configure(
+ cls, impl: Union[None, str, Type[Configurable]], **kwargs: Any
+ ) -> None:
"""Sets the class to use when the base class is instantiated.
Keyword arguments will be saved and added to the arguments passed
base.__impl_kwargs = kwargs
@classmethod
- def configured_class(cls):
- # type: () -> Type[Configurable]
+ def configured_class(cls) -> Type[Configurable]:
"""Returns the currently configured class."""
base = cls.configurable_base()
# Manually mangle the private name to see whether this base
raise ValueError("configured class not found")
@classmethod
- def _save_configuration(cls):
- # type: () -> Tuple[Optional[Type[Configurable]], Optional[Dict[str, Any]]]
+ def _save_configuration(
+ cls,
+ ) -> Tuple[Optional[Type[Configurable]], Optional[Dict[str, Any]]]:
base = cls.configurable_base()
return (base.__impl_class, base.__impl_kwargs)
@classmethod
- def _restore_configuration(cls, saved):
- # type: (Tuple[Optional[Type[Configurable]], Optional[Dict[str, Any]]]) -> None
+ def _restore_configuration(
+ cls, saved: Tuple[Optional[Type[Configurable]], Optional[Dict[str, Any]]]
+ ) -> None:
base = cls.configurable_base()
base.__impl_class = saved[0]
base.__impl_kwargs = saved[1]
def __init__(self, func: Callable, name: str) -> None:
self.name = name
try:
- self.arg_pos = self._getargnames(func).index(name) # type: Optional[int]
+ self.arg_pos: Optional[int] = self._getargnames(func).index(name)
except ValueError:
# Not a positional parameter
self.arg_pos = None
return old_value, args, kwargs
-def timedelta_to_seconds(td):
- # type: (datetime.timedelta) -> float
+def timedelta_to_seconds(td: datetime.timedelta) -> float:
"""Equivalent to ``td.total_seconds()`` (introduced in Python 2.7)."""
return td.total_seconds()
_websocket_mask = _websocket_mask_python
-def doctests():
- # type: () -> unittest.TestSuite
+def doctests() -> unittest.TestSuite:
import doctest
return doctest.DocTestSuite()
"OPTIONS",
)
- _template_loaders = {} # type: Dict[str, template.BaseLoader]
+ _template_loaders: Dict[str, template.BaseLoader] = {}
_template_loader_lock = threading.Lock()
_remove_control_chars_regex = re.compile(r"[\x00-\x08\x0e-\x1f]")
def _initialize(self) -> None:
pass
- initialize = _initialize # type: Callable[..., None]
+ initialize: Callable[..., None] = _initialize
"""Hook for subclass initialization. Called for each request.
A dictionary passed as the third argument of a ``URLSpec`` will be
def _unimplemented_method(self, *args: str, **kwargs: str) -> None:
raise HTTPError(405)
- head = _unimplemented_method # type: Callable[..., Optional[Awaitable[None]]]
- get = _unimplemented_method # type: Callable[..., Optional[Awaitable[None]]]
- post = _unimplemented_method # type: Callable[..., Optional[Awaitable[None]]]
- delete = _unimplemented_method # type: Callable[..., Optional[Awaitable[None]]]
- patch = _unimplemented_method # type: Callable[..., Optional[Awaitable[None]]]
- put = _unimplemented_method # type: Callable[..., Optional[Awaitable[None]]]
- options = _unimplemented_method # type: Callable[..., Optional[Awaitable[None]]]
+ head: Callable[..., Optional[Awaitable[None]]] = _unimplemented_method
+ get: Callable[..., Optional[Awaitable[None]]] = _unimplemented_method
+ post: Callable[..., Optional[Awaitable[None]]] = _unimplemented_method
+ delete: Callable[..., Optional[Awaitable[None]]] = _unimplemented_method
+ patch: Callable[..., Optional[Awaitable[None]]] = _unimplemented_method
+ put: Callable[..., Optional[Awaitable[None]]] = _unimplemented_method
+ options: Callable[..., Optional[Awaitable[None]]] = _unimplemented_method
def prepare(self) -> Optional[Awaitable[None]]:
"""Called at the beginning of a request before `get`/`post`/etc.
}
)
self.set_default_headers()
- self._write_buffer = [] # type: List[bytes]
+ self._write_buffer: List[bytes] = []
self._status_code = 200
self._reason = httputil.responses[200]
f"Invalid cookie attribute {attr_name}={attr_value!r} for cookie {name!r}"
)
if not hasattr(self, "_new_cookie"):
- self._new_cookie = (
- http.cookies.SimpleCookie()
- ) # type: http.cookies.SimpleCookie
+ self._new_cookie: http.cookies.SimpleCookie = http.cookies.SimpleCookie()
if name in self._new_cookie:
del self._new_cookie[name]
self._new_cookie[name] = value
Override this method in a sub-classed controller to change the output.
"""
paths = []
- unique_paths = set() # type: Set[str]
+ unique_paths: Set[str] = set()
for path in js_files:
if not is_absolute(path):
Override this method in a sub-classed controller to change the output.
"""
paths = []
- unique_paths = set() # type: Set[str]
+ unique_paths: Set[str] = set()
for path in css_files:
if not is_absolute(path):
if self.request.method != "HEAD":
return self.request.connection.write(chunk)
else:
- future = Future() # type: Future[None]
+ future: Future[None] = Future()
future.set_result(None)
return future
def _ui_module(self, name: str, module: Type["UIModule"]) -> Callable[..., str]:
def render(*args, **kwargs) -> str: # type: ignore
if not hasattr(self, "_active_modules"):
- self._active_modules = {} # type: Dict[str, UIModule]
+ self._active_modules: Dict[str, UIModule] = {}
if name not in self._active_modules:
self._active_modules[name] = module(self)
rendered = self._active_modules[name].render(*args, **kwargs)
**settings: Any,
) -> None:
if transforms is None:
- self.transforms = [] # type: List[Type[OutputTransform]]
+ self.transforms: List[Type[OutputTransform]] = []
if settings.get("compress_response") or settings.get("gzip"):
self.transforms.append(GZipContentEncoding)
else:
"xsrf_form_html": _xsrf_form_html,
"Template": TemplateModule,
}
- self.ui_methods = {} # type: Dict[str, Callable[..., str]]
+ self.ui_methods: Dict[str, Callable[..., str]] = {}
self._load_ui_modules(settings.get("ui_modules", {}))
self._load_ui_methods(settings.get("ui_methods", {}))
if self.settings.get("static_path"):
self.handler_kwargs = handler_kwargs or {}
self.path_args = path_args or []
self.path_kwargs = path_kwargs or {}
- self.chunks = [] # type: List[bytes]
+ self.chunks: List[bytes] = []
self.stream_request_body = _has_stream_request_body(self.handler_class)
def headers_received(
CACHE_MAX_AGE = 86400 * 365 * 10 # 10 years
- _static_hashes = {} # type: Dict[str, Optional[str]]
+ _static_hashes: Dict[str, Optional[str]] = {}
_lock = threading.Lock() # protects _static_hashes
def initialize(self, path: str, default_filename: Optional[str] = None) -> None:
if start is not None:
file.seek(start)
if end is not None:
- remaining = end - (start or 0) # type: Optional[int]
+ remaining: Optional[int] = end - (start or 0)
else:
remaining = None
while True:
def __init__(self, handler: RequestHandler) -> None:
super().__init__(handler)
# keep resources in both a list and a dict to preserve order
- self._resource_list = [] # type: List[Dict[str, Any]]
- self._resource_dict = {} # type: Dict[str, Dict[str, Any]]
+ self._resource_list: List[Dict[str, Any]] = []
+ self._resource_dict: Dict[str, Dict[str, Any]] = {}
def render(self, path: str, **kwargs: Any) -> bytes:
def set_resources(**kwargs) -> str: # type: ignore
**kwargs: Any,
) -> None:
super().__init__(application, request, **kwargs)
- self.ws_connection = None # type: Optional[WebSocketProtocol]
- self.close_code = None # type: Optional[int]
- self.close_reason = None # type: Optional[str]
+ self.ws_connection: Optional[WebSocketProtocol] = None
+ self.close_code: Optional[int] = None
+ self.close_reason: Optional[str] = None
self._on_close_called = False
async def get(self, *args: Any, **kwargs: Any) -> None:
def _open(self, *args: str, **kwargs: str) -> Optional[Awaitable[None]]:
pass
- open = _open # type: Callable[..., Optional[Awaitable[None]]]
+ open: Callable[..., Optional[Awaitable[None]]] = _open
"""Invoked when a new WebSocket is opened.
The arguments to `open` are extracted from the `tornado.web.URLSpec`
def __init__(self, handler: "_WebSocketDelegate") -> None:
self.handler = handler
- self.stream = None # type: Optional[IOStream]
+ self.stream: Optional[IOStream] = None
self.client_terminated = False
self.server_terminated = False
self._mem_level = compression_options["mem_level"]
if persistent:
- self._compressor = self._create_compressor() # type: Optional[_Compressor]
+ self._compressor: Optional[_Compressor] = self._create_compressor()
else:
self._compressor = None
)
self._max_wbits = max_wbits
if persistent:
- self._decompressor = (
- self._create_decompressor()
- ) # type: Optional[_Decompressor]
+ self._decompressor: Optional[_Decompressor] = self._create_decompressor()
else:
self._decompressor = None
self._final_frame = False
self._frame_opcode = None
self._masked_frame = None
- self._frame_mask = None # type: Optional[bytes]
+ self._frame_mask: Optional[bytes] = None
self._frame_length = None
- self._fragmented_message_buffer = None # type: Optional[bytearray]
+ self._fragmented_message_buffer: Optional[bytearray] = None
self._fragmented_message_opcode = None
- self._waiting = None # type: object
+ self._waiting: object = None
self._compression_options = params.compression_options
- self._decompressor = None # type: Optional[_PerMessageDeflateDecompressor]
- self._compressor = None # type: Optional[_PerMessageDeflateCompressor]
- self._frame_compressed = None # type: Optional[bool]
+ self._decompressor: Optional[_PerMessageDeflateDecompressor] = None
+ self._compressor: Optional[_PerMessageDeflateCompressor] = None
+ self._frame_compressed: Optional[bool] = None
# The total uncompressed size of all messages received or sent.
# Unicode messages are encoded to utf8.
# Only for testing; subject to change.
# the effect of compression, frame overhead, and control frames.
self._wire_bytes_in = 0
self._wire_bytes_out = 0
- self._received_pong = False # type: bool
- self.close_code = None # type: Optional[int]
- self.close_reason = None # type: Optional[str]
- self._ping_coroutine = None # type: Optional[asyncio.Task]
+ self._received_pong: bool = False
+ self.close_code: Optional[int] = None
+ self.close_reason: Optional[str] = None
+ self._ping_coroutine: Optional[asyncio.Task] = None
# Use a property for this to satisfy the abc.
@property
"""Converts a websocket agreed_parameters set to keyword arguments
for our compressor objects.
"""
- options = dict(
+ options: Dict[str, Any] = dict(
persistent=(side + "_no_context_takeover") not in agreed_parameters
- ) # type: Dict[str, Any]
+ )
wbits_header = agreed_parameters.get(side + "_max_window_bits", None)
if wbits_header is None:
options["max_wbits"] = zlib.MAX_WBITS
subprotocols: Optional[List[str]] = None,
resolver: Optional[Resolver] = None,
) -> None:
- self.connect_future = Future() # type: Future[WebSocketClientConnection]
- self.read_queue = Queue(1) # type: Queue[Union[None, str, bytes]]
+ self.connect_future: Future[WebSocketClientConnection] = Future()
+ self.read_queue: Queue[Union[None, str, bytes]] = Queue(1)
self.key = base64.b64encode(os.urandom(16))
self._on_message_callback = on_message_callback
- self.close_code = None # type: Optional[int]
- self.close_reason = None # type: Optional[str]
+ self.close_code: Optional[int] = None
+ self.close_reason: Optional[str] = None
self.params = _WebSocketParams(
ping_interval=ping_interval,
ping_timeout=ping_timeout,
IOLoop.current().spawn_callback(self.handle_request, request)
async def handle_request(self, request: httputil.HTTPServerRequest) -> None:
- data = {} # type: Dict[str, Any]
- response = [] # type: List[bytes]
+ data: Dict[str, Any] = {}
+ response: List[bytes] = []
def start_response(
status: str,
status_code_str, reason = data["status"].split(" ", 1)
status_code = int(status_code_str)
- headers = data["headers"] # type: List[Tuple[str, str]]
+ headers: List[Tuple[str, str]] = data["headers"]
header_set = {k.lower() for (k, v) in headers}
body = escape.utf8(body)
if status_code != 304: