version: str
version_info: typing.Tuple[int, int, int, int]
-from . import auth
-from . import autoreload
-from . import concurrent
-from . import curl_httpclient
-from . import escape
-from . import gen
-from . import http1connection
-from . import httpclient
-from . import httpserver
-from . import httputil
-from . import ioloop
-from . import iostream
-from . import locale
-from . import locks
-from . import log
-from . import netutil
-from . import options
-from . import platform
-from . import process
-from . import queues
-from . import routing
-from . import simple_httpclient
-from . import tcpclient
-from . import tcpserver
-from . import template
-from . import testing
-from . import util
-from . import web
+from . import (
+ auth,
+ autoreload,
+ concurrent,
+ curl_httpclient,
+ escape,
+ gen,
+ http1connection,
+ httpclient,
+ httpserver,
+ httputil,
+ ioloop,
+ iostream,
+ locale,
+ locks,
+ log,
+ netutil,
+ options,
+ platform,
+ process,
+ queues,
+ routing,
+ simple_httpclient,
+ tcpclient,
+ tcpserver,
+ template,
+ testing,
+ util,
+ web,
+)
import urllib.parse
import uuid
import warnings
+from collections.abc import Iterable
+from typing import Any, cast
-from tornado import httpclient
-from tornado import escape
+from tornado import escape, httpclient
from tornado.httputil import url_concat
from tornado.util import unicode_type
from tornado.web import RequestHandler
-from typing import Any, cast
-from collections.abc import Iterable
-
class AuthError(Exception):
pass
import importlib.abc
import os
import pkgutil
+import subprocess
import sys
import traceback
import types
-import subprocess
import weakref
-from tornado import ioloop
+from tornado import ioloop, process
from tornado.log import gen_log
-from tornado import process
try:
import signal
# parsing at the first positional argument. optparse supports
# this but as far as I can tell argparse does not.
import optparse
+
import tornado.autoreload
global _autoreload_is_main
"""
import asyncio
-from concurrent import futures
import functools
import sys
import types
-
-from tornado.log import app_log
-
import typing
-from typing import Any, Union
from collections.abc import Callable
+from concurrent import futures
+from typing import Any, Union
+
+from tornado.log import app_log
_T = typing.TypeVar("_T")
import collections
import functools
+import inspect
import logging
-import pycurl
import re
import threading
import time
-import inspect
+from collections.abc import Callable
from io import BytesIO
+from typing import Any
-from tornado import gen
-from tornado import httputil
-from tornado import ioloop
+import pycurl
-from tornado.escape import utf8, native_str
+from tornado import gen, httputil, ioloop
+from tornado.escape import native_str, utf8
from tornado.httpclient import (
+ AsyncHTTPClient,
+ HTTPError,
HTTPRequest,
HTTPResponse,
- HTTPError,
- AsyncHTTPClient,
main,
)
from tornado.log import app_log
-from typing import Any
-from collections.abc import Callable
-
curl_log = logging.getLogger("tornado.curl_httpclient")
CR_OR_LF_RE = re.compile(b"\r|\n")
import html
import json
import re
+import typing
import urllib.parse
+from collections.abc import Callable
+from typing import Any
from tornado.util import unicode_type
-import typing
-from typing import Any
-from collections.abc import Callable
-
def xhtml_escape(value: str | bytes) -> str:
"""Escapes a string so it is valid within HTML or XML.
import asyncio
import builtins
import collections
-from collections.abc import Generator
import concurrent.futures
import contextvars
import datetime
import functools
-from functools import singledispatch
-from inspect import isawaitable
import sys
import types
+import typing
+from collections.abc import Awaitable, Callable, Generator, Iterable, Mapping, Sequence
+from functools import singledispatch
+from inspect import isawaitable
+from typing import (
+ Any,
+ Dict,
+ List,
+ Tuple,
+ Type,
+ Union,
+ overload,
+)
from tornado.concurrent import (
Future,
- is_future,
chain_future,
- future_set_exc_info,
future_add_done_callback,
+ future_set_exc_info,
future_set_result_unless_cancelled,
+ is_future,
)
from tornado.ioloop import IOLoop
from tornado.log import app_log
from tornado.util import TimeoutError
-import typing
-from typing import (
- Union,
- Any,
- List,
- Type,
- Tuple,
- Dict,
- overload,
-)
-from collections.abc import Callable, Iterable
-from collections.abc import Mapping, Awaitable, Sequence
-
_T = typing.TypeVar("_T")
_Yieldable = Union[
import logging
import re
import types
+from collections.abc import Awaitable, Callable
+from typing import Optional, Type, cast
+from tornado import gen, httputil, iostream
from tornado.concurrent import (
Future,
future_add_done_callback,
future_set_result_unless_cancelled,
)
from tornado.escape import native_str, utf8
-from tornado import gen
-from tornado import httputil
-from tornado import iostream
-from tornado.log import gen_log, app_log
+from tornado.log import app_log, gen_log
from tornado.util import GzipDecompressor
-
-from typing import cast, Optional, Type
-from collections.abc import Callable
-from collections.abc import Awaitable
-
CR_OR_LF_RE = re.compile(b"\r|\n")
import datetime
import functools
-from io import BytesIO
import ssl
import time
import weakref
+from collections.abc import Awaitable, Callable
+from io import BytesIO
+from typing import Any, Optional, Type, Union, cast
+from tornado import gen, httputil
from tornado.concurrent import (
Future,
- future_set_result_unless_cancelled,
future_set_exception_unless_cancelled,
+ future_set_result_unless_cancelled,
)
-from tornado.escape import utf8, native_str
-from tornado import gen, httputil
+from tornado.escape import native_str, utf8
from tornado.ioloop import IOLoop
from tornado.util import Configurable
-from typing import Type, Any, Union, Optional, cast
-from collections.abc import Callable
-from collections.abc import Awaitable
-
class HTTPClient:
"""A blocking HTTP client.
import socket
import ssl
+import typing
+from collections.abc import Awaitable, Callable
+from typing import Any
+from tornado import httputil, iostream, netutil
from tornado.escape import native_str
-from tornado.http1connection import HTTP1ServerConnection, HTTP1ConnectionParameters
-from tornado import httputil
-from tornado import iostream
-from tornado import netutil
+from tornado.http1connection import HTTP1ConnectionParameters, HTTP1ServerConnection
from tornado.tcpserver import TCPServer
from tornado.util import Configurable
-import typing
-from typing import Any
-from collections.abc import Callable
-from collections.abc import Awaitable
-
class HTTPServer(TCPServer, Configurable, httputil.HTTPServerConnectionDelegate):
r"""A non-blocking, single-threaded HTTP server.
import dataclasses
import datetime
import email.utils
-from functools import lru_cache
-from http.client import responses
import http.cookies
import re
-from ssl import SSLError
import time
import unicodedata
-from urllib.parse import urlencode, urlparse, urlunparse, parse_qsl
+from functools import lru_cache
+from http.client import responses
+from ssl import SSLError
+from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
-from tornado.escape import native_str, parse_qs_bytes, utf8, to_unicode
+from tornado.escape import native_str, parse_qs_bytes, to_unicode, utf8
from tornado.util import ObjectDict, unicode_type
# responses is unused in this file, but we re-export it to other files.
responses
import typing
+from collections.abc import Awaitable, Generator, Iterable, Iterator, Mapping
from typing import (
AnyStr,
)
-from collections.abc import Iterable, Mapping, Iterator, Awaitable, Generator
if typing.TYPE_CHECKING:
# These are relatively heavy imports and aren't needed in this file
# unless we're type-checking.
- from asyncio import Future
import unittest
+ from asyncio import Future
# To be used with str.strip() and related methods.
HTTP_WHITESPACE = " \t"
import concurrent.futures
import datetime
import functools
+import math
import numbers
import os
+import random
import sys
import time
-import math
-import random
+import typing
import warnings
+from collections.abc import Awaitable, Callable
from inspect import isawaitable
+from typing import Any, Protocol, TypedDict, TypeVar
from tornado.concurrent import (
Future,
- is_future,
chain_future,
- future_set_exc_info,
future_add_done_callback,
+ future_set_exc_info,
+ is_future,
)
from tornado.log import app_log
from tornado.util import Configurable, TimeoutError, import_object
-import typing
-from typing import Any, TypeVar, TypedDict, Protocol
-from collections.abc import Callable
-from collections.abc import Awaitable
-
class _Selectable(Protocol):
def fileno(self) -> int:
import io
import numbers
import os
+import re
import socket
import ssl
import sys
-import re
-
-from tornado.concurrent import Future, future_set_result_unless_cancelled
-from tornado import ioloop
-from tornado.log import gen_log
-from tornado.netutil import ssl_wrap_socket, _client_ssl_defaults, _server_ssl_defaults
-from tornado.util import errno_from_exception
-
import typing
+from collections.abc import Awaitable, Callable
+from re import Pattern
+from types import TracebackType
from typing import (
Any,
TypeVar,
)
-from collections.abc import Callable
-from collections.abc import Awaitable
-from re import Pattern
-from types import TracebackType
+
+from tornado import ioloop
+from tornado.concurrent import Future, future_set_result_unless_cancelled
+from tornado.log import gen_log
+from tornado.netutil import _client_ssl_defaults, _server_ssl_defaults, ssl_wrap_socket
+from tornado.util import errno_from_exception
_IOStreamType = TypeVar("_IOStreamType", bound="IOStream")
import glob
import os
import re
+from collections.abc import Iterable
+from typing import Any
from tornado import escape
-from tornado.log import gen_log
-
from tornado._locale_data import LOCALE_NAMES
-
-from typing import Any
-from collections.abc import Iterable
+from tornado.log import gen_log
_default_locale = "en_US"
_translations: dict[str, Any] = {}
import collections
import datetime
import types
+from collections.abc import Awaitable
+from typing import Any, Optional, Type
from tornado import gen, ioloop
from tornado.concurrent import Future, future_set_result_unless_cancelled
-from typing import Optional, Type, Any
-from collections.abc import Awaitable
-
__all__ = ["Condition", "Event", "Semaphore", "BoundedSemaphore", "Lock"]
import sys
from tornado.escape import _unicode
-from tornado.util import unicode_type, basestring_type
+from tornado.util import basestring_type, unicode_type
try:
import colorama # type: ignore
import concurrent.futures
import errno
import os
-import sys
import socket
import ssl
import stat
+import sys
+from collections.abc import Awaitable, Callable
+from typing import Any
from tornado.concurrent import dummy_executor, run_on_executor
from tornado.ioloop import IOLoop
from tornado.util import Configurable, errno_from_exception
-from typing import Any
-from collections.abc import Callable
-from collections.abc import Awaitable
-
# Note that the naming of ssl.Purpose is confusing; the purpose
# of a context is to authenticate the opposite side of the connection.
_client_ssl_defaults = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
import datetime
import numbers
+import os
import re
import sys
-import os
import textwrap
-
-from tornado.escape import _unicode, native_str
-from tornado.log import define_logging_options
-from tornado.util import basestring_type, exec_in
-
+from collections.abc import Callable, Iterable, Iterator
from typing import (
Any,
TextIO,
)
-from collections.abc import Callable
-from collections.abc import Iterator, Iterable
+
+from tornado.escape import _unicode, native_str
+from tornado.log import define_logging_options
+from tornado.util import basestring_type, exec_in
class Error(Exception):
import threading
import typing
import warnings
-from tornado.gen import convert_yielded
-from tornado.ioloop import IOLoop, _Selectable
-
+from collections.abc import Callable
from typing import (
Any,
Protocol,
TypeVar,
Union,
)
-from collections.abc import Callable
+
+from tornado.gen import convert_yielded
+from tornado.ioloop import IOLoop, _Selectable
if typing.TYPE_CHECKING:
from typing_extensions import TypeVarTuple, Unpack
-import pycares # type: ignore
import socket
+import typing
+from collections.abc import Generator
+from typing import Any
+
+import pycares # type: ignore
-from tornado.concurrent import Future
from tornado import gen
+from tornado.concurrent import Future
from tornado.ioloop import IOLoop
from tornado.netutil import Resolver, is_valid_ip
-import typing
-from typing import Any
-from collections.abc import Generator
-
class CaresResolver(Resolver):
"""Name resolver based on the c-ares library.
"""Bridges between the Twisted package and Tornado."""
import sys
+import typing
from twisted.internet.defer import Deferred # type: ignore
from twisted.python import failure # type: ignore
-from tornado.concurrent import Future, future_set_exc_info
from tornado import gen
-
-import typing
+from tornado.concurrent import Future, future_set_exc_info
def install() -> None:
"""
import asyncio
-import os
import multiprocessing
+import os
import signal
import subprocess
import sys
import time
-
from binascii import hexlify
+from collections.abc import Callable
+from typing import Any
+from tornado import ioloop
from tornado.concurrent import (
Future,
- future_set_result_unless_cancelled,
future_set_exception_unless_cancelled,
+ future_set_result_unless_cancelled,
)
-from tornado import ioloop
from tornado.iostream import PipeIOStream
from tornado.log import gen_log
-from typing import Any
-from collections.abc import Callable
-
# Re-export this exception for convenience.
CalledProcessError = subprocess.CalledProcessError
import collections
import datetime
import heapq
+from collections.abc import Awaitable
+from typing import Any, Generic, TypeVar
from tornado import gen, ioloop
from tornado.concurrent import Future, future_set_result_unless_cancelled
from tornado.locks import Event
-from typing import TypeVar, Generic, Any
-from collections.abc import Awaitable
-
_T = TypeVar("_T")
__all__ = ["Queue", "PriorityQueue", "LifoQueue", "QueueFull", "QueueEmpty"]
"""
import re
+from collections.abc import Awaitable, Sequence
from functools import partial
-
-from tornado import httputil
-from tornado.httpserver import _CallableAdapter
-from tornado.escape import url_escape, url_unescape, utf8
-from tornado.log import app_log
-from tornado.util import basestring_type, import_object, re_unescape, unicode_type
-
+from re import Pattern
from typing import (
Any,
Union,
overload,
)
-from collections.abc import Awaitable, Sequence
-from re import Pattern
+
+from tornado import httputil
+from tornado.escape import url_escape, url_unescape, utf8
+from tornado.httpserver import _CallableAdapter
+from tornado.log import app_log
+from tornado.util import basestring_type, import_object, re_unescape, unicode_type
class Router(httputil.HTTPServerConnectionDelegate):
+import base64
+import collections
+import copy
+import functools
+import re
+import socket
+import ssl
+import sys
+import time
+import urllib.parse
+from collections.abc import Awaitable, Callable
+from io import BytesIO
+from types import TracebackType
+from typing import Any, Optional, Type
+
+from tornado import gen, httputil, version
from tornado.escape import _unicode
-from tornado import gen, version
+from tornado.http1connection import HTTP1Connection, HTTP1ConnectionParameters
from tornado.httpclient import (
- HTTPResponse,
- HTTPError,
AsyncHTTPClient,
- main,
- _RequestProxy,
+ HTTPError,
HTTPRequest,
+ HTTPResponse,
+ _RequestProxy,
+ main,
)
-from tornado import httputil
-from tornado.http1connection import HTTP1Connection, HTTP1ConnectionParameters
from tornado.ioloop import IOLoop
-from tornado.iostream import StreamClosedError, IOStream
+from tornado.iostream import IOStream, StreamClosedError
+from tornado.log import gen_log
from tornado.netutil import (
- Resolver,
OverrideResolver,
+ Resolver,
_client_ssl_defaults,
is_valid_ip,
)
-from tornado.log import gen_log
from tornado.tcpclient import TCPClient
-import base64
-import collections
-import copy
-import functools
-import re
-import socket
-import ssl
-import sys
-import time
-from io import BytesIO
-import urllib.parse
-
-from typing import Any, Optional, Type
-from collections.abc import Callable
-from collections.abc import Awaitable
-from types import TracebackType
-
class HTTPTimeoutError(HTTPError):
"""Error raised by SimpleAsyncHTTPClient on timeout.
"""A non-blocking TCP connection factory."""
+import datetime
import functools
-import socket
import numbers
-import datetime
+import socket
import ssl
+from collections.abc import Callable, Iterator
+from typing import Any, Tuple
+from tornado import gen
from tornado.concurrent import Future, future_add_done_callback
+from tornado.gen import TimeoutError
from tornado.ioloop import IOLoop
from tornado.iostream import IOStream
-from tornado import gen
from tornado.netutil import Resolver
-from tornado.gen import TimeoutError
-
-from typing import Any, Tuple
-from collections.abc import Callable
-from collections.abc import Iterator
_INITIAL_CONNECT_TIMEOUT = 0.3
import os
import socket
import ssl
+from collections.abc import Awaitable, Callable, Iterable
+from typing import Any
-from tornado import gen
-from tornado.log import app_log
+from tornado import gen, process
from tornado.ioloop import IOLoop
from tornado.iostream import IOStream, SSLIOStream
+from tornado.log import app_log
from tornado.netutil import (
- bind_sockets,
+ _DEFAULT_BACKLOG,
add_accept_handler,
+ bind_sockets,
ssl_wrap_socket,
- _DEFAULT_BACKLOG,
)
-from tornado import process
from tornado.util import errno_from_exception
-from typing import Any
-from collections.abc import Iterable, Awaitable, Callable
-
class TCPServer:
r"""A non-blocking, single-threaded TCP server.
"""
import datetime
-from io import StringIO
import linecache
import os.path
import posixpath
import re
import threading
+import typing
+from collections.abc import Callable, Iterable
+from io import StringIO
+from typing import Any, ContextManager, Optional, TextIO
from tornado import escape
from tornado.log import app_log
from tornado.util import ObjectDict, exec_in, unicode_type
-from typing import Any, Optional, TextIO, ContextManager
-from collections.abc import Callable
-from collections.abc import Iterable
-import typing
-
_DEFAULT_AUTOESCAPE = "xhtml_escape"
import time
import unittest
import warnings
-
from concurrent.futures import ThreadPoolExecutor
+
import tornado.platform.asyncio
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.platform.asyncio import (
+ AddThreadSelectorEventLoop,
AsyncIOLoop,
to_asyncio_future,
- AddThreadSelectorEventLoop,
)
+from tornado.test.util import ignore_deprecation
from tornado.testing import (
+ AsyncHTTPTestCase,
AsyncTestCase,
gen_test,
setup_with_context_manager,
- AsyncHTTPTestCase,
)
-from tornado.test.util import ignore_deprecation
from tornado.web import Application, RequestHandler
from unittest import mock
+from tornado import gen
from tornado.auth import (
- OpenIdMixin,
- OAuthMixin,
- OAuth2Mixin,
- GoogleOAuth2Mixin,
FacebookGraphMixin,
+ GoogleOAuth2Mixin,
+ OAuth2Mixin,
+ OAuthMixin,
+ OpenIdMixin,
TwitterMixin,
)
from tornado.escape import json_decode
-from tornado import gen
from tornado.httpclient import HTTPClientError
from tornado.httputil import url_concat
from tornado.log import app_log
from tornado.testing import AsyncHTTPTestCase, ExpectLog
-from tornado.web import RequestHandler, Application, HTTPError
+from tornado.web import Application, HTTPError, RequestHandler
class OpenIdClientLoginHandler(RequestHandler, OpenIdMixin):
import os
import shutil
import subprocess
-from subprocess import Popen
import sys
-from tempfile import mkdtemp
import textwrap
import time
import unittest
+from subprocess import Popen
+from tempfile import mkdtemp
class AutoreloadTest(unittest.TestCase):
import unittest
import tornado
-from tornado import web, gen, httpclient
+from tornado import gen, httpclient, web
from tornado.test.util import skipNotCPython
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-from concurrent import futures
import logging
import re
import socket
import unittest
+from concurrent import futures
+from tornado import gen
from tornado.concurrent import (
Future,
chain_future,
- run_on_executor,
future_set_result_unless_cancelled,
+ run_on_executor,
)
-from tornado.escape import utf8, to_unicode
-from tornado import gen
+from tornado.escape import to_unicode, utf8
from tornado.iostream import IOStream
from tornado.tcpserver import TCPServer
from tornado.testing import AsyncTestCase, bind_unused_port, gen_test
-from hashlib import md5
import unittest
+from hashlib import md5
+from tornado import gen
from tornado.escape import utf8
-from tornado.testing import AsyncHTTPTestCase
from tornado.test import httpclient_test
+from tornado.testing import AsyncHTTPTestCase
from tornado.web import Application, RequestHandler
-from tornado import gen
try:
import pycurl
import unittest
+from typing import Any
import tornado
from tornado.escape import (
- utf8,
- xhtml_escape,
- xhtml_unescape,
- url_escape,
- url_unescape,
- to_unicode,
json_decode,
json_encode,
- squeeze,
recursive_unicode,
+ squeeze,
+ to_unicode,
+ url_escape,
+ url_unescape,
+ utf8,
+ xhtml_escape,
+ xhtml_unescape,
)
from tornado.util import unicode_type
-from typing import Any
-
linkify_tests: list[tuple[str | bytes, dict[str, Any], str]] = [
# (input, linkify_kwargs, expected_output)
(
import asyncio
-from concurrent import futures
import contextvars
-import gc
import datetime
+import gc
import platform
import sys
import time
-import weakref
import unittest
+import weakref
+from concurrent import futures
+from tornado import gen
from tornado.concurrent import Future
from tornado.log import app_log
-from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, ExpectLog, gen_test
from tornado.test.util import skipNotCPython
-from tornado.web import Application, RequestHandler, HTTPError
-
-from tornado import gen
+from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, ExpectLog, gen_test
+from tornado.web import Application, HTTPError, RequestHandler
class GenBasicTest(AsyncTestCase):
import base64
import binascii
-from contextlib import closing
import copy
-import gzip
-import threading
import datetime
-from io import BytesIO
+import gzip
import subprocess
import sys
+import threading
import time
import unicodedata
import unittest
+from contextlib import closing
+from io import BytesIO
-from tornado.escape import utf8, native_str, to_unicode
-from tornado import gen
+from tornado import gen, netutil
+from tornado.escape import native_str, to_unicode, utf8
from tornado.httpclient import (
+ HTTPClient,
+ HTTPError,
HTTPRequest,
HTTPResponse,
_RequestProxy,
- HTTPError,
- HTTPClient,
)
from tornado.httpserver import HTTPServer
+from tornado.httputil import HTTPHeaders, format_timestamp
from tornado.ioloop import IOLoop
from tornado.iostream import IOStream
-from tornado.log import gen_log, app_log
-from tornado import netutil
-from tornado.testing import AsyncHTTPTestCase, bind_unused_port, gen_test, ExpectLog
+from tornado.log import app_log, gen_log
from tornado.test.util import ignore_deprecation
+from tornado.testing import AsyncHTTPTestCase, ExpectLog, bind_unused_port, gen_test
from tornado.web import Application, RequestHandler, url
-from tornado.httputil import format_timestamp, HTTPHeaders
class HelloWorldHandler(RequestHandler):
+import datetime
+import gzip
+import logging
+import os
+import shutil
+import socket
+import ssl
+import sys
+import tempfile
+import textwrap
+import typing
+import unittest
+import urllib.parse
+import uuid
+from contextlib import closing, contextmanager
+from io import BytesIO
+
from tornado import gen, netutil
from tornado.escape import (
+ _unicode,
json_decode,
json_encode,
- utf8,
- _unicode,
- recursive_unicode,
native_str,
+ recursive_unicode,
+ utf8,
)
from tornado.http1connection import HTTP1Connection
from tornado.httpclient import HTTPError
)
from tornado.iostream import IOStream
from tornado.locks import Event
-from tornado.log import gen_log, app_log
+from tornado.log import app_log, gen_log
from tornado.simple_httpclient import SimpleAsyncHTTPClient
+from tornado.test.util import abstract_base_test
from tornado.testing import (
- AsyncHTTPTestCase,
AsyncHTTPSTestCase,
+ AsyncHTTPTestCase,
AsyncTestCase,
ExpectLog,
gen_test,
)
-from tornado.test.util import abstract_base_test
from tornado.web import Application, RequestHandler, stream_request_body
-from contextlib import closing, contextmanager
-import datetime
-import gzip
-import logging
-import os
-import shutil
-import socket
-import ssl
-import sys
-import tempfile
-import textwrap
-import unittest
-import urllib.parse
-import uuid
-from io import BytesIO
-
-import typing
-
async def read_stream_body(stream):
"""Reads an HTTP response from `stream` and returns a tuple of its
-from tornado.httputil import (
- url_concat,
- parse_multipart_form_data,
- HTTPHeaders,
- format_timestamp,
- HTTPServerRequest,
- parse_request_start_line,
- parse_cookie,
- qs_to_qsl,
- HTTPInputError,
- HTTPFile,
- ParseMultipartConfig,
-)
-from tornado.escape import utf8, native_str
-from tornado.log import gen_log
-from tornado.test.util import ignore_deprecation
-
import copy
import datetime
import logging
import pickle
import time
-import urllib.parse
import unittest
+import urllib.parse
-from tornado.test.util import skipIfEmulated
+from tornado.escape import native_str, utf8
+from tornado.httputil import (
+ HTTPFile,
+ HTTPHeaders,
+ HTTPInputError,
+ HTTPServerRequest,
+ ParseMultipartConfig,
+ format_timestamp,
+ parse_cookie,
+ parse_multipart_form_data,
+ parse_request_start_line,
+ qs_to_qsl,
+ url_concat,
+)
+from tornado.log import gen_log
+from tornado.test.util import ignore_deprecation, skipIfEmulated
def form_data_args() -> tuple[dict[str, list[bytes]], dict[str, list[HTTPFile]]]:
def test_import_aliases(self):
# Ensure we don't delete formerly-documented aliases accidentally.
- import tornado
import asyncio
+ import tornado
+
self.assertIs(tornado.ioloop.TimeoutError, tornado.util.TimeoutError)
self.assertIs(tornado.gen.TimeoutError, tornado.util.TimeoutError)
self.assertIs(tornado.util.TimeoutError, asyncio.TimeoutError)
import asyncio
-from concurrent.futures import ThreadPoolExecutor
-from concurrent import futures
-from collections.abc import Generator
import contextlib
import datetime
import functools
import threading
import time
import types
-from unittest import mock
import unittest
+from collections.abc import Generator
+from concurrent import futures
+from concurrent.futures import ThreadPoolExecutor
+from unittest import mock
-from tornado.escape import native_str
from tornado import gen
-from tornado.ioloop import IOLoop, TimeoutError, PeriodicCallback
+from tornado.concurrent import Future
+from tornado.escape import native_str
+from tornado.ioloop import IOLoop, PeriodicCallback, TimeoutError
from tornado.log import app_log
+from tornado.test.util import (
+ ignore_deprecation,
+ skipIfNonUnix,
+)
from tornado.testing import (
AsyncTestCase,
- bind_unused_port,
ExpectLog,
+ bind_unused_port,
gen_test,
setup_with_context_manager,
)
-from tornado.test.util import (
- ignore_deprecation,
- skipIfNonUnix,
-)
-from tornado.concurrent import Future
class TestIOLoop(AsyncTestCase):
+import asyncio
+import errno
+import hashlib
+import logging
+import os
+import platform
+import random
+import socket
+import ssl
+import typing
+import unittest
+from unittest import mock
+
+from tornado import gen, netutil
from tornado.concurrent import Future
-from tornado import gen
-from tornado import netutil
+from tornado.httpclient import AsyncHTTPClient, HTTPResponse
+from tornado.httputil import HTTPHeaders
from tornado.ioloop import IOLoop
from tornado.iostream import (
IOStream,
- SSLIOStream,
PipeIOStream,
+ SSLIOStream,
StreamClosedError,
_StreamBuffer,
)
-from tornado.httpclient import AsyncHTTPClient, HTTPResponse
-from tornado.httputil import HTTPHeaders
from tornado.locks import Condition, Event
from tornado.log import gen_log
from tornado.netutil import ssl_options_to_context, ssl_wrap_socket
from tornado.platform.asyncio import AddThreadSelectorEventLoop
from tornado.tcpserver import TCPServer
+from tornado.test.util import (
+ abstract_base_test,
+ ignore_deprecation,
+ refusing_port,
+ skipIfNonUnix,
+)
from tornado.testing import (
- AsyncHTTPTestCase,
AsyncHTTPSTestCase,
+ AsyncHTTPTestCase,
AsyncTestCase,
- bind_unused_port,
ExpectLog,
+ bind_unused_port,
gen_test,
)
-from tornado.test.util import (
- skipIfNonUnix,
- refusing_port,
- ignore_deprecation,
- abstract_base_test,
-)
-from tornado.web import RequestHandler, Application
-import asyncio
-import errno
-import hashlib
-import logging
-import os
-import platform
-import random
-import socket
-import ssl
-import typing
-from unittest import mock
-import unittest
+from tornado.web import Application, RequestHandler
def _server_ssl_options():
import unittest
import tornado.locale
-from tornado.escape import utf8, to_unicode
+from tornado.escape import to_unicode, utf8
from tornado.util import unicode_type
# under the License.
import asyncio
-from datetime import timedelta
import unittest
+from datetime import timedelta
from tornado import gen, locks
from tornado.gen import TimeoutError
-from tornado.testing import gen_test, AsyncTestCase
+from tornado.testing import AsyncTestCase, gen_test
class ConditionTest(AsyncTestCase):
import errno
import signal
import socket
-from subprocess import Popen
import sys
import time
+import typing
import unittest
+from subprocess import Popen
from tornado.netutil import (
BlockingResolver,
OverrideResolver,
ThreadedResolver,
- is_valid_ip,
bind_sockets,
+ is_valid_ip,
)
-from tornado.testing import AsyncTestCase, gen_test, bind_unused_port
-from tornado.test.util import skipIfNoNetwork, abstract_base_test
-
-import typing
+from tornado.test.util import abstract_base_test, skipIfNoNetwork
+from tornado.testing import AsyncTestCase, bind_unused_port, gen_test
try:
import pycares # type: ignore
import datetime
-from io import StringIO
import os
import sys
-from unittest import mock
import unittest
+from io import StringIO
+from unittest import mock
-from tornado.options import OptionParser, Error
+from tornado.options import Error, OptionParser
from tornado.util import basestring_type
from tornado.httpclient import HTTPClient, HTTPError
from tornado.httpserver import HTTPServer
from tornado.log import gen_log
-from tornado.process import fork_processes, task_id, Subprocess
+from tornado.process import Subprocess, fork_processes, task_id
from tornado.simple_httpclient import SimpleAsyncHTTPClient
-from tornado.testing import bind_unused_port, ExpectLog, AsyncTestCase, gen_test
from tornado.test.util import skipIfNonUnix
-from tornado.web import RequestHandler, Application
+from tornado.testing import AsyncTestCase, ExpectLog, bind_unused_port, gen_test
+from tornado.web import Application, RequestHandler
# Not using AsyncHTTPTestCase because we need control over the IOLoop.
# under the License.
import asyncio
+import unittest
from datetime import timedelta
from random import random
-import unittest
from tornado import gen, queues
from tornado.gen import TimeoutError
-from tornado.testing import gen_test, AsyncTestCase
+from tornado.testing import AsyncTestCase, gen_test
class QueueBasicTest(AsyncTestCase):
# License for the specific language governing permissions and limitations
# under the License.
+import typing
+
from tornado.httputil import (
HTTPHeaders,
HTTPMessageDelegate,
from tornado.web import Application, HTTPError, RequestHandler
from tornado.wsgi import WSGIContainer
-import typing
-
class BasicRouter(Router):
def find_handler(self, request, **kwargs):
-from functools import reduce
import gc
import io
import locale # system locale module, not tornado.locale
import logging
import operator
-import textwrap
import sys
+import textwrap
import unittest
import warnings
+from functools import reduce
from tornado.httpclient import AsyncHTTPClient
from tornado.httpserver import HTTPServer
from tornado.netutil import Resolver
-from tornado.options import define, add_parse_callback, options
+from tornado.options import add_parse_callback, define, options
from tornado.test.util import ABT_SKIP_MESSAGE
TEST_MODULES = [
import collections
-from contextlib import closing
import errno
import logging
import os
import ssl
import sys
import typing
+from contextlib import closing
-from tornado.escape import to_unicode, utf8
from tornado import gen, version
+from tornado.escape import to_unicode, utf8
from tornado.httpclient import AsyncHTTPClient, HTTPResponse
from tornado.httpserver import HTTPServer
from tornado.httputil import HTTPHeaders, ResponseStartLine
from tornado.log import gen_log
from tornado.netutil import Resolver, bind_sockets
from tornado.simple_httpclient import (
- SimpleAsyncHTTPClient,
HTTPStreamClosedError,
HTTPTimeoutError,
+ SimpleAsyncHTTPClient,
)
+from tornado.test import httpclient_test
from tornado.test.httpclient_test import (
ChunkHandler,
CountdownHandler,
RedirectHandler,
UserAgentHandler,
)
-from tornado.test import httpclient_test
+from tornado.test.util import (
+ abstract_base_test,
+ refusing_port,
+ skipIfNoIPv6,
+)
from tornado.testing import (
- AsyncHTTPTestCase,
AsyncHTTPSTestCase,
+ AsyncHTTPTestCase,
AsyncTestCase,
ExpectLog,
gen_test,
)
-from tornado.test.util import (
- abstract_base_test,
- skipIfNoIPv6,
- refusing_port,
-)
-from tornado.web import RequestHandler, Application, url, stream_request_body
+from tornado.web import Application, RequestHandler, stream_request_body, url
class SimpleHTTPClientCommonTestCase(httpclient_test.HTTPClientCommonTestCase):
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-from contextlib import closing
import getpass
import socket
+import typing
import unittest
+from contextlib import closing
from tornado.concurrent import Future
+from tornado.gen import TimeoutError
from tornado.iostream import IOStream
-from tornado.netutil import bind_sockets, Resolver
+from tornado.netutil import Resolver, bind_sockets
from tornado.queues import Queue
from tornado.tcpclient import TCPClient, _Connector
from tornado.tcpserver import TCPServer
+from tornado.test.util import refusing_port, skipIfNoIPv6, skipIfNonUnix
from tornado.testing import AsyncTestCase, gen_test
-from tornado.test.util import skipIfNoIPv6, refusing_port, skipIfNonUnix
-from tornado.gen import TimeoutError
-
-import typing
# Fake address families for testing. Used in place of AF_INET
# and AF_INET6 because some installations do not have AF_INET6.
import traceback
import unittest
-from tornado.escape import utf8, native_str, to_unicode
-from tornado.template import Template, DictLoader, ParseError, Loader
+from tornado.escape import native_str, to_unicode, utf8
+from tornado.template import DictLoader, Loader, ParseError, Template
from tornado.util import ObjectDict
-from tornado import gen, ioloop
-from tornado.httpserver import HTTPServer
-from tornado.locks import Event
-from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, bind_unused_port, gen_test
-from tornado.web import Application
import asyncio
import contextlib
import gc
import unittest
import warnings
+from tornado import gen, ioloop
+from tornado.httpserver import HTTPServer
+from tornado.locks import Event
+from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, bind_unused_port, gen_test
+from tornado.web import Application
+
@contextlib.contextmanager
def set_environ(name, value):
+import datetime
import re
import sys
-import datetime
import textwrap
import unittest
+from typing import Any, cast
import tornado
from tornado.escape import utf8
from tornado.util import (
- raise_exc_info,
+ ArgReplacer,
Configurable,
exec_in,
- ArgReplacer,
- timedelta_to_seconds,
import_object,
+ raise_exc_info,
re_unescape,
+ timedelta_to_seconds,
)
-from typing import cast, Any
-
class RaiseExcInfoTest(unittest.TestCase):
def test_two_arg_exception(self):
+import binascii
+import contextlib
+import copy
+import datetime
+import email.utils
+import gzip
import http
+import itertools
+import logging
+import os
+import re
+import socket
+import typing
+import unittest
+import urllib.parse
+from io import BytesIO
+from tornado import gen, locale
from tornado.concurrent import Future
-from tornado import gen
from tornado.escape import (
json_decode,
- utf8,
- to_unicode,
- recursive_unicode,
native_str,
+ recursive_unicode,
to_basestring,
+ to_unicode,
+ utf8,
)
from tornado.httpclient import HTTPClientError
from tornado.httputil import format_timestamp
from tornado.iostream import IOStream
-from tornado import locale
from tornado.locks import Event
from tornado.log import app_log, gen_log
from tornado.simple_httpclient import SimpleAsyncHTTPClient
from tornado.template import DictLoader
-from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, ExpectLog, gen_test
from tornado.test.util import ignore_deprecation
+from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, ExpectLog, gen_test
from tornado.util import ObjectDict, unicode_type
from tornado.web import (
Application,
- RequestHandler,
- StaticFileHandler,
- RedirectHandler as WebRedirectHandler,
+ ErrorHandler,
+ Finish,
+ GZipContentEncoding,
HTTPError,
MissingArgumentError,
- ErrorHandler,
- authenticated,
- url,
+)
+from tornado.web import RedirectHandler as WebRedirectHandler
+from tornado.web import (
+ RequestHandler,
+ StaticFileHandler,
+ UIModule,
_create_signature_v1,
+ addslash,
+ authenticated,
create_signed_value,
decode_signed_value,
get_signature_key_version,
- UIModule,
- Finish,
- stream_request_body,
removeslash,
- addslash,
- GZipContentEncoding,
+ stream_request_body,
+ url,
)
-import binascii
-import contextlib
-import copy
-import datetime
-import email.utils
-import gzip
-from io import BytesIO
-import itertools
-import logging
-import os
-import re
-import socket
-import typing
-import unittest
-import urllib.parse
-
def relpath(*a):
return os.path.join(os.path.dirname(__file__), *a)
import typing
import unittest
-from tornado.concurrent import Future
from tornado import gen
+from tornado.concurrent import Future
from tornado.httpclient import HTTPError, HTTPRequest
from tornado.locks import Event
-from tornado.log import gen_log, app_log
+from tornado.log import app_log, gen_log
from tornado.netutil import Resolver
from tornado.simple_httpclient import SimpleAsyncHTTPClient
from tornado.template import DictLoader
from tornado.test.util import abstract_base_test, ignore_deprecation
-from tornado.testing import AsyncHTTPTestCase, gen_test, bind_unused_port, ExpectLog
+from tornado.testing import AsyncHTTPTestCase, ExpectLog, bind_unused_port, gen_test
from tornado.web import Application, RequestHandler
try:
raise
from tornado.websocket import (
+ WebSocketClosedError,
+ WebSocketError,
WebSocketHandler,
websocket_connect,
- WebSocketError,
- WebSocketClosedError,
)
try:
import asyncio
import concurrent.futures
import threading
-
from wsgiref.validate import validator
from tornado.routing import RuleRouter
"""
import asyncio
-from collections.abc import Generator
import functools
import inspect
import logging
import signal
import socket
import sys
+import typing
import unittest
import warnings
+from collections.abc import Callable, Coroutine, Generator
+from types import TracebackType
+from typing import Any, Optional, Type, Union
-from tornado import gen
+from tornado import gen, netutil
from tornado.httpclient import AsyncHTTPClient, HTTPResponse
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop, TimeoutError
-from tornado import netutil
+from tornado.log import app_log
from tornado.platform.asyncio import AsyncIOMainLoop
from tornado.process import Subprocess
-from tornado.log import app_log
-from tornado.util import raise_exc_info, basestring_type
+from tornado.util import basestring_type, raise_exc_info
from tornado.web import Application
-import typing
-from typing import Any, Type, Union, Optional
-from collections.abc import Callable
-from collections.abc import Coroutine
-from types import TracebackType
-
_ExcInfoTuple = tuple[
type[BaseException] | None, BaseException | None, TracebackType | None
]
import array
import asyncio
-from inspect import getfullargspec
import os
import re
import typing
import zlib
-
+from collections.abc import Callable, Mapping, Sequence
+from inspect import getfullargspec
+from re import Match
from typing import (
Any,
)
-from collections.abc import Callable
-from collections.abc import Mapping, Sequence
-from re import Match
if typing.TYPE_CHECKING:
# Additional imports only used in type comments.
# This lets us make these imports lazy.
import datetime
- from types import TracebackType
import unittest
+ from types import TracebackType
# Aliases for types that are spelled differently in different Python
# versions. bytes_type is deprecated and no longer used in Tornado
import hashlib
import hmac
import http.cookies
-from inspect import isclass
-from io import BytesIO
import mimetypes
import numbers
import os.path
import sys
import threading
import time
-import warnings
-import tornado
import traceback
import types
import urllib.parse
+import warnings
+from inspect import isclass
+from io import BytesIO
from urllib.parse import urlencode
+import tornado
+from tornado import escape, gen, httputil, iostream, locale, template
from tornado.concurrent import Future, future_set_result_unless_cancelled
-from tornado import escape
-from tornado import gen
+from tornado.escape import _unicode, utf8
from tornado.httpserver import HTTPServer
-from tornado import httputil
-from tornado import iostream
-from tornado import locale
from tornado.log import access_log, app_log, gen_log
-from tornado import template
-from tornado.escape import utf8, _unicode
from tornado.routing import (
AnyMatches,
DefaultHostMatches,
HostMatches,
ReversibleRouter,
- Rule,
ReversibleRuleRouter,
+ Rule,
URLSpec,
_RuleList,
)
-from tornado.util import ObjectDict, unicode_type, _websocket_mask
+from tornado.util import ObjectDict, _websocket_mask, unicode_type
url = URLSpec
+from collections.abc import Awaitable, Callable, Generator, Iterable
+from types import TracebackType
from typing import (
Any,
- Union,
Optional,
Type,
TypeVar,
+ Union,
cast,
overload,
)
-from collections.abc import Callable
-from collections.abc import Awaitable, Iterable, Generator
-from types import TracebackType
# The following types are accepted by RequestHandler.set_header
# and related methods.
import hashlib
import logging
import os
-import sys
import struct
-import tornado
-from urllib.parse import urlparse
+import sys
import warnings
import zlib
+from collections.abc import Awaitable, Callable
+from types import TracebackType
+from typing import (
+ Any,
+ Optional,
+ Protocol,
+ Type,
+ Union,
+ cast,
+)
+from urllib.parse import urlparse
+import tornado
+from tornado import gen, httpclient, httputil, simple_httpclient
from tornado.concurrent import Future, future_set_result_unless_cancelled
-from tornado.escape import utf8, native_str, to_unicode
-from tornado import gen, httpclient, httputil
+from tornado.escape import native_str, to_unicode, utf8
from tornado.ioloop import IOLoop
-from tornado.iostream import StreamClosedError, IOStream
-from tornado.log import gen_log, app_log
+from tornado.iostream import IOStream, StreamClosedError
+from tornado.log import app_log, gen_log
from tornado.netutil import Resolver
-from tornado import simple_httpclient
from tornado.queues import Queue
from tornado.tcpclient import TCPClient
from tornado.util import _websocket_mask
-from typing import (
- cast,
- Any,
- Optional,
- Union,
- Type,
- Protocol,
-)
-from collections.abc import Callable
-from collections.abc import Awaitable
-from types import TracebackType
-
# The zlib compressor types aren't actually exposed anywhere
# publicly, so declare protocols for the portions we use.
"""
import concurrent.futures
-from io import BytesIO
-import tornado
import sys
+import typing
+from collections.abc import Callable
+from io import BytesIO
+from types import TracebackType
+from typing import Any
+import tornado
+from tornado import escape, httputil
from tornado.concurrent import dummy_executor
-from tornado import escape
-from tornado import httputil
from tornado.ioloop import IOLoop
from tornado.log import access_log
-from typing import Any
-from collections.abc import Callable
-from types import TracebackType
-import typing
-
if typing.TYPE_CHECKING:
from _typeshed.wsgi import WSGIApplication as WSGIAppType