import asyncio
import hashlib
import hmac
+import io
import logging
import os
+import shutil
import tempfile
from . import base
# Setup logging
log = logging.getLogger("pakfire.buildservice.uploads")
+MAX_BUFFER_SIZE = 1 * 1024 * 1024 # 1 MiB
+
class Uploads(base.Object):
def _get_upload(self, query, *args):
res = self.db.get(query, *args)
return iter(uploads)
def get_by_uuid(self, uuid):
- return self._get_upload("SELECT * FROM uploads \
- WHERE uuid = %s AND expires_at > CURRENT_TIMESTAMP", uuid)
+ return self._get_upload("""
+ SELECT
+ *
+ FROM
+ uploads
+ WHERE
+ uuid = %s
+ AND
+ completed_at IS NOT NULL
+ AND
+ expires_at > CURRENT_TIMESTAMP
+ """, uuid,
+ )
def _allocate_file(self):
"""
def expires_at(self):
return self.data.expires_at
+ @lazy_property
+ def _buffer(self):
+ """
+ Returns something that buffers any uploaded data.
+ """
+ return io.BytesIO()
+
+ @lazy_property
+ def _filesize(self):
+ return os.path.getsize(self.path)
+
async def write(self, data):
"""
Takes a chunk of data and writes it to disk
"""
- log.debug("Received a new chunk of %s byte(s) of data for %s" % (len(data), self))
+ #log.debug("Received a new chunk of %s byte(s) of data for %s" % (len(data), self))
+
+ # Check if we would exceed the filesize
+ if self._filesize + len(data) > self.size:
+ raise OverflowError
- return await asyncio.to_thread(self._write, data)
+ # Append the chunk to the buffer
+ self._buffer.write(data)
+ self._filesize += len(data)
- def _write(self, data):
- # Write the data to disk
+ # Flush the buffer to disk after it has reached its threshold
+ if self._buffer.tell() >= MAX_BUFFER_SIZE:
+ await self.flush()
+
+ async def flush(self):
+ """
+ Flushes any buffered file content to disk
+ """
+ # Nothing to do if the buffer is empty
+ if not self._buffer.tell():
+ return
+
+ #log.debug("Flushing buffer (%s byte(s))" % self._buffer.tell())
+
+ return await asyncio.to_thread(self._flush)
+
+ def _flush(self):
+ # Move back to the beginning of the buffer
+ self._buffer.seek(0)
+
+ # Append the buffer to the file
with open(self.path, "ab") as f:
- # Fetch the current size
- filesize = f.tell()
+ shutil.copyfileobj(self._buffer, f)
+
+ # Reset the buffer
+ self._buffer = io.BytesIO()
+
+ async def completed(self):
+ """
+ Called when all content of the upload is received
+ """
+ # Flush anything that might have been left in the buffer
+ await self.flush()
- # Check if we would exceed the filesize
- if filesize + len(data) > self.size:
- raise OverflowError
+ # Mark as completed
+ self._set_attribute_now("completed_at")
- # Write data
- f.write(data)
+ def is_completed(self):
+ """
+ Returns True if this upload has completed
+ """
+ if self.data.completed_at:
+ return True
+
+ return False
async def check_digest(self, algo, expected_digest):
"""
"""
Computes the digest of this download
"""
+ if not self.is_completed():
+ raise RuntimeError("Upload has not been completed, yet")
+
log.debug("Computing '%s' digest for %s" % (algo, self))
return await asyncio.to_thread(self._digest, algo)
def _digest(self, algo):
+ # Initialize the hash function
h = hashlib.new(algo)
# Read the entire file and pass it to the hash function
with open(self.path, "rb") as f:
while True:
- buf = f.read(4096)
+ buf = f.read(65536)
if not buf:
break