]> git.ipfire.org Git - people/jschlag/pbs.git/blob - src/buildservice/jobs.py
c87537894870e95751c4cec7e347dced00b13f4f
[people/jschlag/pbs.git] / src / buildservice / jobs.py
1 #!/usr/bin/python
2
3 import datetime
4 import hashlib
5 import logging
6 import os
7 import shutil
8 import uuid
9
10 import pakfire
11 import pakfire.config
12
13 log = logging.getLogger("builds")
14 log.propagate = 1
15
16 from . import arches
17 from . import base
18 from . import logs
19 from . import users
20
21 from .constants import *
22 from .decorators import *
23
24 class Jobs(base.Object):
25 def _get_job(self, query, *args):
26 res = self.db.get(query, *args)
27
28 if res:
29 return Job(self.backend, res.id, data=res)
30
31 def _get_jobs(self, query, *args):
32 res = self.db.query(query, *args)
33
34 for row in res:
35 yield Job(self.backend, row.id, data=row)
36
37 def create(self, build, arch, test=False, superseeds=None):
38 job = self._get_job("INSERT INTO jobs(uuid, build_id, arch, test) \
39 VALUES(%s, %s, %s, %s) RETURNING *", "%s" % uuid.uuid4(), build.id, arch, test)
40 job.log("created")
41
42 # Set cache for Build object.
43 job.build = build
44
45 # Mark if the new job superseeds some other job
46 if superseeds:
47 superseeds.superseeded_by = job
48
49 # Jobs are by default in state "new" and wait for being checked
50 # for dependencies. Packages that do have no build dependencies
51 # can directly be forwarded to "pending" state.
52 if not job.pkg.requires:
53 job.state = "pending"
54
55 return job
56
57 def get_by_id(self, id, data=None):
58 return Job(self.backend, id, data)
59
60 def get_by_uuid(self, uuid):
61 job = self.db.get("SELECT id FROM jobs WHERE uuid = %s", uuid)
62
63 if job:
64 return self.get_by_id(job.id)
65
66 def get_by_build(self, build_id, build=None, type=None):
67 """
68 Get all jobs in the specifies build.
69 """
70 query = "SELECT * FROM jobs WHERE build_id = %s"
71 args = [build_id,]
72
73 if type:
74 query += " AND type = %s"
75 args.append(type)
76
77 # Get IDs of all builds in this group.
78 jobs = []
79 for job in self.db.query(query, *args):
80 job = Job(self.backend, job.id, job)
81
82 # If the Build object was set, we set it so it won't be retrieved
83 # from the database again.
84 if build:
85 job._build = build
86
87 jobs.append(job)
88
89 # Return sorted list of jobs.
90 return sorted(jobs)
91
92 def get_active(self, host_id=None, builder=None, states=None):
93 if builder:
94 host_id = builder.id
95
96 if states is None:
97 states = ["dispatching", "running", "uploading"]
98
99 query = "SELECT * FROM jobs WHERE state IN (%s)" % ", ".join(["%s"] * len(states))
100 args = states
101
102 if host_id:
103 query += " AND builder_id = %s" % host_id
104
105 query += " ORDER BY \
106 CASE \
107 WHEN jobs.state = 'running' THEN 0 \
108 WHEN jobs.state = 'uploading' THEN 1 \
109 WHEN jobs.state = 'dispatching' THEN 2 \
110 WHEN jobs.state = 'pending' THEN 3 \
111 WHEN jobs.state = 'new' THEN 4 \
112 END, time_started ASC"
113
114 return [Job(self.backend, j.id, j) for j in self.db.query(query, *args)]
115
116 def get_latest(self, arch=None, builder=None, limit=None, age=None, date=None):
117 query = "SELECT * FROM jobs"
118 args = []
119
120 where = ["(state = 'finished' OR state = 'failed' OR state = 'aborted')"]
121
122 if arch:
123 where.append("arch = %s")
124 args.append(arch)
125
126 if builder:
127 where.append("builder_id = %s")
128 args.append(builder.id)
129
130 if date:
131 try:
132 year, month, day = date.split("-", 2)
133 date = datetime.date(int(year), int(month), int(day))
134 except ValueError:
135 pass
136 else:
137 where.append("(time_created::date = %s OR \
138 time_started::date = %s OR time_finished::date = %s)")
139 args += (date, date, date)
140
141 if age:
142 where.append("time_finished >= NOW() - '%s'::interval" % age)
143
144 if where:
145 query += " WHERE %s" % " AND ".join(where)
146
147 query += " ORDER BY time_finished DESC"
148
149 if limit:
150 query += " LIMIT %s"
151 args.append(limit)
152
153 return [Job(self.backend, j.id, j) for j in self.db.query(query, *args)]
154
155 def get_average_build_time(self):
156 """
157 Returns the average build time of all finished builds from the
158 last 3 months.
159 """
160 result = self.db.get("SELECT AVG(time_finished - time_started) as average \
161 FROM jobs WHERE type = 'build' AND state = 'finished' AND \
162 time_finished >= NOW() - '3 months'::interval")
163
164 if result:
165 return result.average
166
167 def count(self, *states):
168 query = "SELECT COUNT(*) AS count FROM jobs"
169 args = []
170
171 if states:
172 query += " WHERE state IN %s"
173 args.append(states)
174
175 jobs = self.db.get(query, *args)
176 if jobs:
177 return jobs.count
178
179 def restart_failed(self):
180 jobs = self._get_jobs("SELECT jobs.* FROM jobs \
181 JOIN builds ON builds.id = jobs.build_id \
182 WHERE \
183 jobs.type = 'build' AND \
184 jobs.state = 'failed' AND \
185 NOT builds.state = 'broken' AND \
186 jobs.time_finished < NOW() - '72 hours'::interval \
187 ORDER BY \
188 CASE \
189 WHEN jobs.type = 'build' THEN 0 \
190 WHEN jobs.type = 'test' THEN 1 \
191 END, \
192 builds.priority DESC, jobs.time_created ASC")
193
194 # Restart the job
195 for job in jobs:
196 job.restart()
197
198
199 class Job(base.DataObject):
200 table = "jobs"
201
202 def __str__(self):
203 return "<%s id=%s %s>" % (self.__class__.__name__, self.id, self.name)
204
205 def __eq__(self, other):
206 if isinstance(other, self.__class__):
207 return self.id == other.id
208
209 def __lt__(self, other):
210 if isinstance(other, self.__class__):
211 if not self.test and other.test:
212 return True
213
214 if self.build == other.build:
215 return arches.priority(self.arch) < arches.priority(other.arch)
216
217 return self.time_created < other.time_created
218
219 def __iter__(self):
220 packages = self.backend.packages._get_packages("SELECT packages.* FROM jobs_packages \
221 LEFT JOIN packages ON jobs_packages.pkg_id = packages.id \
222 WHERE jobs_packages.job_id = %s ORDER BY packages.name", self.id)
223
224 return iter(packages)
225
226 def __nonzero__(self):
227 return True
228
229 def __len__(self):
230 res = self.db.get("SELECT COUNT(*) AS len FROM jobs_packages \
231 WHERE job_id = %s", self.id)
232
233 return res.len
234
235 @property
236 def distro(self):
237 return self.build.distro
238
239 def restart(self):
240 # Copy the job and let it build again
241 return self.backend.jobs.create(self.build, self.arch,
242 test=self.test, superseeds=self)
243
244 def get_superseeded_by(self):
245 if self.data.superseeded_by:
246 return self.backend.jobs.get_by_id(self.data.superseeded_by)
247
248 def set_superseeded_by(self, superseeded_by):
249 assert isinstance(superseeded_by, self.__class__)
250
251 self._set_attribute("superseeded_by", superseeded_by.id)
252 self.superseeded_by = superseeded_by
253
254 superseeded_by = lazy_property(get_superseeded_by, set_superseeded_by)
255
256 def delete(self):
257 self._set_attribute("delete", True)
258
259 def remove(self):
260 """
261 Removes a job from the database
262 """
263 self.__remove_buildroots()
264 self.__remove_history()
265 self.__remove_packages()
266 self.__remove_logfiles()
267
268 # Delete the job itself.
269 self.db.execute("DELETE FROM jobs WHERE id = %s", self.id)
270
271 def __remove_buildroots(self):
272 """
273 Removes all buildroots.
274 """
275 self.db.execute("DELETE FROM jobs_buildroots WHERE job_id = %s", self.id)
276
277 def __remove_history(self):
278 """
279 Removes all references in the history to this build job.
280 """
281 self.db.execute("DELETE FROM jobs_history WHERE job_id = %s", self.id)
282
283 def __remove_packages(self):
284 """
285 Deletes all uploaded files from the job.
286 """
287 for pkg in self.packages:
288 pkg.delete()
289
290 self.db.execute("DELETE FROM jobs_packages WHERE job_id = %s", self.id)
291
292 def __remove_logfiles(self):
293 for logfile in self.logfiles:
294 self.db.execute("INSERT INTO queue_delete(path) VALUES(%s)", logfile.path)
295
296 ## Logging stuff
297
298 def log(self, action, user=None, state=None, builder=None, test_job=None):
299 user_id = None
300 if user:
301 user_id = user.id
302
303 builder_id = None
304 if builder:
305 builder_id = builder.id
306
307 test_job_id = None
308 if test_job:
309 test_job_id = test_job.id
310
311 self.db.execute("INSERT INTO jobs_history(job_id, action, state, user_id, \
312 time, builder_id, test_job_id) VALUES(%s, %s, %s, %s, NOW(), %s, %s)",
313 self.id, action, state, user_id, builder_id, test_job_id)
314
315 def get_log(self, limit=None, offset=None, user=None):
316 query = "SELECT * FROM jobs_history"
317
318 conditions = ["job_id = %s",]
319 args = [self.id,]
320
321 if user:
322 conditions.append("user_id = %s")
323 args.append(user.id)
324
325 if conditions:
326 query += " WHERE %s" % " AND ".join(conditions)
327
328 query += " ORDER BY time DESC"
329
330 if limit:
331 if offset:
332 query += " LIMIT %s,%s"
333 args += [offset, limit,]
334 else:
335 query += " LIMIT %s"
336 args += [limit,]
337
338 entries = []
339 for entry in self.db.query(query, *args):
340 entry = logs.JobLogEntry(self.backend, entry)
341 entries.append(entry)
342
343 return entries
344
345 @property
346 def uuid(self):
347 return self.data.uuid
348
349 @property
350 def test(self):
351 return self.data.test
352
353 @property
354 def build_id(self):
355 return self.data.build_id
356
357 @lazy_property
358 def build(self):
359 return self.backend.builds.get_by_id(self.build_id)
360
361 @property
362 def related_jobs(self):
363 ret = []
364
365 for job in self.build.jobs:
366 if job == self:
367 continue
368
369 ret.append(job)
370
371 return ret
372
373 @property
374 def pkg(self):
375 return self.build.pkg
376
377 @property
378 def name(self):
379 return "%s-%s.%s" % (self.pkg.name, self.pkg.friendly_version, self.arch)
380
381 @property
382 def size(self):
383 return sum((p.size for p in self.packages))
384
385 @lazy_property
386 def rank(self):
387 """
388 Returns the rank in the build queue
389 """
390 if not self.state == "pending":
391 return
392
393 res = self.db.get("SELECT rank FROM jobs_queue WHERE job_id = %s", self.id)
394
395 if res:
396 return res.rank
397
398 def is_running(self):
399 """
400 Returns True if job is in a running state.
401 """
402 return self.state in ("pending", "dispatching", "running", "uploading")
403
404 def get_state(self):
405 return self.data.state
406
407 def set_state(self, state, user=None, log=True):
408 # Nothing to do if the state remains.
409 if not self.state == state:
410 self.db.execute("UPDATE jobs SET state = %s WHERE id = %s", state, self.id)
411
412 # Log the event.
413 if log and not state == "new":
414 self.log("state_change", state=state, user=user)
415
416 # Update cache.
417 if self._data:
418 self._data["state"] = state
419
420 # Always clear the message when the status is changed.
421 self.update_message(None)
422
423 # Update some more informations.
424 if state == "dispatching":
425 # Set start time.
426 self.db.execute("UPDATE jobs SET time_started = NOW(), time_finished = NULL \
427 WHERE id = %s", self.id)
428
429 elif state == "pending":
430 self.db.execute("UPDATE jobs SET time_started = NULL, \
431 time_finished = NULL WHERE id = %s", self.id)
432
433 elif state in ("aborted", "dependency_error", "finished", "failed"):
434 # Set finish time and reset builder..
435 self.db.execute("UPDATE jobs SET time_finished = NOW() WHERE id = %s", self.id)
436
437 # Send messages to the user.
438 if state == "finished":
439 self.send_finished_message()
440
441 elif state == "failed":
442 # Remove all package files if a job is set to failed state.
443 self.__delete_packages()
444
445 self.send_failed_message()
446
447 # Automatically update the state of the build (not on test builds).
448 if not self.test:
449 self.build.auto_update_state()
450
451 state = property(get_state, set_state)
452
453 @property
454 def message(self):
455 return self.data.message
456
457 def update_message(self, msg):
458 self.db.execute("UPDATE jobs SET message = %s WHERE id = %s",
459 msg, self.id)
460
461 if self._data:
462 self._data["message"] = msg
463
464 def get_builder(self):
465 if self.data.builder_id:
466 return self.backend.builders.get_by_id(self.data.builder_id)
467
468 def set_builder(self, builder, user=None):
469 self.db.execute("UPDATE jobs SET builder_id = %s WHERE id = %s",
470 builder.id, self.id)
471
472 # Update cache.
473 if self._data:
474 self._data["builder_id"] = builder.id
475
476 self._builder = builder
477
478 # Log the event.
479 if user:
480 self.log("builder_assigned", builder=builder, user=user)
481
482 builder = lazy_property(get_builder, set_builder)
483
484 @property
485 def arch(self):
486 return self.data.arch
487
488 @property
489 def duration(self):
490 if not self.time_started:
491 return 0
492
493 if self.time_finished:
494 delta = self.time_finished - self.time_started
495 else:
496 delta = datetime.datetime.utcnow() - self.time_started
497
498 return delta.total_seconds()
499
500 @property
501 def time_created(self):
502 return self.data.time_created
503
504 @property
505 def time_started(self):
506 return self.data.time_started
507
508 @property
509 def time_finished(self):
510 return self.data.time_finished
511
512 @property
513 def expected_runtime(self):
514 """
515 Returns the estimated time and stddev, this job takes to finish.
516 """
517 # Get the average build time.
518 build_times = self.backend.builds.get_build_times_by_arch(self.arch,
519 name=self.pkg.name)
520
521 # If there is no statistical data, we cannot estimate anything.
522 if not build_times:
523 return None, None
524
525 return build_times.average, build_times.stddev
526
527 @property
528 def eta(self):
529 expected_runtime, stddev = self.expected_runtime
530
531 if expected_runtime:
532 return expected_runtime - int(self.duration), stddev
533
534 def get_pkg_by_uuid(self, uuid):
535 pkg = self.backend.packages._get_package("SELECT packages.id FROM packages \
536 JOIN jobs_packages ON jobs_packages.pkg_id = packages.id \
537 WHERE jobs_packages.job_id = %s AND packages.uuid = %s",
538 self.id, uuid)
539
540 if pkg:
541 pkg.job = self
542 return pkg
543
544 @lazy_property
545 def logfiles(self):
546 logfiles = []
547
548 for log in self.db.query("SELECT id FROM logfiles WHERE job_id = %s", self.id):
549 log = logs.LogFile(self.backend, log.id)
550 log._job = self
551
552 logfiles.append(log)
553
554 return logfiles
555
556 def add_file(self, filename):
557 """
558 Add the specified file to this job.
559
560 The file is copied to the right directory by this function.
561 """
562 assert os.path.exists(filename)
563
564 if filename.endswith(".log"):
565 self._add_file_log(filename)
566
567 elif filename.endswith(".%s" % PACKAGE_EXTENSION):
568 # It is not allowed to upload packages on test builds.
569 if self.test:
570 return
571
572 self._add_file_package(filename)
573
574 def _add_file_log(self, filename):
575 """
576 Attach a log file to this job.
577 """
578 target_dirname = os.path.join(self.build.path, "logs")
579
580 if self.test:
581 i = 1
582 while True:
583 target_filename = os.path.join(target_dirname,
584 "test.%s.%s.%s.log" % (self.arch, i, self.uuid))
585
586 if os.path.exists(target_filename):
587 i += 1
588 else:
589 break
590 else:
591 target_filename = os.path.join(target_dirname,
592 "build.%s.%s.log" % (self.arch, self.uuid))
593
594 # Make sure the target directory exists.
595 if not os.path.exists(target_dirname):
596 os.makedirs(target_dirname)
597
598 # Calculate a SHA512 hash from that file.
599 f = open(filename, "rb")
600 h = hashlib.sha512()
601 while True:
602 buf = f.read(BUFFER_SIZE)
603 if not buf:
604 break
605
606 h.update(buf)
607 f.close()
608
609 # Copy the file to the final location.
610 shutil.copy2(filename, target_filename)
611
612 # Create an entry in the database.
613 self.db.execute("INSERT INTO logfiles(job_id, path, filesize, hash_sha512) \
614 VALUES(%s, %s, %s, %s)", self.id, os.path.relpath(target_filename, PACKAGES_DIR),
615 os.path.getsize(target_filename), h.hexdigest())
616
617 def _add_file_package(self, filename):
618 # Open package (creates entry in the database)
619 pkg = self.backend.packages.create(filename)
620
621 # Move package to the build directory.
622 pkg.move(os.path.join(self.build.path, self.arch))
623
624 # Attach the package to this job.
625 self.db.execute("INSERT INTO jobs_packages(job_id, pkg_id) VALUES(%s, %s)",
626 self.id, pkg.id)
627
628 def get_aborted_state(self):
629 return self.data.aborted_state
630
631 def set_aborted_state(self, state):
632 self._set_attribute("aborted_state", state)
633
634 aborted_state = property(get_aborted_state, set_aborted_state)
635
636 @property
637 def message_recipients(self):
638 l = []
639
640 # Add all people watching the build.
641 l += self.build.message_recipients
642
643 # Add the package maintainer on release builds.
644 if self.build.type == "release":
645 maint = self.pkg.maintainer
646
647 if isinstance(maint, users.User):
648 l.append("%s <%s>" % (maint.realname, maint.email))
649 elif maint:
650 l.append(maint)
651
652 # XXX add committer and commit author.
653
654 # Add the owner of the scratch build on scratch builds.
655 elif self.build.type == "scratch" and self.build.user:
656 l.append("%s <%s>" % \
657 (self.build.user.realname, self.build.user.email))
658
659 return set(l)
660
661 def save_buildroot(self, pkgs):
662 # Cleanup old stuff first (for rebuilding packages)
663 self.db.execute("DELETE FROM jobs_buildroots WHERE job_id = %s", self.id)
664
665 for pkg_name, pkg_uuid in pkgs:
666 self.db.execute("INSERT INTO jobs_buildroots(job_id, pkg_uuid, pkg_name) \
667 VALUES(%s, %s, %s)", self.id, pkg_name, pkg_uuid)
668
669 @lazy_property
670 def buildroot(self):
671 rows = self.db.query("SELECT * FROM jobs_buildroots \
672 WHERE jobs_buildroots.job_id = %s ORDER BY pkg_name", self.id)
673
674 pkgs = []
675 for row in rows:
676 # Search for this package in the packages table.
677 pkg = self.backend.packages.get_by_uuid(row.pkg_uuid)
678 pkgs.append((row.pkg_name, row.pkg_uuid, pkg))
679
680 return pkgs
681
682 def send_finished_message(self):
683 # Send no finished mails for test jobs.
684 if self.test:
685 return
686
687 logging.debug("Sending finished message for job %s to %s" % \
688 (self.name, ", ".join(self.message_recipients)))
689
690 info = {
691 "build_name" : self.name,
692 "build_host" : self.builder.name,
693 "build_uuid" : self.uuid,
694 }
695
696 self.backend.messages.send_to_all(self.message_recipients,
697 MSG_BUILD_FINISHED_SUBJECT, MSG_BUILD_FINISHED, info)
698
699 def send_failed_message(self):
700 logging.debug("Sending failed message for job %s to %s" % \
701 (self.name, ", ".join(self.message_recipients)))
702
703 build_host = "--"
704 if self.builder:
705 build_host = self.builder.name
706
707 info = {
708 "build_name" : self.name,
709 "build_host" : build_host,
710 "build_uuid" : self.uuid,
711 }
712
713 self.backend.messages.send_to_all(self.message_recipients,
714 MSG_BUILD_FAILED_SUBJECT, MSG_BUILD_FAILED, info)
715
716 def set_start_time(self, start_not_before):
717 self._set_attribute("start_not_before", start_not_before)
718
719 def schedule(self, type, start_time=None, user=None):
720 assert type in ("rebuild", "test")
721
722 if type == "rebuild":
723 if self.state == "finished":
724 return
725
726 job = self.restart()
727 job.set_start_time(start_time)
728
729 # Log the event.
730 self.log("schedule_rebuild", user=user)
731
732 elif type == "test":
733 if not self.state == "finished":
734 return
735
736 # Create a new job with same build and arch.
737 job = self.create(self.backend, self.build, self.arch, test=True)
738 job.set_start_time(start_time)
739
740 # Log the event.
741 self.log("schedule_test_job", test_job=job, user=user)
742
743 return job
744
745 def schedule_test(self, start_not_before=None, user=None):
746 # XXX to be removed
747 return self.schedule("test", start_time=start_not_before, user=user)
748
749 def schedule_rebuild(self, start_not_before=None, user=None):
750 # XXX to be removed
751 return self.schedule("rebuild", start_time=start_not_before, user=user)
752
753 def get_build_repos(self):
754 """
755 Returns a list of all repositories that should be used when
756 building this job.
757 """
758 repo_ids = self.db.query("SELECT repo_id FROM jobs_repos WHERE job_id = %s",
759 self.id)
760
761 if not repo_ids:
762 return self.distro.get_build_repos()
763
764 repos = []
765 for repo in self.distro.repositories:
766 if repo.id in [r.id for r in repo_ids]:
767 repos.append(repo)
768
769 return repos or self.distro.get_build_repos()
770
771 def get_repo_config(self):
772 """
773 Get repository configuration file that is sent to the builder.
774 """
775 confs = []
776
777 for repo in self.get_build_repos():
778 confs.append(repo.get_conf())
779
780 return "\n\n".join(confs)
781
782 def get_config(self):
783 """
784 Get configuration file that is sent to the builder.
785 """
786 confs = []
787
788 # Add the distribution configuration.
789 confs.append(self.distro.get_config())
790
791 # Then add all repositories for this build.
792 confs.append(self.get_repo_config())
793
794 return "\n\n".join(confs)
795
796 def resolvdep(self):
797 config = pakfire.config.Config(files=["general.conf"])
798 config.parse(self.get_config())
799
800 # The filename of the source file.
801 filename = os.path.join(PACKAGES_DIR, self.build.pkg.path)
802 assert os.path.exists(filename), filename
803
804 # Create a new pakfire instance with the configuration for
805 # this build.
806 p = pakfire.PakfireServer(config=config, arch=self.arch)
807
808 # Try to solve the build dependencies.
809 try:
810 solver = p.resolvdep(filename)
811
812 # Catch dependency errors and log the problem string.
813 except DependencyError, e:
814 self.state = "dependency_error"
815 self.update_message(e)
816
817 else:
818 # If the build dependencies can be resolved, we set the build in
819 # pending state.
820 if solver.status is True:
821 if self.state in ("failed",):
822 return
823
824 self.state = "pending"