]> git.ipfire.org Git - people/jschlag/pbs.git/blob - src/buildservice/jobs.py
builds: Drop functionality to reset a build
[people/jschlag/pbs.git] / src / buildservice / jobs.py
1 #!/usr/bin/python
2
3 import datetime
4 import hashlib
5 import logging
6 import os
7 import shutil
8 import uuid
9
10 import pakfire
11 import pakfire.config
12
13 log = logging.getLogger("builds")
14 log.propagate = 1
15
16 from . import arches
17 from . import base
18 from . import logs
19 from . import users
20
21 from .constants import *
22 from .decorators import *
23
24 class Jobs(base.Object):
25 def _get_job(self, query, *args):
26 res = self.db.get(query, *args)
27
28 if res:
29 return Job(self.backend, res.id, data=res)
30
31 def _get_jobs(self, query, *args):
32 res = self.db.query(query, *args)
33
34 for row in res:
35 yield Job(self.backend, row.id, data=row)
36
37 def create(self, build, arch, type="build", superseeds=None):
38 job = self._get_job("INSERT INTO jobs(uuid, type, build_id, arch, time_created) \
39 VALUES(%s, %s, %s, %s, NOW()) RETURNING *", "%s" % uuid.uuid4(), type, build.id, arch)
40 job.log("created")
41
42 # Set cache for Build object.
43 job.build = build
44
45 # Mark if the new job superseeds some other job
46 if superseeds:
47 superseeds.superseeded_by = job
48
49 # Jobs are by default in state "new" and wait for being checked
50 # for dependencies. Packages that do have no build dependencies
51 # can directly be forwarded to "pending" state.
52 if not job.pkg.requires:
53 job.state = "pending"
54
55 return job
56
57 def get_by_id(self, id, data=None):
58 return Job(self.backend, id, data)
59
60 def get_by_uuid(self, uuid):
61 job = self.db.get("SELECT id FROM jobs WHERE uuid = %s", uuid)
62
63 if job:
64 return self.get_by_id(job.id)
65
66 def get_by_build(self, build_id, build=None, type=None):
67 """
68 Get all jobs in the specifies build.
69 """
70 query = "SELECT * FROM jobs WHERE build_id = %s"
71 args = [build_id,]
72
73 if type:
74 query += " AND type = %s"
75 args.append(type)
76
77 # Get IDs of all builds in this group.
78 jobs = []
79 for job in self.db.query(query, *args):
80 job = Job(self.backend, job.id, job)
81
82 # If the Build object was set, we set it so it won't be retrieved
83 # from the database again.
84 if build:
85 job._build = build
86
87 jobs.append(job)
88
89 # Return sorted list of jobs.
90 return sorted(jobs)
91
92 def get_active(self, host_id=None, builder=None, states=None):
93 if builder:
94 host_id = builder.id
95
96 if states is None:
97 states = ["dispatching", "running", "uploading"]
98
99 query = "SELECT * FROM jobs WHERE state IN (%s)" % ", ".join(["%s"] * len(states))
100 args = states
101
102 if host_id:
103 query += " AND builder_id = %s" % host_id
104
105 query += " ORDER BY \
106 CASE \
107 WHEN jobs.state = 'running' THEN 0 \
108 WHEN jobs.state = 'uploading' THEN 1 \
109 WHEN jobs.state = 'dispatching' THEN 2 \
110 WHEN jobs.state = 'pending' THEN 3 \
111 WHEN jobs.state = 'new' THEN 4 \
112 END, time_started ASC"
113
114 return [Job(self.backend, j.id, j) for j in self.db.query(query, *args)]
115
116 def get_latest(self, arch=None, builder=None, limit=None, age=None, date=None):
117 query = "SELECT * FROM jobs"
118 args = []
119
120 where = ["(state = 'finished' OR state = 'failed' OR state = 'aborted')"]
121
122 if arch:
123 where.append("arch = %s")
124 args.append(arch)
125
126 if builder:
127 where.append("builder_id = %s")
128 args.append(builder.id)
129
130 if date:
131 try:
132 year, month, day = date.split("-", 2)
133 date = datetime.date(int(year), int(month), int(day))
134 except ValueError:
135 pass
136 else:
137 where.append("(time_created::date = %s OR \
138 time_started::date = %s OR time_finished::date = %s)")
139 args += (date, date, date)
140
141 if age:
142 where.append("time_finished >= NOW() - '%s'::interval" % age)
143
144 if where:
145 query += " WHERE %s" % " AND ".join(where)
146
147 query += " ORDER BY time_finished DESC"
148
149 if limit:
150 query += " LIMIT %s"
151 args.append(limit)
152
153 return [Job(self.backend, j.id, j) for j in self.db.query(query, *args)]
154
155 def get_average_build_time(self):
156 """
157 Returns the average build time of all finished builds from the
158 last 3 months.
159 """
160 result = self.db.get("SELECT AVG(time_finished - time_started) as average \
161 FROM jobs WHERE type = 'build' AND state = 'finished' AND \
162 time_finished >= NOW() - '3 months'::interval")
163
164 if result:
165 return result.average
166
167 def count(self, *states):
168 query = "SELECT COUNT(*) AS count FROM jobs"
169 args = []
170
171 if states:
172 query += " WHERE state IN %s"
173 args.append(states)
174
175 jobs = self.db.get(query, *args)
176 if jobs:
177 return jobs.count
178
179 def restart_failed(self):
180 jobs = self._get_jobs("SELECT jobs.* FROM jobs \
181 JOIN builds ON builds.id = jobs.build_id \
182 WHERE \
183 jobs.type = 'build' AND \
184 jobs.state = 'failed' AND \
185 NOT builds.state = 'broken' AND \
186 jobs.time_finished < NOW() - '72 hours'::interval \
187 ORDER BY \
188 CASE \
189 WHEN jobs.type = 'build' THEN 0 \
190 WHEN jobs.type = 'test' THEN 1 \
191 END, \
192 builds.priority DESC, jobs.time_created ASC")
193
194 # Restart the job
195 for job in jobs:
196 job.set_state("new", log=False)
197
198
199 class Job(base.DataObject):
200 table = "jobs"
201
202 def __str__(self):
203 return "<%s id=%s %s>" % (self.__class__.__name__, self.id, self.name)
204
205 def __eq__(self, other):
206 if isinstance(other, self.__class__):
207 return self.id == other.id
208
209 def __lt__(self, other):
210 if isinstance(other, self.__class__):
211 if not self.test and other.test:
212 return True
213
214 if self.build == other.build:
215 return arches.priority(self.arch) < arches.priority(other.arch)
216
217 return self.time_created < other.time_created
218
219 def __iter__(self):
220 packages = self.backend.packages._get_packages("SELECT packages.* FROM jobs_packages \
221 LEFT JOIN packages ON jobs_packages.pkg_id = packages.id \
222 WHERE jobs_packages.job_id = %s ORDER BY packages.name", self.id)
223
224 return iter(packages)
225
226 def __nonzero__(self):
227 return True
228
229 def __len__(self):
230 res = self.db.get("SELECT COUNT(*) AS len FROM jobs_packages \
231 WHERE job_id = %s", self.id)
232
233 return res.len
234
235 @property
236 def distro(self):
237 return self.build.distro
238
239 def get_superseeded_by(self):
240 if self.data.superseeded_by:
241 return self.backend.jobs.get_by_id(self.data.superseeded_by)
242
243 def set_superseeded_by(self, superseeded_by):
244 assert isinstance(superseeded_by, self.__class__)
245
246 self._set_attribute("superseeded_by", superseeded_by.id)
247 self.superseeded_by = superseeded_by
248
249 superseeded_by = lazy_property(get_superseeded_by, set_superseeded_by)
250
251 def delete(self):
252 self._set_attribute("delete", True)
253
254 def remove(self):
255 """
256 Removes a job from the database
257 """
258 self.__remove_buildroots()
259 self.__remove_history()
260 self.__remove_packages()
261 self.__remove_logfiles()
262
263 # Delete the job itself.
264 self.db.execute("DELETE FROM jobs WHERE id = %s", self.id)
265
266 def __remove_buildroots(self):
267 """
268 Removes all buildroots.
269 """
270 self.db.execute("DELETE FROM jobs_buildroots WHERE job_id = %s", self.id)
271
272 def __remove_history(self):
273 """
274 Removes all references in the history to this build job.
275 """
276 self.db.execute("DELETE FROM jobs_history WHERE job_id = %s", self.id)
277
278 def __remove_packages(self):
279 """
280 Deletes all uploaded files from the job.
281 """
282 for pkg in self.packages:
283 pkg.delete()
284
285 self.db.execute("DELETE FROM jobs_packages WHERE job_id = %s", self.id)
286
287 def __remove_logfiles(self):
288 for logfile in self.logfiles:
289 self.db.execute("INSERT INTO queue_delete(path) VALUES(%s)", logfile.path)
290
291 ## Logging stuff
292
293 def log(self, action, user=None, state=None, builder=None, test_job=None):
294 user_id = None
295 if user:
296 user_id = user.id
297
298 builder_id = None
299 if builder:
300 builder_id = builder.id
301
302 test_job_id = None
303 if test_job:
304 test_job_id = test_job.id
305
306 self.db.execute("INSERT INTO jobs_history(job_id, action, state, user_id, \
307 time, builder_id, test_job_id) VALUES(%s, %s, %s, %s, NOW(), %s, %s)",
308 self.id, action, state, user_id, builder_id, test_job_id)
309
310 def get_log(self, limit=None, offset=None, user=None):
311 query = "SELECT * FROM jobs_history"
312
313 conditions = ["job_id = %s",]
314 args = [self.id,]
315
316 if user:
317 conditions.append("user_id = %s")
318 args.append(user.id)
319
320 if conditions:
321 query += " WHERE %s" % " AND ".join(conditions)
322
323 query += " ORDER BY time DESC"
324
325 if limit:
326 if offset:
327 query += " LIMIT %s,%s"
328 args += [offset, limit,]
329 else:
330 query += " LIMIT %s"
331 args += [limit,]
332
333 entries = []
334 for entry in self.db.query(query, *args):
335 entry = logs.JobLogEntry(self.backend, entry)
336 entries.append(entry)
337
338 return entries
339
340 @property
341 def uuid(self):
342 return self.data.uuid
343
344 @property
345 def test(self):
346 return self.data.test
347
348 @property
349 def build_id(self):
350 return self.data.build_id
351
352 @lazy_property
353 def build(self):
354 return self.backend.builds.get_by_id(self.build_id)
355
356 @property
357 def related_jobs(self):
358 ret = []
359
360 for job in self.build.jobs:
361 if job == self:
362 continue
363
364 ret.append(job)
365
366 return ret
367
368 @property
369 def pkg(self):
370 return self.build.pkg
371
372 @property
373 def name(self):
374 return "%s-%s.%s" % (self.pkg.name, self.pkg.friendly_version, self.arch)
375
376 @property
377 def size(self):
378 return sum((p.size for p in self.packages))
379
380 @lazy_property
381 def rank(self):
382 """
383 Returns the rank in the build queue
384 """
385 if not self.state == "pending":
386 return
387
388 res = self.db.get("SELECT rank FROM jobs_queue WHERE job_id = %s", self.id)
389
390 if res:
391 return res.rank
392
393 def is_running(self):
394 """
395 Returns True if job is in a running state.
396 """
397 return self.state in ("pending", "dispatching", "running", "uploading")
398
399 def get_state(self):
400 return self.data.state
401
402 def set_state(self, state, user=None, log=True):
403 # Nothing to do if the state remains.
404 if not self.state == state:
405 self.db.execute("UPDATE jobs SET state = %s WHERE id = %s", state, self.id)
406
407 # Log the event.
408 if log and not state == "new":
409 self.log("state_change", state=state, user=user)
410
411 # Update cache.
412 if self._data:
413 self._data["state"] = state
414
415 # Always clear the message when the status is changed.
416 self.update_message(None)
417
418 # Update some more informations.
419 if state == "dispatching":
420 # Set start time.
421 self.db.execute("UPDATE jobs SET time_started = NOW(), time_finished = NULL \
422 WHERE id = %s", self.id)
423
424 elif state == "pending":
425 self.db.execute("UPDATE jobs SET time_started = NULL, \
426 time_finished = NULL WHERE id = %s", self.id)
427
428 elif state in ("aborted", "dependency_error", "finished", "failed"):
429 # Set finish time and reset builder..
430 self.db.execute("UPDATE jobs SET time_finished = NOW() WHERE id = %s", self.id)
431
432 # Send messages to the user.
433 if state == "finished":
434 self.send_finished_message()
435
436 elif state == "failed":
437 # Remove all package files if a job is set to failed state.
438 self.__delete_packages()
439
440 self.send_failed_message()
441
442 # Automatically update the state of the build (not on test builds).
443 if not self.test:
444 self.build.auto_update_state()
445
446 state = property(get_state, set_state)
447
448 @property
449 def message(self):
450 return self.data.message
451
452 def update_message(self, msg):
453 self.db.execute("UPDATE jobs SET message = %s WHERE id = %s",
454 msg, self.id)
455
456 if self._data:
457 self._data["message"] = msg
458
459 def get_builder(self):
460 if self.data.builder_id:
461 return self.backend.builders.get_by_id(self.data.builder_id)
462
463 def set_builder(self, builder, user=None):
464 self.db.execute("UPDATE jobs SET builder_id = %s WHERE id = %s",
465 builder.id, self.id)
466
467 # Update cache.
468 if self._data:
469 self._data["builder_id"] = builder.id
470
471 self._builder = builder
472
473 # Log the event.
474 if user:
475 self.log("builder_assigned", builder=builder, user=user)
476
477 builder = lazy_property(get_builder, set_builder)
478
479 @property
480 def arch(self):
481 return self.data.arch
482
483 @property
484 def duration(self):
485 if not self.time_started:
486 return 0
487
488 if self.time_finished:
489 delta = self.time_finished - self.time_started
490 else:
491 delta = datetime.datetime.utcnow() - self.time_started
492
493 return delta.total_seconds()
494
495 @property
496 def time_created(self):
497 return self.data.time_created
498
499 @property
500 def time_started(self):
501 return self.data.time_started
502
503 @property
504 def time_finished(self):
505 return self.data.time_finished
506
507 @property
508 def expected_runtime(self):
509 """
510 Returns the estimated time and stddev, this job takes to finish.
511 """
512 # Get the average build time.
513 build_times = self.backend.builds.get_build_times_by_arch(self.arch,
514 name=self.pkg.name)
515
516 # If there is no statistical data, we cannot estimate anything.
517 if not build_times:
518 return None, None
519
520 return build_times.average, build_times.stddev
521
522 @property
523 def eta(self):
524 expected_runtime, stddev = self.expected_runtime
525
526 if expected_runtime:
527 return expected_runtime - int(self.duration), stddev
528
529 def get_pkg_by_uuid(self, uuid):
530 pkg = self.backend.packages._get_package("SELECT packages.id FROM packages \
531 JOIN jobs_packages ON jobs_packages.pkg_id = packages.id \
532 WHERE jobs_packages.job_id = %s AND packages.uuid = %s",
533 self.id, uuid)
534
535 if pkg:
536 pkg.job = self
537 return pkg
538
539 @lazy_property
540 def logfiles(self):
541 logfiles = []
542
543 for log in self.db.query("SELECT id FROM logfiles WHERE job_id = %s", self.id):
544 log = logs.LogFile(self.backend, log.id)
545 log._job = self
546
547 logfiles.append(log)
548
549 return logfiles
550
551 def add_file(self, filename):
552 """
553 Add the specified file to this job.
554
555 The file is copied to the right directory by this function.
556 """
557 assert os.path.exists(filename)
558
559 if filename.endswith(".log"):
560 self._add_file_log(filename)
561
562 elif filename.endswith(".%s" % PACKAGE_EXTENSION):
563 # It is not allowed to upload packages on test builds.
564 if self.test:
565 return
566
567 self._add_file_package(filename)
568
569 def _add_file_log(self, filename):
570 """
571 Attach a log file to this job.
572 """
573 target_dirname = os.path.join(self.build.path, "logs")
574
575 if self.test:
576 i = 1
577 while True:
578 target_filename = os.path.join(target_dirname,
579 "test.%s.%s.%s.log" % (self.arch, i, self.uuid))
580
581 if os.path.exists(target_filename):
582 i += 1
583 else:
584 break
585 else:
586 target_filename = os.path.join(target_dirname,
587 "build.%s.%s.log" % (self.arch, self.uuid))
588
589 # Make sure the target directory exists.
590 if not os.path.exists(target_dirname):
591 os.makedirs(target_dirname)
592
593 # Calculate a SHA512 hash from that file.
594 f = open(filename, "rb")
595 h = hashlib.sha512()
596 while True:
597 buf = f.read(BUFFER_SIZE)
598 if not buf:
599 break
600
601 h.update(buf)
602 f.close()
603
604 # Copy the file to the final location.
605 shutil.copy2(filename, target_filename)
606
607 # Create an entry in the database.
608 self.db.execute("INSERT INTO logfiles(job_id, path, filesize, hash_sha512) \
609 VALUES(%s, %s, %s, %s)", self.id, os.path.relpath(target_filename, PACKAGES_DIR),
610 os.path.getsize(target_filename), h.hexdigest())
611
612 def _add_file_package(self, filename):
613 # Open package (creates entry in the database)
614 pkg = self.backend.packages.create(filename)
615
616 # Move package to the build directory.
617 pkg.move(os.path.join(self.build.path, self.arch))
618
619 # Attach the package to this job.
620 self.db.execute("INSERT INTO jobs_packages(job_id, pkg_id) VALUES(%s, %s)",
621 self.id, pkg.id)
622
623 def get_aborted_state(self):
624 return self.data.aborted_state
625
626 def set_aborted_state(self, state):
627 self._set_attribute("aborted_state", state)
628
629 aborted_state = property(get_aborted_state, set_aborted_state)
630
631 @property
632 def message_recipients(self):
633 l = []
634
635 # Add all people watching the build.
636 l += self.build.message_recipients
637
638 # Add the package maintainer on release builds.
639 if self.build.type == "release":
640 maint = self.pkg.maintainer
641
642 if isinstance(maint, users.User):
643 l.append("%s <%s>" % (maint.realname, maint.email))
644 elif maint:
645 l.append(maint)
646
647 # XXX add committer and commit author.
648
649 # Add the owner of the scratch build on scratch builds.
650 elif self.build.type == "scratch" and self.build.user:
651 l.append("%s <%s>" % \
652 (self.build.user.realname, self.build.user.email))
653
654 return set(l)
655
656 def save_buildroot(self, pkgs):
657 # Cleanup old stuff first (for rebuilding packages)
658 self.db.execute("DELETE FROM jobs_buildroots WHERE job_id = %s", self.id)
659
660 for pkg_name, pkg_uuid in pkgs:
661 self.db.execute("INSERT INTO jobs_buildroots(job_id, pkg_uuid, pkg_name) \
662 VALUES(%s, %s, %s)", self.id, pkg_name, pkg_uuid)
663
664 @lazy_property
665 def buildroot(self):
666 rows = self.db.query("SELECT * FROM jobs_buildroots \
667 WHERE jobs_buildroots.job_id = %s ORDER BY pkg_name", self.id)
668
669 pkgs = []
670 for row in rows:
671 # Search for this package in the packages table.
672 pkg = self.backend.packages.get_by_uuid(row.pkg_uuid)
673 pkgs.append((row.pkg_name, row.pkg_uuid, pkg))
674
675 return pkgs
676
677 def send_finished_message(self):
678 # Send no finished mails for test jobs.
679 if self.test:
680 return
681
682 logging.debug("Sending finished message for job %s to %s" % \
683 (self.name, ", ".join(self.message_recipients)))
684
685 info = {
686 "build_name" : self.name,
687 "build_host" : self.builder.name,
688 "build_uuid" : self.uuid,
689 }
690
691 self.backend.messages.send_to_all(self.message_recipients,
692 MSG_BUILD_FINISHED_SUBJECT, MSG_BUILD_FINISHED, info)
693
694 def send_failed_message(self):
695 logging.debug("Sending failed message for job %s to %s" % \
696 (self.name, ", ".join(self.message_recipients)))
697
698 build_host = "--"
699 if self.builder:
700 build_host = self.builder.name
701
702 info = {
703 "build_name" : self.name,
704 "build_host" : build_host,
705 "build_uuid" : self.uuid,
706 }
707
708 self.backend.messages.send_to_all(self.message_recipients,
709 MSG_BUILD_FAILED_SUBJECT, MSG_BUILD_FAILED, info)
710
711 def set_start_time(self, start_not_before):
712 self._set_attribute("start_not_before", start_not_before)
713
714 def schedule(self, type, start_time=None, user=None):
715 assert type in ("rebuild", "test")
716
717 if type == "rebuild":
718 if self.state == "finished":
719 return
720
721 self.set_state("new", user=user, log=False)
722 self.set_start_time(start_time)
723
724 # Log the event.
725 self.log("schedule_rebuild", user=user)
726
727 elif type == "test":
728 if not self.state == "finished":
729 return
730
731 # Create a new job with same build and arch.
732 job = self.create(self.backend, self.build, self.arch, type="test")
733 job.set_start_time(start_time)
734
735 # Log the event.
736 self.log("schedule_test_job", test_job=job, user=user)
737
738 return job
739
740 def schedule_test(self, start_not_before=None, user=None):
741 # XXX to be removed
742 return self.schedule("test", start_time=start_not_before, user=user)
743
744 def schedule_rebuild(self, start_not_before=None, user=None):
745 # XXX to be removed
746 return self.schedule("rebuild", start_time=start_not_before, user=user)
747
748 def get_build_repos(self):
749 """
750 Returns a list of all repositories that should be used when
751 building this job.
752 """
753 repo_ids = self.db.query("SELECT repo_id FROM jobs_repos WHERE job_id = %s",
754 self.id)
755
756 if not repo_ids:
757 return self.distro.get_build_repos()
758
759 repos = []
760 for repo in self.distro.repositories:
761 if repo.id in [r.id for r in repo_ids]:
762 repos.append(repo)
763
764 return repos or self.distro.get_build_repos()
765
766 def get_repo_config(self):
767 """
768 Get repository configuration file that is sent to the builder.
769 """
770 confs = []
771
772 for repo in self.get_build_repos():
773 confs.append(repo.get_conf())
774
775 return "\n\n".join(confs)
776
777 def get_config(self):
778 """
779 Get configuration file that is sent to the builder.
780 """
781 confs = []
782
783 # Add the distribution configuration.
784 confs.append(self.distro.get_config())
785
786 # Then add all repositories for this build.
787 confs.append(self.get_repo_config())
788
789 return "\n\n".join(confs)
790
791 def resolvdep(self):
792 config = pakfire.config.Config(files=["general.conf"])
793 config.parse(self.get_config())
794
795 # The filename of the source file.
796 filename = os.path.join(PACKAGES_DIR, self.build.pkg.path)
797 assert os.path.exists(filename), filename
798
799 # Create a new pakfire instance with the configuration for
800 # this build.
801 p = pakfire.PakfireServer(config=config, arch=self.arch)
802
803 # Try to solve the build dependencies.
804 try:
805 solver = p.resolvdep(filename)
806
807 # Catch dependency errors and log the problem string.
808 except DependencyError, e:
809 self.state = "dependency_error"
810 self.update_message(e)
811
812 else:
813 # If the build dependencies can be resolved, we set the build in
814 # pending state.
815 if solver.status is True:
816 if self.state in ("failed",):
817 return
818
819 self.state = "pending"