]> git.ipfire.org Git - pbs.git/commitdiff
API: Implement streaming the upload's payload
authorMichael Tremer <michael.tremer@ipfire.org>
Tue, 17 Jun 2025 14:24:16 +0000 (14:24 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Tue, 17 Jun 2025 14:24:16 +0000 (14:24 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/api/uploads.py
src/buildservice/uploads.py

index e501c58df38f1e46e8cff07de768852975f9c37e..8881738b777a09b45f45f984c691fb260d078e75 100644 (file)
@@ -102,6 +102,26 @@ async def create(request: UploadRequest,
 async def get(upload: uploads.Upload = fastapi.Depends(get_upload)) -> uploads.Upload:
        return upload
 
+@router.put("/{id}")
+async def put(request: fastapi.Request, upload: uploads.Upload = fastapi.Depends(get_upload)):
+       # Fail if this upload has already received its payload
+       #if upload.has_payload():
+       #       raise fastapi.HTTPException(422, "Upload has already received its payload")
+
+       # Stream the payload
+       try:
+               async with upload.stream() as f:
+                       async for chunk in request.stream():
+                               await f.write(chunk)
+
+       # Raise an error if we could not import the file
+       except ValueError as e:
+               raise fastapi.HTTPException(422, "%s" % e) from e
+
+       # Send 204
+       return fastapi.Response(status_code=204)
+
+
 @router.delete("/{id}")
 async def delete(upload: uploads.Upload = fastapi.Depends(get_upload)):
        # Delete the upload
index 3be627b132a1b2f2f0b1c7dd262f642271433015..7bca4d9a35b28a14b4e3f09d4905e3b9fca54557 100644 (file)
@@ -271,23 +271,6 @@ class Upload(sqlmodel.SQLModel, database.BackendMixin, table=True):
                # The data must exist on disk
                return await self.backend.exists(self.path)
 
-       # Copy the payload from somewhere
-
-       async def copyfrom(self, src):
-               """
-                       Copies the content of this upload from the source file descriptor
-               """
-               # Cannot do this if we already have a payload
-               if await self.has_payload():
-                       raise FileExistsError("We already have the payload")
-
-               # Copy the entire content to a new temporary file
-               with self.backend.tempfile(prefix="upload-", sync=True, delete=False) as dst:
-                       await asyncio.to_thread(shutil.copyfileobj, src, dst)
-
-               # Store the path
-               self.path = dst.name
-
        # Copy the payload to somewhere else
 
        async def copyinto(self, dst):
@@ -302,3 +285,65 @@ class Upload(sqlmodel.SQLModel, database.BackendMixin, table=True):
                # Open the source file and copy it into the destination file descriptor
                with open(self.path, "rb") as src:
                        shutil.copyfileobj(src, dst)
+
+       # Streaming
+
+       def stream(self):
+               # Return a new upload stream
+               return UploadStream(self)
+
+
+class UploadStream(object):
+       """
+               This is a helper class which will take a stream from the client
+               and try to ingest it as an upload file.
+       """
+       def __init__(self, upload):
+               self.upload = upload
+               self.backend = self.upload.backend
+
+               # The temporary file
+               self.f = None
+
+               # Create a new hasher
+               self.h = hashlib.new("blake2b512")
+
+       async def __aenter__(self):
+               return self
+
+       async def __aexit__(self, type, value, traceback):
+               # Close the file is there has been no exception
+               if type is None:
+                       return await self.close()
+
+       async def write(self, chunk):
+               """
+                       Writes a chunk to the file and puts it into the hasher, too.
+               """
+               # Open a temporary file unless already done so (we cannot do this in the constructor)
+               if self.f is None:
+                       self.f = await self.backend.tempfile(prefix="upload-", delete_on_close=False)
+
+               # Write the chunk
+               await self.f.write(chunk)
+
+               # Put the data into the hasher
+               self.h.update(chunk)
+
+       async def close(self):
+               """
+                       Called when the stream has finished.
+               """
+               # Close the file
+               if self.f:
+                       await self.f.close()
+
+               # Compute the digest
+               digest = self.h.digest()
+
+               # Check the hash
+               if not hmac.compare_digest(self.upload.digest_blake2b512, digest):
+                       raise ValueError("Digest Mismatch")
+
+               # Store the path to the temporary file
+               self.upload.path = self.f.name