]> git.ipfire.org Git - pbs.git/blame - src/buildservice/uploads.py
uploads: Rewrite the whole thing
[pbs.git] / src / buildservice / uploads.py
CommitLineData
96bcb9e7 1#!/usr/bin/python3
9137135a 2
7ef2c528 3import asyncio
80d756c8 4import hashlib
7ef2c528 5import hmac
15032b28
MT
6import logging
7import os
08700a32 8import shutil
9137135a 9
2c909128 10from . import base
f062b044 11from . import builders
7ef2c528 12from . import users
2c909128 13from .constants import *
2f64fe68 14from .decorators import *
9137135a 15
a329017a 16# Setup logging
6acc7746 17log = logging.getLogger("pbs.uploads")
15032b28 18
08700a32
MT
19MAX_BUFFER_SIZE = 1 * 1024 * 1024 # 1 MiB
20
bd686a79
MT
21supported_digest_algos = (
22 "blake2b512",
23)
24
25class UnsupportedDigestException(ValueError):
26 pass
27
9137135a 28class Uploads(base.Object):
2f64fe68
MT
29 def _get_upload(self, query, *args):
30 res = self.db.get(query, *args)
9137135a 31
2f64fe68
MT
32 if res:
33 return Upload(self.backend, res.id, data=res)
9137135a 34
2f64fe68
MT
35 def _get_uploads(self, query, *args):
36 res = self.db.query(query, *args)
9137135a 37
2f64fe68
MT
38 for row in res:
39 yield Upload(self.backend, row.id, data=row)
9137135a 40
2f64fe68 41 def __iter__(self):
96bcb9e7
MT
42 uploads = self._get_uploads("SELECT * FROM uploads \
43 ORDER BY created_at DESC")
9137135a 44
57767c6e 45 return iter(uploads)
9137135a 46
1b4386e0 47 def get_by_uuid(self, uuid):
08700a32
MT
48 return self._get_upload("""
49 SELECT
50 *
51 FROM
52 uploads
53 WHERE
54 uuid = %s
08700a32
MT
55 AND
56 expires_at > CURRENT_TIMESTAMP
57 """, uuid,
58 )
96bcb9e7 59
bd686a79 60 async def create(self, filename, size, digest_algo, digest, owner=None):
f062b044
MT
61 builder = None
62 user = None
63
bd686a79
MT
64 # Check if the digest algorithm is supported
65 if not digest_algo in supported_digest_algos:
66 raise UnsupportedDigestException(digest_algo)
67
68 # Check if the digest is set
69 elif not digest:
70 raise ValueError("Empty digest")
71
f062b044 72 # Check uploader type
e5910b93
MT
73 if isinstance(owner, builders.Builder):
74 builder = owner
75 elif isinstance(owner, users.User):
76 user = owner
f062b044 77
7ef2c528
MT
78 # Check quota for users
79 if user:
80 # This will raise an exception if the quota has been exceeded
2275518a 81 user.check_storage_quota(size)
7ef2c528
MT
82
83 # Allocate a new temporary file
bd686a79
MT
84 upload = self._get_upload("""
85 INSERT INTO
86 uploads
87 (
96bcb9e7 88 filename,
96bcb9e7 89 size,
bd686a79
MT
90 builder_id,
91 user_id,
92 digest_algo,
93 digest
94 )
95 VALUES
96 (
97 %s,
98 %s,
99 %s,
100 %s,
101 %s,
102 %s
96bcb9e7 103 )
bd686a79
MT
104 RETURNING *""",
105 filename,
106 size,
107 builder,
108 user,
109 digest_algo,
110 digest,
111 )
96bcb9e7 112
96bcb9e7 113 # Return the newly created upload object
9137135a
MT
114 return upload
115
c416fb05 116 async def create_from_local(self, path, filename=None, **kwargs):
c10dccb9
MT
117 """
118 Imports a file from the local filesystem
119 """
120 # Collect attributes
c416fb05
MT
121 if filename is None:
122 filename = os.path.basename(path)
123
124 # Fetch the size
c10dccb9
MT
125 size = os.path.getsize(path)
126
127 # Create the new upload object
73986414 128 upload = await self.create(filename=filename, size=size, **kwargs)
c10dccb9
MT
129
130 # Import the data
131 with open(path, "rb") as f:
132 await upload.copyfrom(f)
133
134 return upload
135
2d165ceb 136 async def cleanup(self):
96bcb9e7
MT
137 # Find all expired uploads
138 uploads = self._get_uploads("""
139 SELECT
140 *
141 FROM
142 uploads
143 WHERE
144 expires_at <= CURRENT_TIMESTAMP
145 ORDER BY
146 created_at
147 """)
148
149 # Delete them all
150 for upload in uploads:
2d165ceb
MT
151 with self.db.transaction():
152 await upload.delete()
2f64fe68
MT
153
154
155class Upload(base.DataObject):
156 table = "uploads"
157
15032b28 158 def __str__(self):
3e6dbe6c 159 return "%s" % self.uuid
15032b28 160
9137135a
MT
161 @property
162 def uuid(self):
163 return self.data.uuid
164
9137135a
MT
165 @property
166 def filename(self):
167 return self.data.filename
168
169 @property
170 def path(self):
96bcb9e7 171 return self.data.path
9137135a 172
f6e6ff79
MT
173 @property
174 def size(self):
175 return self.data.size
176
bd686a79
MT
177 @property
178 def digest_algo(self):
179 return self.data.digest_algo
180
181 @property
182 def digest(self):
183 return self.data.digest
184
2f64fe68
MT
185 # Builder
186
187 def get_builder(self):
f6e6ff79 188 if self.data.builder_id:
2f64fe68 189 return self.backend.builders.get_by_id(self.data.builder_id)
f6e6ff79 190
2f64fe68 191 def set_builder(self, builder):
57767c6e 192 self._set_attribute("builder_id", builder.id)
2f64fe68
MT
193
194 builder = lazy_property(get_builder, set_builder)
195
196 # User
197
198 def get_user(self):
f6e6ff79 199 if self.data.user_id:
2f64fe68
MT
200 return self.backend.users.get_by_id(self.data.user_id)
201
202 def set_user(self, user):
57767c6e 203 self._set_attribute("user_id", user.id)
2f64fe68
MT
204
205 user = lazy_property(get_user, set_user)
9137135a 206
0656dfa4
MT
207 def has_perm(self, who):
208 """
209 Returns True if "who" has permissions to use this upload
210 """
211 if self.builder == who or self.user == who:
212 return True
213
214 # No permission
215 return False
216
2d165ceb 217 async def delete(self):
15032b28
MT
218 log.info("Deleting upload %s (%s)" % (self, self.path))
219
96bcb9e7 220 # Remove the uploaded data
2d165ceb 221 await self.backend.unlink(self.path)
9137135a 222
96bcb9e7 223 # Delete the upload from the database
9137135a
MT
224 self.db.execute("DELETE FROM uploads WHERE id = %s", self.id)
225
f6e6ff79 226 @property
96bcb9e7
MT
227 def created_at(self):
228 return self.data.created_at
f6e6ff79
MT
229
230 @property
96bcb9e7
MT
231 def expires_at(self):
232 return self.data.expires_at
80d756c8 233
bd686a79 234 async def copyfrom(self, src):
7ef2c528 235 """
bd686a79 236 Copies the content of this upload from the source file descriptor
7ef2c528 237 """
bd686a79
MT
238 # Reset the source file handle
239 src.seek(0)
7ef2c528 240
bd686a79
MT
241 # Setup the hash function
242 h = hashlib.new(self.digest_algo)
7ef2c528 243
bd686a79
MT
244 async with self.backend.tempfile(delete=False) as f:
245 try:
246 while True:
247 buffer = src.read(1024 ** 2)
248 if not buffer:
249 break
7ef2c528 250
bd686a79
MT
251 # Put the buffer into the hash function
252 h.update(buffer)
7ef2c528 253
bd686a79
MT
254 # Write the buffer to disk
255 await f.write(buffer)
80d756c8 256
bd686a79
MT
257 # How many bytes did we write?
258 received_size = await f.tell()
80d756c8 259
bd686a79
MT
260 # Get the digest
261 computed_digest = h.digest()
80d756c8 262
bd686a79
MT
263 # Check if the filesize matches
264 if not received_size == self.size:
265 raise ValueError("File size differs")
80d756c8 266
bd686a79
MT
267 # Check that the digest matches
268 if not hmac.compare_digest(computed_digest, self.digest):
269 raise ValueError("Invalid digest")
80d756c8 270
bd686a79
MT
271 # If there has been any kind of exception, we want to delete the temporary file
272 except Exception as e:
273 await self.backend.unlink(f.name)
703b2fda 274
bd686a79 275 raise e
fe59762c 276
bd686a79
MT
277 # Store the path
278 self._set_attribute("path", f.name)
20781e09 279
703b2fda
MT
280 async def copyinto(self, dst):
281 """
282 Copies the content of this upload into the destination file descriptor.
283 """
8e0f4eb9 284 return await asyncio.to_thread(self._copyinto, dst)
703b2fda
MT
285
286 def _copyinto(self, dst):
1d7cc7d1
MT
287 assert os.path.exists(self.path), "Upload has been deleted - %s" % self
288
703b2fda
MT
289 # Open the source file and copy it into the destination file descriptor
290 with open(self.path, "rb") as src:
291 shutil.copyfileobj(src, dst)