From: Michael Tremer Date: Tue, 18 Oct 2022 16:41:37 +0000 (+0000) Subject: uploads: Improve buffering for smaller chunk sizes X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=08700a32f66fe103aa584b8f3d5a34b0ae8f4a7c;p=pbs.git uploads: Improve buffering for smaller chunk sizes Apache uses a chunk size of 8 KiB which I cannot change. This results in us creating a lof of overhead when receiving large uploads because opening and closing the temporary file that often is not feasible. Blocking is not feasible either. Therefore, the Upload class is now implemening another buffer mechanism so that we will only have to flush the data to disk once we reached a megabyte. This patch will increase memory consumption of the hub (especially with many simultaneous uploads), but it is truly non-blocking. Signed-off-by: Michael Tremer --- diff --git a/src/buildservice/uploads.py b/src/buildservice/uploads.py index 1754a356..b910d3a6 100644 --- a/src/buildservice/uploads.py +++ b/src/buildservice/uploads.py @@ -3,8 +3,10 @@ import asyncio import hashlib import hmac +import io import logging import os +import shutil import tempfile from . import base @@ -15,6 +17,8 @@ from .decorators import * # Setup logging log = logging.getLogger("pakfire.buildservice.uploads") +MAX_BUFFER_SIZE = 1 * 1024 * 1024 # 1 MiB + class Uploads(base.Object): def _get_upload(self, query, *args): res = self.db.get(query, *args) @@ -35,8 +39,19 @@ class Uploads(base.Object): return iter(uploads) def get_by_uuid(self, uuid): - return self._get_upload("SELECT * FROM uploads \ - WHERE uuid = %s AND expires_at > CURRENT_TIMESTAMP", uuid) + return self._get_upload(""" + SELECT + * + FROM + uploads + WHERE + uuid = %s + AND + completed_at IS NOT NULL + AND + expires_at > CURRENT_TIMESTAMP + """, uuid, + ) def _allocate_file(self): """ @@ -174,26 +189,76 @@ class Upload(base.DataObject): def expires_at(self): return self.data.expires_at + @lazy_property + def _buffer(self): + """ + Returns something that buffers any uploaded data. + """ + return io.BytesIO() + + @lazy_property + def _filesize(self): + return os.path.getsize(self.path) + async def write(self, data): """ Takes a chunk of data and writes it to disk """ - log.debug("Received a new chunk of %s byte(s) of data for %s" % (len(data), self)) + #log.debug("Received a new chunk of %s byte(s) of data for %s" % (len(data), self)) + + # Check if we would exceed the filesize + if self._filesize + len(data) > self.size: + raise OverflowError - return await asyncio.to_thread(self._write, data) + # Append the chunk to the buffer + self._buffer.write(data) + self._filesize += len(data) - def _write(self, data): - # Write the data to disk + # Flush the buffer to disk after it has reached its threshold + if self._buffer.tell() >= MAX_BUFFER_SIZE: + await self.flush() + + async def flush(self): + """ + Flushes any buffered file content to disk + """ + # Nothing to do if the buffer is empty + if not self._buffer.tell(): + return + + #log.debug("Flushing buffer (%s byte(s))" % self._buffer.tell()) + + return await asyncio.to_thread(self._flush) + + def _flush(self): + # Move back to the beginning of the buffer + self._buffer.seek(0) + + # Append the buffer to the file with open(self.path, "ab") as f: - # Fetch the current size - filesize = f.tell() + shutil.copyfileobj(self._buffer, f) + + # Reset the buffer + self._buffer = io.BytesIO() + + async def completed(self): + """ + Called when all content of the upload is received + """ + # Flush anything that might have been left in the buffer + await self.flush() - # Check if we would exceed the filesize - if filesize + len(data) > self.size: - raise OverflowError + # Mark as completed + self._set_attribute_now("completed_at") - # Write data - f.write(data) + def is_completed(self): + """ + Returns True if this upload has completed + """ + if self.data.completed_at: + return True + + return False async def check_digest(self, algo, expected_digest): """ @@ -215,17 +280,21 @@ class Upload(base.DataObject): """ Computes the digest of this download """ + if not self.is_completed(): + raise RuntimeError("Upload has not been completed, yet") + log.debug("Computing '%s' digest for %s" % (algo, self)) return await asyncio.to_thread(self._digest, algo) def _digest(self, algo): + # Initialize the hash function h = hashlib.new(algo) # Read the entire file and pass it to the hash function with open(self.path, "rb") as f: while True: - buf = f.read(4096) + buf = f.read(65536) if not buf: break diff --git a/src/database.sql b/src/database.sql index 1f57fb89..a5b0b3d3 100644 --- a/src/database.sql +++ b/src/database.sql @@ -956,7 +956,8 @@ CREATE TABLE public.uploads ( path text NOT NULL, size bigint NOT NULL, created_at timestamp without time zone DEFAULT CURRENT_TIMESTAMP NOT NULL, - expires_at timestamp without time zone DEFAULT (CURRENT_TIMESTAMP + '24:00:00'::interval) NOT NULL + expires_at timestamp without time zone DEFAULT (CURRENT_TIMESTAMP + '24:00:00'::interval) NOT NULL, + completed_at timestamp without time zone ); diff --git a/src/hub/uploads.py b/src/hub/uploads.py index f3531e11..1fe36cdc 100644 --- a/src/hub/uploads.py +++ b/src/hub/uploads.py @@ -57,6 +57,9 @@ class CreateHandler(BaseHandler): """ Called after the entire file has been received """ + # Consider the upload completed + await self.upload.completed() + # Fetch the digest argument algo, delim, hexdigest = self.get_argument("digest").partition(":") diff --git a/tests/test.py b/tests/test.py index 40782a56..504e4aa0 100644 --- a/tests/test.py +++ b/tests/test.py @@ -205,4 +205,7 @@ class TestCase(unittest.IsolatedAsyncioTestCase): upload.write(buf) + # Complete the upload + upload.completed() + return upload diff --git a/tests/upload.py b/tests/upload.py index d34d62d9..8f96e7b2 100755 --- a/tests/upload.py +++ b/tests/upload.py @@ -52,6 +52,10 @@ class UploadTestCase(test.TestCase): # Write some payload await upload.write(b"01234567890123456789") + # Complete the upload + with self.db.transaction(): + await upload.completed() + digest = bytes.fromhex("185c728c3fccb51875d74e21fec19f4cdfad8d5aa347a7a75c06473" "af6f73835b5a00515a34f0e09725d5b1e0715ce43a1a57d966a92400efd215e45dd19c09c")