async def get(upload: uploads.Upload = fastapi.Depends(get_upload)) -> uploads.Upload:
return upload
+@router.put("/{id}")
+async def put(request: fastapi.Request, upload: uploads.Upload = fastapi.Depends(get_upload)):
+ # Fail if this upload has already received its payload
+ #if upload.has_payload():
+ # raise fastapi.HTTPException(422, "Upload has already received its payload")
+
+ # Stream the payload
+ try:
+ async with upload.stream() as f:
+ async for chunk in request.stream():
+ await f.write(chunk)
+
+ # Raise an error if we could not import the file
+ except ValueError as e:
+ raise fastapi.HTTPException(422, "%s" % e) from e
+
+ # Send 204
+ return fastapi.Response(status_code=204)
+
+
@router.delete("/{id}")
async def delete(upload: uploads.Upload = fastapi.Depends(get_upload)):
# Delete the upload
# The data must exist on disk
return await self.backend.exists(self.path)
- # Copy the payload from somewhere
-
- async def copyfrom(self, src):
- """
- Copies the content of this upload from the source file descriptor
- """
- # Cannot do this if we already have a payload
- if await self.has_payload():
- raise FileExistsError("We already have the payload")
-
- # Copy the entire content to a new temporary file
- with self.backend.tempfile(prefix="upload-", sync=True, delete=False) as dst:
- await asyncio.to_thread(shutil.copyfileobj, src, dst)
-
- # Store the path
- self.path = dst.name
-
# Copy the payload to somewhere else
async def copyinto(self, dst):
# Open the source file and copy it into the destination file descriptor
with open(self.path, "rb") as src:
shutil.copyfileobj(src, dst)
+
+ # Streaming
+
+ def stream(self):
+ # Return a new upload stream
+ return UploadStream(self)
+
+
+class UploadStream(object):
+ """
+ This is a helper class which will take a stream from the client
+ and try to ingest it as an upload file.
+ """
+ def __init__(self, upload):
+ self.upload = upload
+ self.backend = self.upload.backend
+
+ # The temporary file
+ self.f = None
+
+ # Create a new hasher
+ self.h = hashlib.new("blake2b512")
+
+ async def __aenter__(self):
+ return self
+
+ async def __aexit__(self, type, value, traceback):
+ # Close the file is there has been no exception
+ if type is None:
+ return await self.close()
+
+ async def write(self, chunk):
+ """
+ Writes a chunk to the file and puts it into the hasher, too.
+ """
+ # Open a temporary file unless already done so (we cannot do this in the constructor)
+ if self.f is None:
+ self.f = await self.backend.tempfile(prefix="upload-", delete_on_close=False)
+
+ # Write the chunk
+ await self.f.write(chunk)
+
+ # Put the data into the hasher
+ self.h.update(chunk)
+
+ async def close(self):
+ """
+ Called when the stream has finished.
+ """
+ # Close the file
+ if self.f:
+ await self.f.close()
+
+ # Compute the digest
+ digest = self.h.digest()
+
+ # Check the hash
+ if not hmac.compare_digest(self.upload.digest_blake2b512, digest):
+ raise ValueError("Digest Mismatch")
+
+ # Store the path to the temporary file
+ self.upload.path = self.f.name