]> git.ipfire.org Git - people/jschlag/pbs.git/blob - src/buildservice/jobs.py
cd5958b63595b4286d19015828c03cd8c0164d95
[people/jschlag/pbs.git] / src / buildservice / jobs.py
1 #!/usr/bin/python
2
3 import datetime
4 import hashlib
5 import logging
6 import os
7 import shutil
8 import uuid
9
10 import pakfire
11 import pakfire.config
12
13 log = logging.getLogger("builds")
14 log.propagate = 1
15
16 from . import arches
17 from . import base
18 from . import logs
19 from . import users
20
21 from .constants import *
22 from .decorators import *
23
24 class Jobs(base.Object):
25 def _get_job(self, query, *args):
26 res = self.db.get(query, *args)
27
28 if res:
29 return Job(self.backend, res.id, data=res)
30
31 def _get_jobs(self, query, *args):
32 res = self.db.query(query, *args)
33
34 for row in res:
35 yield Job(self.backend, row.id, data=row)
36
37 def create(self, build, arch, test=False, superseeds=None):
38 job = self._get_job("INSERT INTO jobs(uuid, build_id, arch, test) \
39 VALUES(%s, %s, %s, %s) RETURNING *", "%s" % uuid.uuid4(), build.id, arch, test)
40 job.log("created")
41
42 # Set cache for Build object.
43 job.build = build
44
45 # Mark if the new job superseeds some other job
46 if superseeds:
47 superseeds.superseeded_by = job
48
49 return job
50
51 def get_by_id(self, id):
52 return self._get_job("SELECT * FROM jobs WHERE id = %s", id)
53
54 def get_by_uuid(self, uuid):
55 return self._get_job("SELECT * FROM jobs WHERE uuid = %s", uuid)
56
57 def get_active(self, host_id=None, builder=None, states=None):
58 if builder:
59 host_id = builder.id
60
61 if states is None:
62 states = ["dispatching", "running", "uploading"]
63
64 query = "SELECT * FROM jobs WHERE state IN (%s)" % ", ".join(["%s"] * len(states))
65 args = states
66
67 if host_id:
68 query += " AND builder_id = %s" % host_id
69
70 query += " ORDER BY \
71 CASE \
72 WHEN jobs.state = 'running' THEN 0 \
73 WHEN jobs.state = 'uploading' THEN 1 \
74 WHEN jobs.state = 'dispatching' THEN 2 \
75 WHEN jobs.state = 'pending' THEN 3 \
76 WHEN jobs.state = 'new' THEN 4 \
77 END, time_started ASC"
78
79 return [Job(self.backend, j.id, j) for j in self.db.query(query, *args)]
80
81 def get_latest(self, arch=None, builder=None, limit=None, age=None, date=None):
82 query = "SELECT * FROM jobs"
83 args = []
84
85 where = ["(state = 'finished' OR state = 'failed' OR state = 'aborted')"]
86
87 if arch:
88 where.append("arch = %s")
89 args.append(arch)
90
91 if builder:
92 where.append("builder_id = %s")
93 args.append(builder.id)
94
95 if date:
96 try:
97 year, month, day = date.split("-", 2)
98 date = datetime.date(int(year), int(month), int(day))
99 except ValueError:
100 pass
101 else:
102 where.append("(time_created::date = %s OR \
103 time_started::date = %s OR time_finished::date = %s)")
104 args += (date, date, date)
105
106 if age:
107 where.append("time_finished >= NOW() - '%s'::interval" % age)
108
109 if where:
110 query += " WHERE %s" % " AND ".join(where)
111
112 query += " ORDER BY time_finished DESC"
113
114 if limit:
115 query += " LIMIT %s"
116 args.append(limit)
117
118 return [Job(self.backend, j.id, j) for j in self.db.query(query, *args)]
119
120 def get_average_build_time(self):
121 """
122 Returns the average build time of all finished builds from the
123 last 3 months.
124 """
125 result = self.db.get("SELECT AVG(time_finished - time_started) as average \
126 FROM jobs WHERE type = 'build' AND state = 'finished' AND \
127 time_finished >= NOW() - '3 months'::interval")
128
129 if result:
130 return result.average
131
132 def count(self, *states):
133 query = "SELECT COUNT(*) AS count FROM jobs"
134 args = []
135
136 if states:
137 query += " WHERE state IN %s"
138 args.append(states)
139
140 jobs = self.db.get(query, *args)
141 if jobs:
142 return jobs.count
143
144 def restart_failed(self):
145 jobs = self._get_jobs("SELECT jobs.* FROM jobs \
146 JOIN builds ON builds.id = jobs.build_id \
147 WHERE \
148 jobs.type = 'build' AND \
149 jobs.state = 'failed' AND \
150 NOT builds.state = 'broken' AND \
151 jobs.time_finished < NOW() - '72 hours'::interval \
152 ORDER BY \
153 CASE \
154 WHEN jobs.type = 'build' THEN 0 \
155 WHEN jobs.type = 'test' THEN 1 \
156 END, \
157 builds.priority DESC, jobs.time_created ASC")
158
159 # Restart the job
160 for job in jobs:
161 job.restart()
162
163
164 class Job(base.DataObject):
165 table = "jobs"
166
167 def __str__(self):
168 return "<%s id=%s %s>" % (self.__class__.__name__, self.id, self.name)
169
170 def __eq__(self, other):
171 if isinstance(other, self.__class__):
172 return self.id == other.id
173
174 def __lt__(self, other):
175 if isinstance(other, self.__class__):
176 if not self.test and other.test:
177 return True
178
179 if self.build == other.build:
180 return arches.priority(self.arch) < arches.priority(other.arch)
181
182 return self.time_created < other.time_created
183
184 def __iter__(self):
185 packages = self.backend.packages._get_packages("SELECT packages.* FROM jobs_packages \
186 LEFT JOIN packages ON jobs_packages.pkg_id = packages.id \
187 WHERE jobs_packages.job_id = %s ORDER BY packages.name", self.id)
188
189 return iter(packages)
190
191 def __nonzero__(self):
192 return True
193
194 def __len__(self):
195 res = self.db.get("SELECT COUNT(*) AS len FROM jobs_packages \
196 WHERE job_id = %s", self.id)
197
198 return res.len
199
200 @property
201 def distro(self):
202 return self.build.distro
203
204 def restart(self):
205 # Copy the job and let it build again
206 return self.backend.jobs.create(self.build, self.arch,
207 test=self.test, superseeds=self)
208
209 def get_superseeded_by(self):
210 if self.data.superseeded_by:
211 return self.backend.jobs.get_by_id(self.data.superseeded_by)
212
213 def set_superseeded_by(self, superseeded_by):
214 assert isinstance(superseeded_by, self.__class__)
215
216 self._set_attribute("superseeded_by", superseeded_by.id)
217 self.superseeded_by = superseeded_by
218
219 superseeded_by = lazy_property(get_superseeded_by, set_superseeded_by)
220
221 def delete(self):
222 """
223 Deletes a job from the database
224 """
225 # Remove the buildroot
226 self.db.execute("DELETE FROM jobs_buildroots WHERE job_id = %s", self.id)
227
228 # Remove the history
229 self.db.execute("DELETE FROM jobs_history WHERE job_id = %s", self.id)
230
231 # Delete all packages
232 for pkg in self:
233 self.db.execute("DELETE FROM jobs_packages \
234 WHERE job_id = %s AND pkg_id = %s", self.id, pkg.id)
235 pkg.delete()
236
237 # Remove all logfiles
238 for logfile in self.logfiles:
239 self.db.execute("INSERT INTO queue_delete(path) VALUES(%s)", logfile.path)
240
241 self.db.execute("DELETE FROM logfiles WHERE job_id = %s", self.id)
242
243 # Delete the job itself.
244 self.db.execute("DELETE FROM jobs WHERE id = %s", self.id)
245
246 ## Logging stuff
247
248 def log(self, action, user=None, state=None, builder=None, test_job=None):
249 user_id = None
250 if user:
251 user_id = user.id
252
253 builder_id = None
254 if builder:
255 builder_id = builder.id
256
257 test_job_id = None
258 if test_job:
259 test_job_id = test_job.id
260
261 self.db.execute("INSERT INTO jobs_history(job_id, action, state, user_id, \
262 time, builder_id, test_job_id) VALUES(%s, %s, %s, %s, NOW(), %s, %s)",
263 self.id, action, state, user_id, builder_id, test_job_id)
264
265 def get_log(self, limit=None, offset=None, user=None):
266 query = "SELECT * FROM jobs_history"
267
268 conditions = ["job_id = %s",]
269 args = [self.id,]
270
271 if user:
272 conditions.append("user_id = %s")
273 args.append(user.id)
274
275 if conditions:
276 query += " WHERE %s" % " AND ".join(conditions)
277
278 query += " ORDER BY time DESC"
279
280 if limit:
281 if offset:
282 query += " LIMIT %s,%s"
283 args += [offset, limit,]
284 else:
285 query += " LIMIT %s"
286 args += [limit,]
287
288 entries = []
289 for entry in self.db.query(query, *args):
290 entry = logs.JobLogEntry(self.backend, entry)
291 entries.append(entry)
292
293 return entries
294
295 @property
296 def uuid(self):
297 return self.data.uuid
298
299 @property
300 def test(self):
301 return self.data.test
302
303 @property
304 def build_id(self):
305 return self.data.build_id
306
307 @lazy_property
308 def build(self):
309 return self.backend.builds.get_by_id(self.build_id)
310
311 @property
312 def related_jobs(self):
313 ret = []
314
315 for job in self.build.jobs:
316 if job == self:
317 continue
318
319 ret.append(job)
320
321 return ret
322
323 @property
324 def pkg(self):
325 return self.build.pkg
326
327 @property
328 def name(self):
329 return "%s-%s.%s" % (self.pkg.name, self.pkg.friendly_version, self.arch)
330
331 @property
332 def size(self):
333 return sum((p.size for p in self.packages))
334
335 @lazy_property
336 def rank(self):
337 """
338 Returns the rank in the build queue
339 """
340 if not self.state == "pending":
341 return
342
343 res = self.db.get("SELECT rank FROM jobs_queue WHERE job_id = %s", self.id)
344
345 if res:
346 return res.rank
347
348 def is_running(self):
349 """
350 Returns True if job is in a running state.
351 """
352 return self.state in ("pending", "dispatching", "running", "uploading")
353
354 def get_state(self):
355 return self.data.state
356
357 def set_state(self, state, user=None, log=True):
358 # Nothing to do if the state remains.
359 if not self.state == state:
360 self._set_attribute("state", state)
361
362 # Log the event.
363 if log and not state == "new":
364 self.log("state_change", state=state, user=user)
365
366 # Always clear the message when the status is changed.
367 self.update_message(None)
368
369 # Update some more informations.
370 if state == "dispatching":
371 # Set start time.
372 self._set_attribute("time_started", datetime.datetime.utcnow())
373
374 elif state in ("aborted", "dependency_error", "finished", "failed"):
375 self._set_attribute("time_finished", datetime.datetime.utcnow())
376
377 # Send messages to the user.
378 if state == "finished":
379 self.send_finished_message()
380
381 elif state == "failed":
382 # Remove all package files if a job is set to failed state.
383 self.__delete_packages()
384
385 self.send_failed_message()
386
387 # Automatically update the state of the build (not on test builds).
388 if not self.test:
389 self.build.auto_update_state()
390
391 state = property(get_state, set_state)
392
393 @property
394 def message(self):
395 return self.data.message
396
397 def update_message(self, message):
398 self._set_attribute("message", message)
399
400 def get_builder(self):
401 if self.data.builder_id:
402 return self.backend.builders.get_by_id(self.data.builder_id)
403
404 def set_builder(self, builder, user=None):
405 self._set_attribute("builder_id", builder.id)
406
407 # Log the event.
408 if user:
409 self.log("builder_assigned", builder=builder, user=user)
410
411 builder = lazy_property(get_builder, set_builder)
412
413 @property
414 def arch(self):
415 return self.data.arch
416
417 @property
418 def duration(self):
419 if not self.time_started:
420 return 0
421
422 if self.time_finished:
423 delta = self.time_finished - self.time_started
424 else:
425 delta = datetime.datetime.utcnow() - self.time_started
426
427 return delta.total_seconds()
428
429 @property
430 def time_created(self):
431 return self.data.time_created
432
433 @property
434 def time_started(self):
435 return self.data.time_started
436
437 @property
438 def time_finished(self):
439 return self.data.time_finished
440
441 def get_pkg_by_uuid(self, uuid):
442 pkg = self.backend.packages._get_package("SELECT packages.id FROM packages \
443 JOIN jobs_packages ON jobs_packages.pkg_id = packages.id \
444 WHERE jobs_packages.job_id = %s AND packages.uuid = %s",
445 self.id, uuid)
446
447 if pkg:
448 pkg.job = self
449 return pkg
450
451 @lazy_property
452 def logfiles(self):
453 logfiles = []
454
455 for log in self.db.query("SELECT id FROM logfiles WHERE job_id = %s", self.id):
456 log = logs.LogFile(self.backend, log.id)
457 log._job = self
458
459 logfiles.append(log)
460
461 return logfiles
462
463 def add_file(self, filename):
464 """
465 Add the specified file to this job.
466
467 The file is copied to the right directory by this function.
468 """
469 assert os.path.exists(filename)
470
471 if filename.endswith(".log"):
472 self._add_file_log(filename)
473
474 elif filename.endswith(".%s" % PACKAGE_EXTENSION):
475 # It is not allowed to upload packages on test builds.
476 if self.test:
477 return
478
479 self._add_file_package(filename)
480
481 def _add_file_log(self, filename):
482 """
483 Attach a log file to this job.
484 """
485 target_dirname = os.path.join(self.build.path, "logs")
486
487 if self.test:
488 i = 1
489 while True:
490 target_filename = os.path.join(target_dirname,
491 "test.%s.%s.%s.log" % (self.arch, i, self.uuid))
492
493 if os.path.exists(target_filename):
494 i += 1
495 else:
496 break
497 else:
498 target_filename = os.path.join(target_dirname,
499 "build.%s.%s.log" % (self.arch, self.uuid))
500
501 # Make sure the target directory exists.
502 if not os.path.exists(target_dirname):
503 os.makedirs(target_dirname)
504
505 # Calculate a SHA512 hash from that file.
506 f = open(filename, "rb")
507 h = hashlib.sha512()
508 while True:
509 buf = f.read(BUFFER_SIZE)
510 if not buf:
511 break
512
513 h.update(buf)
514 f.close()
515
516 # Copy the file to the final location.
517 shutil.copy2(filename, target_filename)
518
519 # Create an entry in the database.
520 self.db.execute("INSERT INTO logfiles(job_id, path, filesize, hash_sha512) \
521 VALUES(%s, %s, %s, %s)", self.id, os.path.relpath(target_filename, PACKAGES_DIR),
522 os.path.getsize(target_filename), h.hexdigest())
523
524 def _add_file_package(self, filename):
525 # Open package (creates entry in the database)
526 pkg = self.backend.packages.create(filename)
527
528 # Move package to the build directory.
529 pkg.move(os.path.join(self.build.path, self.arch))
530
531 # Attach the package to this job.
532 self.db.execute("INSERT INTO jobs_packages(job_id, pkg_id) VALUES(%s, %s)",
533 self.id, pkg.id)
534
535 def get_aborted_state(self):
536 return self.data.aborted_state
537
538 def set_aborted_state(self, state):
539 self._set_attribute("aborted_state", state)
540
541 aborted_state = property(get_aborted_state, set_aborted_state)
542
543 @property
544 def message_recipients(self):
545 l = []
546
547 # Add all people watching the build.
548 l += self.build.message_recipients
549
550 # Add the package maintainer on release builds.
551 if self.build.type == "release":
552 maint = self.pkg.maintainer
553
554 if isinstance(maint, users.User):
555 l.append("%s <%s>" % (maint.realname, maint.email))
556 elif maint:
557 l.append(maint)
558
559 # XXX add committer and commit author.
560
561 # Add the owner of the scratch build on scratch builds.
562 elif self.build.type == "scratch" and self.build.user:
563 l.append("%s <%s>" % \
564 (self.build.user.realname, self.build.user.email))
565
566 return set(l)
567
568 def save_buildroot(self, pkgs):
569 # Cleanup old stuff first (for rebuilding packages)
570 self.db.execute("DELETE FROM jobs_buildroots WHERE job_id = %s", self.id)
571
572 for pkg_name, pkg_uuid in pkgs:
573 self.db.execute("INSERT INTO jobs_buildroots(job_id, pkg_uuid, pkg_name) \
574 VALUES(%s, %s, %s)", self.id, pkg_name, pkg_uuid)
575
576 @lazy_property
577 def buildroot(self):
578 rows = self.db.query("SELECT * FROM jobs_buildroots \
579 WHERE jobs_buildroots.job_id = %s ORDER BY pkg_name", self.id)
580
581 pkgs = []
582 for row in rows:
583 # Search for this package in the packages table.
584 pkg = self.backend.packages.get_by_uuid(row.pkg_uuid)
585 pkgs.append((row.pkg_name, row.pkg_uuid, pkg))
586
587 return pkgs
588
589 def send_finished_message(self):
590 # Send no finished mails for test jobs.
591 if self.test:
592 return
593
594 logging.debug("Sending finished message for job %s to %s" % \
595 (self.name, ", ".join(self.message_recipients)))
596
597 info = {
598 "build_name" : self.name,
599 "build_host" : self.builder.name,
600 "build_uuid" : self.uuid,
601 }
602
603 self.backend.messages.send_to_all(self.message_recipients,
604 MSG_BUILD_FINISHED_SUBJECT, MSG_BUILD_FINISHED, info)
605
606 def send_failed_message(self):
607 logging.debug("Sending failed message for job %s to %s" % \
608 (self.name, ", ".join(self.message_recipients)))
609
610 build_host = "--"
611 if self.builder:
612 build_host = self.builder.name
613
614 info = {
615 "build_name" : self.name,
616 "build_host" : build_host,
617 "build_uuid" : self.uuid,
618 }
619
620 self.backend.messages.send_to_all(self.message_recipients,
621 MSG_BUILD_FAILED_SUBJECT, MSG_BUILD_FAILED, info)
622
623 def set_start_time(self, start_not_before):
624 self._set_attribute("start_not_before", start_not_before)
625
626 def schedule(self, type, start_time=None, user=None):
627 assert type in ("rebuild", "test")
628
629 if type == "rebuild":
630 if self.state == "finished":
631 return
632
633 job = self.restart()
634 job.set_start_time(start_time)
635
636 # Log the event.
637 self.log("schedule_rebuild", user=user)
638
639 elif type == "test":
640 if not self.state == "finished":
641 return
642
643 # Create a new job with same build and arch.
644 job = self.create(self.backend, self.build, self.arch, test=True)
645 job.set_start_time(start_time)
646
647 # Log the event.
648 self.log("schedule_test_job", test_job=job, user=user)
649
650 return job
651
652 def schedule_test(self, start_not_before=None, user=None):
653 # XXX to be removed
654 return self.schedule("test", start_time=start_not_before, user=user)
655
656 def schedule_rebuild(self, start_not_before=None, user=None):
657 # XXX to be removed
658 return self.schedule("rebuild", start_time=start_not_before, user=user)
659
660 def get_build_repos(self):
661 """
662 Returns a list of all repositories that should be used when
663 building this job.
664 """
665 repo_ids = self.db.query("SELECT repo_id FROM jobs_repos WHERE job_id = %s",
666 self.id)
667
668 if not repo_ids:
669 return self.distro.get_build_repos()
670
671 repos = []
672 for repo in self.distro.repositories:
673 if repo.id in [r.id for r in repo_ids]:
674 repos.append(repo)
675
676 return repos or self.distro.get_build_repos()
677
678 def get_config(self, local=False):
679 """
680 Get configuration file that is sent to the builder.
681 """
682 confs = []
683
684 # Add the distribution configuration.
685 confs.append(self.distro.get_config())
686
687 # Then add all repositories for this build.
688 for repo in self.get_build_repos():
689 conf = repo.get_conf(local=local)
690 confs.append(conf)
691
692 return "\n\n".join(confs)
693
694 def resolvdep(self):
695 config = pakfire.config.Config(files=["general.conf"])
696 config.parse(self.get_config(local=True))
697
698 # The filename of the source file.
699 filename = os.path.join(PACKAGES_DIR, self.build.pkg.path)
700 assert os.path.exists(filename), filename
701
702 # Create a new pakfire instance with the configuration for
703 # this build.
704 p = pakfire.PakfireServer(config=config, arch=self.arch)
705
706 # Try to solve the build dependencies.
707 try:
708 solver = p.resolvdep(filename)
709
710 # Catch dependency errors and log the problem string.
711 except DependencyError, e:
712 self.state = "dependency_error"
713 self.update_message("%s" % e)
714
715 else:
716 # If the build dependencies can be resolved, we set the build in
717 # pending state.
718 if solver.status is True:
719 if self.state in ("failed",):
720 return
721
722 self.state = "pending"