import http.cookies
import json
import os
-import re
import stat
import sys
import warnings
self.max_size = max_size
-_RANGE_PATTERN = re.compile(r"(\d*)-(\d*)")
-
-
class FileResponse(Response):
chunk_size = 64 * 1024
def _should_use_range(self, http_if_range: str) -> bool:
return http_if_range == self.headers["last-modified"] or http_if_range == self.headers["etag"]
- @staticmethod
- def _parse_range_header(http_range: str, file_size: int) -> list[tuple[int, int]]:
+ @classmethod
+ def _parse_range_header(cls, http_range: str, file_size: int) -> list[tuple[int, int]]:
ranges: list[tuple[int, int]] = []
try:
units, range_ = http_range.split("=", 1)
if units != "bytes":
raise MalformedRangeHeader("Only support bytes range")
- ranges = [
- (
- int(_[0]) if _[0] else file_size - int(_[1]),
- int(_[1]) + 1 if _[0] and _[1] and int(_[1]) < file_size else file_size,
- )
- for _ in _RANGE_PATTERN.findall(range_)
- if _ != ("", "")
- ]
+ ranges = cls._parse_ranges(range_, file_size)
if len(ranges) == 0:
raise MalformedRangeHeader("Range header: range must be requested")
return result
+ @classmethod
+ def _parse_ranges(cls, range_: str, file_size: int) -> list[tuple[int, int]]:
+ ranges: list[tuple[int, int]] = []
+
+ for part in range_.split(","):
+ part = part.strip()
+
+ # If the range is empty or a single dash, we ignore it.
+ if not part or part == "-":
+ continue
+
+ # If the range is not in the format "start-end", we ignore it.
+ if "-" not in part:
+ continue
+
+ start_str, end_str = part.split("-", 1)
+ start_str = start_str.strip()
+ end_str = end_str.strip()
+
+ try:
+ start = int(start_str) if start_str else file_size - int(end_str)
+ end = int(end_str) + 1 if start_str and end_str and int(end_str) < file_size else file_size
+ ranges.append((start, end))
+ except ValueError:
+ # If the range is not numeric, we ignore it.
+ continue
+
+ return ranges
+
def generate_multipart(
self,
ranges: Sequence[tuple[int, int]],
]
+def test_file_response_range_without_dash(file_response_client: TestClient) -> None:
+ response = file_response_client.get("/", headers={"Range": "bytes=100, 0-50"})
+ assert response.status_code == 206
+ assert response.headers["content-range"] == f"bytes 0-50/{len(README.encode('utf8'))}"
+
+
+def test_file_response_range_empty_start_and_end(file_response_client: TestClient) -> None:
+ response = file_response_client.get("/", headers={"Range": "bytes= - , 0-50"})
+ assert response.status_code == 206
+ assert response.headers["content-range"] == f"bytes 0-50/{len(README.encode('utf8'))}"
+
+
+def test_file_response_range_ignore_non_numeric(file_response_client: TestClient) -> None:
+ response = file_response_client.get("/", headers={"Range": "bytes=abc-def, 0-50"})
+ assert response.status_code == 206
+ assert response.headers["content-range"] == f"bytes 0-50/{len(README.encode('utf8'))}"
+
+
+def test_file_response_suffix_range(file_response_client: TestClient) -> None:
+ # Test suffix range (last N bytes) - line 523 with empty start_str
+ response = file_response_client.get("/", headers={"Range": "bytes=-100"})
+ assert response.status_code == 206
+ file_size = len(README.encode("utf8"))
+ assert response.headers["content-range"] == f"bytes {file_size - 100}-{file_size - 1}/{file_size}"
+ assert response.headers["content-length"] == "100"
+ assert response.content == README.encode("utf8")[-100:]
+
+
@pytest.mark.anyio
async def test_file_response_multi_small_chunk_size(readme_file: Path) -> None:
class SmallChunkSizeFileResponse(FileResponse):