This was only necessary in python 3.
This change was made with a version of pyupgrade that was modified
to perform only this change.
define("debug", default=True, help="run in debug mode")
-class MessageBuffer(object):
+class MessageBuffer:
def __init__(self):
# cond is notified whenever the message cache is updated
self.cond = tornado.locks.Condition()
pass
-class OpenIdMixin(object):
+class OpenIdMixin:
"""Abstract implementation of OpenID and Attribute Exchange.
Class attributes:
return httpclient.AsyncHTTPClient()
-class OAuthMixin(object):
+class OAuthMixin:
"""Abstract implementation of OAuth 1.0 and 1.0a.
See `TwitterMixin` below for an example implementation.
return httpclient.AsyncHTTPClient()
-class OAuth2Mixin(object):
+class OAuth2Mixin:
"""Abstract implementation of OAuth 2.0.
See `FacebookGraphMixin` or `GoogleOAuth2Mixin` below for example
self.args = (value,)
-class WaitIterator(object):
+class WaitIterator:
"""Provides an iterator to yield the results of awaitables as they finish.
Yielding a set of awaitables like this:
return f
-class _NullFuture(object):
+class _NullFuture:
"""_NullFuture resembles a Future that finished with a result of None.
It's not actually a `Future` to avoid depending on a particular event loop.
"""
-class Runner(object):
+class Runner:
"""Internal implementation of `tornado.gen.coroutine`.
Maintains information about pending callbacks and their results.
pass
-class _ExceptionLoggingContext(object):
+class _ExceptionLoggingContext:
"""Used with the ``with`` statement when calling delegate methods to
log any exceptions with the given logger. Any exceptions caught are
converted to _QuietException
raise _QuietException
-class HTTP1ConnectionParameters(object):
+class HTTP1ConnectionParameters:
"""Parameters for `.HTTP1Connection` and `.HTTP1ServerConnection`."""
def __init__(
return self._delegate.on_connection_close()
-class HTTP1ServerConnection(object):
+class HTTP1ServerConnection:
"""An HTTP/1.x server."""
def __init__(
from typing import Type, Any, Union, Dict, Callable, Optional, cast
-class HTTPClient(object):
+class HTTPClient:
"""A blocking HTTP client.
This interface is provided to make it easier to share code between
super(AsyncHTTPClient, cls).configure(impl, **kwargs)
-class HTTPRequest(object):
+class HTTPRequest:
"""HTTP client request object."""
_headers = None # type: Union[Dict[str, str], httputil.HTTPHeaders]
self._body = utf8(value)
-class HTTPResponse(object):
+class HTTPResponse:
"""HTTP Response object.
Attributes:
HTTPError = HTTPClientError
-class _RequestProxy(object):
+class _RequestProxy:
"""Combines an object with a dictionary of defaults.
Used internally by AsyncHTTPClient implementations.
del self._chunks
-class _HTTPRequestContext(object):
+class _HTTPRequestContext:
def __init__(
self,
stream: iostream.IOStream,
__unicode__ = __str__
-class HTTPServerRequest(object):
+class HTTPServerRequest:
"""A single HTTP request.
All attributes are type `str` unless otherwise noted.
pass
-class HTTPServerConnectionDelegate(object):
+class HTTPServerConnectionDelegate:
"""Implement this interface to handle requests from `.HTTPServer`.
.. versionadded:: 4.0
pass
-class HTTPMessageDelegate(object):
+class HTTPMessageDelegate:
"""Implement this interface to handle an HTTP request or response.
.. versionadded:: 4.0
pass
-class HTTPConnection(object):
+class HTTPConnection:
"""Applications use this interface to write their responses.
.. versionadded:: 4.0
self._pending_tasks.discard(f)
-class _Timeout(object):
+class _Timeout:
"""An IOLoop timeout, a UNIX timestamp and a callback"""
# Reduce memory overhead when there are lots of pending callbacks
return self.tdeadline <= other.tdeadline
-class PeriodicCallback(object):
+class PeriodicCallback:
"""Schedules the given callback to be called periodically.
The callback is called every ``callback_time`` milliseconds when
"""Exception raised by `IOStream` methods when the buffer is full."""
-class _StreamBuffer(object):
+class _StreamBuffer:
"""
A specialized buffer that tries to avoid copies when large pieces
of data are encountered.
self._first_pos = pos
-class BaseIOStream(object):
+class BaseIOStream:
"""A utility class to write to and read from a non-blocking file or socket.
We support a non-blocking ``write()`` and a family of ``read_*()``
return _supported_locales
-class Locale(object):
+class Locale:
"""Object representing a locale.
After calling one of `load_translations` or `load_gettext_translations`,
__all__ = ["Condition", "Event", "Semaphore", "BoundedSemaphore", "Lock"]
-class _TimeoutGarbageCollector(object):
+class _TimeoutGarbageCollector:
"""Base class for objects that periodically clean up timed-out waiters.
Avoids memory leak in a common pattern like:
self.notify(len(self._waiters))
-class Event(object):
+class Event:
"""An event blocks coroutines until its internal flag is set to True.
Similar to `threading.Event`.
return timeout_fut
-class _ReleasingContextManager(object):
+class _ReleasingContextManager:
"""Releases a Lock or Semaphore at the end of a "with" statement.
with (yield semaphore.acquire()):
super().release()
-class Lock(object):
+class Lock:
"""A lock for coroutines.
A Lock begins unlocked, and `acquire` locks it immediately. While it is
pass
-class OptionParser(object):
+class OptionParser:
"""A collection of options, a dictionary with object-like access.
Normally accessed via static functions in the `tornado.options` module,
return _Mockable(self)
-class _Mockable(object):
+class _Mockable:
"""`mock.patch` compatible wrapper for `OptionParser`.
As of ``mock`` version 1.0.1, when an object uses ``__getattr__``
setattr(self._options, name, self._originals.pop(name))
-class _Option(object):
+class _Option:
# This class could almost be made generic, but the way the types
# interact with the multiple argument makes this tricky. (default
# and the callback use List[T], but type is still Type[T]).
return _task_id
-class Subprocess(object):
+class Subprocess:
"""Wraps ``subprocess.Popen`` with IOStream support.
The constructor is the same as ``subprocess.Popen`` with the following
return None
-class Rule(object):
+class Rule:
"""A routing rule."""
def __init__(
)
-class Matcher(object):
+class Matcher:
"""Represents a matcher for request features."""
def match(self, request: httputil.HTTPServerRequest) -> Optional[Dict[str, Any]]:
_INITIAL_CONNECT_TIMEOUT = 0.3
-class _Connector(object):
+class _Connector:
"""A stateless implementation of the "Happy Eyeballs" algorithm.
"Happy Eyeballs" is documented in RFC6555 as the recommended practice
stream.close()
-class TCPClient(object):
+class TCPClient:
"""A non-blocking TCP connection factory.
.. versionchanged:: 5.0
from typing import Callable, List # noqa: F401
-class TCPServer(object):
+class TCPServer:
r"""A non-blocking, single-threaded TCP server.
To use `TCPServer`, define a subclass which overrides the `handle_stream`
raise Exception("invalid whitespace mode %s" % mode)
-class Template(object):
+class Template:
"""A compiled template.
We compile into Python from the given template_string. You can generate
return ancestors
-class BaseLoader(object):
+class BaseLoader:
"""Base class for template loaders.
You must use a template loader to use template constructs like
return Template(self.dict[name], name=name, loader=self)
-class _Node(object):
+class _Node:
def each_child(self) -> Iterable["_Node"]:
return ()
return "%s at %s:%d" % (self.message, self.filename, self.lineno)
-class _CodeWriter(object):
+class _CodeWriter:
def __init__(
self,
file: TextIO,
return self._indent
def indent(self) -> "ContextManager":
- class Indenter(object):
+ class Indenter:
def __enter__(_) -> "_CodeWriter":
self._indent += 1
return self
self.include_stack.append((self.current_template, line))
self.current_template = template
- class IncludeTemplate(object):
+ class IncludeTemplate:
def __enter__(_) -> "_CodeWriter":
return self
print(" " * indent + line + line_comment, file=self.file)
-class _TemplateReader(object):
+class _TemplateReader:
def __init__(self, name: str, text: str, whitespace: str) -> None:
self.name = name
self.text = text
class CircleRefsTest(unittest.TestCase):
def test_known_leak(self):
# Construct a known leak scenario to make sure the test harness works.
- class C(object):
+ class C:
def __init__(self, name):
self.name = name
self.a: typing.Optional[C] = None
with concurrent.futures.ThreadPoolExecutor(1) as thread_pool:
- class Factory(object):
+ class Factory:
executor = thread_pool
@tornado.concurrent.run_on_executor
pass
-class BaseCapClient(object):
+class BaseCapClient:
def __init__(self, port):
self.port = port
raise gen.Return(self.process_response(data))
-class ClientTestMixin(object):
+class ClientTestMixin:
client_class = None # type: typing.Callable
def setUp(self):
class RunOnExecutorTest(AsyncTestCase):
@gen_test
def test_no_calling(self):
- class Object(object):
+ class Object:
def __init__(self):
self.executor = futures.thread.ThreadPoolExecutor(1)
@gen_test
def test_call_with_no_args(self):
- class Object(object):
+ class Object:
def __init__(self):
self.executor = futures.thread.ThreadPoolExecutor(1)
@gen_test
def test_call_with_executor(self):
- class Object(object):
+ class Object:
def __init__(self):
self.__executor = futures.thread.ThreadPoolExecutor(1)
@gen_test
def test_async_await(self):
- class Object(object):
+ class Object:
def __init__(self):
self.executor = futures.thread.ThreadPoolExecutor(1)
# without waiting for garbage collection.
@gen.coroutine
def inner():
- class Foo(object):
+ class Foo:
pass
local_var = Foo()
return Application([("/", HelloWorldRequestHandler, dict(protocol="https"))])
-class SSLTestMixin(object):
+class SSLTestMixin:
def get_ssl_options(self):
return dict(
ssl_version=self.get_ssl_version(),
# Use a socket since they are supported by IOLoop on all platforms.
# Unfortunately, sockets don't support the .closed attribute for
# inspecting their close status, so we must use a wrapper.
- class SocketWrapper(object):
+ class SocketWrapper:
def __init__(self, sockobj):
self.sockobj = sockobj
self.closed = False
self.write("Hello")
-class TestIOStreamWebMixin(object):
+class TestIOStreamWebMixin:
def _make_client_iostream(self):
raise NotImplementedError()
stream.read_bytes(1)
-class TestReadWriteMixin(object):
+class TestReadWriteMixin:
# Tests where one stream reads and the other writes.
# These should work for BaseIOStream implementations.
from tornado.platform.caresresolver import CaresResolver
-class _ResolverTestMixin(object):
+class _ResolverTestMixin:
resolver = None # type: typing.Any
@gen_test
# It is impossible to quickly and consistently generate an error in name
# resolution, so test this case separately, using mocks as needed.
-class _ResolverErrorTestMixin(object):
+class _ResolverErrorTestMixin:
resolver = None # type: typing.Any
@gen_test
from typing import List # noqa: F401
-class Email(object):
+class Email:
def __init__(self, value):
if isinstance(value, str) and "@" in value:
self._value = value
self.finish("forbidden")
-class SimpleHTTPClientTestMixin(object):
+class SimpleHTTPClientTestMixin:
def create_client(self, **kwargs):
raise NotImplementedError()
class ConnectorTest(AsyncTestCase):
- class FakeStream(object):
+ class FakeStream:
def __init__(self):
self.closed = False
self.write(dict(methods=self.methods))
-class BaseStreamingRequestFlowControlTest(object):
+class BaseStreamingRequestFlowControlTest:
def get_httpserver_options(self):
# Use a small chunk size so flow control is relevant even though
# all the data arrives at once.
self.assertEqual(res, "hello2")
-class CompressionTestMixin(object):
+class CompressionTestMixin:
MESSAGE = "Hello world. Testing 123 123"
def get_app(self):
self.assertEqual(bytes_out, bytes_in + 12)
-class MaskFunctionMixin(object):
+class MaskFunctionMixin:
# Subclasses should define self.mask(mask, data)
def mask(self, mask: bytes, data: bytes) -> bytes:
raise NotImplementedError()
self[name] = value
-class GzipDecompressor(object):
+class GzipDecompressor:
"""Streaming gzip decompressor.
The interface is like that of `zlib.decompressobj` (without some of the
return _re_unescape_pattern.sub(_re_unescape_replacement, s)
-class Configurable(object):
+class Configurable:
"""Base class for configurable interfaces.
A configurable interface is an (abstract) class whose constructor
base.__impl_kwargs = saved[1]
-class ArgReplacer(object):
+class ArgReplacer:
"""Replaces one value in an ``args, kwargs`` pair.
Inspects the function signature to find an argument by name
_ARG_DEFAULT = _ArgDefaultMarker()
-class RequestHandler(object):
+class RequestHandler:
"""Base class for HTTP request handlers.
Subclasses must define at least one of the methods defined in the
self.on_finish()
-class OutputTransform(object):
+class OutputTransform:
"""A transform modifies the result of an HTTP request (e.g., GZip encoding)
Applications are not expected to create their own OutputTransforms
return wrapper
-class UIModule(object):
+class UIModule:
"""A re-usable, modular UI unit on a page.
UI modules often execute additional queries, and they can include
return "".join(self._get_resources("html_body"))
-class _UIModuleNamespace(object):
+class _UIModuleNamespace:
"""Lazy namespace which creates UIModule proxies bound to a handler."""
def __init__(
pass
-class _WebSocketParams(object):
+class _WebSocketParams:
def __init__(
self,
ping_interval: Optional[float] = None,
raise NotImplementedError()
-class _PerMessageDeflateCompressor(object):
+class _PerMessageDeflateCompressor:
def __init__(
self,
persistent: bool,
return data[:-4]
-class _PerMessageDeflateDecompressor(object):
+class _PerMessageDeflateDecompressor:
def __init__(
self,
persistent: bool,
return s.decode("latin1")
-class WSGIContainer(object):
+class WSGIContainer:
r"""Makes a WSGI-compatible application runnable on Tornado's HTTP server.
.. warning::