]> git.ipfire.org Git - pbs.git/blob - src/buildservice/jobs.py
jobs: Fix some direct accesses to self._data
[pbs.git] / src / buildservice / jobs.py
1 #!/usr/bin/python
2
3 import datetime
4 import hashlib
5 import logging
6 import os
7 import shutil
8 import uuid
9
10 import pakfire
11 import pakfire.config
12
13 log = logging.getLogger("builds")
14 log.propagate = 1
15
16 from . import arches
17 from . import base
18 from . import logs
19 from . import users
20
21 from .constants import *
22 from .decorators import *
23
24 class Jobs(base.Object):
25 def _get_job(self, query, *args):
26 res = self.db.get(query, *args)
27
28 if res:
29 return Job(self.backend, res.id, data=res)
30
31 def _get_jobs(self, query, *args):
32 res = self.db.query(query, *args)
33
34 for row in res:
35 yield Job(self.backend, row.id, data=row)
36
37 def create(self, build, arch, test=False, superseeds=None):
38 job = self._get_job("INSERT INTO jobs(uuid, build_id, arch, test) \
39 VALUES(%s, %s, %s, %s) RETURNING *", "%s" % uuid.uuid4(), build.id, arch, test)
40 job.log("created")
41
42 # Set cache for Build object.
43 job.build = build
44
45 # Mark if the new job superseeds some other job
46 if superseeds:
47 superseeds.superseeded_by = job
48
49 # Jobs are by default in state "new" and wait for being checked
50 # for dependencies. Packages that do have no build dependencies
51 # can directly be forwarded to "pending" state.
52 if not job.pkg.requires:
53 job.state = "pending"
54
55 return job
56
57 def get_by_id(self, id, data=None):
58 return Job(self.backend, id, data)
59
60 def get_by_uuid(self, uuid):
61 job = self.db.get("SELECT id FROM jobs WHERE uuid = %s", uuid)
62
63 if job:
64 return self.get_by_id(job.id)
65
66 def get_active(self, host_id=None, builder=None, states=None):
67 if builder:
68 host_id = builder.id
69
70 if states is None:
71 states = ["dispatching", "running", "uploading"]
72
73 query = "SELECT * FROM jobs WHERE state IN (%s)" % ", ".join(["%s"] * len(states))
74 args = states
75
76 if host_id:
77 query += " AND builder_id = %s" % host_id
78
79 query += " ORDER BY \
80 CASE \
81 WHEN jobs.state = 'running' THEN 0 \
82 WHEN jobs.state = 'uploading' THEN 1 \
83 WHEN jobs.state = 'dispatching' THEN 2 \
84 WHEN jobs.state = 'pending' THEN 3 \
85 WHEN jobs.state = 'new' THEN 4 \
86 END, time_started ASC"
87
88 return [Job(self.backend, j.id, j) for j in self.db.query(query, *args)]
89
90 def get_latest(self, arch=None, builder=None, limit=None, age=None, date=None):
91 query = "SELECT * FROM jobs"
92 args = []
93
94 where = ["(state = 'finished' OR state = 'failed' OR state = 'aborted')"]
95
96 if arch:
97 where.append("arch = %s")
98 args.append(arch)
99
100 if builder:
101 where.append("builder_id = %s")
102 args.append(builder.id)
103
104 if date:
105 try:
106 year, month, day = date.split("-", 2)
107 date = datetime.date(int(year), int(month), int(day))
108 except ValueError:
109 pass
110 else:
111 where.append("(time_created::date = %s OR \
112 time_started::date = %s OR time_finished::date = %s)")
113 args += (date, date, date)
114
115 if age:
116 where.append("time_finished >= NOW() - '%s'::interval" % age)
117
118 if where:
119 query += " WHERE %s" % " AND ".join(where)
120
121 query += " ORDER BY time_finished DESC"
122
123 if limit:
124 query += " LIMIT %s"
125 args.append(limit)
126
127 return [Job(self.backend, j.id, j) for j in self.db.query(query, *args)]
128
129 def get_average_build_time(self):
130 """
131 Returns the average build time of all finished builds from the
132 last 3 months.
133 """
134 result = self.db.get("SELECT AVG(time_finished - time_started) as average \
135 FROM jobs WHERE type = 'build' AND state = 'finished' AND \
136 time_finished >= NOW() - '3 months'::interval")
137
138 if result:
139 return result.average
140
141 def count(self, *states):
142 query = "SELECT COUNT(*) AS count FROM jobs"
143 args = []
144
145 if states:
146 query += " WHERE state IN %s"
147 args.append(states)
148
149 jobs = self.db.get(query, *args)
150 if jobs:
151 return jobs.count
152
153 def restart_failed(self):
154 jobs = self._get_jobs("SELECT jobs.* FROM jobs \
155 JOIN builds ON builds.id = jobs.build_id \
156 WHERE \
157 jobs.type = 'build' AND \
158 jobs.state = 'failed' AND \
159 NOT builds.state = 'broken' AND \
160 jobs.time_finished < NOW() - '72 hours'::interval \
161 ORDER BY \
162 CASE \
163 WHEN jobs.type = 'build' THEN 0 \
164 WHEN jobs.type = 'test' THEN 1 \
165 END, \
166 builds.priority DESC, jobs.time_created ASC")
167
168 # Restart the job
169 for job in jobs:
170 job.restart()
171
172
173 class Job(base.DataObject):
174 table = "jobs"
175
176 def __str__(self):
177 return "<%s id=%s %s>" % (self.__class__.__name__, self.id, self.name)
178
179 def __eq__(self, other):
180 if isinstance(other, self.__class__):
181 return self.id == other.id
182
183 def __lt__(self, other):
184 if isinstance(other, self.__class__):
185 if not self.test and other.test:
186 return True
187
188 if self.build == other.build:
189 return arches.priority(self.arch) < arches.priority(other.arch)
190
191 return self.time_created < other.time_created
192
193 def __iter__(self):
194 packages = self.backend.packages._get_packages("SELECT packages.* FROM jobs_packages \
195 LEFT JOIN packages ON jobs_packages.pkg_id = packages.id \
196 WHERE jobs_packages.job_id = %s ORDER BY packages.name", self.id)
197
198 return iter(packages)
199
200 def __nonzero__(self):
201 return True
202
203 def __len__(self):
204 res = self.db.get("SELECT COUNT(*) AS len FROM jobs_packages \
205 WHERE job_id = %s", self.id)
206
207 return res.len
208
209 @property
210 def distro(self):
211 return self.build.distro
212
213 def restart(self):
214 # Copy the job and let it build again
215 return self.backend.jobs.create(self.build, self.arch,
216 test=self.test, superseeds=self)
217
218 def get_superseeded_by(self):
219 if self.data.superseeded_by:
220 return self.backend.jobs.get_by_id(self.data.superseeded_by)
221
222 def set_superseeded_by(self, superseeded_by):
223 assert isinstance(superseeded_by, self.__class__)
224
225 self._set_attribute("superseeded_by", superseeded_by.id)
226 self.superseeded_by = superseeded_by
227
228 superseeded_by = lazy_property(get_superseeded_by, set_superseeded_by)
229
230 def delete(self):
231 self._set_attribute("delete", True)
232
233 def remove(self):
234 """
235 Removes a job from the database
236 """
237 self.__remove_buildroots()
238 self.__remove_history()
239 self.__remove_packages()
240 self.__remove_logfiles()
241
242 # Delete the job itself.
243 self.db.execute("DELETE FROM jobs WHERE id = %s", self.id)
244
245 def __remove_buildroots(self):
246 """
247 Removes all buildroots.
248 """
249 self.db.execute("DELETE FROM jobs_buildroots WHERE job_id = %s", self.id)
250
251 def __remove_history(self):
252 """
253 Removes all references in the history to this build job.
254 """
255 self.db.execute("DELETE FROM jobs_history WHERE job_id = %s", self.id)
256
257 def __remove_packages(self):
258 """
259 Deletes all uploaded files from the job.
260 """
261 for pkg in self.packages:
262 pkg.delete()
263
264 self.db.execute("DELETE FROM jobs_packages WHERE job_id = %s", self.id)
265
266 def __remove_logfiles(self):
267 for logfile in self.logfiles:
268 self.db.execute("INSERT INTO queue_delete(path) VALUES(%s)", logfile.path)
269
270 ## Logging stuff
271
272 def log(self, action, user=None, state=None, builder=None, test_job=None):
273 user_id = None
274 if user:
275 user_id = user.id
276
277 builder_id = None
278 if builder:
279 builder_id = builder.id
280
281 test_job_id = None
282 if test_job:
283 test_job_id = test_job.id
284
285 self.db.execute("INSERT INTO jobs_history(job_id, action, state, user_id, \
286 time, builder_id, test_job_id) VALUES(%s, %s, %s, %s, NOW(), %s, %s)",
287 self.id, action, state, user_id, builder_id, test_job_id)
288
289 def get_log(self, limit=None, offset=None, user=None):
290 query = "SELECT * FROM jobs_history"
291
292 conditions = ["job_id = %s",]
293 args = [self.id,]
294
295 if user:
296 conditions.append("user_id = %s")
297 args.append(user.id)
298
299 if conditions:
300 query += " WHERE %s" % " AND ".join(conditions)
301
302 query += " ORDER BY time DESC"
303
304 if limit:
305 if offset:
306 query += " LIMIT %s,%s"
307 args += [offset, limit,]
308 else:
309 query += " LIMIT %s"
310 args += [limit,]
311
312 entries = []
313 for entry in self.db.query(query, *args):
314 entry = logs.JobLogEntry(self.backend, entry)
315 entries.append(entry)
316
317 return entries
318
319 @property
320 def uuid(self):
321 return self.data.uuid
322
323 @property
324 def test(self):
325 return self.data.test
326
327 @property
328 def build_id(self):
329 return self.data.build_id
330
331 @lazy_property
332 def build(self):
333 return self.backend.builds.get_by_id(self.build_id)
334
335 @property
336 def related_jobs(self):
337 ret = []
338
339 for job in self.build.jobs:
340 if job == self:
341 continue
342
343 ret.append(job)
344
345 return ret
346
347 @property
348 def pkg(self):
349 return self.build.pkg
350
351 @property
352 def name(self):
353 return "%s-%s.%s" % (self.pkg.name, self.pkg.friendly_version, self.arch)
354
355 @property
356 def size(self):
357 return sum((p.size for p in self.packages))
358
359 @lazy_property
360 def rank(self):
361 """
362 Returns the rank in the build queue
363 """
364 if not self.state == "pending":
365 return
366
367 res = self.db.get("SELECT rank FROM jobs_queue WHERE job_id = %s", self.id)
368
369 if res:
370 return res.rank
371
372 def is_running(self):
373 """
374 Returns True if job is in a running state.
375 """
376 return self.state in ("pending", "dispatching", "running", "uploading")
377
378 def get_state(self):
379 return self.data.state
380
381 def set_state(self, state, user=None, log=True):
382 # Nothing to do if the state remains.
383 if not self.state == state:
384 self._set_attribute("state", state)
385
386 # Log the event.
387 if log and not state == "new":
388 self.log("state_change", state=state, user=user)
389
390 # Always clear the message when the status is changed.
391 self.update_message(None)
392
393 # Update some more informations.
394 if state == "dispatching":
395 # Set start time.
396 self._set_attribute("time_started", datetime.datetime.utcnow())
397
398 elif state in ("aborted", "dependency_error", "finished", "failed"):
399 self._set_attribute("time_finished", datetime.datetime.utcnow())
400
401 # Send messages to the user.
402 if state == "finished":
403 self.send_finished_message()
404
405 elif state == "failed":
406 # Remove all package files if a job is set to failed state.
407 self.__delete_packages()
408
409 self.send_failed_message()
410
411 # Automatically update the state of the build (not on test builds).
412 if not self.test:
413 self.build.auto_update_state()
414
415 state = property(get_state, set_state)
416
417 @property
418 def message(self):
419 return self.data.message
420
421 def update_message(self, message):
422 self._set_attribute("message", message)
423
424 def get_builder(self):
425 if self.data.builder_id:
426 return self.backend.builders.get_by_id(self.data.builder_id)
427
428 def set_builder(self, builder, user=None):
429 self._set_attribute("builder_id", builder.id)
430
431 # Log the event.
432 if user:
433 self.log("builder_assigned", builder=builder, user=user)
434
435 builder = lazy_property(get_builder, set_builder)
436
437 @property
438 def arch(self):
439 return self.data.arch
440
441 @property
442 def duration(self):
443 if not self.time_started:
444 return 0
445
446 if self.time_finished:
447 delta = self.time_finished - self.time_started
448 else:
449 delta = datetime.datetime.utcnow() - self.time_started
450
451 return delta.total_seconds()
452
453 @property
454 def time_created(self):
455 return self.data.time_created
456
457 @property
458 def time_started(self):
459 return self.data.time_started
460
461 @property
462 def time_finished(self):
463 return self.data.time_finished
464
465 @property
466 def expected_runtime(self):
467 """
468 Returns the estimated time and stddev, this job takes to finish.
469 """
470 # Get the average build time.
471 build_times = self.backend.builds.get_build_times_by_arch(self.arch,
472 name=self.pkg.name)
473
474 # If there is no statistical data, we cannot estimate anything.
475 if not build_times:
476 return None, None
477
478 return build_times.average, build_times.stddev
479
480 @property
481 def eta(self):
482 expected_runtime, stddev = self.expected_runtime
483
484 if expected_runtime:
485 return expected_runtime - int(self.duration), stddev
486
487 def get_pkg_by_uuid(self, uuid):
488 pkg = self.backend.packages._get_package("SELECT packages.id FROM packages \
489 JOIN jobs_packages ON jobs_packages.pkg_id = packages.id \
490 WHERE jobs_packages.job_id = %s AND packages.uuid = %s",
491 self.id, uuid)
492
493 if pkg:
494 pkg.job = self
495 return pkg
496
497 @lazy_property
498 def logfiles(self):
499 logfiles = []
500
501 for log in self.db.query("SELECT id FROM logfiles WHERE job_id = %s", self.id):
502 log = logs.LogFile(self.backend, log.id)
503 log._job = self
504
505 logfiles.append(log)
506
507 return logfiles
508
509 def add_file(self, filename):
510 """
511 Add the specified file to this job.
512
513 The file is copied to the right directory by this function.
514 """
515 assert os.path.exists(filename)
516
517 if filename.endswith(".log"):
518 self._add_file_log(filename)
519
520 elif filename.endswith(".%s" % PACKAGE_EXTENSION):
521 # It is not allowed to upload packages on test builds.
522 if self.test:
523 return
524
525 self._add_file_package(filename)
526
527 def _add_file_log(self, filename):
528 """
529 Attach a log file to this job.
530 """
531 target_dirname = os.path.join(self.build.path, "logs")
532
533 if self.test:
534 i = 1
535 while True:
536 target_filename = os.path.join(target_dirname,
537 "test.%s.%s.%s.log" % (self.arch, i, self.uuid))
538
539 if os.path.exists(target_filename):
540 i += 1
541 else:
542 break
543 else:
544 target_filename = os.path.join(target_dirname,
545 "build.%s.%s.log" % (self.arch, self.uuid))
546
547 # Make sure the target directory exists.
548 if not os.path.exists(target_dirname):
549 os.makedirs(target_dirname)
550
551 # Calculate a SHA512 hash from that file.
552 f = open(filename, "rb")
553 h = hashlib.sha512()
554 while True:
555 buf = f.read(BUFFER_SIZE)
556 if not buf:
557 break
558
559 h.update(buf)
560 f.close()
561
562 # Copy the file to the final location.
563 shutil.copy2(filename, target_filename)
564
565 # Create an entry in the database.
566 self.db.execute("INSERT INTO logfiles(job_id, path, filesize, hash_sha512) \
567 VALUES(%s, %s, %s, %s)", self.id, os.path.relpath(target_filename, PACKAGES_DIR),
568 os.path.getsize(target_filename), h.hexdigest())
569
570 def _add_file_package(self, filename):
571 # Open package (creates entry in the database)
572 pkg = self.backend.packages.create(filename)
573
574 # Move package to the build directory.
575 pkg.move(os.path.join(self.build.path, self.arch))
576
577 # Attach the package to this job.
578 self.db.execute("INSERT INTO jobs_packages(job_id, pkg_id) VALUES(%s, %s)",
579 self.id, pkg.id)
580
581 def get_aborted_state(self):
582 return self.data.aborted_state
583
584 def set_aborted_state(self, state):
585 self._set_attribute("aborted_state", state)
586
587 aborted_state = property(get_aborted_state, set_aborted_state)
588
589 @property
590 def message_recipients(self):
591 l = []
592
593 # Add all people watching the build.
594 l += self.build.message_recipients
595
596 # Add the package maintainer on release builds.
597 if self.build.type == "release":
598 maint = self.pkg.maintainer
599
600 if isinstance(maint, users.User):
601 l.append("%s <%s>" % (maint.realname, maint.email))
602 elif maint:
603 l.append(maint)
604
605 # XXX add committer and commit author.
606
607 # Add the owner of the scratch build on scratch builds.
608 elif self.build.type == "scratch" and self.build.user:
609 l.append("%s <%s>" % \
610 (self.build.user.realname, self.build.user.email))
611
612 return set(l)
613
614 def save_buildroot(self, pkgs):
615 # Cleanup old stuff first (for rebuilding packages)
616 self.db.execute("DELETE FROM jobs_buildroots WHERE job_id = %s", self.id)
617
618 for pkg_name, pkg_uuid in pkgs:
619 self.db.execute("INSERT INTO jobs_buildroots(job_id, pkg_uuid, pkg_name) \
620 VALUES(%s, %s, %s)", self.id, pkg_name, pkg_uuid)
621
622 @lazy_property
623 def buildroot(self):
624 rows = self.db.query("SELECT * FROM jobs_buildroots \
625 WHERE jobs_buildroots.job_id = %s ORDER BY pkg_name", self.id)
626
627 pkgs = []
628 for row in rows:
629 # Search for this package in the packages table.
630 pkg = self.backend.packages.get_by_uuid(row.pkg_uuid)
631 pkgs.append((row.pkg_name, row.pkg_uuid, pkg))
632
633 return pkgs
634
635 def send_finished_message(self):
636 # Send no finished mails for test jobs.
637 if self.test:
638 return
639
640 logging.debug("Sending finished message for job %s to %s" % \
641 (self.name, ", ".join(self.message_recipients)))
642
643 info = {
644 "build_name" : self.name,
645 "build_host" : self.builder.name,
646 "build_uuid" : self.uuid,
647 }
648
649 self.backend.messages.send_to_all(self.message_recipients,
650 MSG_BUILD_FINISHED_SUBJECT, MSG_BUILD_FINISHED, info)
651
652 def send_failed_message(self):
653 logging.debug("Sending failed message for job %s to %s" % \
654 (self.name, ", ".join(self.message_recipients)))
655
656 build_host = "--"
657 if self.builder:
658 build_host = self.builder.name
659
660 info = {
661 "build_name" : self.name,
662 "build_host" : build_host,
663 "build_uuid" : self.uuid,
664 }
665
666 self.backend.messages.send_to_all(self.message_recipients,
667 MSG_BUILD_FAILED_SUBJECT, MSG_BUILD_FAILED, info)
668
669 def set_start_time(self, start_not_before):
670 self._set_attribute("start_not_before", start_not_before)
671
672 def schedule(self, type, start_time=None, user=None):
673 assert type in ("rebuild", "test")
674
675 if type == "rebuild":
676 if self.state == "finished":
677 return
678
679 job = self.restart()
680 job.set_start_time(start_time)
681
682 # Log the event.
683 self.log("schedule_rebuild", user=user)
684
685 elif type == "test":
686 if not self.state == "finished":
687 return
688
689 # Create a new job with same build and arch.
690 job = self.create(self.backend, self.build, self.arch, test=True)
691 job.set_start_time(start_time)
692
693 # Log the event.
694 self.log("schedule_test_job", test_job=job, user=user)
695
696 return job
697
698 def schedule_test(self, start_not_before=None, user=None):
699 # XXX to be removed
700 return self.schedule("test", start_time=start_not_before, user=user)
701
702 def schedule_rebuild(self, start_not_before=None, user=None):
703 # XXX to be removed
704 return self.schedule("rebuild", start_time=start_not_before, user=user)
705
706 def get_build_repos(self):
707 """
708 Returns a list of all repositories that should be used when
709 building this job.
710 """
711 repo_ids = self.db.query("SELECT repo_id FROM jobs_repos WHERE job_id = %s",
712 self.id)
713
714 if not repo_ids:
715 return self.distro.get_build_repos()
716
717 repos = []
718 for repo in self.distro.repositories:
719 if repo.id in [r.id for r in repo_ids]:
720 repos.append(repo)
721
722 return repos or self.distro.get_build_repos()
723
724 def get_repo_config(self):
725 """
726 Get repository configuration file that is sent to the builder.
727 """
728 confs = []
729
730 for repo in self.get_build_repos():
731 confs.append(repo.get_conf())
732
733 return "\n\n".join(confs)
734
735 def get_config(self):
736 """
737 Get configuration file that is sent to the builder.
738 """
739 confs = []
740
741 # Add the distribution configuration.
742 confs.append(self.distro.get_config())
743
744 # Then add all repositories for this build.
745 confs.append(self.get_repo_config())
746
747 return "\n\n".join(confs)
748
749 def resolvdep(self):
750 config = pakfire.config.Config(files=["general.conf"])
751 config.parse(self.get_config())
752
753 # The filename of the source file.
754 filename = os.path.join(PACKAGES_DIR, self.build.pkg.path)
755 assert os.path.exists(filename), filename
756
757 # Create a new pakfire instance with the configuration for
758 # this build.
759 p = pakfire.PakfireServer(config=config, arch=self.arch)
760
761 # Try to solve the build dependencies.
762 try:
763 solver = p.resolvdep(filename)
764
765 # Catch dependency errors and log the problem string.
766 except DependencyError, e:
767 self.state = "dependency_error"
768 self.update_message(e)
769
770 else:
771 # If the build dependencies can be resolved, we set the build in
772 # pending state.
773 if solver.status is True:
774 if self.state in ("failed",):
775 return
776
777 self.state = "pending"