"Implicit-optional" mode is on by default, but that default is intended to change in the indefinite future (python/peps#689, python/typing#275). Go ahead and change to the future explicit use of Optional.
HTTP Server
-----------
- .. autoclass:: HTTPServer(request_callback: Union[httputil.HTTPServerConnectionDelegate, Callable[[httputil.HTTPServerRequest], None]], no_keep_alive: bool = False, xheaders: bool = False, ssl_options: Union[Dict[str, Any], ssl.SSLContext] = None, protocol: str = None, decompress_request: bool = False, chunk_size: int = None, max_header_size: int = None, idle_connection_timeout: float = None, body_timeout: float = None, max_body_size: int = None, max_buffer_size: int = None, trusted_downstream: List[str] = None)
+ .. autoclass:: HTTPServer(request_callback: Union[httputil.HTTPServerConnectionDelegate, Callable[[httputil.HTTPServerRequest], None]], no_keep_alive: bool = False, xheaders: bool = False, ssl_options: Union[Dict[str, Any], ssl.SSLContext] = None, protocol: Optional[str] = None, decompress_request: bool = False, chunk_size: Optional[int] = None, max_header_size: Optional[int] = None, idle_connection_timeout: Optional[float] = None, body_timeout: Optional[float] = None, max_body_size: Optional[int] = None, max_buffer_size: Optional[int] = None, trusted_downstream: Optional[List[str]] = None)
:members:
The public interface of this class is mostly inherited from
Application configuration
-------------------------
- .. autoclass:: Application(handlers: List[Union[Rule, Tuple]] = None, default_host: str = None, transforms: List[Type[OutputTransform]] = None, **settings)
+ .. autoclass:: Application(handlers: Optional[List[Union[Rule, Tuple]]] = None, default_host: Optional[str] = None, transforms: Optional[List[Type[OutputTransform]]] = None, **settings)
.. attribute:: settings
[mypy]
python_version = 3.5
+no_implicit_optional = True
[mypy-tornado.*,tornado.platform.*]
disallow_untyped_defs = True
def authenticate_redirect(
self,
- callback_uri: str = None,
+ callback_uri: Optional[str] = None,
ax_attrs: List[str] = ["name", "email", "language", "username"],
) -> None:
"""Redirects to the authentication URL for this service.
handler.redirect(endpoint + "?" + urllib.parse.urlencode(args))
async def get_authenticated_user(
- self, http_client: httpclient.AsyncHTTPClient = None
+ self, http_client: Optional[httpclient.AsyncHTTPClient] = None
) -> Dict[str, Any]:
"""Fetches the authenticated user data upon redirect.
return self._on_authentication_verified(resp)
def _openid_args(
- self, callback_uri: str, ax_attrs: Iterable[str] = [], oauth_scope: str = None
+ self,
+ callback_uri: str,
+ ax_attrs: Iterable[str] = [],
+ oauth_scope: Optional[str] = None,
) -> Dict[str, str]:
handler = cast(RequestHandler, self)
url = urllib.parse.urljoin(handler.request.full_url(), callback_uri)
async def authorize_redirect(
self,
- callback_uri: str = None,
- extra_params: Dict[str, Any] = None,
- http_client: httpclient.AsyncHTTPClient = None,
+ callback_uri: Optional[str] = None,
+ extra_params: Optional[Dict[str, Any]] = None,
+ http_client: Optional[httpclient.AsyncHTTPClient] = None,
) -> None:
"""Redirects the user to obtain OAuth authorization for this service.
self._on_request_token(url, callback_uri, response)
async def get_authenticated_user(
- self, http_client: httpclient.AsyncHTTPClient = None
+ self, http_client: Optional[httpclient.AsyncHTTPClient] = None
) -> Dict[str, Any]:
"""Gets the OAuth authorized user and access token.
return user
def _oauth_request_token_url(
- self, callback_uri: str = None, extra_params: Dict[str, Any] = None
+ self,
+ callback_uri: Optional[str] = None,
+ extra_params: Optional[Dict[str, Any]] = None,
) -> str:
handler = cast(RequestHandler, self)
consumer_token = self._oauth_consumer_token()
def authorize_redirect(
self,
- redirect_uri: str = None,
- client_id: str = None,
- client_secret: str = None,
- extra_params: Dict[str, Any] = None,
- scope: str = None,
+ redirect_uri: Optional[str] = None,
+ client_id: Optional[str] = None,
+ client_secret: Optional[str] = None,
+ extra_params: Optional[Dict[str, Any]] = None,
+ scope: Optional[str] = None,
response_type: str = "code",
) -> None:
"""Redirects the user to obtain OAuth authorization for this service.
def _oauth_request_token_url(
self,
- redirect_uri: str = None,
- client_id: str = None,
- client_secret: str = None,
- code: str = None,
- extra_params: Dict[str, Any] = None,
+ redirect_uri: Optional[str] = None,
+ client_id: Optional[str] = None,
+ client_secret: Optional[str] = None,
+ code: Optional[str] = None,
+ extra_params: Optional[Dict[str, Any]] = None,
) -> str:
url = self._OAUTH_ACCESS_TOKEN_URL # type: ignore
args = {} # type: Dict[str, str]
async def oauth2_request(
self,
url: str,
- access_token: str = None,
- post_args: Dict[str, Any] = None,
+ access_token: Optional[str] = None,
+ post_args: Optional[Dict[str, Any]] = None,
**args: Any
) -> Any:
"""Fetches the given URL auth an OAuth2 access token.
_OAUTH_NO_CALLBACKS = False
_TWITTER_BASE_URL = "https://api.twitter.com/1.1"
- async def authenticate_redirect(self, callback_uri: str = None) -> None:
+ async def authenticate_redirect(self, callback_uri: Optional[str] = None) -> None:
"""Just like `~OAuthMixin.authorize_redirect`, but
auto-redirects if authorized.
self,
path: str,
access_token: Dict[str, Any],
- post_args: Dict[str, Any] = None,
+ post_args: Optional[Dict[str, Any]] = None,
**args: Any
) -> Any:
"""Fetches the given API path, e.g., ``statuses/user_timeline/btaylor``
client_id: str,
client_secret: str,
code: str,
- extra_fields: Dict[str, Any] = None,
+ extra_fields: Optional[Dict[str, Any]] = None,
) -> Optional[Dict[str, Any]]:
"""Handles the login for the Facebook user, returning a user object.
async def facebook_request(
self,
path: str,
- access_token: str = None,
- post_args: Dict[str, Any] = None,
+ access_token: Optional[str] = None,
+ post_args: Optional[Dict[str, Any]] = None,
**args: Any
) -> Any:
"""Fetches the given relative API path, e.g., "/btaylor/picture"
method: str,
url: str,
parameters: Dict[str, Any] = {},
- token: Dict[str, Any] = None,
+ token: Optional[Dict[str, Any]] = None,
) -> bytes:
"""Calculates the HMAC-SHA1 OAuth signature for the given request.
method: str,
url: str,
parameters: Dict[str, Any] = {},
- token: Dict[str, Any] = None,
+ token: Optional[Dict[str, Any]] = None,
) -> bytes:
"""Calculates the HMAC-SHA1 OAuth 1.0a signature for the given request.
)
from tornado.log import app_log
-from typing import Dict, Any, Callable, Union
+from typing import Dict, Any, Callable, Union, Tuple, Optional
import typing
if typing.TYPE_CHECKING:
- from typing import Deque, Tuple, Optional # noqa: F401
+ from typing import Deque # noqa: F401
curl_log = logging.getLogger("tornado.curl_httpclient")
class CurlAsyncHTTPClient(AsyncHTTPClient):
def initialize( # type: ignore
- self, max_clients: int = 10, defaults: Dict[str, Any] = None
+ self, max_clients: int = 10, defaults: Optional[Dict[str, Any]] = None
) -> None:
super(CurlAsyncHTTPClient, self).initialize(defaults=defaults)
# Typeshed is incomplete for CurlMulti, so just use Any for now.
break
def _finish(
- self, curl: pycurl.Curl, curl_error: int = None, curl_message: str = None
+ self,
+ curl: pycurl.Curl,
+ curl_error: Optional[int] = None,
+ curl_message: Optional[str] = None,
) -> None:
info = curl.info # type: ignore
curl.info = None # type: ignore
def __init__(
self,
no_keep_alive: bool = False,
- chunk_size: int = None,
- max_header_size: int = None,
- header_timeout: float = None,
- max_body_size: int = None,
- body_timeout: float = None,
+ chunk_size: Optional[int] = None,
+ max_header_size: Optional[int] = None,
+ header_timeout: Optional[float] = None,
+ max_body_size: Optional[int] = None,
+ body_timeout: Optional[float] = None,
decompress: bool = False,
) -> None:
"""
self,
stream: iostream.IOStream,
is_client: bool,
- params: HTTP1ConnectionParameters = None,
- context: object = None,
+ params: Optional[HTTP1ConnectionParameters] = None,
+ context: Optional[object] = None,
) -> None:
"""
:arg stream: an `.IOStream`
self,
start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine],
headers: httputil.HTTPHeaders,
- chunk: bytes = None,
+ chunk: Optional[bytes] = None,
) -> "Future[None]":
"""Implements `.HTTPConnection.write_headers`."""
lines = []
def __init__(
self,
stream: iostream.IOStream,
- params: HTTP1ConnectionParameters = None,
- context: object = None,
+ params: Optional[HTTP1ConnectionParameters] = None,
+ context: Optional[object] = None,
) -> None:
"""
:arg stream: an `.IOStream`
"""
def __init__(
- self, async_client_class: Type["AsyncHTTPClient"] = None, **kwargs: Any
+ self,
+ async_client_class: "Optional[Type[AsyncHTTPClient]]" = None,
+ **kwargs: Any
) -> None:
# Initialize self._closed at the beginning of the constructor
# so that an exception raised here doesn't lead to confusing
instance_cache[instance.io_loop] = instance
return instance
- def initialize(self, defaults: Dict[str, Any] = None) -> None:
+ def initialize(self, defaults: Optional[Dict[str, Any]] = None) -> None:
self.io_loop = IOLoop.current()
self.defaults = dict(HTTPRequest._DEFAULTS)
if defaults is not None:
self,
url: str,
method: str = "GET",
- headers: Union[Dict[str, str], httputil.HTTPHeaders] = None,
- body: Union[bytes, str] = None,
- auth_username: str = None,
- auth_password: str = None,
- auth_mode: str = None,
- connect_timeout: float = None,
- request_timeout: float = None,
- if_modified_since: Union[float, datetime.datetime] = None,
- follow_redirects: bool = None,
- max_redirects: int = None,
- user_agent: str = None,
- use_gzip: bool = None,
- network_interface: str = None,
- streaming_callback: Callable[[bytes], None] = None,
- header_callback: Callable[[str], None] = None,
- prepare_curl_callback: Callable[[Any], None] = None,
- proxy_host: str = None,
- proxy_port: int = None,
- proxy_username: str = None,
- proxy_password: str = None,
- proxy_auth_mode: str = None,
- allow_nonstandard_methods: bool = None,
- validate_cert: bool = None,
- ca_certs: str = None,
- allow_ipv6: bool = None,
- client_key: str = None,
- client_cert: str = None,
- body_producer: Callable[[Callable[[bytes], None]], "Future[None]"] = None,
+ headers: Optional[Union[Dict[str, str], httputil.HTTPHeaders]] = None,
+ body: Optional[Union[bytes, str]] = None,
+ auth_username: Optional[str] = None,
+ auth_password: Optional[str] = None,
+ auth_mode: Optional[str] = None,
+ connect_timeout: Optional[float] = None,
+ request_timeout: Optional[float] = None,
+ if_modified_since: Optional[Union[float, datetime.datetime]] = None,
+ follow_redirects: Optional[bool] = None,
+ max_redirects: Optional[int] = None,
+ user_agent: Optional[str] = None,
+ use_gzip: Optional[bool] = None,
+ network_interface: Optional[str] = None,
+ streaming_callback: Optional[Callable[[bytes], None]] = None,
+ header_callback: Optional[Callable[[str], None]] = None,
+ prepare_curl_callback: Optional[Callable[[Any], None]] = None,
+ proxy_host: Optional[str] = None,
+ proxy_port: Optional[int] = None,
+ proxy_username: Optional[str] = None,
+ proxy_password: Optional[str] = None,
+ proxy_auth_mode: Optional[str] = None,
+ allow_nonstandard_methods: Optional[bool] = None,
+ validate_cert: Optional[bool] = None,
+ ca_certs: Optional[str] = None,
+ allow_ipv6: Optional[bool] = None,
+ client_key: Optional[str] = None,
+ client_cert: Optional[str] = None,
+ body_producer: Optional[
+ Callable[[Callable[[bytes], None]], "Future[None]"]
+ ] = None,
expect_100_continue: bool = False,
- decompress_response: bool = None,
- ssl_options: Union[Dict[str, Any], ssl.SSLContext] = None,
+ decompress_response: Optional[bool] = None,
+ ssl_options: Optional[Union[Dict[str, Any], ssl.SSLContext]] = None,
) -> None:
r"""All parameters except ``url`` are optional.
self,
request: HTTPRequest,
code: int,
- headers: httputil.HTTPHeaders = None,
- buffer: BytesIO = None,
- effective_url: str = None,
- error: BaseException = None,
- request_time: float = None,
- time_info: Dict[str, float] = None,
- reason: str = None,
- start_time: float = None,
+ headers: Optional[httputil.HTTPHeaders] = None,
+ buffer: Optional[BytesIO] = None,
+ effective_url: Optional[str] = None,
+ error: Optional[BaseException] = None,
+ request_time: Optional[float] = None,
+ time_info: Optional[Dict[str, float]] = None,
+ reason: Optional[str] = None,
+ start_time: Optional[float] = None,
) -> None:
if isinstance(request, _RequestProxy):
self.request = request.request
"""
def __init__(
- self, code: int, message: str = None, response: HTTPResponse = None
+ self,
+ code: int,
+ message: Optional[str] = None,
+ response: Optional[HTTPResponse] = None,
) -> None:
self.code = code
self.message = message or httputil.responses.get(code, "Unknown")
],
no_keep_alive: bool = False,
xheaders: bool = False,
- ssl_options: Union[Dict[str, Any], ssl.SSLContext] = None,
- protocol: str = None,
+ ssl_options: Optional[Union[Dict[str, Any], ssl.SSLContext]] = None,
+ protocol: Optional[str] = None,
decompress_request: bool = False,
- chunk_size: int = None,
- max_header_size: int = None,
- idle_connection_timeout: float = None,
- body_timeout: float = None,
- max_body_size: int = None,
- max_buffer_size: int = None,
- trusted_downstream: List[str] = None,
+ chunk_size: Optional[int] = None,
+ max_header_size: Optional[int] = None,
+ idle_connection_timeout: Optional[float] = None,
+ body_timeout: Optional[float] = None,
+ max_body_size: Optional[int] = None,
+ max_buffer_size: Optional[int] = None,
+ trusted_downstream: Optional[List[str]] = None,
) -> None:
# This method's signature is not extracted with autodoc
# because we want its arguments to appear on the class
stream: iostream.IOStream,
address: Tuple,
protocol: Optional[str],
- trusted_downstream: List[str] = None,
+ trusted_downstream: Optional[List[str]] = None,
) -> None:
self.address = address
# Save the socket's address family now so we know how to
def __init__(
self,
- method: str = None,
- uri: str = None,
+ method: Optional[str] = None,
+ uri: Optional[str] = None,
version: str = "HTTP/1.0",
- headers: HTTPHeaders = None,
- body: bytes = None,
- host: str = None,
- files: Dict[str, List["HTTPFile"]] = None,
- connection: "HTTPConnection" = None,
- start_line: "RequestStartLine" = None,
- server_connection: object = None,
+ headers: Optional[HTTPHeaders] = None,
+ body: Optional[bytes] = None,
+ host: Optional[str] = None,
+ files: Optional[Dict[str, List["HTTPFile"]]] = None,
+ connection: Optional["HTTPConnection"] = None,
+ start_line: Optional["RequestStartLine"] = None,
+ server_connection: Optional[object] = None,
) -> None:
if start_line is not None:
method, uri, version = start_line
self,
start_line: Union["RequestStartLine", "ResponseStartLine"],
headers: HTTPHeaders,
- chunk: bytes = None,
+ chunk: Optional[bytes] = None,
) -> "Future[None]":
"""Write an HTTP header block.
body: bytes,
arguments: Dict[str, List[bytes]],
files: Dict[str, List[HTTPFile]],
- headers: HTTPHeaders = None,
+ headers: Optional[HTTPHeaders] = None,
) -> None:
"""Parses a form request body.
return AsyncIOLoop
- def initialize(self, make_current: bool = None) -> None:
+ def initialize(self, make_current: Optional[bool] = None) -> None:
if make_current is None:
if IOLoop.current(instance=False) is None:
self.make_current()
"""
raise NotImplementedError()
- def run_sync(self, func: Callable, timeout: float = None) -> Any:
+ def run_sync(self, func: Callable, timeout: Optional[float] = None) -> Any:
"""Starts the `IOLoop`, runs the given function, and stops the loop.
The function must return either an awaitable object or
Added the ``real_error`` attribute.
"""
- def __init__(self, real_error: BaseException = None) -> None:
+ def __init__(self, real_error: Optional[BaseException] = None) -> None:
super(StreamClosedError, self).__init__("Stream is closed")
self.real_error = real_error
def __init__(
self,
- max_buffer_size: int = None,
- read_chunk_size: int = None,
- max_write_buffer_size: int = None,
+ max_buffer_size: Optional[int] = None,
+ read_chunk_size: Optional[int] = None,
+ max_write_buffer_size: Optional[int] = None,
) -> None:
"""`BaseIOStream` constructor.
"""
return None
- def read_until_regex(self, regex: bytes, max_bytes: int = None) -> Awaitable[bytes]:
+ def read_until_regex(
+ self, regex: bytes, max_bytes: Optional[int] = None
+ ) -> Awaitable[bytes]:
"""Asynchronously read until we have matched the given regex.
The result includes the data that matches the regex and anything
raise
return future
- def read_until(self, delimiter: bytes, max_bytes: int = None) -> Awaitable[bytes]:
+ def read_until(
+ self, delimiter: bytes, max_bytes: Optional[int] = None
+ ) -> Awaitable[bytes]:
"""Asynchronously read until we have found the given delimiter.
The result includes all the data read including the delimiter.
del data
def connect(
- self: _IOStreamType, address: tuple, server_hostname: str = None
+ self: _IOStreamType, address: tuple, server_hostname: Optional[str] = None
) -> "Future[_IOStreamType]":
"""Connects the socket to a remote address without blocking.
def start_tls(
self,
server_side: bool,
- ssl_options: Union[Dict[str, Any], ssl.SSLContext] = None,
- server_hostname: str = None,
+ ssl_options: Optional[Union[Dict[str, Any], ssl.SSLContext]] = None,
+ server_hostname: Optional[str] = None,
) -> Awaitable["SSLIOStream"]:
"""Convert this `IOStream` to an `SSLIOStream`.
super(SSLIOStream, self)._handle_write()
def connect(
- self, address: Tuple, server_hostname: str = None
+ self, address: Tuple, server_hostname: Optional[str] = None
) -> "Future[SSLIOStream]":
self._server_hostname = server_hostname
# Ignore the result of connect(). If it fails,
from tornado._locale_data import LOCALE_NAMES
-from typing import Iterable, Any, Union, Dict
+from typing import Iterable, Any, Union, Dict, Optional
_default_locale = "en_US"
_translations = {} # type: Dict[str, Any]
_supported_locales = frozenset(list(_translations.keys()) + [_default_locale])
-def load_translations(directory: str, encoding: str = None) -> None:
+def load_translations(directory: str, encoding: Optional[str] = None) -> None:
"""Loads translations from CSV files in a directory.
Translations are strings with optional Python-style named placeholders
]
def translate(
- self, message: str, plural_message: str = None, count: int = None
+ self,
+ message: str,
+ plural_message: Optional[str] = None,
+ count: Optional[int] = None,
) -> str:
"""Returns the translation for the given message for this locale.
raise NotImplementedError()
def pgettext(
- self, context: str, message: str, plural_message: str = None, count: int = None
+ self,
+ context: str,
+ message: str,
+ plural_message: Optional[str] = None,
+ count: Optional[int] = None,
) -> str:
raise NotImplementedError()
super(CSVLocale, self).__init__(code)
def translate(
- self, message: str, plural_message: str = None, count: int = None
+ self,
+ message: str,
+ plural_message: Optional[str] = None,
+ count: Optional[int] = None,
) -> str:
if plural_message is not None:
assert count is not None
return message_dict.get(message, message)
def pgettext(
- self, context: str, message: str, plural_message: str = None, count: int = None
+ self,
+ context: str,
+ message: str,
+ plural_message: Optional[str] = None,
+ count: Optional[int] = None,
) -> str:
if self.translations:
gen_log.warning("pgettext is not supported by CSVLocale")
super(GettextLocale, self).__init__(code)
def translate(
- self, message: str, plural_message: str = None, count: int = None
+ self,
+ message: str,
+ plural_message: Optional[str] = None,
+ count: Optional[int] = None,
) -> str:
if plural_message is not None:
assert count is not None
return self.gettext(message)
def pgettext(
- self, context: str, message: str, plural_message: str = None, count: int = None
+ self,
+ context: str,
+ message: str,
+ plural_message: Optional[str] = None,
+ count: Optional[int] = None,
) -> str:
"""Allows to set context for translation, accepts plural forms.
result += " waiters[%s]" % len(self._waiters)
return result + ">"
- def wait(self, timeout: Union[float, datetime.timedelta] = None) -> Awaitable[bool]:
+ def wait(
+ self, timeout: Optional[Union[float, datetime.timedelta]] = None
+ ) -> Awaitable[bool]:
"""Wait for `.notify`.
Returns a `.Future` that resolves ``True`` if the condition is notified,
"""
self._value = False
- def wait(self, timeout: Union[float, datetime.timedelta] = None) -> Awaitable[None]:
+ def wait(
+ self, timeout: Optional[Union[float, datetime.timedelta]] = None
+ ) -> Awaitable[None]:
"""Block until the internal flag is true.
Returns an awaitable, which raises `tornado.util.TimeoutError` after a
break
def acquire(
- self, timeout: Union[float, datetime.timedelta] = None
+ self, timeout: Optional[Union[float, datetime.timedelta]] = None
) -> Awaitable[_ReleasingContextManager]:
"""Decrement the counter. Returns an awaitable.
return "<%s _block=%s>" % (self.__class__.__name__, self._block)
def acquire(
- self, timeout: Union[float, datetime.timedelta] = None
+ self, timeout: Optional[Union[float, datetime.timedelta]] = None
) -> Awaitable[_ReleasingContextManager]:
"""Attempt to lock. Returns an awaitable.
except ImportError:
curses = None # type: ignore
-from typing import Dict, Any, cast
+from typing import Dict, Any, cast, Optional
# Logger objects for internal tornado use
access_log = logging.getLogger("tornado.access")
return formatted.replace("\n", "\n ")
-def enable_pretty_logging(options: Any = None, logger: logging.Logger = None) -> None:
+def enable_pretty_logging(
+ options: Any = None, logger: Optional[logging.Logger] = None
+) -> None:
"""Turns on formatted logging output as configured.
This is called automatically by `tornado.options.parse_command_line`
from tornado.platform.auto import set_close_exec
from tornado.util import Configurable, errno_from_exception
-import typing
-from typing import List, Callable, Any, Type, Dict, Union, Tuple, Awaitable
-
-if typing.TYPE_CHECKING:
- from asyncio import Future # noqa: F401
+from typing import List, Callable, Any, Type, Dict, Union, Tuple, Awaitable, Optional
# Note that the naming of ssl.Purpose is confusing; the purpose
# of a context is to authentiate the opposite side of the connection.
def bind_sockets(
port: int,
- address: str = None,
+ address: Optional[str] = None,
family: socket.AddressFamily = socket.AF_UNSPEC,
backlog: int = _DEFAULT_BACKLOG,
- flags: int = None,
+ flags: Optional[int] = None,
reuse_port: bool = False,
) -> List[socket.socket]:
"""Creates listening sockets bound to the given port and address.
"""
def initialize(
- self, executor: concurrent.futures.Executor = None, close_executor: bool = True
+ self,
+ executor: Optional[concurrent.futures.Executor] = None,
+ close_executor: bool = True,
) -> None:
self.io_loop = IOLoop.current()
if executor is not None:
def ssl_wrap_socket(
socket: socket.socket,
ssl_options: Union[Dict[str, Any], ssl.SSLContext],
- server_hostname: str = None,
+ server_hostname: Optional[str] = None,
**kwargs: Any
) -> ssl.SSLSocket:
"""Returns an ``ssl.SSLSocket`` wrapping the given socket.
from tornado.log import define_logging_options
from tornado.util import basestring_type, exec_in
-import typing
-from typing import Any, Iterator, Iterable, Tuple, Set, Dict, Callable, List, TextIO
-
-if typing.TYPE_CHECKING:
- from typing import Optional # noqa: F401
+from typing import (
+ Any,
+ Iterator,
+ Iterable,
+ Tuple,
+ Set,
+ Dict,
+ Callable,
+ List,
+ TextIO,
+ Optional,
+)
class Error(Exception):
self,
name: str,
default: Any = None,
- type: type = None,
- help: str = None,
- metavar: str = None,
+ type: Optional[type] = None,
+ help: Optional[str] = None,
+ metavar: Optional[str] = None,
multiple: bool = False,
- group: str = None,
- callback: Callable[[Any], None] = None,
+ group: Optional[str] = None,
+ callback: Optional[Callable[[Any], None]] = None,
) -> None:
"""Defines a new command line option.
self._options[normalized] = option
def parse_command_line(
- self, args: List[str] = None, final: bool = True
+ self, args: Optional[List[str]] = None, final: bool = True
) -> List[str]:
"""Parses all options given on the command line (defaults to
`sys.argv`).
if final:
self.run_parse_callbacks()
- def print_help(self, file: TextIO = None) -> None:
+ def print_help(self, file: Optional[TextIO] = None) -> None:
"""Prints all the command line options to stderr (or another file)."""
if file is None:
file = sys.stderr
self,
name: str,
default: Any = None,
- type: type = None,
- help: str = None,
- metavar: str = None,
+ type: Optional[type] = None,
+ help: Optional[str] = None,
+ metavar: Optional[str] = None,
multiple: bool = False,
- file_name: str = None,
- group_name: str = None,
- callback: Callable[[Any], None] = None,
+ file_name: Optional[str] = None,
+ group_name: Optional[str] = None,
+ callback: Optional[Callable[[Any], None]] = None,
) -> None:
if default is None and multiple:
default = []
def define(
name: str,
default: Any = None,
- type: type = None,
- help: str = None,
- metavar: str = None,
+ type: Optional[type] = None,
+ help: Optional[str] = None,
+ metavar: Optional[str] = None,
multiple: bool = False,
- group: str = None,
- callback: Callable[[Any], None] = None,
+ group: Optional[str] = None,
+ callback: Optional[Callable[[Any], None]] = None,
) -> None:
"""Defines an option in the global namespace.
)
-def parse_command_line(args: List[str] = None, final: bool = True) -> List[str]:
+def parse_command_line(
+ args: Optional[List[str]] = None, final: bool = True
+) -> List[str]:
"""Parses global options from the command line.
See `OptionParser.parse_command_line`.
return options.parse_config_file(path, final=final)
-def print_help(file: TextIO = None) -> None:
+def print_help(file: Optional[TextIO] = None) -> None:
"""Prints all the command line options to stderr (or another file).
See `OptionParser.print_help`.
_task_id = None
-def fork_processes(num_processes: Optional[int], max_restarts: int = None) -> int:
+def fork_processes(
+ num_processes: Optional[int], max_restarts: Optional[int] = None
+) -> int:
"""Starts multiple worker processes.
If ``num_processes`` is None or <= 0, we detect the number of cores
from tornado.concurrent import Future, future_set_result_unless_cancelled
from tornado.locks import Event
-from typing import Union, TypeVar, Generic, Awaitable
+from typing import Union, TypeVar, Generic, Awaitable, Optional
import typing
if typing.TYPE_CHECKING:
- from typing import Deque, Tuple, List, Any # noqa: F401
+ from typing import Deque, Tuple, Any # noqa: F401
_T = TypeVar("_T")
return self.qsize() >= self.maxsize
def put(
- self, item: _T, timeout: Union[float, datetime.timedelta] = None
+ self, item: _T, timeout: Optional[Union[float, datetime.timedelta]] = None
) -> "Future[None]":
"""Put an item into the queue, perhaps waiting until there is room.
else:
self.__put_internal(item)
- def get(self, timeout: Union[float, datetime.timedelta] = None) -> Awaitable[_T]:
+ def get(
+ self, timeout: Optional[Union[float, datetime.timedelta]] = None
+ ) -> Awaitable[_T]:
"""Remove and return an item from the queue.
Returns an awaitable which resolves once an item is available, or raises
if self._unfinished_tasks == 0:
self._finished.set()
- def join(self, timeout: Union[float, datetime.timedelta] = None) -> Awaitable[None]:
+ def join(
+ self, timeout: Optional[Union[float, datetime.timedelta]] = None
+ ) -> Awaitable[None]:
"""Block until all items in the queue are processed.
Returns an awaitable, which raises `tornado.util.TimeoutError` after a
class RuleRouter(Router):
"""Rule-based router implementation."""
- def __init__(self, rules: _RuleList = None) -> None:
+ def __init__(self, rules: Optional[_RuleList] = None) -> None:
"""Constructs a router from an ordered list of rules::
RuleRouter([
in a rule's matcher (see `Matcher.reverse`).
"""
- def __init__(self, rules: _RuleList = None) -> None:
+ def __init__(self, rules: Optional[_RuleList] = None) -> None:
self.named_rules = {} # type: Dict[str, Any]
super(ReversibleRuleRouter, self).__init__(rules)
self,
matcher: "Matcher",
target: Any,
- target_kwargs: Dict[str, Any] = None,
- name: str = None,
+ target_kwargs: Optional[Dict[str, Any]] = None,
+ name: Optional[str] = None,
) -> None:
"""Constructs a Rule instance.
self,
pattern: Union[str, Pattern],
handler: Any,
- kwargs: Dict[str, Any] = None,
- name: str = None,
+ kwargs: Optional[Dict[str, Any]] = None,
+ name: Optional[str] = None,
) -> None:
"""Parameters:
def initialize( # type: ignore
self,
max_clients: int = 10,
- hostname_mapping: Dict[str, str] = None,
+ hostname_mapping: Optional[Dict[str, str]] = None,
max_buffer_size: int = 104857600,
- resolver: Resolver = None,
- defaults: Dict[str, Any] = None,
- max_header_size: int = None,
- max_body_size: int = None,
+ resolver: Optional[Resolver] = None,
+ defaults: Optional[Dict[str, Any]] = None,
+ max_header_size: Optional[int] = None,
+ max_body_size: Optional[int] = None,
) -> None:
"""Creates a AsyncHTTPClient.
self.io_loop.remove_timeout(timeout_handle)
del self.waiting[key]
- def _on_timeout(self, key: object, info: str = None) -> None:
+ def _on_timeout(self, key: object, info: Optional[str] = None) -> None:
"""Timeout callback of request.
Construct a timeout HTTPResponse when a timeout occurs.
return ssl_ctx
return None
- def _on_timeout(self, info: str = None) -> None:
+ def _on_timeout(self, info: Optional[str] = None) -> None:
"""Timeout callback of _HTTPConnection instance.
Raise a `HTTPTimeoutError` when a timeout occurs.
from tornado.platform.auto import set_close_exec
from tornado.gen import TimeoutError
-import typing
-from typing import Any, Union, Dict, Tuple, List, Callable, Iterator
-
-if typing.TYPE_CHECKING:
- from typing import Optional, Set # noqa: F401
+from typing import Any, Union, Dict, Tuple, List, Callable, Iterator, Optional, Set
_INITIAL_CONNECT_TIMEOUT = 0.3
def start(
self,
timeout: float = _INITIAL_CONNECT_TIMEOUT,
- connect_timeout: Union[float, datetime.timedelta] = None,
+ connect_timeout: Optional[Union[float, datetime.timedelta]] = None,
) -> "Future[Tuple[socket.AddressFamily, Any, IOStream]]":
self.try_connect(iter(self.primary_addrs))
self.set_timeout(timeout)
The ``io_loop`` argument (deprecated since version 4.1) has been removed.
"""
- def __init__(self, resolver: Resolver = None) -> None:
+ def __init__(self, resolver: Optional[Resolver] = None) -> None:
if resolver is not None:
self.resolver = resolver
self._own_resolver = False
host: str,
port: int,
af: socket.AddressFamily = socket.AF_UNSPEC,
- ssl_options: Union[Dict[str, Any], ssl.SSLContext] = None,
- max_buffer_size: int = None,
- source_ip: str = None,
- source_port: int = None,
- timeout: Union[float, datetime.timedelta] = None,
+ ssl_options: Optional[Union[Dict[str, Any], ssl.SSLContext]] = None,
+ max_buffer_size: Optional[int] = None,
+ source_ip: Optional[str] = None,
+ source_port: Optional[int] = None,
+ timeout: Optional[Union[float, datetime.timedelta]] = None,
) -> IOStream:
"""Connect to the given host and port.
max_buffer_size: int,
af: socket.AddressFamily,
addr: Tuple,
- source_ip: str = None,
- source_port: int = None,
+ source_ip: Optional[str] = None,
+ source_port: Optional[int] = None,
) -> Tuple[IOStream, "Future[IOStream]"]:
# Always connect in plaintext; we'll convert to ssl if necessary
# after one connection has completed.
def __init__(
self,
- ssl_options: Union[Dict[str, Any], ssl.SSLContext] = None,
- max_buffer_size: int = None,
- read_chunk_size: int = None,
+ ssl_options: Optional[Union[Dict[str, Any], ssl.SSLContext]] = None,
+ max_buffer_size: Optional[int] = None,
+ read_chunk_size: Optional[int] = None,
) -> None:
self.ssl_options = ssl_options
self._sockets = {} # type: Dict[int, socket.socket]
def bind(
self,
port: int,
- address: str = None,
+ address: Optional[str] = None,
family: socket.AddressFamily = socket.AF_UNSPEC,
backlog: int = 128,
reuse_port: bool = False,
else:
self._pending_sockets.extend(sockets)
- def start(self, num_processes: Optional[int] = 1, max_restarts: int = None) -> None:
+ def start(
+ self, num_processes: Optional[int] = 1, max_restarts: Optional[int] = None
+ ) -> None:
"""Starts this server in the `.IOLoop`.
By default, we run the server in this process and do not fork any
self,
template_string: Union[str, bytes],
name: str = "<string>",
- loader: "BaseLoader" = None,
+ loader: Optional["BaseLoader"] = None,
compress_whitespace: Union[bool, _UnsetMarker] = _UNSET,
autoescape: Union[str, _UnsetMarker] = _UNSET,
- whitespace: str = None,
+ whitespace: Optional[str] = None,
) -> None:
"""Construct a Template.
def __init__(
self,
autoescape: str = _DEFAULT_AUTOESCAPE,
- namespace: Dict[str, Any] = None,
- whitespace: str = None,
+ namespace: Optional[Dict[str, Any]] = None,
+ whitespace: Optional[str] = None,
) -> None:
"""Construct a template loader.
with self.lock:
self.templates = {}
- def resolve_path(self, name: str, parent_path: str = None) -> str:
+ def resolve_path(self, name: str, parent_path: Optional[str] = None) -> str:
"""Converts a possibly-relative path to absolute (used internally)."""
raise NotImplementedError()
- def load(self, name: str, parent_path: str = None) -> Template:
+ def load(self, name: str, parent_path: Optional[str] = None) -> Template:
"""Loads a template."""
name = self.resolve_path(name, parent_path=parent_path)
with self.lock:
super(Loader, self).__init__(**kwargs)
self.root = os.path.abspath(root_directory)
- def resolve_path(self, name: str, parent_path: str = None) -> str:
+ def resolve_path(self, name: str, parent_path: Optional[str] = None) -> str:
if (
parent_path
and not parent_path.startswith("<")
super(DictLoader, self).__init__(**kwargs)
self.dict = dict
- def resolve_path(self, name: str, parent_path: str = None) -> str:
+ def resolve_path(self, name: str, parent_path: Optional[str] = None) -> str:
if (
parent_path
and not parent_path.startswith("<")
Added ``filename`` and ``lineno`` attributes.
"""
- def __init__(self, message: str, filename: str = None, lineno: int = 0) -> None:
+ def __init__(
+ self, message: str, filename: Optional[str] = None, lineno: int = 0
+ ) -> None:
self.message = message
# The names "filename" and "lineno" are chosen for consistency
# with python SyntaxError.
return IncludeTemplate()
- def write_line(self, line: str, line_number: int, indent: int = None) -> None:
+ def write_line(
+ self, line: str, line_number: int, indent: Optional[int] = None
+ ) -> None:
if indent is None:
indent = self._indent
line_comment = " # %s:%d" % (self.current_template.name, line_number)
self.line = 1
self.pos = 0
- def find(self, needle: str, start: int = 0, end: int = None) -> int:
+ def find(self, needle: str, start: int = 0, end: Optional[int] = None) -> int:
assert start >= 0, start
pos = self.pos
start += pos
index -= pos
return index
- def consume(self, count: int = None) -> str:
+ def consume(self, count: Optional[int] = None) -> str:
if count is None:
count = len(self.text) - self.pos
newpos = self.pos + count
def _parse(
reader: _TemplateReader,
template: Template,
- in_block: str = None,
- in_loop: str = None,
+ in_block: Optional[str] = None,
+ in_loop: Optional[str] = None,
) -> _ChunkList:
body = _ChunkList([])
while True:
self.__failure = None
raise_exc_info(failure)
- def run(self, result: unittest.TestResult = None) -> Optional[unittest.TestResult]:
+ def run(
+ self, result: Optional[unittest.TestResult] = None
+ ) -> Optional[unittest.TestResult]:
ret = super(AsyncTestCase, self).run(result)
# As a last resort, if an exception escaped super.run() and wasn't
# re-raised in tearDown, raise it here. This will cause the
self.__stopped = True
def wait(
- self, condition: Callable[..., bool] = None, timeout: float = None
+ self,
+ condition: Optional[Callable[..., bool]] = None,
+ timeout: Optional[float] = None,
) -> None:
"""Runs the `.IOLoop` until stop is called or timeout has passed.
@typing.overload
def gen_test(
- *, timeout: float = None
+ *, timeout: Optional[float] = None
) -> Callable[[Callable[..., Union[Generator, "Coroutine"]]], Callable[..., None]]:
pass
def gen_test( # noqa: F811
- func: Callable[..., Union[Generator, "Coroutine"]] = None, timeout: float = None
+ func: Optional[Callable[..., Union[Generator, "Coroutine"]]] = None,
+ timeout: Optional[float] = None,
) -> Union[
Callable[..., None],
Callable[[Callable[..., Union[Generator, "Coroutine"]]], Callable[..., None]],
raise ImportError("No module named %s" % parts[-1])
-def exec_in(code: Any, glob: Dict[str, Any], loc: Mapping[str, Any] = None) -> None:
+def exec_in(
+ code: Any, glob: Dict[str, Any], loc: Optional[Optional[Mapping[str, Any]]] = None
+) -> None:
if isinstance(code, str):
# exec(string) inherits the caller's future imports; compile
# the string first to prevent that.
"""
pass
- def set_status(self, status_code: int, reason: str = None) -> None:
+ def set_status(self, status_code: int, reason: Optional[str] = None) -> None:
"""Sets the status code for our response.
:arg int status_code: Response status code.
values.append(s)
return values
- def decode_argument(self, value: bytes, name: str = None) -> str:
+ def decode_argument(self, value: bytes, name: Optional[str] = None) -> str:
"""Decodes an argument from the request.
The argument has been percent-decoded and is now a byte string.
`self.request.cookies <.httputil.HTTPServerRequest.cookies>`."""
return self.request.cookies
- def get_cookie(self, name: str, default: str = None) -> Optional[str]:
+ def get_cookie(self, name: str, default: Optional[str] = None) -> Optional[str]:
"""Returns the value of the request cookie with the given name.
If the named cookie is not present, returns ``default``.
self,
name: str,
value: Union[str, bytes],
- domain: str = None,
- expires: Union[float, Tuple, datetime.datetime] = None,
+ domain: Optional[str] = None,
+ expires: Optional[Union[float, Tuple, datetime.datetime]] = None,
path: str = "/",
- expires_days: int = None,
+ expires_days: Optional[int] = None,
**kwargs: Any
) -> None:
"""Sets an outgoing cookie name/value with the given options.
morsel[k] = v
- def clear_cookie(self, name: str, path: str = "/", domain: str = None) -> None:
+ def clear_cookie(
+ self, name: str, path: str = "/", domain: Optional[str] = None
+ ) -> None:
"""Deletes the cookie with the given name.
Due to limitations of the cookie protocol, you must pass the same
expires = datetime.datetime.utcnow() - datetime.timedelta(days=365)
self.set_cookie(name, value="", path=path, expires=expires, domain=domain)
- def clear_all_cookies(self, path: str = "/", domain: str = None) -> None:
+ def clear_all_cookies(self, path: str = "/", domain: Optional[str] = None) -> None:
"""Deletes all the cookies the user sent with this request.
See `clear_cookie` for more information on the path and domain
self,
name: str,
value: Union[str, bytes],
- expires_days: int = 30,
- version: int = None,
+ expires_days: Optional[int] = 30,
+ version: Optional[int] = None,
**kwargs: Any
) -> None:
"""Signs and timestamps a cookie so it cannot be forged.
Note that the ``expires_days`` parameter sets the lifetime of the
cookie in the browser, but is independent of the ``max_age_days``
parameter to `get_secure_cookie`.
+ A value of None limits the lifetime to the current browser session.
Secure cookies may contain arbitrary byte values, not just unicode
strings (unlike regular cookies)
)
def create_signed_value(
- self, name: str, value: Union[str, bytes], version: int = None
+ self, name: str, value: Union[str, bytes], version: Optional[int] = None
) -> bytes:
"""Signs and timestamps a string so it cannot be forged.
def get_secure_cookie(
self,
name: str,
- value: str = None,
+ value: Optional[str] = None,
max_age_days: int = 31,
- min_version: int = None,
+ min_version: Optional[int] = None,
) -> Optional[bytes]:
"""Returns the given signed cookie if it validates, or None.
)
def get_secure_cookie_key_version(
- self, name: str, value: str = None
+ self, name: str, value: Optional[str] = None
) -> Optional[int]:
"""Returns the signing key version of the secure cookie.
return None
return get_signature_key_version(value)
- def redirect(self, url: str, permanent: bool = False, status: int = None) -> None:
+ def redirect(
+ self, url: str, permanent: bool = False, status: Optional[int] = None
+ ) -> None:
"""Sends a redirect to the given (optionally relative) URL.
If the ``status`` argument is specified, that value is used as the
future.set_result(None)
return future
- def finish(self, chunk: Union[str, bytes, dict] = None) -> "Future[None]":
+ def finish(self, chunk: Optional[Union[str, bytes, dict]] = None) -> "Future[None]":
"""Finishes this response, ending the HTTP request.
Passing a ``chunk`` to ``finish()`` is equivalent to passing that
+ '"/>'
)
- def static_url(self, path: str, include_host: bool = None, **kwargs: Any) -> str:
+ def static_url(
+ self, path: str, include_host: Optional[bool] = None, **kwargs: Any
+ ) -> str:
"""Returns a static URL for the given relative static file path.
This method requires you set the ``static_path`` setting in your
`_ApplicationRouter` instance.
"""
- def __init__(self, application: "Application", rules: _RuleList = None) -> None:
+ def __init__(
+ self, application: "Application", rules: Optional[_RuleList] = None
+ ) -> None:
assert isinstance(application, Application)
self.application = application
super(_ApplicationRouter, self).__init__(rules)
def __init__(
self,
- handlers: _RuleList = None,
- default_host: str = None,
- transforms: List[Type["OutputTransform"]] = None,
+ handlers: Optional[_RuleList] = None,
+ default_host: Optional[str] = None,
+ transforms: Optional[List[Type["OutputTransform"]]] = None,
**settings: Any
) -> None:
if transforms is None:
self,
request: httputil.HTTPServerRequest,
target_class: Type[RequestHandler],
- target_kwargs: Dict[str, Any] = None,
- path_args: List[bytes] = None,
- path_kwargs: Dict[str, bytes] = None,
+ target_kwargs: Optional[Dict[str, Any]] = None,
+ path_args: Optional[List[bytes]] = None,
+ path_kwargs: Optional[Dict[str, bytes]] = None,
) -> "_HandlerDelegate":
"""Returns `~.httputil.HTTPMessageDelegate` that can serve a request
for application and `RequestHandler` subclass.
"""
def __init__(
- self, status_code: int = 500, log_message: str = None, *args: Any, **kwargs: Any
+ self,
+ status_code: int = 500,
+ log_message: Optional[str] = None,
+ *args: Any,
+ **kwargs: Any
) -> None:
self.status_code = status_code
self.log_message = log_message
_static_hashes = {} # type: Dict[str, Optional[str]]
_lock = threading.Lock() # protects _static_hashes
- def initialize(self, path: str, default_filename: str = None) -> None:
+ def initialize(self, path: str, default_filename: Optional[str] = None) -> None:
self.root = path
self.default_filename = default_filename
@classmethod
def get_content(
- cls, abspath: str, start: int = None, end: int = None
+ cls, abspath: str, start: Optional[int] = None, end: Optional[int] = None
) -> Generator[bytes, None, None]:
"""Retrieve the content of the requested resource which is located
at the given absolute path.
secret: _CookieSecretTypes,
name: str,
value: Union[str, bytes],
- version: int = None,
- clock: Callable[[], float] = None,
- key_version: int = None,
+ version: Optional[int] = None,
+ clock: Optional[Callable[[], float]] = None,
+ key_version: Optional[int] = None,
) -> bytes:
if version is None:
version = DEFAULT_SIGNED_VALUE_VERSION
name: str,
value: Union[None, str, bytes],
max_age_days: int = 31,
- clock: Callable[[], float] = None,
- min_version: int = None,
+ clock: Optional[Callable[[], float]] = None,
+ min_version: Optional[int] = None,
) -> Optional[bytes]:
if clock is None:
clock = time.time
# the server side and WebSocketClientConnection on the client
# side.
def on_ws_connection_close(
- self, close_code: int = None, close_reason: str = None
+ self, close_code: Optional[int] = None, close_reason: Optional[str] = None
) -> None:
pass
class _WebSocketParams(object):
def __init__(
self,
- ping_interval: float = None,
- ping_timeout: float = None,
+ ping_interval: Optional[float] = None,
+ ping_timeout: Optional[float] = None,
max_message_size: int = _default_max_message_size,
- compression_options: Dict[str, Any] = None,
+ compression_options: Optional[Dict[str, Any]] = None,
) -> None:
self.ping_interval = ping_interval
self.ping_timeout = ping_timeout
"""
pass
- def close(self, code: int = None, reason: str = None) -> None:
+ def close(self, code: Optional[int] = None, reason: Optional[str] = None) -> None:
"""Closes this Web Socket.
Once the close handshake is successful the socket will be closed.
self._break_cycles()
def on_ws_connection_close(
- self, close_code: int = None, close_reason: str = None
+ self, close_code: Optional[int] = None, close_reason: Optional[str] = None
) -> None:
self.close_code = close_code
self.close_reason = close_reason
self.close() # let the subclass cleanup
@abc.abstractmethod
- def close(self, code: int = None, reason: str = None) -> None:
+ def close(self, code: Optional[int] = None, reason: Optional[str] = None) -> None:
raise NotImplementedError()
@abc.abstractmethod
self,
persistent: bool,
max_wbits: Optional[int],
- compression_options: Dict[str, Any] = None,
+ compression_options: Optional[Dict[str, Any]] = None,
) -> None:
if max_wbits is None:
max_wbits = zlib.MAX_WBITS
persistent: bool,
max_wbits: Optional[int],
max_message_size: int,
- compression_options: Dict[str, Any] = None,
+ compression_options: Optional[Dict[str, Any]] = None,
) -> None:
self._max_message_size = max_message_size
if max_wbits is None:
self,
side: str,
agreed_parameters: Dict[str, Any],
- compression_options: Dict[str, Any] = None,
+ compression_options: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""Converts a websocket agreed_parameters set to keyword arguments
for our compressor objects.
self,
side: str,
agreed_parameters: Dict[str, Any],
- compression_options: Dict[str, Any] = None,
+ compression_options: Optional[Dict[str, Any]] = None,
) -> None:
# TODO: handle invalid parameters gracefully
allowed_keys = set(
self._abort()
return None
- def close(self, code: int = None, reason: str = None) -> None:
+ def close(self, code: Optional[int] = None, reason: Optional[str] = None) -> None:
"""Closes the WebSocket connection."""
if not self.server_terminated:
if not self.stream.closed():
def __init__(
self,
request: httpclient.HTTPRequest,
- on_message_callback: Callable[[Union[None, str, bytes]], None] = None,
- compression_options: Dict[str, Any] = None,
- ping_interval: float = None,
- ping_timeout: float = None,
+ on_message_callback: Optional[Callable[[Union[None, str, bytes]], None]] = None,
+ compression_options: Optional[Dict[str, Any]] = None,
+ ping_interval: Optional[float] = None,
+ ping_timeout: Optional[float] = None,
max_message_size: int = _default_max_message_size,
subprotocols: Optional[List[str]] = [],
) -> None:
104857600,
)
- def close(self, code: int = None, reason: str = None) -> None:
+ def close(self, code: Optional[int] = None, reason: Optional[str] = None) -> None:
"""Closes the websocket connection.
``code`` and ``reason`` are documented under
super(WebSocketClientConnection, self).on_connection_close()
def on_ws_connection_close(
- self, close_code: int = None, close_reason: str = None
+ self, close_code: Optional[int] = None, close_reason: Optional[str] = None
) -> None:
self.close_code = close_code
self.close_reason = close_reason
return self.protocol.write_message(message, binary=binary)
def read_message(
- self, callback: Callable[["Future[Union[None, str, bytes]]"], None] = None
+ self,
+ callback: Optional[Callable[["Future[Union[None, str, bytes]]"], None]] = None,
) -> Awaitable[Union[None, str, bytes]]:
"""Reads a message from the WebSocket server.
def websocket_connect(
url: Union[str, httpclient.HTTPRequest],
- callback: Callable[["Future[WebSocketClientConnection]"], None] = None,
- connect_timeout: float = None,
- on_message_callback: Callable[[Union[None, str, bytes]], None] = None,
- compression_options: Dict[str, Any] = None,
- ping_interval: float = None,
- ping_timeout: float = None,
+ callback: Optional[Callable[["Future[WebSocketClientConnection]"], None]] = None,
+ connect_timeout: Optional[float] = None,
+ on_message_callback: Optional[Callable[[Union[None, str, bytes]], None]] = None,
+ compression_options: Optional[Dict[str, Any]] = None,
+ ping_interval: Optional[float] = None,
+ ping_timeout: Optional[float] = None,
max_message_size: int = _default_max_message_size,
- subprotocols: List[str] = None,
+ subprotocols: Optional[List[str]] = None,
) -> "Awaitable[WebSocketClientConnection]":
"""Client-side websocket support.