From 919937a85f4bff9931c99bdbe5a8209dfb396ad0 Mon Sep 17 00:00:00 2001 From: Michael Tremer Date: Tue, 17 Jun 2025 14:24:16 +0000 Subject: [PATCH] API: Implement streaming the upload's payload Signed-off-by: Michael Tremer --- src/api/uploads.py | 20 ++++++++++ src/buildservice/uploads.py | 79 +++++++++++++++++++++++++++++-------- 2 files changed, 82 insertions(+), 17 deletions(-) diff --git a/src/api/uploads.py b/src/api/uploads.py index e501c58d..8881738b 100644 --- a/src/api/uploads.py +++ b/src/api/uploads.py @@ -102,6 +102,26 @@ async def create(request: UploadRequest, async def get(upload: uploads.Upload = fastapi.Depends(get_upload)) -> uploads.Upload: return upload +@router.put("/{id}") +async def put(request: fastapi.Request, upload: uploads.Upload = fastapi.Depends(get_upload)): + # Fail if this upload has already received its payload + #if upload.has_payload(): + # raise fastapi.HTTPException(422, "Upload has already received its payload") + + # Stream the payload + try: + async with upload.stream() as f: + async for chunk in request.stream(): + await f.write(chunk) + + # Raise an error if we could not import the file + except ValueError as e: + raise fastapi.HTTPException(422, "%s" % e) from e + + # Send 204 + return fastapi.Response(status_code=204) + + @router.delete("/{id}") async def delete(upload: uploads.Upload = fastapi.Depends(get_upload)): # Delete the upload diff --git a/src/buildservice/uploads.py b/src/buildservice/uploads.py index 3be627b1..7bca4d9a 100644 --- a/src/buildservice/uploads.py +++ b/src/buildservice/uploads.py @@ -271,23 +271,6 @@ class Upload(sqlmodel.SQLModel, database.BackendMixin, table=True): # The data must exist on disk return await self.backend.exists(self.path) - # Copy the payload from somewhere - - async def copyfrom(self, src): - """ - Copies the content of this upload from the source file descriptor - """ - # Cannot do this if we already have a payload - if await self.has_payload(): - raise FileExistsError("We already have the payload") - - # Copy the entire content to a new temporary file - with self.backend.tempfile(prefix="upload-", sync=True, delete=False) as dst: - await asyncio.to_thread(shutil.copyfileobj, src, dst) - - # Store the path - self.path = dst.name - # Copy the payload to somewhere else async def copyinto(self, dst): @@ -302,3 +285,65 @@ class Upload(sqlmodel.SQLModel, database.BackendMixin, table=True): # Open the source file and copy it into the destination file descriptor with open(self.path, "rb") as src: shutil.copyfileobj(src, dst) + + # Streaming + + def stream(self): + # Return a new upload stream + return UploadStream(self) + + +class UploadStream(object): + """ + This is a helper class which will take a stream from the client + and try to ingest it as an upload file. + """ + def __init__(self, upload): + self.upload = upload + self.backend = self.upload.backend + + # The temporary file + self.f = None + + # Create a new hasher + self.h = hashlib.new("blake2b512") + + async def __aenter__(self): + return self + + async def __aexit__(self, type, value, traceback): + # Close the file is there has been no exception + if type is None: + return await self.close() + + async def write(self, chunk): + """ + Writes a chunk to the file and puts it into the hasher, too. + """ + # Open a temporary file unless already done so (we cannot do this in the constructor) + if self.f is None: + self.f = await self.backend.tempfile(prefix="upload-", delete_on_close=False) + + # Write the chunk + await self.f.write(chunk) + + # Put the data into the hasher + self.h.update(chunk) + + async def close(self): + """ + Called when the stream has finished. + """ + # Close the file + if self.f: + await self.f.close() + + # Compute the digest + digest = self.h.digest() + + # Check the hash + if not hmac.compare_digest(self.upload.digest_blake2b512, digest): + raise ValueError("Digest Mismatch") + + # Store the path to the temporary file + self.upload.path = self.f.name -- 2.47.2