#### Using `StreamingResponse` with file-like objects
-If you have a file-like object (e.g. the object returned by `open()`), you can return it in a `StreamingResponse`.
+If you have a file-like object (e.g. the object returned by `open()`), you can create a generator function to iterate over that file-like object.
+
+That way, you don't have to read it all first in memory, and you can pass that generator function to the `StreamingResponse`, and return it.
This includes many libraries to interact with cloud storage, video processing, and others.
-```Python hl_lines="2 10-11"
+```{ .python .annotate hl_lines="2 10-12 14" }
{!../../../docs_src/custom_response/tutorial008.py!}
```
+1. This is the generator function. It's a "generator function" because it contains `yield` statements inside.
+2. By using a `with` block, we make sure that the file-like object is closed after the generator function is done. So, after it finishes sending the response.
+3. This `yield from` tells the function to iterate over that thing named `file_like`. And then, for each part iterated, yield that part as coming from this generator function.
+
+ So, it is a generator function that transfers the "generating" work to something else internally.
+
+ By doing it this way, we can put it in a `with` block, and that way, ensure that it is closed after finishing.
+
!!! tip
Notice that here as we are using standard `open()` that doesn't support `async` and `await`, we declare the path operation with normal `def`.
これにはクラウドストレージとの連携や映像処理など、多くのライブラリが含まれています。
-```Python hl_lines="2 10-11"
+```Python hl_lines="2 10-12 14"
{!../../../docs_src/custom_response/tutorial008.py!}
```
包括许多与云存储,视频处理等交互的库。
-```Python hl_lines="2 10 11"
+```Python hl_lines="2 10-12 14"
{!../../../docs_src/custom_response/tutorial008.py!}
```
@app.get("/")
def main():
- file_like = open(some_file_path, mode="rb")
- return StreamingResponse(file_like, media_type="video/mp4")
+ def iterfile(): # (1)
+ with open(some_file_path, mode="rb") as file_like: # (2)
+ yield from file_like # (3)
+
+ return StreamingResponse(iterfile(), media_type="video/mp4")
[tool.flit.metadata.requires-extra]
test = [
- "pytest ==5.4.3",
- "pytest-cov ==2.10.0",
+ "pytest >=6.2.4,<7.0.0",
+ "pytest-cov >=2.12.0,<3.0.0",
"pytest-asyncio >=0.14.0,<0.15.0",
"mypy ==0.812",
"flake8 >=3.8.3,<4.0.0",
[tool.isort]
profile = "black"
known_third_party = ["fastapi", "pydantic", "starlette"]
+
+[tool.pytest.ini_options]
+addopts = [
+ "--strict-config",
+ "--strict-markers",
+]
+xfail_strict = true
+junit_family = "xunit2"
+filterwarnings = [
+ "error",
+ 'ignore:"@coroutine" decorator is deprecated since Python 3\.8, use "async def" instead:DeprecationWarning',
+ # TODO: if these ignores are needed, enable them, otherwise remove them
+ # 'ignore:The explicit passing of coroutine objects to asyncio\.wait\(\) is deprecated since Python 3\.8:DeprecationWarning',
+ # 'ignore:Exception ignored in. <socket\.socket fd=-1:pytest.PytestUnraisableExceptionWarning',
+]
from pydantic import BaseModel
-class Test(BaseModel):
+class Model1(BaseModel):
foo: str
bar: str
-class Test2(BaseModel):
- test: Test
+class Model2(BaseModel):
+ ref: Model1
baz: str
-class Test3(BaseModel):
+class Model3(BaseModel):
name: str
age: int
- test2: Test2
+ ref2: Model2
app = FastAPI()
@app.get(
"/simple_include",
- response_model=Test2,
- response_model_include={"baz": ..., "test": {"foo"}},
+ response_model=Model2,
+ response_model_include={"baz": ..., "ref": {"foo"}},
)
def simple_include():
- return Test2(
- test=Test(foo="simple_include test foo", bar="simple_include test bar"),
- baz="simple_include test2 baz",
+ return Model2(
+ ref=Model1(foo="simple_include model foo", bar="simple_include model bar"),
+ baz="simple_include model2 baz",
)
@app.get(
"/simple_include_dict",
- response_model=Test2,
- response_model_include={"baz": ..., "test": {"foo"}},
+ response_model=Model2,
+ response_model_include={"baz": ..., "ref": {"foo"}},
)
def simple_include_dict():
return {
- "test": {
- "foo": "simple_include_dict test foo",
- "bar": "simple_include_dict test bar",
+ "ref": {
+ "foo": "simple_include_dict model foo",
+ "bar": "simple_include_dict model bar",
},
- "baz": "simple_include_dict test2 baz",
+ "baz": "simple_include_dict model2 baz",
}
@app.get(
"/simple_exclude",
- response_model=Test2,
- response_model_exclude={"test": {"bar"}},
+ response_model=Model2,
+ response_model_exclude={"ref": {"bar"}},
)
def simple_exclude():
- return Test2(
- test=Test(foo="simple_exclude test foo", bar="simple_exclude test bar"),
- baz="simple_exclude test2 baz",
+ return Model2(
+ ref=Model1(foo="simple_exclude model foo", bar="simple_exclude model bar"),
+ baz="simple_exclude model2 baz",
)
@app.get(
"/simple_exclude_dict",
- response_model=Test2,
- response_model_exclude={"test": {"bar"}},
+ response_model=Model2,
+ response_model_exclude={"ref": {"bar"}},
)
def simple_exclude_dict():
return {
- "test": {
- "foo": "simple_exclude_dict test foo",
- "bar": "simple_exclude_dict test bar",
+ "ref": {
+ "foo": "simple_exclude_dict model foo",
+ "bar": "simple_exclude_dict model bar",
},
- "baz": "simple_exclude_dict test2 baz",
+ "baz": "simple_exclude_dict model2 baz",
}
@app.get(
"/mixed",
- response_model=Test3,
- response_model_include={"test2", "name"},
- response_model_exclude={"test2": {"baz"}},
+ response_model=Model3,
+ response_model_include={"ref2", "name"},
+ response_model_exclude={"ref2": {"baz"}},
)
def mixed():
- return Test3(
- name="mixed test3 name",
+ return Model3(
+ name="mixed model3 name",
age=3,
- test2=Test2(
- test=Test(foo="mixed test foo", bar="mixed test bar"), baz="mixed test2 baz"
+ ref2=Model2(
+ ref=Model1(foo="mixed model foo", bar="mixed model bar"),
+ baz="mixed model2 baz",
),
)
@app.get(
"/mixed_dict",
- response_model=Test3,
- response_model_include={"test2", "name"},
- response_model_exclude={"test2": {"baz"}},
+ response_model=Model3,
+ response_model_include={"ref2", "name"},
+ response_model_exclude={"ref2": {"baz"}},
)
def mixed_dict():
return {
- "name": "mixed_dict test3 name",
+ "name": "mixed_dict model3 name",
"age": 3,
- "test2": {
- "test": {"foo": "mixed_dict test foo", "bar": "mixed_dict test bar"},
- "baz": "mixed_dict test2 baz",
+ "ref2": {
+ "ref": {"foo": "mixed_dict model foo", "bar": "mixed_dict model bar"},
+ "baz": "mixed_dict model2 baz",
},
}
assert response.status_code == 200, response.text
assert response.json() == {
- "baz": "simple_include test2 baz",
- "test": {"foo": "simple_include test foo"},
+ "baz": "simple_include model2 baz",
+ "ref": {"foo": "simple_include model foo"},
}
assert response.status_code == 200, response.text
assert response.json() == {
- "baz": "simple_include_dict test2 baz",
- "test": {"foo": "simple_include_dict test foo"},
+ "baz": "simple_include_dict model2 baz",
+ "ref": {"foo": "simple_include_dict model foo"},
}
response = client.get("/simple_exclude")
assert response.status_code == 200, response.text
assert response.json() == {
- "baz": "simple_exclude test2 baz",
- "test": {"foo": "simple_exclude test foo"},
+ "baz": "simple_exclude model2 baz",
+ "ref": {"foo": "simple_exclude model foo"},
}
response = client.get("/simple_exclude_dict")
assert response.status_code == 200, response.text
assert response.json() == {
- "baz": "simple_exclude_dict test2 baz",
- "test": {"foo": "simple_exclude_dict test foo"},
+ "baz": "simple_exclude_dict model2 baz",
+ "ref": {"foo": "simple_exclude_dict model foo"},
}
response = client.get("/mixed")
assert response.status_code == 200, response.text
assert response.json() == {
- "name": "mixed test3 name",
- "test2": {
- "test": {"foo": "mixed test foo", "bar": "mixed test bar"},
+ "name": "mixed model3 name",
+ "ref2": {
+ "ref": {"foo": "mixed model foo", "bar": "mixed model bar"},
},
}
response = client.get("/mixed_dict")
assert response.status_code == 200, response.text
assert response.json() == {
- "name": "mixed_dict test3 name",
- "test2": {
- "test": {"foo": "mixed_dict test foo", "bar": "mixed_dict test bar"},
+ "name": "mixed_dict model3 name",
+ "ref2": {
+ "ref": {"foo": "mixed_dict model foo", "bar": "mixed_dict model bar"},
},
}
-import os
-
from fastapi.testclient import TestClient
from docs_src.request_files.tutorial001 import app
assert response.json() == file_required
-def test_post_file(tmpdir):
- path = os.path.join(tmpdir, "test.txt")
- with open(path, "wb") as file:
- file.write(b"<file content>")
+def test_post_file(tmp_path):
+ path = tmp_path / "test.txt"
+ path.write_bytes(b"<file content>")
client = TestClient(app)
- response = client.post("/files/", files={"file": open(path, "rb")})
+ with path.open("rb") as file:
+ response = client.post("/files/", files={"file": file})
assert response.status_code == 200, response.text
assert response.json() == {"file_size": 14}
-def test_post_large_file(tmpdir):
+def test_post_large_file(tmp_path):
default_pydantic_max_size = 2 ** 16
- path = os.path.join(tmpdir, "test.txt")
- with open(path, "wb") as file:
- file.write(b"x" * (default_pydantic_max_size + 1))
+ path = tmp_path / "test.txt"
+ path.write_bytes(b"x" * (default_pydantic_max_size + 1))
client = TestClient(app)
- response = client.post("/files/", files={"file": open(path, "rb")})
+ with path.open("rb") as file:
+ response = client.post("/files/", files={"file": file})
assert response.status_code == 200, response.text
assert response.json() == {"file_size": default_pydantic_max_size + 1}
-def test_post_upload_file(tmpdir):
- path = os.path.join(tmpdir, "test.txt")
- with open(path, "wb") as file:
- file.write(b"<file content>")
+def test_post_upload_file(tmp_path):
+ path = tmp_path / "test.txt"
+ path.write_bytes(b"<file content>")
client = TestClient(app)
- response = client.post("/uploadfile/", files={"file": open(path, "rb")})
+ with path.open("rb") as file:
+ response = client.post("/uploadfile/", files={"file": file})
assert response.status_code == 200, response.text
assert response.json() == {"filename": "test.txt"}
-import os
-
from fastapi.testclient import TestClient
from docs_src.request_files.tutorial002 import app
assert response.json() == file_required
-def test_post_files(tmpdir):
- path = os.path.join(tmpdir, "test.txt")
- with open(path, "wb") as file:
- file.write(b"<file content>")
- path2 = os.path.join(tmpdir, "test2.txt")
- with open(path2, "wb") as file:
- file.write(b"<file content2>")
+def test_post_files(tmp_path):
+ path = tmp_path / "test.txt"
+ path.write_bytes(b"<file content>")
+ path2 = tmp_path / "test2.txt"
+ path2.write_bytes(b"<file content2>")
client = TestClient(app)
- response = client.post(
- "/files/",
- files=(
- ("files", ("test.txt", open(path, "rb"))),
- ("files", ("test2.txt", open(path2, "rb"))),
- ),
- )
+ with path.open("rb") as file, path2.open("rb") as file2:
+ response = client.post(
+ "/files/",
+ files=(
+ ("files", ("test.txt", file)),
+ ("files", ("test2.txt", file2)),
+ ),
+ )
assert response.status_code == 200, response.text
assert response.json() == {"file_sizes": [14, 15]}
-def test_post_upload_file(tmpdir):
- path = os.path.join(tmpdir, "test.txt")
- with open(path, "wb") as file:
- file.write(b"<file content>")
- path2 = os.path.join(tmpdir, "test2.txt")
- with open(path2, "wb") as file:
- file.write(b"<file content2>")
+def test_post_upload_file(tmp_path):
+ path = tmp_path / "test.txt"
+ path.write_bytes(b"<file content>")
+ path2 = tmp_path / "test2.txt"
+ path2.write_bytes(b"<file content2>")
client = TestClient(app)
- response = client.post(
- "/uploadfiles/",
- files=(
- ("files", ("test.txt", open(path, "rb"))),
- ("files", ("test2.txt", open(path2, "rb"))),
- ),
- )
+ with path.open("rb") as file, path2.open("rb") as file2:
+ response = client.post(
+ "/uploadfiles/",
+ files=(
+ ("files", ("test.txt", file)),
+ ("files", ("test2.txt", file2)),
+ ),
+ )
assert response.status_code == 200, response.text
assert response.json() == {"filenames": ["test.txt", "test2.txt"]}
-import os
-from pathlib import Path
-
from fastapi.testclient import TestClient
from docs_src.request_forms_and_files.tutorial001 import app
assert response.json() == file_and_token_required
-def test_post_file_no_token(tmpdir):
- path = os.path.join(tmpdir, "test.txt")
- with open(path, "wb") as file:
- file.write(b"<file content>")
+def test_post_file_no_token(tmp_path):
+ path = tmp_path / "test.txt"
+ path.write_bytes(b"<file content>")
client = TestClient(app)
- response = client.post("/files/", files={"file": open(path, "rb")})
+ with path.open("rb") as file:
+ response = client.post("/files/", files={"file": file})
assert response.status_code == 422, response.text
assert response.json() == token_required
-def test_post_files_and_token(tmpdir):
- patha = Path(tmpdir) / "test.txt"
- pathb = Path(tmpdir) / "testb.txt"
+def test_post_files_and_token(tmp_path):
+ patha = tmp_path / "test.txt"
+ pathb = tmp_path / "testb.txt"
patha.write_text("<file content>")
pathb.write_text("<file b content>")
client = TestClient(app)
- response = client.post(
- "/files/",
- data={"token": "foo"},
- files={
- "file": patha.open("rb"),
- "fileb": ("testb.txt", pathb.open("rb"), "text/plain"),
- },
- )
+ with patha.open("rb") as filea, pathb.open("rb") as fileb:
+ response = client.post(
+ "/files/",
+ data={"token": "foo"},
+ files={"file": filea, "fileb": ("testb.txt", fileb, "text/plain")},
+ )
assert response.status_code == 200, response.text
assert response.json() == {
"file_size": 14,