import urlparse
import urllib as urllib_parse
-try:
- long # py2
-except NameError:
- long = int # py3
-
class AuthError(Exception):
pass
import functools
import logging
import os
-import pkgutil
+import pkgutil # type: ignore
import sys
import traceback
import types
_watched_files = set()
_reload_hooks = []
_reload_attempted = False
-_io_loops = weakref.WeakKeyDictionary()
+_io_loops = weakref.WeakKeyDictionary() # type: ignore
def start(io_loop=None, check_time=500):
import collections
import functools
import logging
-import pycurl
+import pycurl # type: ignore
import threading
import time
from io import BytesIO
try:
try:
- from functools import singledispatch # py34+
+ # py34+
+ from functools import singledispatch # type: ignore
except ImportError:
from singledispatch import singledispatch # backport
except ImportError:
try:
try:
- from collections.abc import Generator as GeneratorType # py35+
+ # py35+
+ from collections.abc import Generator as GeneratorType # type: ignore
except ImportError:
- from backports_abc import Generator as GeneratorType
+ from backports_abc import Generator as GeneratorType # type: ignore
try:
from inspect import isawaitable # py35+
raise
from types import GeneratorType
- def isawaitable(x):
+ def isawaitable(x): # type: ignore
return False
if PY3:
self.request_time = request_time
self.time_info = time_info or {}
- def _get_body(self):
+ @property
+ def body(self):
if self.buffer is None:
return None
elif self._body is None:
return self._body
- body = property(_get_body)
-
def rethrow(self):
"""If there was an error on the request, raise an `HTTPError`."""
if self.error:
from ssl import SSLError
except ImportError:
# ssl is unavailable on app engine.
- class SSLError(Exception):
+ class _SSLError(Exception):
pass
-
+ # Hack around a mypy limitation. We can't simply put "type: ignore"
+ # on the class definition itself; must go through an assignment.
+ SSLError = _SSLError # type: ignore
# RFC 7230 section 3.5: a recipient MAY recognize a single LF as a line
# terminator and ignore any preceding CR.
name = disp_params["name"]
if disp_params.get("filename"):
ctype = headers.get("Content-Type", "application/unknown")
- files.setdefault(name, []).append(HTTPFile(
+ files.setdefault(name, []).append(HTTPFile( # type: ignore
filename=disp_params["filename"], body=value,
content_type=ctype))
else:
_ERRNO_WOULDBLOCK = (errno.EWOULDBLOCK, errno.EAGAIN)
if hasattr(errno, "WSAEWOULDBLOCK"):
- _ERRNO_WOULDBLOCK += (errno.WSAEWOULDBLOCK,)
+ _ERRNO_WOULDBLOCK += (errno.WSAEWOULDBLOCK,) # type: ignore
# These errnos indicate that a connection has been abruptly terminated.
# They should be caught and handled less noisily than other errors.
errno.ETIMEDOUT)
if hasattr(errno, "WSAECONNRESET"):
- _ERRNO_CONNRESET += (errno.WSAECONNRESET, errno.WSAECONNABORTED, errno.WSAETIMEDOUT)
+ _ERRNO_CONNRESET += (errno.WSAECONNRESET, errno.WSAECONNABORTED, errno.WSAETIMEDOUT) # type: ignore
if sys.platform == 'darwin':
# OSX appears to have a race condition that causes send(2) to return
# http://erickt.github.io/blog/2014/11/19/adventures-in-debugging-a-potential-osx-kernel-bug/
# Since the socket is being closed anyway, treat this as an ECONNRESET
# instead of an unexpected error.
- _ERRNO_CONNRESET += (errno.EPROTOTYPE,)
+ _ERRNO_CONNRESET += (errno.EPROTOTYPE,) # type: ignore
# More non-portable errnos:
_ERRNO_INPROGRESS = (errno.EINPROGRESS,)
if hasattr(errno, "WSAEINPROGRESS"):
- _ERRNO_INPROGRESS += (errno.WSAEINPROGRESS,)
+ _ERRNO_INPROGRESS += (errno.WSAEINPROGRESS,) # type: ignore
class StreamClosedError(IOError):
from tornado._locale_data import LOCALE_NAMES
_default_locale = "en_US"
-_translations = {}
+_translations = {} # type: dict
_supported_locales = frozenset([_default_locale])
_use_gettext = False
CONTEXT_SEPARATOR = "\x04"
ssl_match_hostname = ssl.match_hostname
SSLCertificateError = ssl.CertificateError
elif ssl is None:
- ssl_match_hostname = SSLCertificateError = None
+ ssl_match_hostname = SSLCertificateError = None # type: ignore
else:
import backports.ssl_match_hostname
ssl_match_hostname = backports.ssl_match_hostname.match_hostname
- SSLCertificateError = backports.ssl_match_hostname.CertificateError
+ SSLCertificateError = backports.ssl_match_hostname.CertificateError # type: ignore
if hasattr(ssl, 'SSLContext'):
if hasattr(ssl, 'create_default_context'):
_ERRNO_WOULDBLOCK = (errno.EWOULDBLOCK, errno.EAGAIN)
if hasattr(errno, "WSAEWOULDBLOCK"):
- _ERRNO_WOULDBLOCK += (errno.WSAEWOULDBLOCK,)
+ _ERRNO_WOULDBLOCK += (errno.WSAEWOULDBLOCK,) # type: ignore
# Default backlog used when calling sock.listen()
_DEFAULT_BACKLOG = 128
All ``ThreadedResolvers`` share a single thread pool, whose
size is set by the first one to be created.
"""
- _threadpool = None
- _threadpool_pid = None
+ _threadpool = None # type: ignore
+ _threadpool_pid = None # type: int
def initialize(self, io_loop=None, num_threads=10):
threadpool = ThreadedResolver._create_threadpool(num_threads)
else:
return context.wrap_socket(socket, **kwargs)
else:
- return ssl.wrap_socket(socket, **dict(context, **kwargs))
+ return ssl.wrap_socket(socket, **dict(context, **kwargs)) # type: ignore
except ImportError as e:
# Asyncio itself isn't available; see if trollius is (backport to py26+).
try:
- import trollius as asyncio
+ import trollius as asyncio # type: ignore
except ImportError:
# Re-raise the original asyncio error, not the trollius one.
raise e
return af
if hasattr(convert_yielded, 'register'):
- convert_yielded.register(asyncio.Future, to_tornado_future)
+ convert_yielded.register(asyncio.Future, to_tornado_future) # type: ignore
from __future__ import absolute_import, division, print_function, with_statement
-import pycares
+import pycares # type: ignore
import socket
from tornado import gen
def close(self):
"""Closes the waker's file descriptor(s)."""
raise NotImplementedError()
+
+def monotonic_time():
+ raise NotImplementedError()
import socket
import sys
-import twisted.internet.abstract
-from twisted.internet.defer import Deferred
-from twisted.internet.posixbase import PosixReactorBase
-from twisted.internet.interfaces import \
- IReactorFDSet, IDelayedCall, IReactorTime, IReadDescriptor, IWriteDescriptor
-from twisted.python import failure, log
-from twisted.internet import error
-import twisted.names.cache
-import twisted.names.client
-import twisted.names.hosts
-import twisted.names.resolve
-
-from zope.interface import implementer
+import twisted.internet.abstract # type: ignore
+from twisted.internet.defer import Deferred # type: ignore
+from twisted.internet.posixbase import PosixReactorBase # type: ignore
+from twisted.internet.interfaces import IReactorFDSet, IDelayedCall, IReactorTime, IReadDescriptor, IWriteDescriptor # type: ignore
+from twisted.python import failure, log # type: ignore
+from twisted.internet import error # type: ignore
+import twisted.names.cache # type: ignore
+import twisted.names.client # type: ignore
+import twisted.names.hosts # type: ignore
+import twisted.names.resolve # type: ignore
+
+from zope.interface import implementer # type: ignore
from tornado.concurrent import Future
from tornado.escape import utf8
if not io_loop:
io_loop = tornado.ioloop.IOLoop.current()
reactor = TornadoReactor(io_loop)
- from twisted.internet.main import installReactor
+ from twisted.internet.main import installReactor # type: ignore
installReactor(reactor)
return reactor
def initialize(self, reactor=None, **kwargs):
super(TwistedIOLoop, self).initialize(**kwargs)
if reactor is None:
- import twisted.internet.reactor
+ import twisted.internet.reactor # type: ignore
reactor = twisted.internet.reactor
self.reactor = reactor
self.fds = {}
raise gen.Return(result)
if hasattr(gen.convert_yielded, 'register'):
- @gen.convert_yielded.register(Deferred)
+ @gen.convert_yielded.register(Deferred) # type: ignore
def _(d):
f = Future()
from __future__ import absolute_import, division, print_function, with_statement
-import ctypes
-import ctypes.wintypes
+import ctypes # type: ignore
+import ctypes.wintypes # type: ignore
# See: http://msdn.microsoft.com/en-us/library/ms724935(VS.85).aspx
SetHandleInformation = ctypes.windll.kernel32.SetHandleInformation
STREAM = object()
_initialized = False
- _waiting = {}
+ _waiting = {} # type: ignore
def __init__(self, *args, **kwargs):
self.io_loop = kwargs.pop('io_loop', None) or ioloop.IOLoop.current()
from tornado import escape
from tornado.log import app_log
-from tornado.util import ObjectDict, exec_in, unicode_type
+from tornado.util import ObjectDict, exec_in, unicode_type, PY3
-try:
- from cStringIO import StringIO # py2
-except ImportError:
- from io import StringIO # py3
+if PY3:
+ from io import StringIO
+else:
+ from cStringIO import StringIO
_DEFAULT_AUTOESCAPE = "xhtml_escape"
_UNSET = object()
# __name__ and __loader__ allow the traceback mechanism to find
# the generated source code.
"__name__": self.name.replace('.', '_'),
- "__loader__": ObjectDict(get_source=lambda name: self.code),
+ "__loader__": ObjectDict(get_source=lambda name: self.code), # type: ignore
}
namespace.update(self.namespace)
namespace.update(kwargs)
class ClientTestMixin(object):
def setUp(self):
- super(ClientTestMixin, self).setUp()
+ super(ClientTestMixin, self).setUp() # type: ignore
self.server = CapServer(io_loop=self.io_loop)
sock, port = bind_unused_port()
self.server.add_sockets([sock])
def tearDown(self):
self.server.stop()
- super(ClientTestMixin, self).tearDown()
+ super(ClientTestMixin, self).tearDown() # type: ignore
def test_callback(self):
self.client.capitalize("hello", callback=self.stop)
try:
- import pycurl
+ import pycurl # type: ignore
except ImportError:
pycurl = None
class SSLTestMixin(object):
def get_ssl_options(self):
- return dict(ssl_version=self.get_ssl_version(),
+ return dict(ssl_version=self.get_ssl_version(), # type: ignore
**AsyncHTTPSTestCase.get_ssl_options())
def get_ssl_version(self):
def test_import_pycurl(self):
try:
- import pycurl
+ import pycurl # type: ignore
except ImportError:
pass
else:
import sys
try:
- from unittest import mock # python 3.3
+ from unittest import mock # type: ignore
except ImportError:
try:
- import mock # third-party mock package
+ import mock # type: ignore
except ImportError:
mock = None
stream = IOStream(s, io_loop=self.io_loop)
stream.set_close_callback(self.stop)
with mock.patch('socket.socket.connect',
- side_effect=socket.gaierror('boom')):
+ side_effect=socket.gaierror(errno.EIO, 'boom')):
with ExpectLog(gen_log, "Connect error"):
stream.connect(('localhost', 80), callback=self.stop)
self.wait()
from __future__ import absolute_import, division, print_function, with_statement
+import errno
import os
import signal
import socket
futures = None
try:
- import pycares
+ import pycares # type: ignore
except ImportError:
pycares = None
else:
from tornado.platform.caresresolver import CaresResolver
try:
- import twisted
- import twisted.names
+ import twisted # type: ignore
+ import twisted.names # type: ignore
except ImportError:
twisted = None
else:
def _failing_getaddrinfo(*args):
"""Dummy implementation of getaddrinfo for use in mocks"""
- raise socket.gaierror("mock: lookup failed")
+ raise socket.gaierror(errno.EIO, "mock: lookup failed")
@skipIfNoNetwork
from cStringIO import StringIO
try:
- from unittest import mock # python 3.3
+ # py33+
+ from unittest import mock # type: ignore
except ImportError:
try:
- import mock # third-party mock package
+ import mock # type: ignore
except ImportError:
mock = None
try:
import fcntl
- from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
- from twisted.internet.interfaces import IReadDescriptor, IWriteDescriptor
- from twisted.internet.protocol import Protocol
- from twisted.python import log
+ from twisted.internet.defer import Deferred, inlineCallbacks, returnValue # type: ignore
+ from twisted.internet.interfaces import IReadDescriptor, IWriteDescriptor # type: ignore
+ from twisted.internet.protocol import Protocol # type: ignore
+ from twisted.python import log # type: ignore
from tornado.platform.twisted import TornadoReactor, TwistedIOLoop
- from zope.interface import implementer
+ from zope.interface import implementer # type: ignore
have_twisted = True
except ImportError:
have_twisted = False
# The core of Twisted 12.3.0 is available on python 3, but twisted.web is not
# so test for it separately.
try:
- from twisted.web.client import Agent, readBody
- from twisted.web.resource import Resource
- from twisted.web.server import Site
+ from twisted.web.client import Agent, readBody # type: ignore
+ from twisted.web.resource import Resource # type: ignore
+ from twisted.web.server import Site # type: ignore
# As of Twisted 15.0.0, twisted.web is present but fails our
# tests due to internal str/bytes errors.
have_twisted_web = sys.version_info < (3,)
self._reactor.run()
-class Reader(object):
- def __init__(self, fd, callback):
- self._fd = fd
- self._callback = callback
-
- def logPrefix(self):
- return "Reader"
+if have_twisted:
+ @implementer(IReadDescriptor)
+ class Reader(object):
+ def __init__(self, fd, callback):
+ self._fd = fd
+ self._callback = callback
- def close(self):
- self._fd.close()
+ def logPrefix(self):
+ return "Reader"
- def fileno(self):
- return self._fd.fileno()
+ def close(self):
+ self._fd.close()
- def readConnectionLost(self, reason):
- self.close()
+ def fileno(self):
+ return self._fd.fileno()
- def connectionLost(self, reason):
- self.close()
+ def readConnectionLost(self, reason):
+ self.close()
- def doRead(self):
- self._callback(self._fd)
-if have_twisted:
- Reader = implementer(IReadDescriptor)(Reader)
+ def connectionLost(self, reason):
+ self.close()
+ def doRead(self):
+ self._callback(self._fd)
-class Writer(object):
- def __init__(self, fd, callback):
- self._fd = fd
- self._callback = callback
+ @implementer(IWriteDescriptor)
+ class Writer(object):
+ def __init__(self, fd, callback):
+ self._fd = fd
+ self._callback = callback
- def logPrefix(self):
- return "Writer"
+ def logPrefix(self):
+ return "Writer"
- def close(self):
- self._fd.close()
+ def close(self):
+ self._fd.close()
- def fileno(self):
- return self._fd.fileno()
+ def fileno(self):
+ return self._fd.fileno()
- def connectionLost(self, reason):
- self.close()
+ def connectionLost(self, reason):
+ self.close()
- def doWrite(self):
- self._callback(self._fd)
-if have_twisted:
- Writer = implementer(IWriteDescriptor)(Writer)
+ def doWrite(self):
+ self._callback(self._fd)
@skipIfNoTwisted
test_class = import_object(test_name)
except (ImportError, AttributeError):
continue
- for test_func in blacklist:
+ for test_func in blacklist: # type: ignore
if hasattr(test_class, test_func):
# The test_func may be defined in a mixin, so clobber
# it instead of delattr()
setattr(test_class, test_func, lambda self: None)
def make_test_subclass(test_class):
- class TornadoTest(test_class):
+ class TornadoTest(test_class): # type: ignore
_reactors = ["tornado.platform.twisted._TestReactor"]
def setUp(self):
self.__curdir = os.getcwd()
self.__tempdir = tempfile.mkdtemp()
os.chdir(self.__tempdir)
- super(TornadoTest, self).setUp()
+ super(TornadoTest, self).setUp() # type: ignore
def tearDown(self):
- super(TornadoTest, self).tearDown()
+ super(TornadoTest, self).tearDown() # type: ignore
os.chdir(self.__curdir)
shutil.rmtree(self.__tempdir)
# enabled) but without our filter rules to ignore those
# warnings from Twisted code.
filtered = []
- for w in super(TornadoTest, self).flushWarnings(
+ for w in super(TornadoTest, self).flushWarnings( # type: ignore
*args, **kwargs):
if w['category'] in (BytesWarning, ResourceWarning):
continue
# Twisted recently introduced a new logger; disable that one too.
try:
- from twisted.logger import globalLogBeginner
+ from twisted.logger import globalLogBeginner # type: ignore
except ImportError:
pass
else:
# When configured to use LayeredTwistedIOLoop we can't easily
# get the next-best IOLoop implementation, so use the lowest common
# denominator.
- self.real_io_loop = SelectIOLoop(make_current=False)
+ self.real_io_loop = SelectIOLoop(make_current=False) # type: ignore
reactor = TornadoReactor(io_loop=self.real_io_loop)
super(LayeredTwistedIOLoop, self).initialize(reactor=reactor, **kwargs)
self.add_callback(self.make_current)
# To be used as 'from tornado.test.util import unittest'.
if sys.version_info < (2, 7):
# In py26, we must always use unittest2.
- import unittest2 as unittest
+ import unittest2 as unittest # type: ignore
else:
# Otherwise, use whichever version of unittest was imported in
# tornado.testing.
# Flatten the real global and local namespace into our fake
# globals: it's all global from the perspective of code defined
# in s.
- global_namespace = dict(caller_globals, **caller_locals)
+ global_namespace = dict(caller_globals, **caller_locals) # type: ignore
local_namespace = {}
exec(textwrap.dedent(s), global_namespace, local_namespace)
return local_namespace
import tornado.escape
from tornado.escape import utf8
-from tornado.util import raise_exc_info, Configurable, exec_in, ArgReplacer, timedelta_to_seconds, import_object, re_unescape
+from tornado.util import raise_exc_info, Configurable, exec_in, ArgReplacer, timedelta_to_seconds, import_object, re_unescape, PY3
from tornado.test.util import unittest
-try:
- from cStringIO import StringIO # py2
-except ImportError:
- from io import StringIO # py3
+if PY3:
+ from io import StringIO
+else:
+ from cStringIO import StringIO
class RaiseExcInfoTest(unittest.TestCase):
# don't call super.__init__
self._cookies = {}
if key_version is None:
- self.application = ObjectDict(settings=dict(cookie_secret=cookie_secret))
+ self.application = ObjectDict(settings=dict(cookie_secret=cookie_secret)) # type: ignore
else:
- self.application = ObjectDict(settings=dict(cookie_secret=cookie_secret,
+ self.application = ObjectDict(settings=dict(cookie_secret=cookie_secret, # type: ignore
key_version=key_version))
def get_cookie(self, name):
def test_xsrf_success_header(self):
response = self.fetch("/", method="POST", body=b"",
- headers=dict({"X-Xsrftoken": self.xsrf_token},
+ headers=dict({"X-Xsrftoken": self.xsrf_token}, # type: ignore
**self.cookie_headers()))
self.assertEqual(response.code, 200)
def wrap_web_tests_application():
result = {}
for cls in web_test.wsgi_safe_tests:
- class WSGIApplicationWrappedTest(cls):
+ class WSGIApplicationWrappedTest(cls): # type: ignore
def get_app(self):
self.app = WSGIApplication(self.get_handlers(),
**self.get_app_kwargs())
def wrap_web_tests_adapter():
result = {}
for cls in web_test.wsgi_safe_tests:
- class WSGIAdapterWrappedTest(cls):
+ class WSGIAdapterWrappedTest(cls): # type: ignore
def get_app(self):
self.app = Application(self.get_handlers(),
**self.get_app_kwargs())
except ImportError:
# These modules are not importable on app engine. Parts of this module
# won't work, but e.g. LogTrapTestCase and main() will.
- AsyncHTTPClient = None
- gen = None
- HTTPServer = None
- IOLoop = None
- netutil = None
- SimpleAsyncHTTPClient = None
- Subprocess = None
+ AsyncHTTPClient = None # type: ignore
+ gen = None # type: ignore
+ HTTPServer = None # type: ignore
+ IOLoop = None # type: ignore
+ netutil = None # type: ignore
+ SimpleAsyncHTTPClient = None # type: ignore
+ Subprocess = None # type: ignore
from tornado.log import gen_log, app_log
from tornado.stack_context import ExceptionStackContext
from tornado.util import raise_exc_info, basestring_type, PY3
from cStringIO import StringIO
try:
- from collections.abc import Generator as GeneratorType # py35+
+ from collections.abc import Generator as GeneratorType # type: ignore
except ImportError:
- from types import GeneratorType
+ from types import GeneratorType # type: ignore
if sys.version_info >= (3, 5):
iscoroutine = inspect.iscoroutine
self.assertIn("FriendFeed", response.body)
self.stop()
"""
- def __init__(self, methodName='runTest', **kwargs):
- super(AsyncTestCase, self).__init__(methodName, **kwargs)
+ def __init__(self, methodName='runTest'):
+ super(AsyncTestCase, self).__init__(methodName)
self.__stopped = False
self.__running = False
self.__failure = None
# Without this attribute, nosetests will try to run gen_test as a test
# anywhere it is imported.
-gen_test.__test__ = False
+gen_test.__test__ = False # type: ignore
class LogTrapTestCase(unittest.TestCase):
import urlparse
from urllib import urlencode
+try:
+ import typing # noqa
+except ImportError:
+ pass
+
MIN_SUPPORTED_SIGNED_VALUE_VERSION = 1
"""The oldest signed value version supported by this version of Tornado.
SUPPORTED_METHODS = ("GET", "HEAD", "POST", "DELETE", "PATCH", "PUT",
"OPTIONS")
- _template_loaders = {} # {path: template.BaseLoader}
+ _template_loaders = {} # type: typing.Dict[str, template.BaseLoader]
_template_loader_lock = threading.Lock()
_remove_control_chars_regex = re.compile(r"[\x00-\x08\x0e-\x1f]")
raise ValueError("Unsafe header value %r", value)
return value
- _ARG_DEFAULT = []
+ _ARG_DEFAULT = object()
def get_argument(self, name, default=_ARG_DEFAULT, strip=True):
"""Returns the value of the argument with the given name.
"""
CACHE_MAX_AGE = 86400 * 365 * 10 # 10 years
- _static_hashes = {}
+ _static_hashes = {} # type: typing.Dict
_lock = threading.Lock() # protects _static_hashes
def initialize(self, path, default_filename=None):