contextmanager_in_threadpool,
)
from fastapi.dependencies.models import Dependant, SecurityRequirement
+from fastapi.logger import logger
from fastapi.security.base import SecurityBase
from fastapi.security.oauth2 import OAuth2, SecurityScopes
from fastapi.security.open_id_connect_url import OpenIdConnect
}
+multipart_not_installed_error = (
+ 'Form data requires "python-multipart" to be installed. \n'
+ 'You can install "python-multipart" with: \n\n'
+ "pip install python-multipart\n"
+)
+multipart_incorrect_install_error = (
+ 'Form data requires "python-multipart" to be installed. '
+ 'It seems you installed "multipart" instead. \n'
+ 'You can remove "multipart" with: \n\n'
+ "pip uninstall multipart\n\n"
+ 'And then install "python-multipart" with: \n\n'
+ "pip install python-multipart\n"
+)
+
+
+def check_file_field(field: ModelField) -> None:
+ field_info = get_field_info(field)
+ if isinstance(field_info, params.Form):
+ try:
+ # __version__ is available in both multiparts, and can be mocked
+ from multipart import __version__
+
+ assert __version__
+ try:
+ # parse_options_header is only available in the right multlipart
+ from multipart.multipart import parse_options_header
+
+ assert parse_options_header
+ except ImportError:
+ logger.error(multipart_incorrect_install_error)
+ raise RuntimeError(multipart_incorrect_install_error)
+ except ImportError:
+ logger.error(multipart_not_installed_error)
+ raise RuntimeError(multipart_not_installed_error)
+
+
def get_param_sub_dependant(
*, param: inspect.Parameter, path: str, security_scopes: Optional[List[str]] = None
) -> Dependant:
default=field.default,
required=field.required,
alias=field.alias,
- field_info=field.field_info if PYDANTIC_1 else field.schema, # type: ignore
+ field_info=get_field_info(field),
)
-
return out_field
embed = getattr(field_info, "embed", None)
body_param_names_set = {param.name for param in flat_dependant.body_params}
if len(body_param_names_set) == 1 and not embed:
- return get_schema_compatible_field(field=first_param)
+ final_field = get_schema_compatible_field(field=first_param)
+ check_file_field(final_field)
+ return final_field
# If one field requires to embed, all have to be embedded
# in case a sub-dependency is evaluated with a single unique body field
# That is combined (embedded) with other body fields
]
if len(set(body_param_media_types)) == 1:
BodyFieldInfo_kwargs["media_type"] = body_param_media_types[0]
- return create_response_field(
+ final_field = create_response_field(
name="body",
type_=BodyModel,
required=required,
alias="body",
field_info=BodyFieldInfo(**BodyFieldInfo_kwargs),
)
+ check_file_field(final_field)
+ return final_field
--- /dev/null
+import pytest
+from fastapi import FastAPI, File, Form, UploadFile
+from fastapi.dependencies.utils import (
+ multipart_incorrect_install_error,
+ multipart_not_installed_error,
+)
+
+
+def test_incorrect_multipart_installed_form(monkeypatch):
+ monkeypatch.delattr("multipart.multipart.parse_options_header", raising=False)
+ with pytest.raises(RuntimeError, match=multipart_incorrect_install_error):
+ app = FastAPI()
+
+ @app.post("/")
+ async def root(username: str = Form(...)):
+ return username # pragma: nocover
+
+
+def test_incorrect_multipart_installed_file_upload(monkeypatch):
+ monkeypatch.delattr("multipart.multipart.parse_options_header", raising=False)
+ with pytest.raises(RuntimeError, match=multipart_incorrect_install_error):
+ app = FastAPI()
+
+ @app.post("/")
+ async def root(f: UploadFile = File(...)):
+ return f # pragma: nocover
+
+
+def test_incorrect_multipart_installed_file_bytes(monkeypatch):
+ monkeypatch.delattr("multipart.multipart.parse_options_header", raising=False)
+ with pytest.raises(RuntimeError, match=multipart_incorrect_install_error):
+ app = FastAPI()
+
+ @app.post("/")
+ async def root(f: bytes = File(...)):
+ return f # pragma: nocover
+
+
+def test_incorrect_multipart_installed_multi_form(monkeypatch):
+ monkeypatch.delattr("multipart.multipart.parse_options_header", raising=False)
+ with pytest.raises(RuntimeError, match=multipart_incorrect_install_error):
+ app = FastAPI()
+
+ @app.post("/")
+ async def root(username: str = Form(...), pasword: str = Form(...)):
+ return username # pragma: nocover
+
+
+def test_incorrect_multipart_installed_form_file(monkeypatch):
+ monkeypatch.delattr("multipart.multipart.parse_options_header", raising=False)
+ with pytest.raises(RuntimeError, match=multipart_incorrect_install_error):
+ app = FastAPI()
+
+ @app.post("/")
+ async def root(username: str = Form(...), f: UploadFile = File(...)):
+ return username # pragma: nocover
+
+
+def test_no_multipart_installed(monkeypatch):
+ monkeypatch.delattr("multipart.__version__", raising=False)
+ with pytest.raises(RuntimeError, match=multipart_not_installed_error):
+ app = FastAPI()
+
+ @app.post("/")
+ async def root(username: str = Form(...)):
+ return username # pragma: nocover
+
+
+def test_no_multipart_installed_file(monkeypatch):
+ monkeypatch.delattr("multipart.__version__", raising=False)
+ with pytest.raises(RuntimeError, match=multipart_not_installed_error):
+ app = FastAPI()
+
+ @app.post("/")
+ async def root(f: UploadFile = File(...)):
+ return f # pragma: nocover
+
+
+def test_no_multipart_installed_file_bytes(monkeypatch):
+ monkeypatch.delattr("multipart.__version__", raising=False)
+ with pytest.raises(RuntimeError, match=multipart_not_installed_error):
+ app = FastAPI()
+
+ @app.post("/")
+ async def root(f: bytes = File(...)):
+ return f # pragma: nocover
+
+
+def test_no_multipart_installed_multi_form(monkeypatch):
+ monkeypatch.delattr("multipart.__version__", raising=False)
+ with pytest.raises(RuntimeError, match=multipart_not_installed_error):
+ app = FastAPI()
+
+ @app.post("/")
+ async def root(username: str = Form(...), password: str = Form(...)):
+ return username # pragma: nocover
+
+
+def test_no_multipart_installed_form_file(monkeypatch):
+ monkeypatch.delattr("multipart.__version__", raising=False)
+ with pytest.raises(RuntimeError, match=multipart_not_installed_error):
+ app = FastAPI()
+
+ @app.post("/")
+ async def root(username: str = Form(...), f: UploadFile = File(...)):
+ return username # pragma: nocover