]> git.ipfire.org Git - pbs.git/blob - src/buildservice/jobs.py
jobs: Use templates to sender job status emails
[pbs.git] / src / buildservice / jobs.py
1 #!/usr/bin/python
2
3 import datetime
4 import hashlib
5 import logging
6 import os
7 import shutil
8 import uuid
9
10 import pakfire
11 import pakfire.config
12
13 log = logging.getLogger("builds")
14 log.propagate = 1
15
16 from . import arches
17 from . import base
18 from . import logs
19 from . import users
20
21 from .constants import *
22 from .decorators import *
23
24 class Jobs(base.Object):
25 def _get_job(self, query, *args):
26 res = self.db.get(query, *args)
27
28 if res:
29 return Job(self.backend, res.id, data=res)
30
31 def _get_jobs(self, query, *args):
32 res = self.db.query(query, *args)
33
34 for row in res:
35 yield Job(self.backend, row.id, data=row)
36
37 def create(self, build, arch, test=False, superseeds=None):
38 job = self._get_job("INSERT INTO jobs(uuid, build_id, arch, test) \
39 VALUES(%s, %s, %s, %s) RETURNING *", "%s" % uuid.uuid4(), build.id, arch, test)
40 job.log("created")
41
42 # Set cache for Build object.
43 job.build = build
44
45 # Mark if the new job superseeds some other job
46 if superseeds:
47 superseeds.superseeded_by = job
48
49 return job
50
51 def get_by_id(self, id):
52 return self._get_job("SELECT * FROM jobs WHERE id = %s", id)
53
54 def get_by_uuid(self, uuid):
55 return self._get_job("SELECT * FROM jobs WHERE uuid = %s", uuid)
56
57 def get_active(self, limit=None):
58 jobs = self._get_jobs("SELECT jobs.* FROM jobs \
59 WHERE time_started IS NOT NULL AND time_finished IS NULL \
60 ORDER BY time_started LIMIT %s", limit)
61
62 return jobs
63
64 def get_recently_ended(self, limit=None):
65 jobs = self._get_jobs("SELECT jobs.* FROM jobs \
66 WHERE time_finished IS NOT NULL ORDER BY time_finished DESC LIMIT %s", limit)
67
68 return jobs
69
70 def restart_failed(self):
71 jobs = self._get_jobs("SELECT jobs.* FROM jobs \
72 JOIN builds ON builds.id = jobs.build_id \
73 WHERE \
74 jobs.type = 'build' AND \
75 jobs.state = 'failed' AND \
76 NOT builds.state = 'broken' AND \
77 jobs.time_finished < NOW() - '72 hours'::interval \
78 ORDER BY \
79 CASE \
80 WHEN jobs.type = 'build' THEN 0 \
81 WHEN jobs.type = 'test' THEN 1 \
82 END, \
83 builds.priority DESC, jobs.time_created ASC")
84
85 # Restart the job
86 for job in jobs:
87 job.restart()
88
89
90 class Job(base.DataObject):
91 table = "jobs"
92
93 def __str__(self):
94 return "<%s id=%s %s>" % (self.__class__.__name__, self.id, self.name)
95
96 def __eq__(self, other):
97 if isinstance(other, self.__class__):
98 return self.id == other.id
99
100 def __lt__(self, other):
101 if isinstance(other, self.__class__):
102 if not self.test and other.test:
103 return True
104
105 if self.build == other.build:
106 return arches.priority(self.arch) < arches.priority(other.arch)
107
108 return self.time_created < other.time_created
109
110 def __iter__(self):
111 packages = self.backend.packages._get_packages("SELECT packages.* FROM jobs_packages \
112 LEFT JOIN packages ON jobs_packages.pkg_id = packages.id \
113 WHERE jobs_packages.job_id = %s ORDER BY packages.name", self.id)
114
115 return iter(packages)
116
117 def __nonzero__(self):
118 return True
119
120 def __len__(self):
121 res = self.db.get("SELECT COUNT(*) AS len FROM jobs_packages \
122 WHERE job_id = %s", self.id)
123
124 return res.len
125
126 @property
127 def uuid(self):
128 return self.data.uuid
129
130 @property
131 def name(self):
132 return "%s-%s.%s" % (self.pkg.name, self.pkg.friendly_version, self.arch)
133
134 @property
135 def build_id(self):
136 return self.data.build_id
137
138 @lazy_property
139 def build(self):
140 return self.backend.builds.get_by_id(self.build_id)
141
142 @property
143 def test(self):
144 return self.data.test
145
146 @property
147 def related_jobs(self):
148 ret = []
149
150 for job in self.build.jobs:
151 if job == self:
152 continue
153
154 ret.append(job)
155
156 return ret
157
158 @property
159 def pkg(self):
160 return self.build.pkg
161
162 @property
163 def size(self):
164 return sum((p.size for p in self.packages))
165
166 @lazy_property
167 def rank(self):
168 """
169 Returns the rank in the build queue
170 """
171 if not self.state == "pending":
172 return
173
174 res = self.db.get("SELECT rank FROM jobs_queue WHERE job_id = %s", self.id)
175
176 if res:
177 return res.rank
178
179 @property
180 def distro(self):
181 return self.build.distro
182
183 def get_superseeded_by(self):
184 if self.data.superseeded_by:
185 return self.backend.jobs.get_by_id(self.data.superseeded_by)
186
187 def set_superseeded_by(self, superseeded_by):
188 assert isinstance(superseeded_by, self.__class__)
189
190 self._set_attribute("superseeded_by", superseeded_by.id)
191
192 superseeded_by = lazy_property(get_superseeded_by, set_superseeded_by)
193
194 def start(self, builder):
195 """
196 Starts this job on builder
197 """
198 self.builder = builder
199
200 # Start to dispatch the build job
201 self.state = "dispatching"
202
203 def running(self):
204 self.state = "running"
205
206 # Set start time
207 self.time_started = datetime.datetime.utcnow()
208 self.time_finished = None
209
210 def finished(self):
211 self.state = "finished"
212
213 # Log end time
214 self.time_finished = datetime.datetime.utcnow()
215
216 # Notify users
217 self.send_finished_message()
218
219 def failed(self, message):
220 self.state = "failed"
221 self.message = message
222
223 # Log end time
224 self.time_finished = datetime.datetime.utcnow()
225
226 # Notify users
227 self.send_failed_message()
228
229 def restart(self, test=None, start_not_before=None):
230 # Copy the job and let it build again
231 job = self.backend.jobs.create(self.build, self.arch,
232 test=test or self.test, superseeds=self)
233
234 if start_not_before:
235 job.start_not_before = start_not_before
236
237 return job
238
239 def delete(self):
240 """
241 Deletes a job from the database
242 """
243 # Remove the buildroot
244 self.db.execute("DELETE FROM jobs_buildroots WHERE job_id = %s", self.id)
245
246 # Remove the history
247 self.db.execute("DELETE FROM jobs_history WHERE job_id = %s", self.id)
248
249 # Delete all packages
250 for pkg in self:
251 self.db.execute("DELETE FROM jobs_packages \
252 WHERE job_id = %s AND pkg_id = %s", self.id, pkg.id)
253 pkg.delete()
254
255 # Remove all logfiles
256 for logfile in self.logfiles:
257 self.backend.delete_file(os.path.join(PACKAGES_DIR, logfile.path))
258
259 self.db.execute("DELETE FROM logfiles WHERE job_id = %s", self.id)
260
261 # Delete the job itself.
262 self.db.execute("DELETE FROM jobs WHERE id = %s", self.id)
263
264 ## Logging stuff
265
266 def log(self, action, user=None, state=None, builder=None, test_job=None):
267 user_id = None
268 if user:
269 user_id = user.id
270
271 builder_id = None
272 if builder:
273 builder_id = builder.id
274
275 test_job_id = None
276 if test_job:
277 test_job_id = test_job.id
278
279 self.db.execute("INSERT INTO jobs_history(job_id, action, state, user_id, \
280 time, builder_id, test_job_id) VALUES(%s, %s, %s, %s, NOW(), %s, %s)",
281 self.id, action, state, user_id, builder_id, test_job_id)
282
283 def get_log(self, limit=None, offset=None, user=None):
284 query = "SELECT * FROM jobs_history"
285
286 conditions = ["job_id = %s",]
287 args = [self.id,]
288
289 if user:
290 conditions.append("user_id = %s")
291 args.append(user.id)
292
293 if conditions:
294 query += " WHERE %s" % " AND ".join(conditions)
295
296 query += " ORDER BY time DESC"
297
298 if limit:
299 if offset:
300 query += " LIMIT %s,%s"
301 args += [offset, limit,]
302 else:
303 query += " LIMIT %s"
304 args += [limit,]
305
306 entries = []
307 for entry in self.db.query(query, *args):
308 entry = logs.JobLogEntry(self.backend, entry)
309 entries.append(entry)
310
311 return entries
312
313 def is_running(self):
314 """
315 Returns True if job is in a running state.
316 """
317 return self.state in ("pending", "dispatching", "running", "uploading")
318
319 def get_state(self):
320 return self.data.state
321
322 def set_state(self, state):
323 self._set_attribute("state", state)
324
325 # Automatically update the state of the build (not on test builds)
326 if not self.test:
327 self.build.auto_update_state()
328
329 state = property(get_state, set_state)
330
331 def set_message(self, message):
332 if message:
333 message = "%s" % message
334
335 self._set_attribute("message", message)
336
337 message = property(lambda s: s.data.message, set_message)
338
339 def get_builder(self):
340 if self.data.builder_id:
341 return self.backend.builders.get_by_id(self.data.builder_id)
342
343 def set_builder(self, builder, user=None):
344 log.info("Builder %s has been assigned to %s" % (builder.name, self.name))
345
346 self._set_attribute("builder_id", builder.id)
347
348 # Log the event.
349 if user:
350 self.log("builder_assigned", builder=builder, user=user)
351
352 builder = lazy_property(get_builder, set_builder)
353
354 @property
355 def candidate_builders(self):
356 """
357 Returns all active builders that could build this job
358 """
359 builders = self.backend.builders.get_for_arch(self.arch)
360
361 # Remove all builders that are not available
362 builders = (b for b in builders if b.enabled and b.is_online())
363
364 # Remove all builders that have too many jobs
365 builders = (b for b in builders if not b.too_many_jobs)
366
367 # Sort them by the fastest builder first
368 return sorted(builders, key=lambda b: -b.performance_index)
369
370 @property
371 def designated_builder(self):
372 """
373 Returns the fastest candidate builder builder
374 """
375 if self.candidate_builders:
376 return self.candidate_builders[0]
377
378 @property
379 def arch(self):
380 return self.data.arch
381
382 @property
383 def duration(self):
384 if not self.time_started:
385 return 0
386
387 if self.time_finished:
388 delta = self.time_finished - self.time_started
389 else:
390 delta = datetime.datetime.utcnow() - self.time_started
391
392 return delta.total_seconds()
393
394 @property
395 def time_created(self):
396 return self.data.time_created
397
398 def set_time_started(self, time_started):
399 self._set_attribute("time_started", time_started)
400
401 time_started = property(lambda s: s.data.time_started, set_time_started)
402
403 def set_time_finished(self, time_finished):
404 self._set_attribute("time_finished", time_finished)
405
406 time_finished = property(lambda s: s.data.time_finished, set_time_finished)
407
408 def set_start_not_before(self, start_not_before):
409 self._set_attribute("start_not_before", start_not_before)
410
411 start_not_before = property(lambda s: s.data.start_not_before, set_start_not_before)
412
413 def get_pkg_by_uuid(self, uuid):
414 pkg = self.backend.packages._get_package("SELECT packages.id FROM packages \
415 JOIN jobs_packages ON jobs_packages.pkg_id = packages.id \
416 WHERE jobs_packages.job_id = %s AND packages.uuid = %s",
417 self.id, uuid)
418
419 if pkg:
420 pkg.job = self
421 return pkg
422
423 @lazy_property
424 def logfiles(self):
425 logfiles = []
426
427 for log in self.db.query("SELECT id FROM logfiles WHERE job_id = %s", self.id):
428 log = logs.LogFile(self.backend, log.id)
429 log._job = self
430
431 logfiles.append(log)
432
433 return logfiles
434
435 def add_file(self, filename):
436 """
437 Add the specified file to this job.
438
439 The file is copied to the right directory by this function.
440 """
441 assert os.path.exists(filename)
442
443 if filename.endswith(".log"):
444 self._add_file_log(filename)
445
446 elif filename.endswith(".%s" % PACKAGE_EXTENSION):
447 # It is not allowed to upload packages on test builds.
448 if self.test:
449 return
450
451 self._add_file_package(filename)
452
453 def _add_file_log(self, filename):
454 """
455 Attach a log file to this job.
456 """
457 target_dirname = os.path.join(self.build.path, "logs")
458
459 if self.test:
460 i = 1
461 while True:
462 target_filename = os.path.join(target_dirname,
463 "test.%s.%s.%s.log" % (self.arch, i, self.uuid))
464
465 if os.path.exists(target_filename):
466 i += 1
467 else:
468 break
469 else:
470 target_filename = os.path.join(target_dirname,
471 "build.%s.%s.log" % (self.arch, self.uuid))
472
473 # Make sure the target directory exists.
474 if not os.path.exists(target_dirname):
475 os.makedirs(target_dirname)
476
477 # Calculate a SHA512 hash from that file.
478 f = open(filename, "rb")
479 h = hashlib.sha512()
480 while True:
481 buf = f.read(BUFFER_SIZE)
482 if not buf:
483 break
484
485 h.update(buf)
486 f.close()
487
488 # Copy the file to the final location.
489 shutil.copy2(filename, target_filename)
490
491 # Create an entry in the database.
492 self.db.execute("INSERT INTO logfiles(job_id, path, filesize, hash_sha512) \
493 VALUES(%s, %s, %s, %s)", self.id, os.path.relpath(target_filename, PACKAGES_DIR),
494 os.path.getsize(target_filename), h.hexdigest())
495
496 def _add_file_package(self, filename):
497 # Open package (creates entry in the database)
498 pkg = self.backend.packages.create(filename)
499
500 # Move package to the build directory.
501 pkg.move(os.path.join(self.build.path, self.arch))
502
503 # Attach the package to this job.
504 self.db.execute("INSERT INTO jobs_packages(job_id, pkg_id) VALUES(%s, %s)",
505 self.id, pkg.id)
506
507 def get_aborted_state(self):
508 return self.data.aborted_state
509
510 def set_aborted_state(self, state):
511 self._set_attribute("aborted_state", state)
512
513 aborted_state = property(get_aborted_state, set_aborted_state)
514
515 @property
516 def message_recipients(self):
517 l = []
518
519 # Add all people watching the build.
520 l += self.build.message_recipients
521
522 # Add the package maintainer on release builds.
523 if self.build.type == "release":
524 maint = self.pkg.maintainer
525
526 if isinstance(maint, users.User):
527 l.append("%s <%s>" % (maint.realname, maint.email))
528 elif maint:
529 l.append(maint)
530
531 # XXX add committer and commit author.
532
533 # Add the owner of the scratch build on scratch builds.
534 elif self.build.type == "scratch" and self.build.user:
535 l.append("%s <%s>" % \
536 (self.build.user.realname, self.build.user.email))
537
538 return set(l)
539
540 def save_buildroot(self, pkgs):
541 # Cleanup old stuff first (for rebuilding packages)
542 self.db.execute("DELETE FROM jobs_buildroots WHERE job_id = %s", self.id)
543
544 for pkg_name, pkg_uuid in pkgs:
545 self.db.execute("INSERT INTO jobs_buildroots(job_id, pkg_uuid, pkg_name) \
546 VALUES(%s, %s, %s)", self.id, pkg_name, pkg_uuid)
547
548 @lazy_property
549 def buildroot(self):
550 rows = self.db.query("SELECT * FROM jobs_buildroots \
551 WHERE jobs_buildroots.job_id = %s ORDER BY pkg_name", self.id)
552
553 pkgs = []
554 for row in rows:
555 # Search for this package in the packages table.
556 pkg = self.backend.packages.get_by_uuid(row.pkg_uuid)
557 pkgs.append((row.pkg_name, row.pkg_uuid, pkg))
558
559 return pkgs
560
561 def send_finished_message(self):
562 # Send no finished mails for test jobs.
563 if self.test:
564 return
565
566 logging.debug("Sending finished message for job %s to %s" % \
567 (self.name, ", ".join(self.message_recipients)))
568
569 self.backend.messages.send_template_to_many(self.message_recipients,
570 "messages/jobs/finished", job=self)
571
572 def send_failed_message(self):
573 logging.debug("Sending failed message for job %s to %s" % \
574 (self.name, ", ".join(self.message_recipients)))
575
576 self.backend.messages.send_template_to_many(self.message_recipients,
577 "messages/jobs/failed", job=self)
578
579 def get_build_repos(self):
580 """
581 Returns a list of all repositories that should be used when
582 building this job.
583 """
584 repo_ids = self.db.query("SELECT repo_id FROM jobs_repos WHERE job_id = %s",
585 self.id)
586
587 if not repo_ids:
588 return self.distro.get_build_repos()
589
590 repos = []
591 for repo in self.distro.repositories:
592 if repo.id in [r.id for r in repo_ids]:
593 repos.append(repo)
594
595 return repos or self.distro.get_build_repos()
596
597 def get_config(self, local=False):
598 """
599 Get configuration file that is sent to the builder.
600 """
601 confs = []
602
603 # Add the distribution configuration.
604 confs.append(self.distro.get_config())
605
606 # Then add all repositories for this build.
607 for repo in self.get_build_repos():
608 conf = repo.get_conf(local=local)
609 confs.append(conf)
610
611 return "\n\n".join(confs)
612
613 def set_dependency_check_succeeded(self, value):
614 self._set_attribute("dependency_check_succeeded", value)
615 self._set_attribute("dependency_check_at", datetime.datetime.utcnow())
616
617 # Reset the message
618 if value is True:
619 self.message = None
620
621 dependency_check_succeeded = property(
622 lambda s: s.data.dependency_check_succeeded,
623 set_dependency_check_succeeded)
624
625 def resolvdep(self):
626 log.info("Processing dependencies for %s..." % self)
627
628 config = pakfire.config.Config(files=["general.conf"])
629 config.parse(self.get_config(local=True))
630
631 # The filename of the source file.
632 filename = os.path.join(PACKAGES_DIR, self.build.pkg.path)
633 assert os.path.exists(filename), filename
634
635 # Create a new pakfire instance with the configuration for
636 # this build.
637 p = pakfire.PakfireServer(config=config, arch=self.arch)
638
639 # Try to solve the build dependencies.
640 try:
641 solver = p.resolvdep(filename)
642
643 # Catch dependency errors and log the problem string.
644 except DependencyError, e:
645 self.dependency_check_succeeded = False
646 self.message = e
647
648 # The dependency check has succeeded
649 else:
650 self.dependency_check_succeeded = True