]> git.ipfire.org Git - pbs.git/commitdiff
uploads: Improve buffering for smaller chunk sizes
authorMichael Tremer <michael.tremer@ipfire.org>
Tue, 18 Oct 2022 16:41:37 +0000 (16:41 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Tue, 18 Oct 2022 16:41:37 +0000 (16:41 +0000)
Apache uses a chunk size of 8 KiB which I cannot change.

This results in us creating a lof of overhead when receiving large
uploads because opening and closing the temporary file that often is not
feasible.

Blocking is not feasible either.

Therefore, the Upload class is now implemening another buffer mechanism
so that we will only have to flush the data to disk once we reached a
megabyte.

This patch will increase memory consumption of the hub (especially with
many simultaneous uploads), but it is truly non-blocking.

Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/buildservice/uploads.py
src/database.sql
src/hub/uploads.py
tests/test.py
tests/upload.py

index 1754a3568220c9d4e17cf8a955217a341f24e263..b910d3a653a28b9bac1ca2cf93cecdb7d1a00ed4 100644 (file)
@@ -3,8 +3,10 @@
 import asyncio
 import hashlib
 import hmac
+import io
 import logging
 import os
+import shutil
 import tempfile
 
 from . import base
@@ -15,6 +17,8 @@ from .decorators import *
 # Setup logging
 log = logging.getLogger("pakfire.buildservice.uploads")
 
+MAX_BUFFER_SIZE = 1 * 1024 * 1024 # 1 MiB
+
 class Uploads(base.Object):
        def _get_upload(self, query, *args):
                res = self.db.get(query, *args)
@@ -35,8 +39,19 @@ class Uploads(base.Object):
                return iter(uploads)
 
        def get_by_uuid(self, uuid):
-               return self._get_upload("SELECT * FROM uploads \
-                       WHERE uuid = %s AND expires_at > CURRENT_TIMESTAMP", uuid)
+               return self._get_upload("""
+                       SELECT
+                               *
+                       FROM
+                               uploads
+                       WHERE
+                               uuid = %s
+                       AND
+                               completed_at IS NOT NULL
+                       AND
+                               expires_at > CURRENT_TIMESTAMP
+                       """, uuid,
+               )
 
        def _allocate_file(self):
                """
@@ -174,26 +189,76 @@ class Upload(base.DataObject):
        def expires_at(self):
                return self.data.expires_at
 
+       @lazy_property
+       def _buffer(self):
+               """
+                       Returns something that buffers any uploaded data.
+               """
+               return io.BytesIO()
+
+       @lazy_property
+       def _filesize(self):
+               return os.path.getsize(self.path)
+
        async def write(self, data):
                """
                        Takes a chunk of data and writes it to disk
                """
-               log.debug("Received a new chunk of %s byte(s) of data for %s" % (len(data), self))
+               #log.debug("Received a new chunk of %s byte(s) of data for %s" % (len(data), self))
+
+               # Check if we would exceed the filesize
+               if self._filesize + len(data) > self.size:
+                       raise OverflowError
 
-               return await asyncio.to_thread(self._write, data)
+               # Append the chunk to the buffer
+               self._buffer.write(data)
+               self._filesize += len(data)
 
-       def _write(self, data):
-               # Write the data to disk
+               # Flush the buffer to disk after it has reached its threshold
+               if self._buffer.tell() >= MAX_BUFFER_SIZE:
+                       await self.flush()
+
+       async def flush(self):
+               """
+                       Flushes any buffered file content to disk
+               """
+               # Nothing to do if the buffer is empty
+               if not self._buffer.tell():
+                       return
+
+               #log.debug("Flushing buffer (%s byte(s))" % self._buffer.tell())
+
+               return await asyncio.to_thread(self._flush)
+
+       def _flush(self):
+               # Move back to the beginning of the buffer
+               self._buffer.seek(0)
+
+               # Append the buffer to the file
                with open(self.path, "ab") as f:
-                       # Fetch the current size
-                       filesize = f.tell()
+                       shutil.copyfileobj(self._buffer, f)
+
+               # Reset the buffer
+               self._buffer = io.BytesIO()
+
+       async def completed(self):
+               """
+                       Called when all content of the upload is received
+               """
+               # Flush anything that might have been left in the buffer
+               await self.flush()
 
-                       # Check if we would exceed the filesize
-                       if filesize + len(data) > self.size:
-                               raise OverflowError
+               # Mark as completed
+               self._set_attribute_now("completed_at")
 
-                       # Write data
-                       f.write(data)
+       def is_completed(self):
+               """
+                       Returns True if this upload has completed
+               """
+               if self.data.completed_at:
+                       return True
+
+               return False
 
        async def check_digest(self, algo, expected_digest):
                """
@@ -215,17 +280,21 @@ class Upload(base.DataObject):
                """
                        Computes the digest of this download
                """
+               if not self.is_completed():
+                       raise RuntimeError("Upload has not been completed, yet")
+
                log.debug("Computing '%s' digest for %s" % (algo, self))
 
                return await asyncio.to_thread(self._digest, algo)
 
        def _digest(self, algo):
+               # Initialize the hash function
                h = hashlib.new(algo)
 
                # Read the entire file and pass it to the hash function
                with open(self.path, "rb") as f:
                        while True:
-                               buf = f.read(4096)
+                               buf = f.read(65536)
                                if not buf:
                                        break
 
index 1f57fb89260a091cd87985b22323d982fb1ddc65..a5b0b3d3ba41095db71d5a44f95f910b8424dc41 100644 (file)
@@ -956,7 +956,8 @@ CREATE TABLE public.uploads (
     path text NOT NULL,
     size bigint NOT NULL,
     created_at timestamp without time zone DEFAULT CURRENT_TIMESTAMP NOT NULL,
-    expires_at timestamp without time zone DEFAULT (CURRENT_TIMESTAMP + '24:00:00'::interval) NOT NULL
+    expires_at timestamp without time zone DEFAULT (CURRENT_TIMESTAMP + '24:00:00'::interval) NOT NULL,
+    completed_at timestamp without time zone
 );
 
 
index f3531e116c35daba7168f171dc711e727b242e8b..1fe36cdc3fc96b8aef11eceb2ca5036de30281c5 100644 (file)
@@ -57,6 +57,9 @@ class CreateHandler(BaseHandler):
                """
                        Called after the entire file has been received
                """
+               # Consider the upload completed
+               await self.upload.completed()
+
                # Fetch the digest argument
                algo, delim, hexdigest = self.get_argument("digest").partition(":")
 
index 40782a56dfcfdc4adde7ddfc4a4009b98497b352..504e4aa03fd9a62115ffe285975145f157ee2e76 100644 (file)
@@ -205,4 +205,7 @@ class TestCase(unittest.IsolatedAsyncioTestCase):
 
                                upload.write(buf)
 
+               # Complete the upload
+               upload.completed()
+
                return upload
index d34d62d984ca34452f744fcc466dfa987d9e6353..8f96e7b218d7e43cf4dcdf9d3f056b40834292f4 100755 (executable)
@@ -52,6 +52,10 @@ class UploadTestCase(test.TestCase):
                # Write some payload
                await upload.write(b"01234567890123456789")
 
+               # Complete the upload
+               with self.db.transaction():
+                       await upload.completed()
+
                digest = bytes.fromhex("185c728c3fccb51875d74e21fec19f4cdfad8d5aa347a7a75c06473"
                        "af6f73835b5a00515a34f0e09725d5b1e0715ce43a1a57d966a92400efd215e45dd19c09c")