]> git.ipfire.org Git - pbs.git/blob - src/buildservice/jobs.py
5d5d984c7de961e26f241ab815bd3b1cfdcd9535
[pbs.git] / src / buildservice / jobs.py
1 #!/usr/bin/python
2
3 import asyncio
4 import datetime
5 import hashlib
6 import logging
7 import os
8 import shutil
9
10 import pakfire
11 import pakfire.config
12
13 from . import base
14 from . import logs
15 from . import users
16
17 from .constants import *
18 from .decorators import *
19
20 # Setup logging
21 log = logging.getLogger("pakfire.buildservice.jobs")
22
23 class Jobs(base.Object):
24 def _get_job(self, query, *args):
25 res = self.db.get(query, *args)
26
27 if res:
28 return Job(self.backend, res.id, data=res)
29
30 def _get_jobs(self, query, *args):
31 res = self.db.query(query, *args)
32
33 for row in res:
34 yield Job(self.backend, row.id, data=row)
35
36 def create(self, build, arch, test=False, superseeds=None):
37 job = self._get_job("""
38 INSERT INTO
39 jobs
40 (
41 build_id,
42 arch,
43 test
44 )
45 VALUES
46 (
47 %s,
48 %s,
49 %s
50 )
51 RETURNING *""",
52 build,
53 arch,
54 test,
55 )
56
57 # Set cache for Build object
58 job.build = build
59
60 # Mark if the new job superseeds some other job
61 if superseeds:
62 superseeds.superseeded_by = job
63
64 return job
65
66 def get_by_id(self, id):
67 return self._get_job("SELECT * FROM jobs WHERE id = %s", id)
68
69 def get_by_uuid(self, uuid):
70 return self._get_job("SELECT * FROM jobs WHERE uuid = %s", uuid)
71
72 @property
73 def running(self):
74 jobs = self._get_jobs("""
75 SELECT
76 jobs.*
77 FROM
78 jobs
79 WHERE
80 started_at IS NOT NULL
81 AND
82 finished_at IS NULL
83 ORDER BY
84 finished_at
85 """)
86
87 return list(jobs)
88
89 async def depcheck(self, jobs=None):
90 """
91 Performs a dependency check on all given jobs concurrently
92 """
93 await asyncio.gather(*(job.depcheck() for job in jobs))
94
95
96 class Job(base.DataObject):
97 table = "jobs"
98
99 def __repr__(self):
100 return "<%s id=%s %s>" % (self.__class__.__name__, self.id, self.name)
101
102 def __str__(self):
103 return self.name
104
105 def __lt__(self, other):
106 if isinstance(other, self.__class__):
107 if not self.test and other.test:
108 return True
109
110 if self.build == other.build:
111 return arches.priority(self.arch) < arches.priority(other.arch)
112
113 return self.time_created < other.time_created
114
115 return NotImplemented
116
117 @property
118 def uuid(self):
119 return self.data.uuid
120
121 @property
122 def name(self):
123 return "%s-%s.%s" % (self.pkg.name, self.pkg.evr, self.arch)
124
125 @lazy_property
126 def build(self):
127 return self.backend.builds.get_by_id(self.data.build_id)
128
129 @property
130 def test(self):
131 return self.data.test
132
133 @property
134 def related_jobs(self):
135 """
136 Returns all sibling jobs
137 """
138 return [job for job in self.build.jobs if not self == job]
139
140 @property
141 def pkg(self):
142 return self.build.pkg
143
144 @lazy_property
145 def packages(self):
146 packages = self.backend.packages._get_packages("""
147 SELECT
148 packages.*
149 FROM
150 jobs_packages
151 LEFT JOIN
152 packages ON jobs_packages.pkg_id = packages.id
153 WHERE
154 jobs_packages.job_id = %s
155 ORDER BY
156 packages.name""",
157 self.id,
158 )
159
160 return list(packages)
161
162 @property
163 def size(self):
164 return sum((p.size for p in self.packages))
165
166 @lazy_property
167 def estimated_build_time(self):
168 """
169 Returns the time we expect this job to run for
170 """
171 res = self.db.get("""
172 SELECT
173 build_time
174 FROM
175 package_estimated_build_times
176 WHERE
177 name = %s
178 AND
179 arch = %s""",
180 self.pkg.name, self.arch,
181 )
182
183 if res:
184 return res.build_time
185
186 @property
187 def distro(self):
188 return self.build.distro
189
190 def get_superseeded_by(self):
191 if self.data.superseeded_by:
192 return self.backend.jobs.get_by_id(self.data.superseeded_by)
193
194 def set_superseeded_by(self, superseeded_by):
195 assert isinstance(superseeded_by, self.__class__)
196
197 self._set_attribute("superseeded_by", superseeded_by.id)
198
199 superseeded_by = lazy_property(get_superseeded_by, set_superseeded_by)
200
201 @property
202 def created_at(self):
203 """
204 Returns when this job was created
205 """
206 return self.data.created_at
207
208 @property
209 def started_at(self):
210 """
211 Returns when this job was started
212 """
213 return self.data.started_at
214
215 @property
216 def finished_at(self):
217 """
218 Returns when this job finished
219 """
220 return self.data.finished_at
221
222 def start(self, builder):
223 """
224 Starts this job on builder
225 """
226 log.info("Starting job %s on %s" % (self, builder))
227
228 # Store the assigned builder
229 self._set_attribute("builder_id", builder)
230
231 # Store the time
232 self._set_attribute_now("started_at")
233
234 async def finished(self, success, message=None, log=None):
235 """
236 Called when this job has finished
237 """
238 # Store the time
239 self._set_attribute_now("finished_at")
240
241 # Import log
242 if log:
243 await self._import_log(log)
244
245 # Did this build fail?
246 if not success:
247 # Clone the build
248 self.clone()
249
250 # Store message
251 self._set_attribute("message", message)
252
253 # Notify users
254 if success:
255 self.send_finished_message()
256 else:
257 self._set_attribute("failed", True)
258 self.send_failed_message()
259
260 # XXX propagate any changes to the build
261
262 def is_running(self):
263 """
264 Returns True if this job is running
265 """
266 return self.started_at and not self.finished_at
267
268 def has_finished(self):
269 if self.finished_at:
270 return True
271
272 return False
273
274 @property
275 def failed(self):
276 """
277 Indicates whether this job has failed
278 """
279 return self.data.failed
280
281 @property
282 def message(self):
283 return self.data.message
284
285 def delete(self):
286 """
287 Deletes a job from the database
288 """
289 # Remove the buildroot
290 self.db.execute("DELETE FROM jobs_buildroots WHERE job_id = %s", self.id)
291
292 # Remove the history
293 self.db.execute("DELETE FROM jobs_history WHERE job_id = %s", self.id)
294
295 # Delete all packages
296 for pkg in self:
297 self.db.execute("DELETE FROM jobs_packages \
298 WHERE job_id = %s AND pkg_id = %s", self.id, pkg.id)
299 pkg.delete()
300
301 # Remove all logfiles
302 for logfile in self.logfiles:
303 path = self.backend.path("packages", logfile.path)
304 self.backend.delete_file(path)
305
306 self.db.execute("DELETE FROM logfiles WHERE job_id = %s", self.id)
307
308 # Delete the job itself.
309 self.db.execute("DELETE FROM jobs WHERE id = %s", self.id)
310
311 def clone(self):
312 """
313 Clones this build job
314 """
315 job = self.backend.jobs.create(
316 build=self.build,
317 arch=self.arch,
318 test=self.test,
319 superseeds=self,
320 )
321
322 log.debug("Cloned job %s as %s" % (self, job))
323
324 return job
325
326 # Log
327
328 def has_log(self):
329 if self.log_path:
330 return True
331
332 return False
333
334 @property
335 def log_url(self):
336 return self.backend.path_to_url(self.log_path)
337
338 @property
339 def log_path(self):
340 return self.data.log_path
341
342 @property
343 def log_size(self):
344 return self.data.log_size
345
346 @property
347 def log_digest_blake2s(self):
348 return self.data.log_digest_blake2s
349
350 async def _import_log(self, upload):
351 # Create some destination path
352 path = self.backend.path(
353 "logs",
354 "jobs",
355 self.uuid[0:2],
356 self.uuid[2:4],
357 "%s.log" % self.uuid[4:],
358 )
359
360 # Copy file to its destination
361 await self.backend.copy(upload.path, path)
362
363 # Compute a digest for integrity
364 digest = await upload.digest("blake2s")
365
366 # Store everything in the database
367 self._set_attribute("log_path", path)
368 self._set_attribute("log_size", upload.size)
369 self._set_attribute("log_digest_blake2s", digest)
370
371 # Builder
372
373 @lazy_property
374 def builder(self):
375 if self.data.builder_id:
376 return self.backend.builders.get_by_id(self.data.builder_id)
377
378 @property
379 def arch(self):
380 return self.data.arch
381
382 @property
383 def duration(self):
384 """
385 Returns the total build duration or elapsed time
386 """
387 if self.has_finished():
388 return self.finished_at - self.started_at
389 else:
390 return datetime.datetime.utcnow() - self.started_at
391
392 def add_file(self, filename):
393 """
394 Add the specified file to this job.
395
396 The file is copied to the right directory by this function.
397 """
398 assert os.path.exists(filename)
399
400 if filename.endswith(".%s" % PACKAGE_EXTENSION):
401 # It is not allowed to upload packages on test builds.
402 if self.test:
403 return
404
405 # Open package (creates entry in the database)
406 pkg = self.backend.packages.create(filename)
407
408 # Move package to the build directory.
409 pkg.move(os.path.join(self.build.path, self.arch))
410
411 # Attach the package to this job.
412 self.db.execute("INSERT INTO jobs_packages(job_id, pkg_id) VALUES(%s, %s)",
413 self.id, pkg.id)
414
415 @property
416 def message_recipients(self):
417 l = []
418
419 # Add all people watching the build.
420 l += self.build.message_recipients
421
422 # Add the package maintainer on release builds.
423 if self.build.type == "release":
424 maint = self.pkg.maintainer
425
426 if isinstance(maint, users.User):
427 l.append("%s <%s>" % (maint.realname, maint.email))
428 elif maint:
429 l.append(maint)
430
431 # XXX add committer and commit author.
432
433 # Add the owner of the scratch build on scratch builds.
434 elif self.build.type == "scratch" and self.build.user:
435 l.append("%s <%s>" % \
436 (self.build.user.realname, self.build.user.email))
437
438 return set(l)
439
440 def save_buildroot(self, pkgs):
441 # Cleanup old stuff first (for rebuilding packages)
442 self.db.execute("DELETE FROM jobs_buildroots WHERE job_id = %s", self.id)
443
444 for pkg_name, pkg_uuid in pkgs:
445 self.db.execute("INSERT INTO jobs_buildroots(job_id, pkg_uuid, pkg_name) \
446 VALUES(%s, %s, %s)", self.id, pkg_name, pkg_uuid)
447
448 @lazy_property
449 def buildroot(self):
450 rows = self.db.query("SELECT * FROM jobs_buildroots \
451 WHERE jobs_buildroots.job_id = %s ORDER BY pkg_name", self.id)
452
453 pkgs = []
454 for row in rows:
455 # Search for this package in the packages table.
456 pkg = self.backend.packages.get_by_uuid(row.pkg_uuid)
457 pkgs.append((row.pkg_name, row.pkg_uuid, pkg))
458
459 return pkgs
460
461 def send_finished_message(self):
462 # Send no finished mails for test jobs.
463 if self.test:
464 return
465
466 logging.debug("Sending finished message for job %s to %s" % \
467 (self.name, ", ".join(self.message_recipients)))
468
469 self.backend.messages.send_template_to_many(self.message_recipients,
470 "messages/jobs/finished", job=self)
471
472 def send_failed_message(self):
473 logging.debug("Sending failed message for job %s to %s" % \
474 (self.name, ", ".join(self.message_recipients)))
475
476 self.backend.messages.send_template_to_many(self.message_recipients,
477 "messages/jobs/failed", job=self)
478
479 @property
480 def pakfire(self):
481 """
482 Generate the Pakfire configuration for this job
483 """
484 return self.backend.pakfire(distro=self.distro, repos=[self.build.build_repo])
485
486 async def depcheck(self):
487 """
488 Perform dependency check
489 """
490 log.info("Performing dependency check for %s" % self)
491
492 with self.db.transaction():
493 return await asyncio.to_thread(self._depcheck)
494
495 def _depcheck(self):
496 # Create a Pakfire instance
497 with self.pakfire() as p:
498 # Try to install the source package
499 try:
500 p.install([self.pkg.path], dryrun=True)
501
502 # XXX Pakfire should throw a better exception
503 except Exception as e:
504 self._set_attribute("depcheck_succeeded", False)
505
506 # Store the message
507 self._set_attribute("message", "%s" % e)
508
509 # Everything OK
510 else:
511 self._set_attribute("depcheck_succeeded", True)
512
513 # Store the timestamp
514 self._set_attribute_now("depcheck_performed_at")
515
516 @property
517 def depcheck_succeeded(self):
518 return self.data.depcheck_succeeded
519
520 @property
521 def depcheck_performed_at(self):
522 return self.data.depcheck_performed_at