]> git.ipfire.org Git - pbs.git/blob - src/buildservice/jobs.py
jobs: Drop type field and replace it by test field
[pbs.git] / src / buildservice / jobs.py
1 #!/usr/bin/python
2
3 import datetime
4 import hashlib
5 import logging
6 import os
7 import shutil
8 import uuid
9
10 import pakfire
11 import pakfire.config
12
13 log = logging.getLogger("builds")
14 log.propagate = 1
15
16 from . import arches
17 from . import base
18 from . import logs
19 from . import users
20
21 from .constants import *
22 from .decorators import *
23
24 class Jobs(base.Object):
25 def _get_job(self, query, *args):
26 res = self.db.get(query, *args)
27
28 if res:
29 return Job(self.backend, res.id, data=res)
30
31 def _get_jobs(self, query, *args):
32 res = self.db.query(query, *args)
33
34 for row in res:
35 yield Job(self.backend, row.id, data=row)
36
37 def create(self, build, arch, type="build", superseeds=None):
38 job = self._get_job("INSERT INTO jobs(uuid, type, build_id, arch, time_created) \
39 VALUES(%s, %s, %s, %s, NOW()) RETURNING *", "%s" % uuid.uuid4(), type, build.id, arch)
40 job.log("created")
41
42 # Set cache for Build object.
43 job.build = build
44
45 # Mark if the new job superseeds some other job
46 if superseeds:
47 superseeds.superseeded_by = job
48
49 # Jobs are by default in state "new" and wait for being checked
50 # for dependencies. Packages that do have no build dependencies
51 # can directly be forwarded to "pending" state.
52 if not job.pkg.requires:
53 job.state = "pending"
54
55 return job
56
57 def get_by_id(self, id, data=None):
58 return Job(self.backend, id, data)
59
60 def get_by_uuid(self, uuid):
61 job = self.db.get("SELECT id FROM jobs WHERE uuid = %s", uuid)
62
63 if job:
64 return self.get_by_id(job.id)
65
66 def get_by_build(self, build_id, build=None, type=None):
67 """
68 Get all jobs in the specifies build.
69 """
70 query = "SELECT * FROM jobs WHERE build_id = %s"
71 args = [build_id,]
72
73 if type:
74 query += " AND type = %s"
75 args.append(type)
76
77 # Get IDs of all builds in this group.
78 jobs = []
79 for job in self.db.query(query, *args):
80 job = Job(self.backend, job.id, job)
81
82 # If the Build object was set, we set it so it won't be retrieved
83 # from the database again.
84 if build:
85 job._build = build
86
87 jobs.append(job)
88
89 # Return sorted list of jobs.
90 return sorted(jobs)
91
92 def get_active(self, host_id=None, builder=None, states=None):
93 if builder:
94 host_id = builder.id
95
96 if states is None:
97 states = ["dispatching", "running", "uploading"]
98
99 query = "SELECT * FROM jobs WHERE state IN (%s)" % ", ".join(["%s"] * len(states))
100 args = states
101
102 if host_id:
103 query += " AND builder_id = %s" % host_id
104
105 query += " ORDER BY \
106 CASE \
107 WHEN jobs.state = 'running' THEN 0 \
108 WHEN jobs.state = 'uploading' THEN 1 \
109 WHEN jobs.state = 'dispatching' THEN 2 \
110 WHEN jobs.state = 'pending' THEN 3 \
111 WHEN jobs.state = 'new' THEN 4 \
112 END, time_started ASC"
113
114 return [Job(self.backend, j.id, j) for j in self.db.query(query, *args)]
115
116 def get_latest(self, arch=None, builder=None, limit=None, age=None, date=None):
117 query = "SELECT * FROM jobs"
118 args = []
119
120 where = ["(state = 'finished' OR state = 'failed' OR state = 'aborted')"]
121
122 if arch:
123 where.append("arch = %s")
124 args.append(arch)
125
126 if builder:
127 where.append("builder_id = %s")
128 args.append(builder.id)
129
130 if date:
131 try:
132 year, month, day = date.split("-", 2)
133 date = datetime.date(int(year), int(month), int(day))
134 except ValueError:
135 pass
136 else:
137 where.append("(time_created::date = %s OR \
138 time_started::date = %s OR time_finished::date = %s)")
139 args += (date, date, date)
140
141 if age:
142 where.append("time_finished >= NOW() - '%s'::interval" % age)
143
144 if where:
145 query += " WHERE %s" % " AND ".join(where)
146
147 query += " ORDER BY time_finished DESC"
148
149 if limit:
150 query += " LIMIT %s"
151 args.append(limit)
152
153 return [Job(self.backend, j.id, j) for j in self.db.query(query, *args)]
154
155 def get_average_build_time(self):
156 """
157 Returns the average build time of all finished builds from the
158 last 3 months.
159 """
160 result = self.db.get("SELECT AVG(time_finished - time_started) as average \
161 FROM jobs WHERE type = 'build' AND state = 'finished' AND \
162 time_finished >= NOW() - '3 months'::interval")
163
164 if result:
165 return result.average
166
167 def count(self, *states):
168 query = "SELECT COUNT(*) AS count FROM jobs"
169 args = []
170
171 if states:
172 query += " WHERE state IN %s"
173 args.append(states)
174
175 jobs = self.db.get(query, *args)
176 if jobs:
177 return jobs.count
178
179 def restart_failed(self, max_tries=9):
180 jobs = self._get_jobs("SELECT jobs.* FROM jobs \
181 JOIN builds ON builds.id = jobs.build_id \
182 WHERE \
183 jobs.type = 'build' AND \
184 jobs.state = 'failed' AND \
185 jobs.tries <= %s AND \
186 NOT builds.state = 'broken' AND \
187 jobs.time_finished < NOW() - '72 hours'::interval \
188 ORDER BY \
189 CASE \
190 WHEN jobs.type = 'build' THEN 0 \
191 WHEN jobs.type = 'test' THEN 1 \
192 END, \
193 builds.priority DESC, jobs.time_created ASC",
194 max_tries)
195
196 # Restart the job
197 for job in jobs:
198 job.set_state("new", log=False)
199
200
201 class Job(base.DataObject):
202 table = "jobs"
203
204 def __str__(self):
205 return "<%s id=%s %s>" % (self.__class__.__name__, self.id, self.name)
206
207 def __eq__(self, other):
208 if isinstance(other, self.__class__):
209 return self.id == other.id
210
211 def __lt__(self, other):
212 if isinstance(other, self.__class__):
213 if not self.test and other.test:
214 return True
215
216 if self.build == other.build:
217 return arches.priority(self.arch) < arches.priority(other.arch)
218
219 return self.time_created < other.time_created
220
221 def __iter__(self):
222 packages = self.backend.packages._get_packages("SELECT packages.* FROM jobs_packages \
223 LEFT JOIN packages ON jobs_packages.pkg_id = packages.id \
224 WHERE jobs_packages.job_id = %s ORDER BY packages.name", self.id)
225
226 return iter(packages)
227
228 def __nonzero__(self):
229 return True
230
231 def __len__(self):
232 res = self.db.get("SELECT COUNT(*) AS len FROM jobs_packages \
233 WHERE job_id = %s", self.id)
234
235 return res.len
236
237 @property
238 def distro(self):
239 return self.build.distro
240
241 def get_superseeded_by(self):
242 if self.data.superseeded_by:
243 return self.backend.jobs.get_by_id(self.data.superseeded_by)
244
245 def set_superseeded_by(self, superseeded_by):
246 assert isinstance(superseeded_by, self.__class__)
247
248 self._set_attribute("superseeded_by", superseeded_by.id)
249 self.superseeded_by = superseeded_by
250
251 superseeded_by = lazy_property(get_superseeded_by, set_superseeded_by)
252
253 def delete(self):
254 self.__delete_buildroots()
255 self.__delete_history()
256 self.__delete_packages()
257 self.__delete_logfiles()
258
259 # Delete the job itself.
260 self.db.execute("DELETE FROM jobs WHERE id = %s", self.id)
261
262 def __delete_buildroots(self):
263 """
264 Removes all buildroots.
265 """
266 self.db.execute("DELETE FROM jobs_buildroots WHERE job_id = %s", self.id)
267
268 def __delete_history(self):
269 """
270 Removes all references in the history to this build job.
271 """
272 self.db.execute("DELETE FROM jobs_history WHERE job_id = %s", self.id)
273
274 def __delete_packages(self):
275 """
276 Deletes all uploaded files from the job.
277 """
278 for pkg in self.packages:
279 pkg.delete()
280
281 self.db.execute("DELETE FROM jobs_packages WHERE job_id = %s", self.id)
282
283 def __delete_logfiles(self):
284 for logfile in self.logfiles:
285 self.db.execute("INSERT INTO queue_delete(path) VALUES(%s)", logfile.path)
286
287 def reset(self, user=None):
288 self.__delete_buildroots()
289 self.__delete_packages()
290 self.__delete_history()
291 self.__delete_logfiles()
292
293 self.state = "new"
294 self.log("reset", user=user)
295
296 ## Logging stuff
297
298 def log(self, action, user=None, state=None, builder=None, test_job=None):
299 user_id = None
300 if user:
301 user_id = user.id
302
303 builder_id = None
304 if builder:
305 builder_id = builder.id
306
307 test_job_id = None
308 if test_job:
309 test_job_id = test_job.id
310
311 self.db.execute("INSERT INTO jobs_history(job_id, action, state, user_id, \
312 time, builder_id, test_job_id) VALUES(%s, %s, %s, %s, NOW(), %s, %s)",
313 self.id, action, state, user_id, builder_id, test_job_id)
314
315 def get_log(self, limit=None, offset=None, user=None):
316 query = "SELECT * FROM jobs_history"
317
318 conditions = ["job_id = %s",]
319 args = [self.id,]
320
321 if user:
322 conditions.append("user_id = %s")
323 args.append(user.id)
324
325 if conditions:
326 query += " WHERE %s" % " AND ".join(conditions)
327
328 query += " ORDER BY time DESC"
329
330 if limit:
331 if offset:
332 query += " LIMIT %s,%s"
333 args += [offset, limit,]
334 else:
335 query += " LIMIT %s"
336 args += [limit,]
337
338 entries = []
339 for entry in self.db.query(query, *args):
340 entry = logs.JobLogEntry(self.backend, entry)
341 entries.append(entry)
342
343 return entries
344
345 @property
346 def uuid(self):
347 return self.data.uuid
348
349 @property
350 def test(self):
351 return self.data.test
352
353 @property
354 def build_id(self):
355 return self.data.build_id
356
357 @lazy_property
358 def build(self):
359 return self.backend.builds.get_by_id(self.build_id)
360
361 @property
362 def related_jobs(self):
363 ret = []
364
365 for job in self.build.jobs:
366 if job == self:
367 continue
368
369 ret.append(job)
370
371 return ret
372
373 @property
374 def pkg(self):
375 return self.build.pkg
376
377 @property
378 def name(self):
379 return "%s-%s.%s" % (self.pkg.name, self.pkg.friendly_version, self.arch)
380
381 @property
382 def size(self):
383 return sum((p.size for p in self.packages))
384
385 @lazy_property
386 def rank(self):
387 """
388 Returns the rank in the build queue
389 """
390 if not self.state == "pending":
391 return
392
393 res = self.db.get("SELECT rank FROM jobs_queue WHERE job_id = %s", self.id)
394
395 if res:
396 return res.rank
397
398 def is_running(self):
399 """
400 Returns True if job is in a running state.
401 """
402 return self.state in ("pending", "dispatching", "running", "uploading")
403
404 def get_state(self):
405 return self.data.state
406
407 def set_state(self, state, user=None, log=True):
408 # Nothing to do if the state remains.
409 if not self.state == state:
410 self.db.execute("UPDATE jobs SET state = %s WHERE id = %s", state, self.id)
411
412 # Log the event.
413 if log and not state == "new":
414 self.log("state_change", state=state, user=user)
415
416 # Update cache.
417 if self._data:
418 self._data["state"] = state
419
420 # Always clear the message when the status is changed.
421 self.update_message(None)
422
423 # Update some more informations.
424 if state == "dispatching":
425 # Set start time.
426 self.db.execute("UPDATE jobs SET time_started = NOW(), time_finished = NULL \
427 WHERE id = %s", self.id)
428
429 elif state == "pending":
430 self.db.execute("UPDATE jobs SET tries = tries + 1, time_started = NULL, \
431 time_finished = NULL WHERE id = %s", self.id)
432
433 elif state in ("aborted", "dependency_error", "finished", "failed"):
434 # Set finish time and reset builder..
435 self.db.execute("UPDATE jobs SET time_finished = NOW() WHERE id = %s", self.id)
436
437 # Send messages to the user.
438 if state == "finished":
439 self.send_finished_message()
440
441 elif state == "failed":
442 # Remove all package files if a job is set to failed state.
443 self.__delete_packages()
444
445 self.send_failed_message()
446
447 # Automatically update the state of the build (not on test builds).
448 if not self.test:
449 self.build.auto_update_state()
450
451 state = property(get_state, set_state)
452
453 @property
454 def message(self):
455 return self.data.message
456
457 def update_message(self, msg):
458 self.db.execute("UPDATE jobs SET message = %s WHERE id = %s",
459 msg, self.id)
460
461 if self._data:
462 self._data["message"] = msg
463
464 def get_builder(self):
465 if self.data.builder_id:
466 return self.backend.builders.get_by_id(self.data.builder_id)
467
468 def set_builder(self, builder, user=None):
469 self.db.execute("UPDATE jobs SET builder_id = %s WHERE id = %s",
470 builder.id, self.id)
471
472 # Update cache.
473 if self._data:
474 self._data["builder_id"] = builder.id
475
476 self._builder = builder
477
478 # Log the event.
479 if user:
480 self.log("builder_assigned", builder=builder, user=user)
481
482 builder = lazy_property(get_builder, set_builder)
483
484 @property
485 def arch(self):
486 return self.data.arch
487
488 @property
489 def duration(self):
490 if not self.time_started:
491 return 0
492
493 if self.time_finished:
494 delta = self.time_finished - self.time_started
495 else:
496 delta = datetime.datetime.utcnow() - self.time_started
497
498 return delta.total_seconds()
499
500 @property
501 def time_created(self):
502 return self.data.time_created
503
504 @property
505 def time_started(self):
506 return self.data.time_started
507
508 @property
509 def time_finished(self):
510 return self.data.time_finished
511
512 @property
513 def expected_runtime(self):
514 """
515 Returns the estimated time and stddev, this job takes to finish.
516 """
517 # Get the average build time.
518 build_times = self.backend.builds.get_build_times_by_arch(self.arch,
519 name=self.pkg.name)
520
521 # If there is no statistical data, we cannot estimate anything.
522 if not build_times:
523 return None, None
524
525 return build_times.average, build_times.stddev
526
527 @property
528 def eta(self):
529 expected_runtime, stddev = self.expected_runtime
530
531 if expected_runtime:
532 return expected_runtime - int(self.duration), stddev
533
534 @property
535 def tries(self):
536 return self.data.tries
537
538 def get_pkg_by_uuid(self, uuid):
539 pkg = self.backend.packages._get_package("SELECT packages.id FROM packages \
540 JOIN jobs_packages ON jobs_packages.pkg_id = packages.id \
541 WHERE jobs_packages.job_id = %s AND packages.uuid = %s",
542 self.id, uuid)
543
544 if pkg:
545 pkg.job = self
546 return pkg
547
548 @lazy_property
549 def logfiles(self):
550 logfiles = []
551
552 for log in self.db.query("SELECT id FROM logfiles WHERE job_id = %s", self.id):
553 log = logs.LogFile(self.backend, log.id)
554 log._job = self
555
556 logfiles.append(log)
557
558 return logfiles
559
560 def add_file(self, filename):
561 """
562 Add the specified file to this job.
563
564 The file is copied to the right directory by this function.
565 """
566 assert os.path.exists(filename)
567
568 if filename.endswith(".log"):
569 self._add_file_log(filename)
570
571 elif filename.endswith(".%s" % PACKAGE_EXTENSION):
572 # It is not allowed to upload packages on test builds.
573 if self.test:
574 return
575
576 self._add_file_package(filename)
577
578 def _add_file_log(self, filename):
579 """
580 Attach a log file to this job.
581 """
582 target_dirname = os.path.join(self.build.path, "logs")
583
584 if self.test:
585 i = 1
586 while True:
587 target_filename = os.path.join(target_dirname,
588 "test.%s.%s.%s.log" % (self.arch, i, self.tries))
589
590 if os.path.exists(target_filename):
591 i += 1
592 else:
593 break
594 else:
595 target_filename = os.path.join(target_dirname,
596 "build.%s.%s.log" % (self.arch, self.tries))
597
598 # Make sure the target directory exists.
599 if not os.path.exists(target_dirname):
600 os.makedirs(target_dirname)
601
602 # Calculate a SHA512 hash from that file.
603 f = open(filename, "rb")
604 h = hashlib.sha512()
605 while True:
606 buf = f.read(BUFFER_SIZE)
607 if not buf:
608 break
609
610 h.update(buf)
611 f.close()
612
613 # Copy the file to the final location.
614 shutil.copy2(filename, target_filename)
615
616 # Create an entry in the database.
617 self.db.execute("INSERT INTO logfiles(job_id, path, filesize, hash_sha512) \
618 VALUES(%s, %s, %s, %s)", self.id, os.path.relpath(target_filename, PACKAGES_DIR),
619 os.path.getsize(target_filename), h.hexdigest())
620
621 def _add_file_package(self, filename):
622 # Open package (creates entry in the database)
623 pkg = self.backend.packages.create(filename)
624
625 # Move package to the build directory.
626 pkg.move(os.path.join(self.build.path, self.arch))
627
628 # Attach the package to this job.
629 self.db.execute("INSERT INTO jobs_packages(job_id, pkg_id) VALUES(%s, %s)",
630 self.id, pkg.id)
631
632 def get_aborted_state(self):
633 return self.data.aborted_state
634
635 def set_aborted_state(self, state):
636 self._set_attribute("aborted_state", state)
637
638 aborted_state = property(get_aborted_state, set_aborted_state)
639
640 @property
641 def message_recipients(self):
642 l = []
643
644 # Add all people watching the build.
645 l += self.build.message_recipients
646
647 # Add the package maintainer on release builds.
648 if self.build.type == "release":
649 maint = self.pkg.maintainer
650
651 if isinstance(maint, users.User):
652 l.append("%s <%s>" % (maint.realname, maint.email))
653 elif maint:
654 l.append(maint)
655
656 # XXX add committer and commit author.
657
658 # Add the owner of the scratch build on scratch builds.
659 elif self.build.type == "scratch" and self.build.user:
660 l.append("%s <%s>" % \
661 (self.build.user.realname, self.build.user.email))
662
663 return set(l)
664
665 def save_buildroot(self, pkgs):
666 rows = []
667
668 for pkg_name, pkg_uuid in pkgs:
669 rows.append((self.id, self.tries, pkg_uuid, pkg_name))
670
671 # Cleanup old stuff first (for rebuilding packages).
672 self.db.execute("DELETE FROM jobs_buildroots WHERE job_id = %s AND tries = %s",
673 self.id, self.tries)
674
675 self.db.executemany("INSERT INTO \
676 jobs_buildroots(job_id, tries, pkg_uuid, pkg_name) \
677 VALUES(%s, %s, %s, %s)", rows)
678
679 def has_buildroot(self, tries=None):
680 if tries is None:
681 tries = self.tries
682
683 res = self.db.get("SELECT COUNT(*) AS num FROM jobs_buildroots \
684 WHERE jobs_buildroots.job_id = %s AND jobs_buildroots.tries = %s",
685 self.id, tries)
686
687 if res:
688 return res.num
689
690 return 0
691
692 def get_buildroot(self, tries=None):
693 if tries is None:
694 tries = self.tries
695
696 rows = self.db.query("SELECT * FROM jobs_buildroots \
697 WHERE jobs_buildroots.job_id = %s AND jobs_buildroots.tries = %s \
698 ORDER BY pkg_name", self.id, tries)
699
700 pkgs = []
701 for row in rows:
702 # Search for this package in the packages table.
703 pkg = self.backend.packages.get_by_uuid(row.pkg_uuid)
704 pkgs.append((row.pkg_name, row.pkg_uuid, pkg))
705
706 return pkgs
707
708 def send_finished_message(self):
709 # Send no finished mails for test jobs.
710 if self.test:
711 return
712
713 logging.debug("Sending finished message for job %s to %s" % \
714 (self.name, ", ".join(self.message_recipients)))
715
716 info = {
717 "build_name" : self.name,
718 "build_host" : self.builder.name,
719 "build_uuid" : self.uuid,
720 }
721
722 self.backend.messages.send_to_all(self.message_recipients,
723 MSG_BUILD_FINISHED_SUBJECT, MSG_BUILD_FINISHED, info)
724
725 def send_failed_message(self):
726 logging.debug("Sending failed message for job %s to %s" % \
727 (self.name, ", ".join(self.message_recipients)))
728
729 build_host = "--"
730 if self.builder:
731 build_host = self.builder.name
732
733 info = {
734 "build_name" : self.name,
735 "build_host" : build_host,
736 "build_uuid" : self.uuid,
737 }
738
739 self.backend.messages.send_to_all(self.message_recipients,
740 MSG_BUILD_FAILED_SUBJECT, MSG_BUILD_FAILED, info)
741
742 def set_start_time(self, start_not_before):
743 self._set_attribute("start_not_before", start_not_before)
744
745 def schedule(self, type, start_time=None, user=None):
746 assert type in ("rebuild", "test")
747
748 if type == "rebuild":
749 if self.state == "finished":
750 return
751
752 self.set_state("new", user=user, log=False)
753 self.set_start_time(start_time)
754
755 # Log the event.
756 self.log("schedule_rebuild", user=user)
757
758 elif type == "test":
759 if not self.state == "finished":
760 return
761
762 # Create a new job with same build and arch.
763 job = self.create(self.backend, self.build, self.arch, type="test")
764 job.set_start_time(start_time)
765
766 # Log the event.
767 self.log("schedule_test_job", test_job=job, user=user)
768
769 return job
770
771 def schedule_test(self, start_not_before=None, user=None):
772 # XXX to be removed
773 return self.schedule("test", start_time=start_not_before, user=user)
774
775 def schedule_rebuild(self, start_not_before=None, user=None):
776 # XXX to be removed
777 return self.schedule("rebuild", start_time=start_not_before, user=user)
778
779 def get_build_repos(self):
780 """
781 Returns a list of all repositories that should be used when
782 building this job.
783 """
784 repo_ids = self.db.query("SELECT repo_id FROM jobs_repos WHERE job_id = %s",
785 self.id)
786
787 if not repo_ids:
788 return self.distro.get_build_repos()
789
790 repos = []
791 for repo in self.distro.repositories:
792 if repo.id in [r.id for r in repo_ids]:
793 repos.append(repo)
794
795 return repos or self.distro.get_build_repos()
796
797 def get_repo_config(self):
798 """
799 Get repository configuration file that is sent to the builder.
800 """
801 confs = []
802
803 for repo in self.get_build_repos():
804 confs.append(repo.get_conf())
805
806 return "\n\n".join(confs)
807
808 def get_config(self):
809 """
810 Get configuration file that is sent to the builder.
811 """
812 confs = []
813
814 # Add the distribution configuration.
815 confs.append(self.distro.get_config())
816
817 # Then add all repositories for this build.
818 confs.append(self.get_repo_config())
819
820 return "\n\n".join(confs)
821
822 def resolvdep(self):
823 config = pakfire.config.Config(files=["general.conf"])
824 config.parse(self.get_config())
825
826 # The filename of the source file.
827 filename = os.path.join(PACKAGES_DIR, self.build.pkg.path)
828 assert os.path.exists(filename), filename
829
830 # Create a new pakfire instance with the configuration for
831 # this build.
832 p = pakfire.PakfireServer(config=config, arch=self.arch)
833
834 # Try to solve the build dependencies.
835 try:
836 solver = p.resolvdep(filename)
837
838 # Catch dependency errors and log the problem string.
839 except DependencyError, e:
840 self.state = "dependency_error"
841 self.update_message(e)
842
843 else:
844 # If the build dependencies can be resolved, we set the build in
845 # pending state.
846 if solver.status is True:
847 if self.state in ("failed",):
848 return
849
850 self.state = "pending"