login_url="/auth/login",
debug=True,
)
- super(Application, self).__init__(handlers, **settings)
+ super().__init__(handlers, **settings)
class BaseHandler(tornado.web.RequestHandler):
static_path=os.path.join(os.path.dirname(__file__), "static"),
xsrf_cookies=True,
)
- super(Application, self).__init__(handlers, **settings)
+ super().__init__(handlers, **settings)
class MainHandler(tornado.web.RequestHandler):
return Application(self.get_handlers(), gzip=True, **self.get_app_kwargs())
def get_allowed_errors(self):
- return super(GzipHTTPTest, self).get_allowed_errors() + [
+ return super().get_allowed_errors() + [
# TODO: The Etag is supposed to change when Content-Encoding is
# used. This should be fixed, but it's difficult to do with the
# way GZipContentEncoding fits into the pipeline, and in practice
def initialize( # type: ignore
self, max_clients: int = 10, defaults: Optional[Dict[str, Any]] = None
) -> None:
- super(CurlAsyncHTTPClient, self).initialize(defaults=defaults)
+ super().initialize(defaults=defaults)
# Typeshed is incomplete for CurlMulti, so just use Any for now.
self._multi = pycurl.CurlMulti() # type: Any
self._multi.setopt(pycurl.M_TIMERFUNCTION, self._set_timeout)
for curl in self._curls:
curl.close()
self._multi.close()
- super(CurlAsyncHTTPClient, self).close()
+ super().close()
# Set below properties to None to reduce the reference count of current
# instance, because those properties hold some methods of current
"""
def __init__(self, value: Any = None) -> None:
- super(Return, self).__init__()
+ super().__init__()
self.value = value
# Cython recognizes subclasses of StopIteration with a .args tuple.
self.args = (value,)
self.code = code
self.message = message or httputil.responses.get(code, "Unknown")
self.response = response
- super(HTTPClientError, self).__init__(code, message, response)
+ super().__init__(code, message, response)
def __str__(self) -> str:
return "HTTP %d: %s" % (self.code, self.message)
"""
def __init__(self, real_error: Optional[BaseException] = None) -> None:
- super(StreamClosedError, self).__init__("Stream is closed")
+ super().__init__("Stream is closed")
self.real_error = real_error
def __init__(self, socket: socket.socket, *args: Any, **kwargs: Any) -> None:
self.socket = socket
self.socket.setblocking(False)
- super(IOStream, self).__init__(*args, **kwargs)
+ super().__init__(*args, **kwargs)
def fileno(self) -> Union[int, ioloop._Selectable]:
return self.socket
for `ssl.wrap_socket`
"""
self._ssl_options = kwargs.pop("ssl_options", _client_ssl_defaults)
- super(SSLIOStream, self).__init__(*args, **kwargs)
+ super().__init__(*args, **kwargs)
self._ssl_accepting = True
self._handshake_reading = False
self._handshake_writing = False
self._add_io_state(self.io_loop.WRITE)
def reading(self) -> bool:
- return self._handshake_reading or super(SSLIOStream, self).reading()
+ return self._handshake_reading or super().reading()
def writing(self) -> bool:
- return self._handshake_writing or super(SSLIOStream, self).writing()
+ return self._handshake_writing or super().writing()
def _do_ssl_handshake(self) -> None:
# Based on code from test_ssl.py in the python stdlib
if self._ssl_accepting:
self._do_ssl_handshake()
return
- super(SSLIOStream, self)._handle_read()
+ super()._handle_read()
def _handle_write(self) -> None:
if self._ssl_accepting:
self._do_ssl_handshake()
return
- super(SSLIOStream, self)._handle_write()
+ super()._handle_write()
def connect(
self, address: Tuple, server_hostname: Optional[str] = None
# (There's a test for it, but I think in practice users
# either wait for the connect before performing a write or
# they don't care about the connect Future at all)
- fut = super(SSLIOStream, self).connect(address)
+ fut = super().connect(address)
fut.add_done_callback(lambda f: f.exception())
return self.wait_for_handshake()
def _handle_connect(self) -> None:
# Call the superclass method to check for errors.
- super(SSLIOStream, self)._handle_connect()
+ super()._handle_connect()
if self.closed():
return
# When the connection is complete, wrap the socket for SSL
def _is_connreset(self, e: BaseException) -> bool:
if isinstance(e, ssl.SSLError) and e.args[0] == ssl.SSL_ERROR_EOF:
return True
- return super(SSLIOStream, self)._is_connreset(e)
+ return super()._is_connreset(e)
class PipeIOStream(BaseIOStream):
self.fd = fd
self._fio = io.FileIO(self.fd, "r+")
os.set_blocking(fd, False)
- super(PipeIOStream, self).__init__(*args, **kwargs)
+ super().__init__(*args, **kwargs)
def fileno(self) -> int:
return self.fd
def __init__(self, code: str, translations: Dict[str, Dict[str, str]]) -> None:
self.translations = translations
- super(CSVLocale, self).__init__(code)
+ super().__init__(code)
def translate(
self,
self.gettext = translations.gettext
# self.gettext must exist before __init__ is called, since it
# calls into self.translate
- super(GettextLocale, self).__init__(code)
+ super().__init__(code)
def translate(
self,
"""
def __init__(self) -> None:
- super(Condition, self).__init__()
+ super().__init__()
self.io_loop = ioloop.IOLoop.current()
def __repr__(self) -> str:
"""
def __init__(self, value: int = 1) -> None:
- super(Semaphore, self).__init__()
+ super().__init__()
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
self._value = value
def __repr__(self) -> str:
- res = super(Semaphore, self).__repr__()
+ res = super().__repr__()
extra = (
"locked" if self._value == 0 else "unlocked,value:{0}".format(self._value)
)
"""
def __init__(self, value: int = 1) -> None:
- super(BoundedSemaphore, self).__init__(value=value)
+ super().__init__(value=value)
self._initial_value = value
def release(self) -> None:
"""Increment the counter and wake one waiter."""
if self._value >= self._initial_value:
raise ValueError("Semaphore released too many times")
- super(BoundedSemaphore, self).release()
+ super().release()
class Lock(object):
"""
def initialize(self) -> None: # type: ignore
- super(BlockingResolver, self).initialize()
+ super().initialize()
class ThreadedResolver(ExecutorResolver):
def initialize(self, num_threads: int = 10) -> None: # type: ignore
threadpool = ThreadedResolver._create_threadpool(num_threads)
- super(ThreadedResolver, self).initialize(
- executor=threadpool, close_executor=False
- )
+ super().initialize(executor=threadpool, close_executor=False)
@classmethod
def _create_threadpool(
self._thread_identity = 0
- super(BaseAsyncIOLoop, self).initialize(**kwargs)
+ super().initialize(**kwargs)
def assign_thread_identity() -> None:
self._thread_identity = get_ident()
"""
def initialize(self, **kwargs: Any) -> None: # type: ignore
- super(AsyncIOMainLoop, self).initialize(asyncio.get_event_loop(), **kwargs)
+ super().initialize(asyncio.get_event_loop(), **kwargs)
def make_current(self) -> None:
# AsyncIOMainLoop already refers to the current asyncio loop so
self.is_current = False
loop = asyncio.new_event_loop()
try:
- super(AsyncIOLoop, self).initialize(loop, **kwargs)
+ super().initialize(loop, **kwargs)
except Exception:
# If initialize() does not succeed (taking ownership of the loop),
# we have to close it.
def close(self, all_fds: bool = False) -> None:
if self.is_current:
self.clear_current()
- super(AsyncIOLoop, self).close(all_fds=all_fds)
+ super().close(all_fds=all_fds)
def make_current(self) -> None:
if not self.is_current:
def __init__(self, rules: Optional[_RuleList] = None) -> None:
self.named_rules = {} # type: Dict[str, Any]
- super(ReversibleRuleRouter, self).__init__(rules)
+ super().__init__(rules)
def process_rule(self, rule: "Rule") -> "Rule":
- rule = super(ReversibleRuleRouter, self).process_rule(rule)
+ rule = super().process_rule(rule)
if rule.name:
if rule.name in self.named_rules:
"""
matcher = PathMatches(pattern)
- super(URLSpec, self).__init__(matcher, handler, kwargs, name)
+ super().__init__(matcher, handler, kwargs, name)
self.regex = matcher.regex
self.handler_class = self.target
"""
def __init__(self, message: str) -> None:
- super(HTTPTimeoutError, self).__init__(599, message=message)
+ super().__init__(599, message=message)
def __str__(self) -> str:
return self.message or "Timeout"
"""
def __init__(self, message: str) -> None:
- super(HTTPStreamClosedError, self).__init__(599, message=message)
+ super().__init__(599, message=message)
def __str__(self) -> str:
return self.message or "Stream closed"
.. versionchanged:: 4.2
Added the ``max_body_size`` argument.
"""
- super(SimpleAsyncHTTPClient, self).initialize(defaults=defaults)
+ super().initialize(defaults=defaults)
self.max_clients = max_clients
self.queue = (
collections.deque()
self.tcp_client = TCPClient(resolver=self.resolver)
def close(self) -> None:
- super(SimpleAsyncHTTPClient, self).close()
+ super().close()
if self.own_resolver:
self.resolver.close()
self.tcp_client.close()
"""
def __init__(self, root_directory: str, **kwargs: Any) -> None:
- super(Loader, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.root = os.path.abspath(root_directory)
def resolve_path(self, name: str, parent_path: Optional[str] = None) -> str:
"""A template loader that loads from a dictionary."""
def __init__(self, dict: Dict[str, str], **kwargs: Any) -> None:
- super(DictLoader, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.dict = dict
def resolve_path(self, name: str, parent_path: Optional[str] = None) -> str:
class _Module(_Expression):
def __init__(self, expression: str, line: int) -> None:
- super(_Module, self).__init__("_tt_modules." + expression, line, raw=True)
+ super().__init__("_tt_modules." + expression, line, raw=True)
class _Text(_Node):
client_class = None # type: typing.Callable
def setUp(self):
- super(ClientTestMixin, self).setUp() # type: ignore
+ super().setUp() # type: ignore
self.server = CapServer()
sock, port = bind_unused_port()
self.server.add_sockets([sock])
def tearDown(self):
self.server.stop()
- super(ClientTestMixin, self).tearDown() # type: ignore
+ super().tearDown() # type: ignore
def test_future(self: typing.Any):
future = self.client.capitalize("hello")
@unittest.skipIf(pycurl is None, "pycurl module not present")
class CurlHTTPClientTestCase(AsyncHTTPTestCase):
def setUp(self):
- super(CurlHTTPClientTestCase, self).setUp()
+ super().setUp()
self.http_client = self.create_client()
def get_app(self):
# so we need explicit checks here to make sure the tests run all
# the way through.
self.finished = False
- super(GenCoroutineTest, self).setUp()
+ super().setUp()
def tearDown(self):
- super(GenCoroutineTest, self).tearDown()
+ super().tearDown()
assert self.finished
def test_attributes(self):
code = None # type: typing.Optional[int]
def setUp(self):
- super(HTTP1ConnectionTest, self).setUp()
+ super().setUp()
self.asyncSetUp()
@gen_test
return Application([("/echo", EchoHandler)])
def setUp(self):
- super(HTTPServerRawTest, self).setUp()
+ super().setUp()
self.stream = IOStream(socket.socket())
self.io_loop.run_sync(
lambda: self.stream.connect(("127.0.0.1", self.get_http_port()))
def tearDown(self):
self.stream.close()
- super(HTTPServerRawTest, self).tearDown()
+ super().tearDown()
def test_empty_request(self):
self.stream.close()
return Application([("/", XHeaderTest.Handler)])
def get_httpserver_options(self):
- output = super(SSLXHeaderTest, self).get_httpserver_options()
+ output = super().get_httpserver_options()
output["xheaders"] = True
return output
"""
def setUp(self):
- super(UnixSocketTest, self).setUp()
+ super().setUp()
self.tmpdir = tempfile.mkdtemp()
self.sockfile = os.path.join(self.tmpdir, "test.sock")
sock = netutil.bind_unix_socket(self.sockfile)
self.io_loop.run_sync(self.server.close_all_connections)
self.server.stop()
shutil.rmtree(self.tmpdir)
- super(UnixSocketTest, self).tearDown()
+ super().tearDown()
@gen_test
def test_unix_socket(self):
)
def setUp(self):
- super(KeepAliveTest, self).setUp()
+ super().setUp()
self.http_version = b"HTTP/1.1"
def tearDown(self):
if hasattr(self, "stream"):
self.stream.close()
- super(KeepAliveTest, self).tearDown()
+ super().tearDown()
# The next few methods are a crude manual http client
@gen.coroutine
return dict(idle_connection_timeout=0.1)
def setUp(self):
- super(IdleTimeoutTest, self).setUp()
+ super().setUp()
self.streams = [] # type: List[IOStream]
def tearDown(self):
- super(IdleTimeoutTest, self).tearDown()
+ super().tearDown()
for stream in self.streams:
stream.close()
class MyExecutor(futures.ThreadPoolExecutor):
def submit(self, func, *args):
count[0] += 1
- return super(MyExecutor, self).submit(func, *args)
+ return super().submit(func, *args)
event = threading.Event()
class TestIOStreamStartTLS(AsyncTestCase):
def setUp(self):
try:
- super(TestIOStreamStartTLS, self).setUp()
+ super().setUp()
self.listener, self.port = bind_unused_port()
self.server_stream = None
self.server_accepted = Future() # type: Future[None]
if self.client_stream is not None:
self.client_stream.close()
self.listener.close()
- super(TestIOStreamStartTLS, self).tearDown()
+ super().tearDown()
def accept(self, connection, address):
if self.server_stream is not None:
class ConditionTest(AsyncTestCase):
def setUp(self):
- super(ConditionTest, self).setUp()
+ super().setUp()
self.history = [] # type: typing.List[typing.Union[int, str]]
def record_done(self, future, key):
class EnablePrettyLoggingTest(unittest.TestCase):
def setUp(self):
- super(EnablePrettyLoggingTest, self).setUp()
+ super().setUp()
self.options = OptionParser()
define_logging_options(self.options)
self.logger = logging.Logger("tornado.test.log_test.EnablePrettyLoggingTest")
@skipIfNoNetwork
class BlockingResolverTest(AsyncTestCase, _ResolverTestMixin):
def setUp(self):
- super(BlockingResolverTest, self).setUp()
+ super().setUp()
self.resolver = BlockingResolver()
# our default timeout.
class BlockingResolverErrorTest(AsyncTestCase, _ResolverErrorTestMixin):
def setUp(self):
- super(BlockingResolverErrorTest, self).setUp()
+ super().setUp()
self.resolver = BlockingResolver()
self.real_getaddrinfo = socket.getaddrinfo
socket.getaddrinfo = _failing_getaddrinfo
def tearDown(self):
socket.getaddrinfo = self.real_getaddrinfo
- super(BlockingResolverErrorTest, self).tearDown()
+ super().tearDown()
class OverrideResolverTest(AsyncTestCase, _ResolverTestMixin):
def setUp(self):
- super(OverrideResolverTest, self).setUp()
+ super().setUp()
mapping = {
("google.com", 80): ("1.2.3.4", 80),
("google.com", 80, socket.AF_INET): ("1.2.3.4", 80),
@skipIfNoNetwork
class ThreadedResolverTest(AsyncTestCase, _ResolverTestMixin):
def setUp(self):
- super(ThreadedResolverTest, self).setUp()
+ super().setUp()
self.resolver = ThreadedResolver()
def tearDown(self):
self.resolver.close()
- super(ThreadedResolverTest, self).tearDown()
+ super().tearDown()
class ThreadedResolverErrorTest(AsyncTestCase, _ResolverErrorTestMixin):
def setUp(self):
- super(ThreadedResolverErrorTest, self).setUp()
+ super().setUp()
self.resolver = BlockingResolver()
self.real_getaddrinfo = socket.getaddrinfo
socket.getaddrinfo = _failing_getaddrinfo
def tearDown(self):
socket.getaddrinfo = self.real_getaddrinfo
- super(ThreadedResolverErrorTest, self).tearDown()
+ super().tearDown()
@skipIfNoNetwork
@unittest.skipIf(sys.platform == "win32", "pycares doesn't return loopback on windows")
class CaresResolverTest(AsyncTestCase, _ResolverTestMixin):
def setUp(self):
- super(CaresResolverTest, self).setUp()
+ super().setUp()
self.resolver = CaresResolver()
@unittest.skipIf(sys.platform == "win32", "twisted resolver hangs on windows")
class TwistedResolverTest(AsyncTestCase, _ResolverTestMixin):
def setUp(self):
- super(TwistedResolverTest, self).setUp()
+ super().setUp()
self.resolver = TwistedResolver()
os._exit(1)
# In the surviving process, clear the alarm we set earlier
signal.alarm(0)
- super(ProcessTest, self).tearDown()
+ super().tearDown()
def test_multi_process(self):
# This test doesn't work on twisted because we use the global
class CustomRouter(ReversibleRouter):
def __init__(self):
- super(CustomRouter, self).__init__()
+ super().__init__()
self.routes = {} # type: typing.Dict[str, typing.Any]
def add_routes(self, routes):
class TornadoTextTestRunner(unittest.TextTestRunner):
def __init__(self, *args, **kwargs):
kwargs["stream"] = stderr
- super(TornadoTextTestRunner, self).__init__(*args, **kwargs)
+ super().__init__(*args, **kwargs)
def run(self, test):
- result = super(TornadoTextTestRunner, self).run(test)
+ result = super().run(test)
if result.skipped:
skip_reasons = set(reason for (test, reason) in result.skipped)
self.stream.write( # type: ignore
"""Counts the number of WARNING or higher log records."""
def __init__(self, *args, **kwargs):
- super(LogCounter, self).__init__(*args, **kwargs)
+ super().__init__(*args, **kwargs)
self.info_count = self.warning_count = self.error_count = 0
def filter(self, record):
class SimpleHTTPClientTestCase(SimpleHTTPClientTestMixin, AsyncHTTPTestCase):
def setUp(self):
- super(SimpleHTTPClientTestCase, self).setUp()
+ super().setUp()
self.http_client = self.create_client()
def create_client(self, **kwargs):
class SimpleHTTPSClientTestCase(SimpleHTTPClientTestMixin, AsyncHTTPSTestCase):
def setUp(self):
- super(SimpleHTTPSClientTestCase, self).setUp()
+ super().setUp()
self.http_client = self.create_client()
def create_client(self, **kwargs):
class CreateAsyncHTTPClientTestCase(AsyncTestCase):
def setUp(self):
- super(CreateAsyncHTTPClientTestCase, self).setUp()
+ super().setUp()
self.saved = AsyncHTTPClient._save_configuration()
def tearDown(self):
AsyncHTTPClient._restore_configuration(self.saved)
- super(CreateAsyncHTTPClientTestCase, self).tearDown()
+ super().tearDown()
def test_max_clients(self):
AsyncHTTPClient.configure(SimpleAsyncHTTPClient)
class HostnameMappingTestCase(AsyncHTTPTestCase):
def setUp(self):
- super(HostnameMappingTestCase, self).setUp()
+ super().setUp()
self.http_client = SimpleAsyncHTTPClient(
hostname_mapping={
"www.example.com": "127.0.0.1",
# Return something valid so the test doesn't raise during cleanup.
return [(socket.AF_INET, ("127.0.0.1", test.get_http_port()))]
- super(ResolveTimeoutTestCase, self).setUp()
+ super().setUp()
self.http_client = SimpleAsyncHTTPClient(resolver=BadResolver())
def get_app(self):
class TestTCPServer(TCPServer):
def __init__(self, family):
- super(TestTCPServer, self).__init__()
+ super().__init__()
self.streams = [] # type: List[IOStream]
self.queue = Queue() # type: Queue[IOStream]
sockets = bind_sockets(0, "localhost", family)
self.queue.put(stream)
def stop(self):
- super(TestTCPServer, self).stop()
+ super().stop()
for stream in self.streams:
stream.close()
class TCPClientTest(AsyncTestCase):
def setUp(self):
- super(TCPClientTest, self).setUp()
+ super().setUp()
self.server = None
self.client = TCPClient()
def tearDown(self):
self.client.close()
self.stop_server()
- super(TCPClientTest, self).tearDown()
+ super().tearDown()
def skipIfLocalhostV4(self):
# The port used here doesn't matter, but some systems require it
self.closed = True
def setUp(self):
- super(ConnectorTest, self).setUp()
+ super().setUp()
self.connect_futures = (
{}
) # type: Dict[Tuple[int, typing.Any], Future[ConnectorTest.FakeStream]]
# be closing any streams
for stream in self.streams.values():
self.assertFalse(stream.closed)
- super(ConnectorTest, self).tearDown()
+ super().tearDown()
def create_stream(self, af, addr):
stream = ConnectorTest.FakeStream()
class AsyncHTTPTestCaseTest(AsyncHTTPTestCase):
def setUp(self):
- super(AsyncHTTPTestCaseTest, self).setUp()
+ super().setUp()
# Bind a second port.
sock, port = bind_unused_port()
app = Application()
def tearDown(self):
self.second_server.stop()
- super(AsyncHTTPTestCaseTest, self).tearDown()
+ super().tearDown()
class AsyncTestCaseWrapperTest(unittest.TestCase):
class GenTest(AsyncTestCase):
def setUp(self):
- super(GenTest, self).setUp()
+ super().setUp()
self.finished = False
def tearDown(self):
self.assertTrue(self.finished)
- super(GenTest, self).tearDown()
+ super().tearDown()
@gen_test
def test_sync(self):
self.orig_loop = asyncio.get_event_loop()
self.new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.new_loop)
- super(GetNewIOLoopTest, self).setUp()
+ super().setUp()
def tearDown(self):
- super(GetNewIOLoopTest, self).tearDown()
+ super().tearDown()
# AsyncTestCase must not affect the existing asyncio loop.
self.assertFalse(asyncio.get_event_loop().is_closed())
asyncio.set_event_loop(self.orig_loop)
# doesn't have a "copy constructor"
class TwoArgException(Exception):
def __init__(self, a, b):
- super(TwoArgException, self).__init__()
+ super().__init__()
self.a, self.b = a, b
try:
)
def tearDown(self):
- super(WSGISafeWebTest, self).tearDown()
+ super().tearDown()
RequestHandler._template_loaders.clear()
def get_handlers(self):
)
def tearDown(self):
- super(UIMethodUIModuleTest, self).tearDown()
+ super().tearDown()
# TODO: fix template loader caching so this isn't necessary.
RequestHandler._template_loaders.clear()
)
def tearDown(self):
- super(GetCurrentUserTest, self).tearDown()
+ super().tearDown()
RequestHandler._template_loaders.clear()
def get_handlers(self):
self.test = test
def on_connection_close(self):
- super(CloseDetectionHandler, self).on_connection_close()
+ super().on_connection_close()
self.test.close_future.set_result(None)
return [
return dict(xsrf_cookies=True)
def setUp(self):
- super(XSRFTest, self).setUp()
+ super().setUp()
self.xsrf_token = self.get_token()
def get_token(self, old_token=None, version=None):
class CoroutineOnMessageHandler(TestWebSocketHandler):
def initialize(self, **kwargs):
- super(CoroutineOnMessageHandler, self).initialize(**kwargs)
+ super().initialize(**kwargs)
self.sleeping = 0
@gen.coroutine
class SubprotocolHandler(TestWebSocketHandler):
def initialize(self, **kwargs):
- super(SubprotocolHandler, self).initialize(**kwargs)
+ super().initialize(**kwargs)
self.select_subprotocol_called = False
def select_subprotocol(self, subprotocols):
class OpenCoroutineHandler(TestWebSocketHandler):
def initialize(self, test, **kwargs):
- super(OpenCoroutineHandler, self).initialize(**kwargs)
+ super().initialize(**kwargs)
self.test = test
self.open_finished = False
return SimpleAsyncHTTPClient()
def tearDown(self):
- super(WebSocketTest, self).tearDown()
+ super().tearDown()
RequestHandler._template_loaders.clear()
def test_http_request(self):
"""
def __init__(self, methodName: str = "runTest") -> None:
- super(AsyncTestCase, self).__init__(methodName)
+ super().__init__(methodName)
self.__stopped = False
self.__running = False
self.__failure = None # type: Optional[_ExcInfoTuple]
self._test_generator = None # type: Optional[Union[Generator, Coroutine]]
def setUp(self) -> None:
- super(AsyncTestCase, self).setUp()
+ super().setUp()
self.io_loop = self.get_new_ioloop()
self.io_loop.make_current()
# in the same process with autoreload (because curl does not
# set FD_CLOEXEC on its file descriptors)
self.io_loop.close(all_fds=True)
- super(AsyncTestCase, self).tearDown()
+ super().tearDown()
# In case an exception escaped or the StackContext caught an exception
# when there wasn't a wait() to re-raise it, do so here.
# This is our last chance to raise an exception in a way that the
def run(
self, result: Optional[unittest.TestResult] = None
) -> Optional[unittest.TestResult]:
- ret = super(AsyncTestCase, self).run(result)
+ ret = super().run(result)
# As a last resort, if an exception escaped super.run() and wasn't
# re-raised in tearDown, raise it here. This will cause the
# unittest run to fail messily, but that's better than silently
"""
def setUp(self) -> None:
- super(AsyncHTTPTestCase, self).setUp()
+ super().setUp()
sock, port = bind_unused_port()
self.__port = port
self.http_client.close()
del self.http_server
del self._app
- super(AsyncHTTPTestCase, self).tearDown()
+ super().tearDown()
class AsyncHTTPSTestCase(AsyncHTTPTestCase):
request: httputil.HTTPServerRequest,
**kwargs: Any
) -> None:
- super(RequestHandler, self).__init__()
+ super().__init__()
self.application = application
self.request = request
) -> None:
assert isinstance(application, Application)
self.application = application
- super(_ApplicationRouter, self).__init__(rules)
+ super().__init__(rules)
def process_rule(self, rule: Rule) -> Rule:
- rule = super(_ApplicationRouter, self).process_rule(rule)
+ rule = super().process_rule(rule)
if isinstance(rule.target, (list, tuple)):
rule.target = _ApplicationRouter(
request, target, **target_params
)
- return super(_ApplicationRouter, self).get_target_delegate(
- target, request, **target_params
- )
+ return super().get_target_delegate(target, request, **target_params)
class Application(ReversibleRouter):
"""
def __init__(self, arg_name: str) -> None:
- super(MissingArgumentError, self).__init__(
- 400, "Missing argument %s" % arg_name
- )
+ super().__init__(400, "Missing argument %s" % arg_name)
self.arg_name = arg_name
"""
def __init__(self, handler: RequestHandler) -> None:
- super(TemplateModule, self).__init__(handler)
+ super().__init__(handler)
# keep resources in both a list and a dict to preserve order
self._resource_list = [] # type: List[Dict[str, Any]]
self._resource_dict = {} # type: Dict[str, Dict[str, Any]]
request: httputil.HTTPServerRequest,
**kwargs: Any
) -> None:
- super(WebSocketHandler, self).__init__(application, request, **kwargs)
+ super().__init__(application, request, **kwargs)
self.ws_connection = None # type: Optional[WebSocketProtocol]
self.close_code = None # type: Optional[int]
self.close_reason = None # type: Optional[str]
# connection (if it was established in the first place,
# indicated by status code 101).
if self.get_status() != 101 or self._on_close_called:
- super(WebSocketHandler, self)._break_cycles()
+ super()._break_cycles()
def send_error(self, *args: Any, **kwargs: Any) -> None:
if self.stream is None:
- super(WebSocketHandler, self).send_error(*args, **kwargs)
+ super().send_error(*args, **kwargs)
else:
# If we get an uncaught exception during the handshake,
# we have no choice but to abruptly close the connection.
] = "permessage-deflate; client_max_window_bits"
self.tcp_client = TCPClient()
- super(WebSocketClientConnection, self).__init__(
+ super().__init__(
None,
request,
lambda: None,
self.connect_future.set_exception(StreamClosedError())
self._on_message(None)
self.tcp_client.close()
- super(WebSocketClientConnection, self).on_connection_close()
+ super().on_connection_close()
def on_ws_connection_close(
self, close_code: Optional[int] = None, close_reason: Optional[str] = None
) -> None:
assert isinstance(start_line, httputil.ResponseStartLine)
if start_line.code != 101:
- await super(WebSocketClientConnection, self).headers_received(
- start_line, headers
- )
+ await super().headers_received(start_line, headers)
return
if self._timeout is not None: