black
flake8
-mypy==0.740
+mypy
tox
black==20.8b1
flake8==3.8.4
-mypy==0.740
+mypy==0.790
tox==3.20.1
## The following requirements were added by pip freeze:
appdirs==1.4.4
_reload_attempted = True
for fn in _reload_hooks:
fn()
- if hasattr(signal, "setitimer"):
+ if sys.platform != "win32":
# Clear the alarm signal set by
# ioloop.set_blocking_log_threshold so it doesn't fire
# after the exec.
# SyntaxErrors are special: their innermost stack frame is fake
# so extract_tb won't see it and we have to get the filename
# from the exception object.
- watch(e.filename)
+ if e.filename is not None:
+ watch(e.filename)
else:
logging.basicConfig()
gen_log.info("Script exited normally")
return
if hasattr(a, "exc_info") and a.exc_info() is not None: # type: ignore
future_set_exc_info(b, a.exc_info()) # type: ignore
- elif a.exception() is not None:
- b.set_exception(a.exception())
else:
- b.set_result(a.result())
+ a_exc = a.exception()
+ if a_exc is not None:
+ b.set_exception(a_exc)
+ else:
+ b.set_result(a.result())
if isinstance(a, Future):
future_add_done_callback(a, copy)
"callback": callback,
"queue_start_time": queue_start_time,
"curl_start_time": time.time(),
- "curl_start_ioloop_time": self.io_loop.current().time(),
+ "curl_start_ioloop_time": self.io_loop.current().time(), # type: ignore
}
try:
self._curl_setup_request(
return len(b)
else:
- write_function = buffer.write
+ write_function = buffer.write # type: ignore
curl.setopt(pycurl.WRITEFUNCTION, write_function)
curl.setopt(pycurl.FOLLOWLOCATION, request.follow_redirects)
curl.setopt(pycurl.MAXREDIRS, request.max_redirects)
"""
import calendar
-import collections
+import collections.abc
import copy
import datetime
import email.utils
% (normalized, self._options[normalized].file_name)
)
frame = sys._getframe(0)
- options_file = frame.f_code.co_filename
-
- # Can be called directly, or through top level define() fn, in which
- # case, step up above that frame to look for real caller.
- if (
- frame.f_back.f_code.co_filename == options_file
- and frame.f_back.f_code.co_name == "define"
- ):
- frame = frame.f_back
-
- file_name = frame.f_back.f_code.co_filename
+ if frame is not None:
+ options_file = frame.f_code.co_filename
+
+ # Can be called directly, or through top level define() fn, in which
+ # case, step up above that frame to look for real caller.
+ if (
+ frame.f_back is not None
+ and frame.f_back.f_code.co_filename == options_file
+ and frame.f_back.f_code.co_name == "define"
+ ):
+ frame = frame.f_back
+
+ assert frame.f_back is not None
+ file_name = frame.f_back.f_code.co_filename
+ else:
+ file_name = "<unknown>"
if file_name == options_file:
file_name = ""
if type is None:
% (self.request.network_interface,)
)
- timeout = (
- min(self.request.connect_timeout, self.request.request_timeout)
- or self.request.connect_timeout
- or self.request.request_timeout
- ) # min but skip zero
+ if self.request.connect_timeout and self.request.request_timeout:
+ timeout = min(
+ self.request.connect_timeout, self.request.request_timeout
+ )
+ elif self.request.connect_timeout:
+ timeout = self.request.connect_timeout
+ elif self.request.request_timeout:
+ timeout = self.request.request_timeout
+ else:
+ timeout = 0
if timeout:
self._timeout = self.io_loop.add_timeout(
self.start_time + timeout,
original_request = getattr(self.request, "original_request", self.request)
if self._should_follow_redirect():
assert isinstance(self.request, _RequestProxy)
+ assert self.headers is not None
new_request = copy.copy(self.request.request)
new_request.url = urllib.parse.urljoin(
self.request.url, self.headers["Location"]
)
+ assert self.request.max_redirects is not None
new_request.max_redirects = self.request.max_redirects - 1
del new_request.headers["Host"]
# https://tools.ietf.org/html/rfc7231#section-6.4
self.code in (301, 302) and self.request.method == "POST"
):
new_request.method = "GET"
- new_request.body = None
+ new_request.body = None # type: ignore
for h in [
"Content-Length",
"Content-Type",
del self.request.headers[h]
except KeyError:
pass
- new_request.original_request = original_request
+ new_request.original_request = original_request # type: ignore
final_callback = self.final_callback
- self.final_callback = None
+ self.final_callback = None # type: ignore
self._release()
+ assert self.client is not None
fut = self.client.fetch(new_request, raise_error=False)
fut.add_done_callback(lambda f: final_callback(f.result()))
self._on_end_request()
# cases are covered by the first match for str.
if isinstance(value, str):
retval = value
- elif isinstance(value, bytes): # py3
+ elif isinstance(value, bytes):
# Non-ascii characters in headers are not well supported,
# but if you pass bytes, use latin1 so they pass through as-is.
retval = value.decode("latin1")
- elif isinstance(value, unicode_type): # py2
- # TODO: This is inconsistent with the use of latin1 above,
- # but it's been that way for a long time. Should it change?
- retval = escape.utf8(value)
elif isinstance(value, numbers.Integral):
# return immediately since we know the converted value will be safe
return str(value)
if not template_path:
frame = sys._getframe(0)
web_file = frame.f_code.co_filename
- while frame.f_code.co_filename == web_file:
+ while frame.f_code.co_filename == web_file and frame.f_back is not None:
frame = frame.f_back
assert frame.f_code.co_filename is not None
template_path = os.path.dirname(frame.f_code.co_filename)