readme = "README.md"
license = "BSD-3-Clause"
requires-python = ">=3.8"
-authors = [
- { name = "Tom Christie", email = "tom@tomchristie.com" },
-]
+authors = [{ name = "Tom Christie", email = "tom@tomchristie.com" }]
classifiers = [
"Development Status :: 3 - Alpha",
"Environment :: Web Environment",
path = "starlette/__init__.py"
[tool.ruff.lint]
-select = ["E", "F", "I"]
+select = ["E", "F", "I", "FA", "UP"]
[tool.ruff.lint.isort]
combine-as-imports = true
]
[tool.coverage.run]
-source_pkgs = [
- "starlette",
- "tests",
-]
+source_pkgs = ["starlette", "tests"]
[tool.coverage.report]
exclude_lines = [
{} if exception_handlers is None else dict(exception_handlers)
)
self.user_middleware = [] if middleware is None else list(middleware)
- self.middleware_stack: typing.Optional[ASGIApp] = None
+ self.middleware_stack: ASGIApp | None = None
def build_middleware_stack(self) -> ASGIApp:
debug = self.debug
def add_middleware(
self,
- middleware_class: typing.Type[_MiddlewareClass[P]],
+ middleware_class: type[_MiddlewareClass[P]],
*args: P.args,
**kwargs: P.kwargs,
) -> None:
def add_exception_handler(
self,
- exc_class_or_status_code: int | typing.Type[Exception],
+ exc_class_or_status_code: int | type[Exception],
handler: ExceptionHandler,
) -> None: # pragma: no cover
self.exception_handlers[exc_class_or_status_code] = handler
self,
path: str,
route: typing.Callable[[Request], typing.Awaitable[Response] | Response],
- methods: typing.Optional[typing.List[str]] = None,
- name: typing.Optional[str] = None,
+ methods: list[str] | None = None,
+ name: str | None = None,
include_in_schema: bool = True,
) -> None: # pragma: no cover
self.router.add_route(
self.router.add_websocket_route(path, route, name=name)
def exception_handler(
- self, exc_class_or_status_code: int | typing.Type[Exception]
+ self, exc_class_or_status_code: int | type[Exception]
) -> typing.Callable: # type: ignore[type-arg]
warnings.warn(
"The `exception_handler` decorator is deprecated, and will be removed in version 1.0.0. " # noqa: E501
if not has_required_scope(request, scopes_list):
if redirect is not None:
orig_request_qparam = urlencode({"next": str(request.url)})
- next_url = "{redirect_path}?{orig_request}".format(
- redirect_path=request.url_for(redirect),
- orig_request=orig_request_qparam,
- )
+ next_url = f"{request.url_for(redirect)}?{orig_request_qparam}"
return RedirectResponse(url=next_url, status_code=303)
raise HTTPException(status_code=status_code)
return await func(*args, **kwargs)
if not has_required_scope(request, scopes_list):
if redirect is not None:
orig_request_qparam = urlencode({"next": str(request.url)})
- next_url = "{redirect_path}?{orig_request}".format(
- redirect_path=request.url_for(redirect),
- orig_request=orig_request_qparam,
- )
+ next_url = f"{request.url_for(redirect)}?{orig_request_qparam}"
return RedirectResponse(url=next_url, status_code=303)
raise HTTPException(status_code=status_code)
return func(*args, **kwargs)
class Environ(typing.MutableMapping[str, str]):
def __init__(self, environ: typing.MutableMapping[str, str] = os.environ):
self._environ = environ
- self._has_been_read: typing.Set[str] = set()
+ self._has_been_read: set[str] = set()
def __getitem__(self, key: str) -> str:
self._has_been_read.add(key)
) -> None:
self.environ = environ
self.env_prefix = env_prefix
- self.file_values: typing.Dict[str, str] = {}
+ self.file_values: dict[str, str] = {}
if env_file is not None:
if not os.path.isfile(env_file):
warnings.warn(f"Config file '{env_file}' not found.")
raise KeyError(f"Config '{key}' is missing, and has no default.")
def _read_file(self, file_name: str | Path) -> dict[str, str]:
- file_values: typing.Dict[str, str] = {}
+ file_values: dict[str, str] = {}
with open(file_name) as input_file:
for line in input_file.readlines():
line = line.strip()
+from __future__ import annotations
+
import math
import typing
import uuid
return str(value)
-CONVERTOR_TYPES: typing.Dict[str, Convertor[typing.Any]] = {
+CONVERTOR_TYPES: dict[str, Convertor[typing.Any]] = {
"str": StringConvertor(),
"path": PathConvertor(),
"int": IntegerConvertor(),
query = urlencode([(str(key), str(value)) for key, value in kwargs.items()])
return self.replace(query=query)
- def remove_query_params(self, keys: str | typing.Sequence[str]) -> "URL":
+ def remove_query_params(self, keys: str | typing.Sequence[str]) -> URL:
if isinstance(keys, str):
keys = [keys]
params = MultiDict(parse_qsl(self.query, keep_blank_values=True))
Used by the routing to return `url_path_for` matches.
"""
- def __new__(cls, path: str, protocol: str = "", host: str = "") -> "URLPath":
+ def __new__(cls, path: str, protocol: str = "", host: str = "") -> URLPath:
assert protocol in ("http", "websocket", "")
return str.__new__(cls, path)
class ImmutableMultiDict(typing.Mapping[_KeyType, _CovariantValueType]):
- _dict: typing.Dict[_KeyType, _CovariantValueType]
+ _dict: dict[_KeyType, _CovariantValueType]
def __init__(
self,
*args: ImmutableMultiDict[_KeyType, _CovariantValueType]
| typing.Mapping[_KeyType, _CovariantValueType]
- | typing.Iterable[typing.Tuple[_KeyType, _CovariantValueType]],
+ | typing.Iterable[tuple[_KeyType, _CovariantValueType]],
**kwargs: typing.Any,
) -> None:
assert len(args) < 2, "Too many arguments."
set_key = key.lower().encode("latin-1")
set_value = value.encode("latin-1")
- found_indexes: "typing.List[int]" = []
+ found_indexes: list[int] = []
for idx, (item_key, item_value) in enumerate(self._list):
if item_key == set_key:
found_indexes.append(idx)
"""
del_key = key.lower().encode("latin-1")
- pop_indexes: "typing.List[int]" = []
+ pop_indexes: list[int] = []
for idx, (item_key, item_value) in enumerate(self._list):
if item_key == del_key:
pop_indexes.append(idx)
field_name = b""
field_value = b""
- items: list[tuple[str, typing.Union[str, UploadFile]]] = []
+ items: list[tuple[str, str | UploadFile]] = []
# Feed the parser with data from the request.
async for chunk in self.stream:
+from __future__ import annotations
+
import typing
from starlette.authentication import (
self,
app: ASGIApp,
backend: AuthenticationBackend,
- on_error: typing.Optional[
- typing.Callable[[HTTPConnection, AuthenticationError], Response]
- ] = None,
+ on_error: typing.Callable[[HTTPConnection, AuthenticationError], Response]
+ | None = None,
) -> None:
self.app = app
self.backend = backend
+from __future__ import annotations
+
import typing
import anyio
class BaseHTTPMiddleware:
- def __init__(
- self, app: ASGIApp, dispatch: typing.Optional[DispatchFunction] = None
- ) -> None:
+ def __init__(self, app: ASGIApp, dispatch: DispatchFunction | None = None) -> None:
self.app = app
self.dispatch_func = self.dispatch if dispatch is None else dispatch
response_sent = anyio.Event()
async def call_next(request: Request) -> Response:
- app_exc: typing.Optional[Exception] = None
+ app_exc: Exception | None = None
send_stream: ObjectSendStream[typing.MutableMapping[str, typing.Any]]
recv_stream: ObjectReceiveStream[typing.MutableMapping[str, typing.Any]]
send_stream, recv_stream = anyio.create_memory_object_stream()
self,
content: ContentStream,
status_code: int = 200,
- headers: typing.Optional[typing.Mapping[str, str]] = None,
- media_type: typing.Optional[str] = None,
- background: typing.Optional[BackgroundTask] = None,
- info: typing.Optional[typing.Mapping[str, typing.Any]] = None,
+ headers: typing.Mapping[str, str] | None = None,
+ media_type: str | None = None,
+ background: BackgroundTask | None = None,
+ info: typing.Mapping[str, typing.Any] | None = None,
) -> None:
self._info = info
super().__init__(content, status_code, headers, media_type, background)
+from __future__ import annotations
+
import functools
import re
import typing
allow_methods: typing.Sequence[str] = ("GET",),
allow_headers: typing.Sequence[str] = (),
allow_credentials: bool = False,
- allow_origin_regex: typing.Optional[str] = None,
+ allow_origin_regex: str | None = None,
expose_headers: typing.Sequence[str] = (),
max_age: int = 600,
) -> None:
+from __future__ import annotations
+
import html
import inspect
import traceback
def __init__(
self,
app: ASGIApp,
- handler: typing.Optional[
- typing.Callable[[Request, Exception], typing.Any]
- ] = None,
+ handler: typing.Callable[[Request, Exception], typing.Any] | None = None,
debug: bool = False,
) -> None:
self.app = app
+from __future__ import annotations
+
import typing
from starlette._exception_handler import (
def __init__(
self,
app: ASGIApp,
- handlers: typing.Optional[
- typing.Mapping[typing.Any, typing.Callable[[Request, Exception], Response]]
- ] = None,
+ handlers: typing.Mapping[
+ typing.Any, typing.Callable[[Request, Exception], Response]
+ ]
+ | None = None,
debug: bool = False,
) -> None:
self.app = app
def add_exception_handler(
self,
- exc_class_or_status_code: typing.Union[int, typing.Type[Exception]],
+ exc_class_or_status_code: int | type[Exception],
handler: typing.Callable[[Request, Exception], Response],
) -> None:
if isinstance(exc_class_or_status_code, int):
self._status_handlers,
)
- conn: typing.Union[Request, WebSocket]
+ conn: Request | WebSocket
if scope["type"] == "http":
conn = Request(scope, receive, send)
else:
+from __future__ import annotations
+
import json
import typing
from base64 import b64decode, b64encode
def __init__(
self,
app: ASGIApp,
- secret_key: typing.Union[str, Secret],
+ secret_key: str | Secret,
session_cookie: str = "session",
- max_age: typing.Optional[int] = 14 * 24 * 60 * 60, # 14 days, in seconds
+ max_age: int | None = 14 * 24 * 60 * 60, # 14 days, in seconds
path: str = "/",
same_site: typing.Literal["lax", "strict", "none"] = "lax",
https_only: bool = False,
- domain: typing.Optional[str] = None,
+ domain: str | None = None,
) -> None:
self.app = app
self.signer = itsdangerous.TimestampSigner(str(secret_key))
+from __future__ import annotations
+
import typing
from starlette.datastructures import URL, Headers
def __init__(
self,
app: ASGIApp,
- allowed_hosts: typing.Optional[typing.Sequence[str]] = None,
+ allowed_hosts: typing.Sequence[str] | None = None,
www_redirect: bool = True,
) -> None:
if allowed_hosts is None:
+from __future__ import annotations
+
import io
import math
import sys
)
-def build_environ(scope: Scope, body: bytes) -> typing.Dict[str, typing.Any]:
+def build_environ(scope: Scope, body: bytes) -> dict[str, typing.Any]:
"""
Builds a scope and request body into a WSGI environ object.
"""
def start_response(
self,
status: str,
- response_headers: typing.List[typing.Tuple[str, str]],
+ response_headers: list[tuple[str, str]],
exc_info: typing.Any = None,
) -> None:
self.exc_info = exc_info
def wsgi(
self,
- environ: typing.Dict[str, typing.Any],
+ environ: dict[str, typing.Any],
start_response: typing.Callable[..., typing.Any],
) -> None:
for chunk in self.app(environ, start_response):
Note: we are explicitly _NOT_ using `SimpleCookie.load` because it is based
on an outdated spec and will fail on lots of input we want to support
"""
- cookie_dict: typing.Dict[str, str] = {}
+ cookie_dict: dict[str, str] = {}
for chunk in cookie_string.split(";"):
if "=" in chunk:
key, val = chunk.split("=", 1)
@property
def cookies(self) -> dict[str, str]:
if not hasattr(self, "_cookies"):
- cookies: typing.Dict[str, str] = {}
+ cookies: dict[str, str] = {}
cookie_header = self.headers.get("cookie")
if cookie_header:
class Request(HTTPConnection):
- _form: typing.Optional[FormData]
+ _form: FormData | None
def __init__(
self, scope: Scope, receive: Receive = empty_receive, send: Send = empty_send
async def body(self) -> bytes:
if not hasattr(self, "_body"):
- chunks: "typing.List[bytes]" = []
+ chunks: list[bytes] = []
async for chunk in self.stream():
chunks.append(chunk)
self._body = b"".join(chunks)
async def send_push_promise(self, path: str) -> None:
if "http.response.push" in self.scope.get("extensions", {}):
- raw_headers: "typing.List[typing.Tuple[bytes, bytes]]" = []
+ raw_headers: list[tuple[bytes, bytes]] = []
for name in SERVER_PUSH_HEADERS_TO_COPY:
for value in self.headers.getlist(name):
raw_headers.append(
def request_response(
- func: typing.Callable[
- [Request], typing.Union[typing.Awaitable[Response], Response]
- ],
+ func: typing.Callable[[Request], typing.Awaitable[Response] | Response],
) -> ASGIApp:
"""
Takes a function or coroutine `func(request) -> response`,
self.path_regex, self.path_format, self.param_convertors = compile_path(path)
def matches(self, scope: Scope) -> tuple[Match, Scope]:
- path_params: "typing.Dict[str, typing.Any]"
+ path_params: dict[str, typing.Any]
if scope["type"] == "http":
route_path = get_route_path(scope)
match = self.path_regex.match(route_path)
self.path_regex, self.path_format, self.param_convertors = compile_path(path)
def matches(self, scope: Scope) -> tuple[Match, Scope]:
- path_params: "typing.Dict[str, typing.Any]"
+ path_params: dict[str, typing.Any]
if scope["type"] == "websocket":
route_path = get_route_path(scope)
match = self.path_regex.match(route_path)
def routes(self) -> list[BaseRoute]:
return getattr(self._base_app, "routes", [])
- def matches(self, scope: Scope) -> typing.Tuple[Match, Scope]:
- path_params: "typing.Dict[str, typing.Any]"
+ def matches(self, scope: Scope) -> tuple[Match, Scope]:
+ path_params: dict[str, typing.Any]
if scope["type"] in ("http", "websocket"):
root_path = scope.get("root_path", "")
route_path = get_route_path(scope)
def _setup_env_defaults(self, env: jinja2.Environment) -> None:
@pass_context
def url_for(
- context: typing.Dict[str, typing.Any],
+ context: dict[str, typing.Any],
name: str,
/,
**path_params: typing.Any,
class WebSocketDisconnect(Exception):
- def __init__(self, code: int = 1000, reason: typing.Optional[str] = None) -> None:
+ def __init__(self, code: int = 1000, reason: str | None = None) -> None:
self.code = code
self.reason = reason or ""
self.application_state = WebSocketState.DISCONNECTED
try:
await self._send(message)
- except IOError:
+ except OSError:
self.application_state = WebSocketState.DISCONNECTED
raise WebSocketDisconnect(code=1006)
elif self.application_state == WebSocketState.RESPONSE:
+from __future__ import annotations
+
import functools
-from typing import Any, Callable, Dict, Literal
+from typing import Any, Callable, Literal
import pytest
@pytest.fixture
def test_client_factory(
anyio_backend_name: Literal["asyncio", "trio"],
- anyio_backend_options: Dict[str, Any],
+ anyio_backend_options: dict[str, Any],
) -> TestClientFactory:
# anyio_backend_name defined by:
# https://anyio.readthedocs.io/en/stable/testing.html#specifying-the-backends-to-run-on
+from __future__ import annotations
+
import contextvars
from contextlib import AsyncExitStack
from typing import (
AsyncGenerator,
Callable,
Generator,
- List,
- Type,
- Union,
)
import anyio
)
def test_contextvars(
test_client_factory: TestClientFactory,
- middleware_cls: Type[_MiddlewareClass[Any]],
+ middleware_cls: type[_MiddlewareClass[Any]],
) -> None:
# this has to be an async endpoint because Starlette calls run_in_threadpool
# on sync endpoints which has it's own set of peculiarities w.r.t propagating
async def test_do_not_block_on_background_tasks() -> None:
request_body_sent = False
response_complete = anyio.Event()
- events: List[Union[str, Message]] = []
+ events: list[str | Message] = []
async def sleep_and_set() -> None:
events.append("Background task started")
call_next: RequestResponseEndpoint,
) -> Response:
expected = b"1"
- response: Union[Response, None] = None
+ response: Response | None = None
async for chunk in request.stream():
assert chunk == expected
if expected == b"1":
yield {"type": "http.request", "body": b"3"}
await anyio.sleep(float("inf"))
- sent: List[Message] = []
+ sent: list[Message] = []
async def send(msg: Message) -> None:
sent.append(msg)
"""
https://github.com/encode/starlette/pull/1519#issuecomment-1236166180
"""
- bodies: List[bytes] = []
+ bodies: list[bytes] = []
class LogRequestBodySize(BaseHTTPMiddleware):
async def dispatch(
+from __future__ import annotations
+
import base64
import binascii
-from typing import Any, Awaitable, Callable, Optional, Tuple
+from typing import Any, Awaitable, Callable
from urllib.parse import urlencode
import pytest
async def authenticate(
self,
request: HTTPConnection,
- ) -> Optional[Tuple[AuthCredentials, SimpleUser]]:
+ ) -> tuple[AuthCredentials, SimpleUser] | None:
if "Authorization" not in request.headers:
return None
+from __future__ import annotations
+
import os
import typing
from contextlib import nullcontext as does_not_raise
async def app(scope: Scope, receive: Receive, send: Send) -> None:
request = Request(scope, receive)
data = await request.form()
- output: typing.Dict[str, typing.Any] = {}
+ output: dict[str, typing.Any] = {}
for key, value in data.items():
if isinstance(value, UploadFile):
content = await value.read()
async def multi_items_app(scope: Scope, receive: Receive, send: Send) -> None:
request = Request(scope, receive)
data = await request.form()
- output: typing.Dict[str, typing.List[typing.Any]] = {}
+ output: dict[str, list[typing.Any]] = {}
for key, value in data.multi_items():
if key not in output:
output[key] = []
async def app_with_headers(scope: Scope, receive: Receive, send: Send) -> None:
request = Request(scope, receive)
data = await request.form()
- output: typing.Dict[str, typing.Any] = {}
+ output: dict[str, typing.Any] = {}
for key, value in data.items():
if isinstance(value, UploadFile):
content = await value.read()
async def app(scope: Scope, receive: Receive, send: Send) -> None:
request = Request(scope, receive)
data = await request.form(max_files=max_files, max_fields=max_fields)
- output: typing.Dict[str, typing.Any] = {}
+ output: dict[str, typing.Any] = {}
for key, value in data.items():
if isinstance(value, UploadFile):
content = await value.read()
+from __future__ import annotations
+
import sys
-from typing import Any, Callable, Dict, Iterator, List, Optional
+from typing import Any, Callable, Iterator
import anyio
import pytest
({}, None),
],
)
-def test_request_client(scope: Scope, expected_client: Optional[Address]) -> None:
+def test_request_client(scope: Scope, expected_client: Address | None) -> None:
scope.update({"type": "http"}) # required by Request's constructor
client = Request(scope).client
assert client == expected_client
def test_request_disconnect(
anyio_backend_name: str,
- anyio_backend_options: Dict[str, Any],
+ anyio_backend_options: dict[str, Any],
) -> None:
"""
If a client disconnect occurs while reading request body
)
def test_cookies_edge_cases(
set_cookie: str,
- expected: Dict[str, str],
+ expected: dict[str, str],
test_client_factory: TestClientFactory,
) -> None:
async def app(scope: Scope, receive: Receive, send: Send) -> None:
)
def test_cookies_invalid(
set_cookie: str,
- expected: Dict[str, str],
+ expected: dict[str, str],
test_client_factory: TestClientFactory,
) -> None:
"""
],
)
@pytest.mark.anyio
-async def test_request_rcv(messages: List[Message]) -> None:
+async def test_request_rcv(messages: list[Message]) -> None:
messages = messages.copy()
async def rcv() -> Message:
@pytest.mark.anyio
async def test_request_stream_called_twice() -> None:
- messages: List[Message] = [
+ messages: list[Message] = [
{"type": "http.request", "body": b"1", "more_body": True},
{"type": "http.request", "body": b"2", "more_body": True},
{"type": "http.request", "body": b"3"},
+from __future__ import annotations
+
import datetime as dt
import os
import time
from http.cookies import SimpleCookie
from pathlib import Path
-from typing import AsyncIterator, Callable, Iterator, Union
+from typing import AsyncIterator, Callable, Iterator
import anyio
import pytest
) -> None:
async def app(scope: Scope, receive: Receive, send: Send) -> None:
class CustomAsyncIterable:
- async def __aiter__(self) -> AsyncIterator[Union[str, bytes]]:
+ async def __aiter__(self) -> AsyncIterator[str | bytes]:
for i in range(5):
yield str(i + 1)
+from __future__ import annotations
+
import contextlib
import functools
import json
@contextlib.asynccontextmanager
async def lifespan(
app: ASGIApp,
- ) -> typing.AsyncGenerator[typing.Dict[str, str], None]:
+ ) -> typing.AsyncGenerator[dict[str, str], None]:
yield {"foo": "bar"}
app = Router(
class State(typing.TypedDict):
count: int
- items: typing.List[int]
+ items: list[int]
async def hello_world(request: Request) -> Response:
# modifications to the state should not leak across requests
+from __future__ import annotations
+
import os
import typing
from pathlib import Path
async def homepage(request: Request) -> Response:
return templates.TemplateResponse(request, "index.html")
- def hello_world_processor(request: Request) -> typing.Dict[str, str]:
+ def hello_world_processor(request: Request) -> dict[str, str]:
return {"username": "World"}
app = Starlette(
return
# Simulate the exception the server would send to the application when the
# client disconnects.
- raise IOError
+ raise OSError
with pytest.raises(WebSocketDisconnect) as ctx:
await app({"type": "websocket", "path": "/"}, receive, send)