]> git.ipfire.org Git - people/jschlag/pbs.git/blob - src/buildservice/jobs.py
Don't set jobs to pending automatically
[people/jschlag/pbs.git] / src / buildservice / jobs.py
1 #!/usr/bin/python
2
3 import datetime
4 import hashlib
5 import logging
6 import os
7 import shutil
8 import uuid
9
10 import pakfire
11 import pakfire.config
12
13 log = logging.getLogger("builds")
14 log.propagate = 1
15
16 from . import arches
17 from . import base
18 from . import logs
19 from . import users
20
21 from .constants import *
22 from .decorators import *
23
24 class Jobs(base.Object):
25 def _get_job(self, query, *args):
26 res = self.db.get(query, *args)
27
28 if res:
29 return Job(self.backend, res.id, data=res)
30
31 def _get_jobs(self, query, *args):
32 res = self.db.query(query, *args)
33
34 for row in res:
35 yield Job(self.backend, row.id, data=row)
36
37 def create(self, build, arch, test=False, superseeds=None):
38 job = self._get_job("INSERT INTO jobs(uuid, build_id, arch, test) \
39 VALUES(%s, %s, %s, %s) RETURNING *", "%s" % uuid.uuid4(), build.id, arch, test)
40 job.log("created")
41
42 # Set cache for Build object.
43 job.build = build
44
45 # Mark if the new job superseeds some other job
46 if superseeds:
47 superseeds.superseeded_by = job
48
49 return job
50
51 def get_by_id(self, id, data=None):
52 return Job(self.backend, id, data)
53
54 def get_by_uuid(self, uuid):
55 job = self.db.get("SELECT id FROM jobs WHERE uuid = %s", uuid)
56
57 if job:
58 return self.get_by_id(job.id)
59
60 def get_active(self, host_id=None, builder=None, states=None):
61 if builder:
62 host_id = builder.id
63
64 if states is None:
65 states = ["dispatching", "running", "uploading"]
66
67 query = "SELECT * FROM jobs WHERE state IN (%s)" % ", ".join(["%s"] * len(states))
68 args = states
69
70 if host_id:
71 query += " AND builder_id = %s" % host_id
72
73 query += " ORDER BY \
74 CASE \
75 WHEN jobs.state = 'running' THEN 0 \
76 WHEN jobs.state = 'uploading' THEN 1 \
77 WHEN jobs.state = 'dispatching' THEN 2 \
78 WHEN jobs.state = 'pending' THEN 3 \
79 WHEN jobs.state = 'new' THEN 4 \
80 END, time_started ASC"
81
82 return [Job(self.backend, j.id, j) for j in self.db.query(query, *args)]
83
84 def get_latest(self, arch=None, builder=None, limit=None, age=None, date=None):
85 query = "SELECT * FROM jobs"
86 args = []
87
88 where = ["(state = 'finished' OR state = 'failed' OR state = 'aborted')"]
89
90 if arch:
91 where.append("arch = %s")
92 args.append(arch)
93
94 if builder:
95 where.append("builder_id = %s")
96 args.append(builder.id)
97
98 if date:
99 try:
100 year, month, day = date.split("-", 2)
101 date = datetime.date(int(year), int(month), int(day))
102 except ValueError:
103 pass
104 else:
105 where.append("(time_created::date = %s OR \
106 time_started::date = %s OR time_finished::date = %s)")
107 args += (date, date, date)
108
109 if age:
110 where.append("time_finished >= NOW() - '%s'::interval" % age)
111
112 if where:
113 query += " WHERE %s" % " AND ".join(where)
114
115 query += " ORDER BY time_finished DESC"
116
117 if limit:
118 query += " LIMIT %s"
119 args.append(limit)
120
121 return [Job(self.backend, j.id, j) for j in self.db.query(query, *args)]
122
123 def get_average_build_time(self):
124 """
125 Returns the average build time of all finished builds from the
126 last 3 months.
127 """
128 result = self.db.get("SELECT AVG(time_finished - time_started) as average \
129 FROM jobs WHERE type = 'build' AND state = 'finished' AND \
130 time_finished >= NOW() - '3 months'::interval")
131
132 if result:
133 return result.average
134
135 def count(self, *states):
136 query = "SELECT COUNT(*) AS count FROM jobs"
137 args = []
138
139 if states:
140 query += " WHERE state IN %s"
141 args.append(states)
142
143 jobs = self.db.get(query, *args)
144 if jobs:
145 return jobs.count
146
147 def restart_failed(self):
148 jobs = self._get_jobs("SELECT jobs.* FROM jobs \
149 JOIN builds ON builds.id = jobs.build_id \
150 WHERE \
151 jobs.type = 'build' AND \
152 jobs.state = 'failed' AND \
153 NOT builds.state = 'broken' AND \
154 jobs.time_finished < NOW() - '72 hours'::interval \
155 ORDER BY \
156 CASE \
157 WHEN jobs.type = 'build' THEN 0 \
158 WHEN jobs.type = 'test' THEN 1 \
159 END, \
160 builds.priority DESC, jobs.time_created ASC")
161
162 # Restart the job
163 for job in jobs:
164 job.restart()
165
166
167 class Job(base.DataObject):
168 table = "jobs"
169
170 def __str__(self):
171 return "<%s id=%s %s>" % (self.__class__.__name__, self.id, self.name)
172
173 def __eq__(self, other):
174 if isinstance(other, self.__class__):
175 return self.id == other.id
176
177 def __lt__(self, other):
178 if isinstance(other, self.__class__):
179 if not self.test and other.test:
180 return True
181
182 if self.build == other.build:
183 return arches.priority(self.arch) < arches.priority(other.arch)
184
185 return self.time_created < other.time_created
186
187 def __iter__(self):
188 packages = self.backend.packages._get_packages("SELECT packages.* FROM jobs_packages \
189 LEFT JOIN packages ON jobs_packages.pkg_id = packages.id \
190 WHERE jobs_packages.job_id = %s ORDER BY packages.name", self.id)
191
192 return iter(packages)
193
194 def __nonzero__(self):
195 return True
196
197 def __len__(self):
198 res = self.db.get("SELECT COUNT(*) AS len FROM jobs_packages \
199 WHERE job_id = %s", self.id)
200
201 return res.len
202
203 @property
204 def distro(self):
205 return self.build.distro
206
207 def restart(self):
208 # Copy the job and let it build again
209 return self.backend.jobs.create(self.build, self.arch,
210 test=self.test, superseeds=self)
211
212 def get_superseeded_by(self):
213 if self.data.superseeded_by:
214 return self.backend.jobs.get_by_id(self.data.superseeded_by)
215
216 def set_superseeded_by(self, superseeded_by):
217 assert isinstance(superseeded_by, self.__class__)
218
219 self._set_attribute("superseeded_by", superseeded_by.id)
220 self.superseeded_by = superseeded_by
221
222 superseeded_by = lazy_property(get_superseeded_by, set_superseeded_by)
223
224 def delete(self):
225 self._set_attribute("delete", True)
226
227 def remove(self):
228 """
229 Removes a job from the database
230 """
231 self.__remove_buildroots()
232 self.__remove_history()
233 self.__remove_packages()
234 self.__remove_logfiles()
235
236 # Delete the job itself.
237 self.db.execute("DELETE FROM jobs WHERE id = %s", self.id)
238
239 def __remove_buildroots(self):
240 """
241 Removes all buildroots.
242 """
243 self.db.execute("DELETE FROM jobs_buildroots WHERE job_id = %s", self.id)
244
245 def __remove_history(self):
246 """
247 Removes all references in the history to this build job.
248 """
249 self.db.execute("DELETE FROM jobs_history WHERE job_id = %s", self.id)
250
251 def __remove_packages(self):
252 """
253 Deletes all uploaded files from the job.
254 """
255 for pkg in self.packages:
256 pkg.delete()
257
258 self.db.execute("DELETE FROM jobs_packages WHERE job_id = %s", self.id)
259
260 def __remove_logfiles(self):
261 for logfile in self.logfiles:
262 self.db.execute("INSERT INTO queue_delete(path) VALUES(%s)", logfile.path)
263
264 ## Logging stuff
265
266 def log(self, action, user=None, state=None, builder=None, test_job=None):
267 user_id = None
268 if user:
269 user_id = user.id
270
271 builder_id = None
272 if builder:
273 builder_id = builder.id
274
275 test_job_id = None
276 if test_job:
277 test_job_id = test_job.id
278
279 self.db.execute("INSERT INTO jobs_history(job_id, action, state, user_id, \
280 time, builder_id, test_job_id) VALUES(%s, %s, %s, %s, NOW(), %s, %s)",
281 self.id, action, state, user_id, builder_id, test_job_id)
282
283 def get_log(self, limit=None, offset=None, user=None):
284 query = "SELECT * FROM jobs_history"
285
286 conditions = ["job_id = %s",]
287 args = [self.id,]
288
289 if user:
290 conditions.append("user_id = %s")
291 args.append(user.id)
292
293 if conditions:
294 query += " WHERE %s" % " AND ".join(conditions)
295
296 query += " ORDER BY time DESC"
297
298 if limit:
299 if offset:
300 query += " LIMIT %s,%s"
301 args += [offset, limit,]
302 else:
303 query += " LIMIT %s"
304 args += [limit,]
305
306 entries = []
307 for entry in self.db.query(query, *args):
308 entry = logs.JobLogEntry(self.backend, entry)
309 entries.append(entry)
310
311 return entries
312
313 @property
314 def uuid(self):
315 return self.data.uuid
316
317 @property
318 def test(self):
319 return self.data.test
320
321 @property
322 def build_id(self):
323 return self.data.build_id
324
325 @lazy_property
326 def build(self):
327 return self.backend.builds.get_by_id(self.build_id)
328
329 @property
330 def related_jobs(self):
331 ret = []
332
333 for job in self.build.jobs:
334 if job == self:
335 continue
336
337 ret.append(job)
338
339 return ret
340
341 @property
342 def pkg(self):
343 return self.build.pkg
344
345 @property
346 def name(self):
347 return "%s-%s.%s" % (self.pkg.name, self.pkg.friendly_version, self.arch)
348
349 @property
350 def size(self):
351 return sum((p.size for p in self.packages))
352
353 @lazy_property
354 def rank(self):
355 """
356 Returns the rank in the build queue
357 """
358 if not self.state == "pending":
359 return
360
361 res = self.db.get("SELECT rank FROM jobs_queue WHERE job_id = %s", self.id)
362
363 if res:
364 return res.rank
365
366 def is_running(self):
367 """
368 Returns True if job is in a running state.
369 """
370 return self.state in ("pending", "dispatching", "running", "uploading")
371
372 def get_state(self):
373 return self.data.state
374
375 def set_state(self, state, user=None, log=True):
376 # Nothing to do if the state remains.
377 if not self.state == state:
378 self._set_attribute("state", state)
379
380 # Log the event.
381 if log and not state == "new":
382 self.log("state_change", state=state, user=user)
383
384 # Always clear the message when the status is changed.
385 self.update_message(None)
386
387 # Update some more informations.
388 if state == "dispatching":
389 # Set start time.
390 self._set_attribute("time_started", datetime.datetime.utcnow())
391
392 elif state in ("aborted", "dependency_error", "finished", "failed"):
393 self._set_attribute("time_finished", datetime.datetime.utcnow())
394
395 # Send messages to the user.
396 if state == "finished":
397 self.send_finished_message()
398
399 elif state == "failed":
400 # Remove all package files if a job is set to failed state.
401 self.__delete_packages()
402
403 self.send_failed_message()
404
405 # Automatically update the state of the build (not on test builds).
406 if not self.test:
407 self.build.auto_update_state()
408
409 state = property(get_state, set_state)
410
411 @property
412 def message(self):
413 return self.data.message
414
415 def update_message(self, message):
416 self._set_attribute("message", message)
417
418 def get_builder(self):
419 if self.data.builder_id:
420 return self.backend.builders.get_by_id(self.data.builder_id)
421
422 def set_builder(self, builder, user=None):
423 self._set_attribute("builder_id", builder.id)
424
425 # Log the event.
426 if user:
427 self.log("builder_assigned", builder=builder, user=user)
428
429 builder = lazy_property(get_builder, set_builder)
430
431 @property
432 def arch(self):
433 return self.data.arch
434
435 @property
436 def duration(self):
437 if not self.time_started:
438 return 0
439
440 if self.time_finished:
441 delta = self.time_finished - self.time_started
442 else:
443 delta = datetime.datetime.utcnow() - self.time_started
444
445 return delta.total_seconds()
446
447 @property
448 def time_created(self):
449 return self.data.time_created
450
451 @property
452 def time_started(self):
453 return self.data.time_started
454
455 @property
456 def time_finished(self):
457 return self.data.time_finished
458
459 @property
460 def expected_runtime(self):
461 """
462 Returns the estimated time and stddev, this job takes to finish.
463 """
464 # Get the average build time.
465 build_times = self.backend.builds.get_build_times_by_arch(self.arch,
466 name=self.pkg.name)
467
468 # If there is no statistical data, we cannot estimate anything.
469 if not build_times:
470 return None, None
471
472 return build_times.average, build_times.stddev
473
474 @property
475 def eta(self):
476 expected_runtime, stddev = self.expected_runtime
477
478 if expected_runtime:
479 return expected_runtime - int(self.duration), stddev
480
481 def get_pkg_by_uuid(self, uuid):
482 pkg = self.backend.packages._get_package("SELECT packages.id FROM packages \
483 JOIN jobs_packages ON jobs_packages.pkg_id = packages.id \
484 WHERE jobs_packages.job_id = %s AND packages.uuid = %s",
485 self.id, uuid)
486
487 if pkg:
488 pkg.job = self
489 return pkg
490
491 @lazy_property
492 def logfiles(self):
493 logfiles = []
494
495 for log in self.db.query("SELECT id FROM logfiles WHERE job_id = %s", self.id):
496 log = logs.LogFile(self.backend, log.id)
497 log._job = self
498
499 logfiles.append(log)
500
501 return logfiles
502
503 def add_file(self, filename):
504 """
505 Add the specified file to this job.
506
507 The file is copied to the right directory by this function.
508 """
509 assert os.path.exists(filename)
510
511 if filename.endswith(".log"):
512 self._add_file_log(filename)
513
514 elif filename.endswith(".%s" % PACKAGE_EXTENSION):
515 # It is not allowed to upload packages on test builds.
516 if self.test:
517 return
518
519 self._add_file_package(filename)
520
521 def _add_file_log(self, filename):
522 """
523 Attach a log file to this job.
524 """
525 target_dirname = os.path.join(self.build.path, "logs")
526
527 if self.test:
528 i = 1
529 while True:
530 target_filename = os.path.join(target_dirname,
531 "test.%s.%s.%s.log" % (self.arch, i, self.uuid))
532
533 if os.path.exists(target_filename):
534 i += 1
535 else:
536 break
537 else:
538 target_filename = os.path.join(target_dirname,
539 "build.%s.%s.log" % (self.arch, self.uuid))
540
541 # Make sure the target directory exists.
542 if not os.path.exists(target_dirname):
543 os.makedirs(target_dirname)
544
545 # Calculate a SHA512 hash from that file.
546 f = open(filename, "rb")
547 h = hashlib.sha512()
548 while True:
549 buf = f.read(BUFFER_SIZE)
550 if not buf:
551 break
552
553 h.update(buf)
554 f.close()
555
556 # Copy the file to the final location.
557 shutil.copy2(filename, target_filename)
558
559 # Create an entry in the database.
560 self.db.execute("INSERT INTO logfiles(job_id, path, filesize, hash_sha512) \
561 VALUES(%s, %s, %s, %s)", self.id, os.path.relpath(target_filename, PACKAGES_DIR),
562 os.path.getsize(target_filename), h.hexdigest())
563
564 def _add_file_package(self, filename):
565 # Open package (creates entry in the database)
566 pkg = self.backend.packages.create(filename)
567
568 # Move package to the build directory.
569 pkg.move(os.path.join(self.build.path, self.arch))
570
571 # Attach the package to this job.
572 self.db.execute("INSERT INTO jobs_packages(job_id, pkg_id) VALUES(%s, %s)",
573 self.id, pkg.id)
574
575 def get_aborted_state(self):
576 return self.data.aborted_state
577
578 def set_aborted_state(self, state):
579 self._set_attribute("aborted_state", state)
580
581 aborted_state = property(get_aborted_state, set_aborted_state)
582
583 @property
584 def message_recipients(self):
585 l = []
586
587 # Add all people watching the build.
588 l += self.build.message_recipients
589
590 # Add the package maintainer on release builds.
591 if self.build.type == "release":
592 maint = self.pkg.maintainer
593
594 if isinstance(maint, users.User):
595 l.append("%s <%s>" % (maint.realname, maint.email))
596 elif maint:
597 l.append(maint)
598
599 # XXX add committer and commit author.
600
601 # Add the owner of the scratch build on scratch builds.
602 elif self.build.type == "scratch" and self.build.user:
603 l.append("%s <%s>" % \
604 (self.build.user.realname, self.build.user.email))
605
606 return set(l)
607
608 def save_buildroot(self, pkgs):
609 # Cleanup old stuff first (for rebuilding packages)
610 self.db.execute("DELETE FROM jobs_buildroots WHERE job_id = %s", self.id)
611
612 for pkg_name, pkg_uuid in pkgs:
613 self.db.execute("INSERT INTO jobs_buildroots(job_id, pkg_uuid, pkg_name) \
614 VALUES(%s, %s, %s)", self.id, pkg_name, pkg_uuid)
615
616 @lazy_property
617 def buildroot(self):
618 rows = self.db.query("SELECT * FROM jobs_buildroots \
619 WHERE jobs_buildroots.job_id = %s ORDER BY pkg_name", self.id)
620
621 pkgs = []
622 for row in rows:
623 # Search for this package in the packages table.
624 pkg = self.backend.packages.get_by_uuid(row.pkg_uuid)
625 pkgs.append((row.pkg_name, row.pkg_uuid, pkg))
626
627 return pkgs
628
629 def send_finished_message(self):
630 # Send no finished mails for test jobs.
631 if self.test:
632 return
633
634 logging.debug("Sending finished message for job %s to %s" % \
635 (self.name, ", ".join(self.message_recipients)))
636
637 info = {
638 "build_name" : self.name,
639 "build_host" : self.builder.name,
640 "build_uuid" : self.uuid,
641 }
642
643 self.backend.messages.send_to_all(self.message_recipients,
644 MSG_BUILD_FINISHED_SUBJECT, MSG_BUILD_FINISHED, info)
645
646 def send_failed_message(self):
647 logging.debug("Sending failed message for job %s to %s" % \
648 (self.name, ", ".join(self.message_recipients)))
649
650 build_host = "--"
651 if self.builder:
652 build_host = self.builder.name
653
654 info = {
655 "build_name" : self.name,
656 "build_host" : build_host,
657 "build_uuid" : self.uuid,
658 }
659
660 self.backend.messages.send_to_all(self.message_recipients,
661 MSG_BUILD_FAILED_SUBJECT, MSG_BUILD_FAILED, info)
662
663 def set_start_time(self, start_not_before):
664 self._set_attribute("start_not_before", start_not_before)
665
666 def schedule(self, type, start_time=None, user=None):
667 assert type in ("rebuild", "test")
668
669 if type == "rebuild":
670 if self.state == "finished":
671 return
672
673 job = self.restart()
674 job.set_start_time(start_time)
675
676 # Log the event.
677 self.log("schedule_rebuild", user=user)
678
679 elif type == "test":
680 if not self.state == "finished":
681 return
682
683 # Create a new job with same build and arch.
684 job = self.create(self.backend, self.build, self.arch, test=True)
685 job.set_start_time(start_time)
686
687 # Log the event.
688 self.log("schedule_test_job", test_job=job, user=user)
689
690 return job
691
692 def schedule_test(self, start_not_before=None, user=None):
693 # XXX to be removed
694 return self.schedule("test", start_time=start_not_before, user=user)
695
696 def schedule_rebuild(self, start_not_before=None, user=None):
697 # XXX to be removed
698 return self.schedule("rebuild", start_time=start_not_before, user=user)
699
700 def get_build_repos(self):
701 """
702 Returns a list of all repositories that should be used when
703 building this job.
704 """
705 repo_ids = self.db.query("SELECT repo_id FROM jobs_repos WHERE job_id = %s",
706 self.id)
707
708 if not repo_ids:
709 return self.distro.get_build_repos()
710
711 repos = []
712 for repo in self.distro.repositories:
713 if repo.id in [r.id for r in repo_ids]:
714 repos.append(repo)
715
716 return repos or self.distro.get_build_repos()
717
718 def get_repo_config(self):
719 """
720 Get repository configuration file that is sent to the builder.
721 """
722 confs = []
723
724 for repo in self.get_build_repos():
725 confs.append(repo.get_conf())
726
727 return "\n\n".join(confs)
728
729 def get_config(self):
730 """
731 Get configuration file that is sent to the builder.
732 """
733 confs = []
734
735 # Add the distribution configuration.
736 confs.append(self.distro.get_config())
737
738 # Then add all repositories for this build.
739 confs.append(self.get_repo_config())
740
741 return "\n\n".join(confs)
742
743 def resolvdep(self):
744 config = pakfire.config.Config(files=["general.conf"])
745 config.parse(self.get_config())
746
747 # The filename of the source file.
748 filename = os.path.join(PACKAGES_DIR, self.build.pkg.path)
749 assert os.path.exists(filename), filename
750
751 # Create a new pakfire instance with the configuration for
752 # this build.
753 p = pakfire.PakfireServer(config=config, arch=self.arch)
754
755 # Try to solve the build dependencies.
756 try:
757 solver = p.resolvdep(filename)
758
759 # Catch dependency errors and log the problem string.
760 except DependencyError, e:
761 self.state = "dependency_error"
762 self.update_message(e)
763
764 else:
765 # If the build dependencies can be resolved, we set the build in
766 # pending state.
767 if solver.status is True:
768 if self.state in ("failed",):
769 return
770
771 self.state = "pending"