assert callable(depends.dependency), (
"A parameter-less dependency must have a callable dependency"
)
- return get_sub_dependant(depends=depends, dependency=depends.dependency, path=path)
-
-
-def get_sub_dependant(
- *,
- depends: params.Depends,
- dependency: Callable[..., Any],
- path: str,
- name: Optional[str] = None,
- security_scopes: Optional[List[str]] = None,
-) -> Dependant:
- security_requirement = None
- security_scopes = security_scopes or []
- if isinstance(depends, params.Security):
- if depends.scopes:
- security_scopes.extend(depends.scopes)
- if isinstance(dependency, SecurityBase):
- use_scopes: List[str] = []
- if isinstance(dependency, (OAuth2, OpenIdConnect)):
- use_scopes = security_scopes
- security_requirement = SecurityRequirement(
- security_scheme=dependency, scopes=use_scopes
- )
- sub_dependant = get_dependant(
- path=path,
- call=dependency,
- name=name,
- security_scopes=security_scopes,
- use_cache=depends.use_cache,
+ use_security_scopes: List[str] = []
+ if isinstance(depends, params.Security) and depends.scopes:
+ use_security_scopes.extend(depends.scopes)
+ return get_dependant(
+ path=path, call=depends.dependency, security_scopes=use_security_scopes
)
- if security_requirement:
- sub_dependant.security_requirements.append(security_requirement)
- return sub_dependant
CacheKey = Tuple[Optional[Callable[..., Any]], Tuple[str, ...]]
)
if param_details.depends is not None:
assert param_details.depends.dependency
- sub_dependant = get_sub_dependant(
- depends=param_details.depends,
- dependency=param_details.depends.dependency,
+ use_security_scopes = security_scopes or []
+ if isinstance(param_details.depends, params.Security):
+ if param_details.depends.scopes:
+ use_security_scopes.extend(param_details.depends.scopes)
+ sub_dependant = get_dependant(
path=path,
+ call=param_details.depends.dependency,
name=param_name,
- security_scopes=security_scopes,
+ security_scopes=use_security_scopes,
+ use_cache=param_details.depends.use_cache,
)
+ if isinstance(param_details.depends.dependency, SecurityBase):
+ use_scopes: List[str] = []
+ if isinstance(
+ param_details.depends.dependency, (OAuth2, OpenIdConnect)
+ ):
+ use_scopes = use_security_scopes
+ security_requirement = SecurityRequirement(
+ security_scheme=param_details.depends.dependency, scopes=use_scopes
+ )
+ sub_dependant.security_requirements.append(security_requirement)
dependant.dependencies.append(sub_dependant)
continue
if add_non_field_param_to_dependency(
--- /dev/null
+from typing import Union
+
+from fastapi import FastAPI, HTTPException, Security
+from fastapi.security import (
+ OAuth2PasswordBearer,
+ SecurityScopes,
+)
+from fastapi.testclient import TestClient
+from typing_extensions import Annotated
+
+app = FastAPI()
+
+oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
+
+
+def process_auth(
+ credentials: Annotated[Union[str, None], Security(oauth2_scheme)],
+ security_scopes: SecurityScopes,
+):
+ # This is an incorrect way of using it, this is not checking if the scopes are
+ # provided by the token, only if the endpoint is requesting them, but the test
+ # here is just to check if FastAPI is indeed registering and passing the scopes
+ # correctly when using Security with parameterless dependencies.
+ if "a" not in security_scopes.scopes or "b" not in security_scopes.scopes:
+ raise HTTPException(detail="a or b not in scopes", status_code=401)
+ return {"token": credentials, "scopes": security_scopes.scopes}
+
+
+@app.get("/get-credentials")
+def get_credentials(
+ credentials: Annotated[dict, Security(process_auth, scopes=["a", "b"])],
+):
+ return credentials
+
+
+@app.get(
+ "/parameterless-with-scopes",
+ dependencies=[Security(process_auth, scopes=["a", "b"])],
+)
+def get_parameterless_with_scopes():
+ return {"status": "ok"}
+
+
+@app.get(
+ "/parameterless-without-scopes",
+ dependencies=[Security(process_auth)],
+)
+def get_parameterless_without_scopes():
+ return {"status": "ok"}
+
+
+client = TestClient(app)
+
+
+def test_get_credentials():
+ response = client.get("/get-credentials", headers={"authorization": "Bearer token"})
+ assert response.status_code == 200, response.text
+ assert response.json() == {"token": "token", "scopes": ["a", "b"]}
+
+
+def test_parameterless_with_scopes():
+ response = client.get(
+ "/parameterless-with-scopes", headers={"authorization": "Bearer token"}
+ )
+ assert response.status_code == 200, response.text
+ assert response.json() == {"status": "ok"}
+
+
+def test_parameterless_without_scopes():
+ response = client.get(
+ "/parameterless-without-scopes", headers={"authorization": "Bearer token"}
+ )
+ assert response.status_code == 401, response.text
+ assert response.json() == {"detail": "a or b not in scopes"}
+
+
+def test_call_get_parameterless_without_scopes_for_coverage():
+ assert get_parameterless_without_scopes() == {"status": "ok"}