From: Michael Tremer Date: Thu, 19 Oct 2023 16:03:55 +0000 (+0000) Subject: uploads: Rewrite the whole thing X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=bd686a79a8956438d93864b276e7c16e94baefe1;p=pbs.git uploads: Rewrite the whole thing Signed-off-by: Michael Tremer --- diff --git a/src/buildservice/uploads.py b/src/buildservice/uploads.py index 398d801c..25eb9119 100644 --- a/src/buildservice/uploads.py +++ b/src/buildservice/uploads.py @@ -18,6 +18,13 @@ log = logging.getLogger("pbs.uploads") MAX_BUFFER_SIZE = 1 * 1024 * 1024 # 1 MiB +supported_digest_algos = ( + "blake2b512", +) + +class UnsupportedDigestException(ValueError): + pass + class Uploads(base.Object): def _get_upload(self, query, *args): res = self.db.get(query, *args) @@ -50,10 +57,18 @@ class Uploads(base.Object): """, uuid, ) - async def create(self, filename, size, owner=None): + async def create(self, filename, size, digest_algo, digest, owner=None): builder = None user = None + # Check if the digest algorithm is supported + if not digest_algo in supported_digest_algos: + raise UnsupportedDigestException(digest_algo) + + # Check if the digest is set + elif not digest: + raise ValueError("Empty digest") + # Check uploader type if isinstance(owner, builders.Builder): builder = owner @@ -66,32 +81,34 @@ class Uploads(base.Object): user.check_storage_quota(size) # Allocate a new temporary file - async with self.backend.tempfile(delete=False) as f: - upload = self._get_upload(""" - INSERT INTO - uploads - ( - filename, - path, - size, - builder_id, - user_id - ) - VALUES - ( - %s, - %s, - %s, - %s, - %s - ) - RETURNING *""", + upload = self._get_upload(""" + INSERT INTO + uploads + ( filename, - f.name, size, - builder, - user, + builder_id, + user_id, + digest_algo, + digest + ) + VALUES + ( + %s, + %s, + %s, + %s, + %s, + %s ) + RETURNING *""", + filename, + size, + builder, + user, + digest_algo, + digest, + ) # Return the newly created upload object return upload @@ -157,6 +174,14 @@ class Upload(base.DataObject): def size(self): return self.data.size + @property + def digest_algo(self): + return self.data.digest_algo + + @property + def digest(self): + return self.data.digest + # Builder def get_builder(self): @@ -206,64 +231,51 @@ class Upload(base.DataObject): def expires_at(self): return self.data.expires_at - @lazy_property - def _filesize(self): - return os.path.getsize(self.path) - - async def check_digest(self, algo, expected_digest): + async def copyfrom(self, src): """ - Checks if the upload matches an expected digest + Copies the content of this upload from the source file descriptor """ - # Compute the digest - computed_digest = await self.digest(algo) + # Reset the source file handle + src.seek(0) - # Compare the digests - if hmac.compare_digest(computed_digest, expected_digest): - return True + # Setup the hash function + h = hashlib.new(self.digest_algo) - # The digests didn't match - log.error("Upload does not match its digest") + async with self.backend.tempfile(delete=False) as f: + try: + while True: + buffer = src.read(1024 ** 2) + if not buffer: + break - return False + # Put the buffer into the hash function + h.update(buffer) - async def digest(self, algo): - """ - Computes the digest of this download - """ - log.debug("Computing '%s' digest for %s" % (algo, self)) + # Write the buffer to disk + await f.write(buffer) - return await asyncio.to_thread(self._digest, algo) + # How many bytes did we write? + received_size = await f.tell() - def _digest(self, algo): - # Initialize the hash function - h = hashlib.new(algo) + # Get the digest + computed_digest = h.digest() - # Read the entire file and pass it to the hash function - with open(self.path, "rb") as f: - while True: - buf = f.read(65536) - if not buf: - break + # Check if the filesize matches + if not received_size == self.size: + raise ValueError("File size differs") - h.update(buf) + # Check that the digest matches + if not hmac.compare_digest(computed_digest, self.digest): + raise ValueError("Invalid digest") - # Return the digest - return h.digest() + # If there has been any kind of exception, we want to delete the temporary file + except Exception as e: + await self.backend.unlink(f.name) - async def copyfrom(self, src): - """ - Copies the content of this upload from the source file descriptor - """ - return await asyncio.to_thread(self._copyfrom, src) - - def _copyfrom(self, src): - # Open the destination file and copy all source data into it - with open(self.path, "wb") as dst: - shutil.copyfileobj(src, dst) + raise e - # Check that we didn't copy too much - if dst.tell() > self.size: - raise OverflowError + # Store the path + self._set_attribute("path", f.name) async def copyinto(self, dst): """ diff --git a/src/database.sql b/src/database.sql index ba33e806..87a6791d 100644 --- a/src/database.sql +++ b/src/database.sql @@ -1047,10 +1047,12 @@ CREATE TABLE public.uploads ( user_id integer, builder_id integer, filename text NOT NULL, - path text NOT NULL, + path text, size bigint NOT NULL, created_at timestamp without time zone DEFAULT CURRENT_TIMESTAMP NOT NULL, - expires_at timestamp without time zone DEFAULT (CURRENT_TIMESTAMP + '24:00:00'::interval) NOT NULL + expires_at timestamp without time zone DEFAULT (CURRENT_TIMESTAMP + '24:00:00'::interval) NOT NULL, + digest_algo text NOT NULL, + digest bytea NOT NULL ); @@ -1873,6 +1875,20 @@ CREATE INDEX sources_repo_id ON public.sources USING btree (repo_id) WHERE (dele CREATE UNIQUE INDEX sources_slug ON public.sources USING btree (slug) WHERE (deleted_at IS NULL); +-- +-- Name: uploads_builder_id; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX uploads_builder_id ON public.uploads USING btree (builder_id); + + +-- +-- Name: uploads_user_id; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX uploads_user_id ON public.uploads USING btree (user_id); + + -- -- Name: uploads_uuid; Type: INDEX; Schema: public; Owner: - -- diff --git a/src/web/uploads.py b/src/web/uploads.py index 69629cb3..9172473b 100644 --- a/src/web/uploads.py +++ b/src/web/uploads.py @@ -23,24 +23,14 @@ import io import tornado.web from . import base +from .. import uploads from .. import users -@tornado.web.stream_request_body class APIv1IndexHandler(base.APIMixin, tornado.web.RequestHandler): # Allow users to perform uploads allow_users = True - def initialize(self): - # Buffer to cache the uploaded content - self.buffer = io.BytesIO() - - def data_received(self, data): - """ - Called when some data is being received - """ - self.buffer.write(data) - - @tornado.web.authenticated + @base.negotiate def get(self): uploads = [] @@ -59,10 +49,10 @@ class APIv1IndexHandler(base.APIMixin, tornado.web.RequestHandler): "uploads" : uploads, }) - @tornado.web.authenticated - async def put(self): + @base.negotiate + async def post(self): """ - Called after the entire file has been received + Creates a new upload and returns its UUID """ # Fetch the filename filename = self.get_argument("filename") @@ -70,14 +60,17 @@ class APIv1IndexHandler(base.APIMixin, tornado.web.RequestHandler): # Fetch file size size = self.get_argument_int("size") - # Fetch the digest argument - algo, delim, hexdigest = self.get_argument("digest").partition(":") + # Fetch the digest algorithm + digest_algo = self.get_argument("hexdigest_algo") - # Convert hexdigest - digest = bytes.fromhex(hexdigest) + # Fetch the digest + hexdigest = self.get_argument("hexdigest") - # Move to the beginning of the buffer - self.buffer.seek(0) + # Convert hexdigest + try: + digest = bytes.fromhex(hexdigest) + except ValueError as e: + raise tornado.web.HTTPError(400, "Invalid hexdigest") from e # Create a new upload with self.db.transaction(): @@ -86,19 +79,20 @@ class APIv1IndexHandler(base.APIMixin, tornado.web.RequestHandler): filename, size=size, owner=self.current_user, + digest_algo=digest_algo, + digest=digest, ) + except uploads.UnsupportedDigestException as e: + raise tornado.web.HTTPError(400, + "Unsupported digest %s" % digest_algo) from e + except users.QuotaExceededError as e: raise tornado.web.HTTPError(400, "Quota exceeded for %s" % self.current_user) from e - # Import the payload from the buffer - await upload.copyfrom(self.buffer) - - # Check the digest - if not await upload.check_digest(algo, digest): - # 422 - Unprocessable Entity - raise tornado.web.HTTPError(422, "Digest did not match") + except ValueError as e: + raise tornado.web.HTTPError(400, "%s" % e) from e # Send the ID of the upload back to the client self.finish({ @@ -107,11 +101,46 @@ class APIv1IndexHandler(base.APIMixin, tornado.web.RequestHandler): }) +@tornado.web.stream_request_body class APIv1DetailHandler(base.APIMixin, tornado.web.RequestHandler): # Allow users to perform uploads allow_users = True - @tornado.web.authenticated + def initialize(self): + # Buffer to cache the uploaded content + self.buffer = io.BytesIO() + + def data_received(self, data): + """ + Called when some data is being received + """ + self.buffer.write(data) + + # Yes, this does not require authentication. You have seen this correctly. + # This is because of us using SPNEGO which might cause a request being sent + # more than once, which therefore means that the payload is being transferred + # more than once. + # To avoid this, we request the digest when the upload is being created, we + # then generate a unique ID which an attacker would have to guess first and + # then have to upload a file which's hash collides with the original file. + async def put(self, uuid): + """ + Called to store the received payload + """ + # Fetch the upload + upload = self.backend.uploads.get_by_uuid(uuid) + if not upload: + raise tornado.web.HTTPError(404, "Could not find upload %s" % uuid) + + # Import the payload from the buffer + with self.db.transaction(): + try: + await upload.copyfrom(self.buffer) + + except ValueError as e: + raise tornado.web.HTTPError(400, "%s" % e) from e + + @base.negotiate async def delete(self, uuid): """ Deletes an upload with a certain UUID