]> git.ipfire.org Git - people/jschlag/pbs.git/blame - src/buildservice/jobs.py
Refactor Bugzilla URL generation
[people/jschlag/pbs.git] / src / buildservice / jobs.py
CommitLineData
2a1e9ce2
MT
1#!/usr/bin/python
2
3import datetime
4import hashlib
5import logging
6import os
7import shutil
8import uuid
9
10import pakfire
11import pakfire.config
12
13log = logging.getLogger("builds")
14log.propagate = 1
15
16from . import arches
17from . import base
18from . import logs
19from . import users
20
21from .constants import *
22from .decorators import *
23
24class Jobs(base.Object):
25 def _get_job(self, query, *args):
26 res = self.db.get(query, *args)
27
28 if res:
29 return Job(self.backend, res.id, data=res)
30
31 def _get_jobs(self, query, *args):
32 res = self.db.query(query, *args)
33
34 for row in res:
35 yield Job(self.backend, row.id, data=row)
36
089dfc92 37 def create(self, build, arch, test=False, superseeds=None):
96cc81de
MT
38 job = self._get_job("INSERT INTO jobs(uuid, build_id, arch, test) \
39 VALUES(%s, %s, %s, %s) RETURNING *", "%s" % uuid.uuid4(), build.id, arch, test)
2a1e9ce2
MT
40 job.log("created")
41
42 # Set cache for Build object.
43 job.build = build
44
3f516e41
MT
45 # Mark if the new job superseeds some other job
46 if superseeds:
47 superseeds.superseeded_by = job
48
2a1e9ce2
MT
49 return job
50
51 def get_by_id(self, id, data=None):
52 return Job(self.backend, id, data)
53
54 def get_by_uuid(self, uuid):
45ffc310 55 return self._get_job("SELECT * FROM jobs WHERE uuid = %s", uuid)
2a1e9ce2 56
2a1e9ce2
MT
57 def get_active(self, host_id=None, builder=None, states=None):
58 if builder:
59 host_id = builder.id
60
61 if states is None:
62 states = ["dispatching", "running", "uploading"]
63
64 query = "SELECT * FROM jobs WHERE state IN (%s)" % ", ".join(["%s"] * len(states))
65 args = states
66
67 if host_id:
68 query += " AND builder_id = %s" % host_id
69
70 query += " ORDER BY \
71 CASE \
72 WHEN jobs.state = 'running' THEN 0 \
73 WHEN jobs.state = 'uploading' THEN 1 \
74 WHEN jobs.state = 'dispatching' THEN 2 \
75 WHEN jobs.state = 'pending' THEN 3 \
76 WHEN jobs.state = 'new' THEN 4 \
77 END, time_started ASC"
78
79 return [Job(self.backend, j.id, j) for j in self.db.query(query, *args)]
80
81 def get_latest(self, arch=None, builder=None, limit=None, age=None, date=None):
82 query = "SELECT * FROM jobs"
83 args = []
84
85 where = ["(state = 'finished' OR state = 'failed' OR state = 'aborted')"]
86
87 if arch:
88 where.append("arch = %s")
89 args.append(arch)
90
91 if builder:
92 where.append("builder_id = %s")
93 args.append(builder.id)
94
95 if date:
96 try:
97 year, month, day = date.split("-", 2)
98 date = datetime.date(int(year), int(month), int(day))
99 except ValueError:
100 pass
101 else:
102 where.append("(time_created::date = %s OR \
103 time_started::date = %s OR time_finished::date = %s)")
104 args += (date, date, date)
105
106 if age:
107 where.append("time_finished >= NOW() - '%s'::interval" % age)
108
109 if where:
110 query += " WHERE %s" % " AND ".join(where)
111
112 query += " ORDER BY time_finished DESC"
113
114 if limit:
115 query += " LIMIT %s"
116 args.append(limit)
117
118 return [Job(self.backend, j.id, j) for j in self.db.query(query, *args)]
119
120 def get_average_build_time(self):
121 """
122 Returns the average build time of all finished builds from the
123 last 3 months.
124 """
125 result = self.db.get("SELECT AVG(time_finished - time_started) as average \
126 FROM jobs WHERE type = 'build' AND state = 'finished' AND \
127 time_finished >= NOW() - '3 months'::interval")
128
129 if result:
130 return result.average
131
132 def count(self, *states):
133 query = "SELECT COUNT(*) AS count FROM jobs"
134 args = []
135
136 if states:
137 query += " WHERE state IN %s"
138 args.append(states)
139
140 jobs = self.db.get(query, *args)
141 if jobs:
142 return jobs.count
143
6990cac2 144 def restart_failed(self):
2a1e9ce2
MT
145 jobs = self._get_jobs("SELECT jobs.* FROM jobs \
146 JOIN builds ON builds.id = jobs.build_id \
147 WHERE \
148 jobs.type = 'build' AND \
149 jobs.state = 'failed' AND \
2a1e9ce2
MT
150 NOT builds.state = 'broken' AND \
151 jobs.time_finished < NOW() - '72 hours'::interval \
152 ORDER BY \
153 CASE \
154 WHEN jobs.type = 'build' THEN 0 \
155 WHEN jobs.type = 'test' THEN 1 \
156 END, \
6990cac2 157 builds.priority DESC, jobs.time_created ASC")
2a1e9ce2
MT
158
159 # Restart the job
160 for job in jobs:
96cc81de 161 job.restart()
2a1e9ce2
MT
162
163
164class Job(base.DataObject):
165 table = "jobs"
166
167 def __str__(self):
168 return "<%s id=%s %s>" % (self.__class__.__name__, self.id, self.name)
169
170 def __eq__(self, other):
171 if isinstance(other, self.__class__):
172 return self.id == other.id
173
174 def __lt__(self, other):
175 if isinstance(other, self.__class__):
4f90cf84 176 if not self.test and other.test:
2a1e9ce2
MT
177 return True
178
179 if self.build == other.build:
180 return arches.priority(self.arch) < arches.priority(other.arch)
181
182 return self.time_created < other.time_created
183
184 def __iter__(self):
185 packages = self.backend.packages._get_packages("SELECT packages.* FROM jobs_packages \
186 LEFT JOIN packages ON jobs_packages.pkg_id = packages.id \
187 WHERE jobs_packages.job_id = %s ORDER BY packages.name", self.id)
188
189 return iter(packages)
190
191 def __nonzero__(self):
192 return True
193
194 def __len__(self):
195 res = self.db.get("SELECT COUNT(*) AS len FROM jobs_packages \
196 WHERE job_id = %s", self.id)
197
198 return res.len
199
200 @property
201 def distro(self):
202 return self.build.distro
203
96cc81de
MT
204 def restart(self):
205 # Copy the job and let it build again
206 return self.backend.jobs.create(self.build, self.arch,
207 test=self.test, superseeds=self)
208
3f516e41
MT
209 def get_superseeded_by(self):
210 if self.data.superseeded_by:
211 return self.backend.jobs.get_by_id(self.data.superseeded_by)
212
213 def set_superseeded_by(self, superseeded_by):
214 assert isinstance(superseeded_by, self.__class__)
215
216 self._set_attribute("superseeded_by", superseeded_by.id)
217 self.superseeded_by = superseeded_by
218
219 superseeded_by = lazy_property(get_superseeded_by, set_superseeded_by)
220
2a1e9ce2 221 def delete(self):
2a1e9ce2 222 """
a08fbdef 223 Deletes a job from the database
2a1e9ce2 224 """
a08fbdef 225 # Remove the buildroot
2a1e9ce2
MT
226 self.db.execute("DELETE FROM jobs_buildroots WHERE job_id = %s", self.id)
227
a08fbdef 228 # Remove the history
2a1e9ce2
MT
229 self.db.execute("DELETE FROM jobs_history WHERE job_id = %s", self.id)
230
a08fbdef
MT
231 # Delete all packages
232 for pkg in self:
233 self.db.execute("DELETE FROM jobs_packages \
234 WHERE job_id = %s AND pkg_id = %s", self.id, pkg.id)
2a1e9ce2
MT
235 pkg.delete()
236
a08fbdef 237 # Remove all logfiles
2a1e9ce2
MT
238 for logfile in self.logfiles:
239 self.db.execute("INSERT INTO queue_delete(path) VALUES(%s)", logfile.path)
240
a08fbdef
MT
241 self.db.execute("DELETE FROM logfiles WHERE job_id = %s", self.id)
242
243 # Delete the job itself.
244 self.db.execute("DELETE FROM jobs WHERE id = %s", self.id)
245
2a1e9ce2
MT
246 ## Logging stuff
247
248 def log(self, action, user=None, state=None, builder=None, test_job=None):
249 user_id = None
250 if user:
251 user_id = user.id
252
253 builder_id = None
254 if builder:
255 builder_id = builder.id
256
257 test_job_id = None
258 if test_job:
259 test_job_id = test_job.id
260
261 self.db.execute("INSERT INTO jobs_history(job_id, action, state, user_id, \
262 time, builder_id, test_job_id) VALUES(%s, %s, %s, %s, NOW(), %s, %s)",
263 self.id, action, state, user_id, builder_id, test_job_id)
264
265 def get_log(self, limit=None, offset=None, user=None):
266 query = "SELECT * FROM jobs_history"
267
268 conditions = ["job_id = %s",]
269 args = [self.id,]
270
271 if user:
272 conditions.append("user_id = %s")
273 args.append(user.id)
274
275 if conditions:
276 query += " WHERE %s" % " AND ".join(conditions)
277
278 query += " ORDER BY time DESC"
279
280 if limit:
281 if offset:
282 query += " LIMIT %s,%s"
283 args += [offset, limit,]
284 else:
285 query += " LIMIT %s"
286 args += [limit,]
287
288 entries = []
289 for entry in self.db.query(query, *args):
290 entry = logs.JobLogEntry(self.backend, entry)
291 entries.append(entry)
292
293 return entries
294
295 @property
296 def uuid(self):
297 return self.data.uuid
298
299 @property
4f90cf84
MT
300 def test(self):
301 return self.data.test
2a1e9ce2
MT
302
303 @property
304 def build_id(self):
305 return self.data.build_id
306
307 @lazy_property
308 def build(self):
309 return self.backend.builds.get_by_id(self.build_id)
310
311 @property
312 def related_jobs(self):
313 ret = []
314
315 for job in self.build.jobs:
316 if job == self:
317 continue
318
319 ret.append(job)
320
321 return ret
322
323 @property
324 def pkg(self):
325 return self.build.pkg
326
327 @property
328 def name(self):
329 return "%s-%s.%s" % (self.pkg.name, self.pkg.friendly_version, self.arch)
330
331 @property
332 def size(self):
333 return sum((p.size for p in self.packages))
334
335 @lazy_property
336 def rank(self):
337 """
338 Returns the rank in the build queue
339 """
340 if not self.state == "pending":
341 return
342
343 res = self.db.get("SELECT rank FROM jobs_queue WHERE job_id = %s", self.id)
344
345 if res:
346 return res.rank
347
348 def is_running(self):
349 """
350 Returns True if job is in a running state.
351 """
352 return self.state in ("pending", "dispatching", "running", "uploading")
353
354 def get_state(self):
355 return self.data.state
356
357 def set_state(self, state, user=None, log=True):
358 # Nothing to do if the state remains.
359 if not self.state == state:
c2fb4460 360 self._set_attribute("state", state)
2a1e9ce2
MT
361
362 # Log the event.
363 if log and not state == "new":
364 self.log("state_change", state=state, user=user)
365
2a1e9ce2
MT
366 # Always clear the message when the status is changed.
367 self.update_message(None)
368
369 # Update some more informations.
370 if state == "dispatching":
371 # Set start time.
c2fb4460 372 self._set_attribute("time_started", datetime.datetime.utcnow())
2a1e9ce2
MT
373
374 elif state in ("aborted", "dependency_error", "finished", "failed"):
c2fb4460 375 self._set_attribute("time_finished", datetime.datetime.utcnow())
2a1e9ce2
MT
376
377 # Send messages to the user.
378 if state == "finished":
379 self.send_finished_message()
380
381 elif state == "failed":
382 # Remove all package files if a job is set to failed state.
383 self.__delete_packages()
384
385 self.send_failed_message()
386
387 # Automatically update the state of the build (not on test builds).
4f90cf84 388 if not self.test:
2a1e9ce2
MT
389 self.build.auto_update_state()
390
391 state = property(get_state, set_state)
392
393 @property
394 def message(self):
395 return self.data.message
396
c2fb4460
MT
397 def update_message(self, message):
398 self._set_attribute("message", message)
2a1e9ce2
MT
399
400 def get_builder(self):
401 if self.data.builder_id:
402 return self.backend.builders.get_by_id(self.data.builder_id)
403
404 def set_builder(self, builder, user=None):
c2fb4460 405 self._set_attribute("builder_id", builder.id)
2a1e9ce2
MT
406
407 # Log the event.
408 if user:
409 self.log("builder_assigned", builder=builder, user=user)
410
411 builder = lazy_property(get_builder, set_builder)
412
413 @property
414 def arch(self):
415 return self.data.arch
416
417 @property
418 def duration(self):
419 if not self.time_started:
420 return 0
421
422 if self.time_finished:
423 delta = self.time_finished - self.time_started
424 else:
425 delta = datetime.datetime.utcnow() - self.time_started
426
427 return delta.total_seconds()
428
429 @property
430 def time_created(self):
431 return self.data.time_created
432
433 @property
434 def time_started(self):
435 return self.data.time_started
436
437 @property
438 def time_finished(self):
439 return self.data.time_finished
440
441 @property
442 def expected_runtime(self):
443 """
444 Returns the estimated time and stddev, this job takes to finish.
445 """
446 # Get the average build time.
447 build_times = self.backend.builds.get_build_times_by_arch(self.arch,
448 name=self.pkg.name)
449
450 # If there is no statistical data, we cannot estimate anything.
451 if not build_times:
452 return None, None
453
454 return build_times.average, build_times.stddev
455
456 @property
457 def eta(self):
458 expected_runtime, stddev = self.expected_runtime
459
460 if expected_runtime:
461 return expected_runtime - int(self.duration), stddev
462
2a1e9ce2
MT
463 def get_pkg_by_uuid(self, uuid):
464 pkg = self.backend.packages._get_package("SELECT packages.id FROM packages \
465 JOIN jobs_packages ON jobs_packages.pkg_id = packages.id \
466 WHERE jobs_packages.job_id = %s AND packages.uuid = %s",
467 self.id, uuid)
468
469 if pkg:
470 pkg.job = self
471 return pkg
472
473 @lazy_property
474 def logfiles(self):
475 logfiles = []
476
477 for log in self.db.query("SELECT id FROM logfiles WHERE job_id = %s", self.id):
478 log = logs.LogFile(self.backend, log.id)
479 log._job = self
480
481 logfiles.append(log)
482
483 return logfiles
484
485 def add_file(self, filename):
486 """
487 Add the specified file to this job.
488
489 The file is copied to the right directory by this function.
490 """
491 assert os.path.exists(filename)
492
493 if filename.endswith(".log"):
494 self._add_file_log(filename)
495
496 elif filename.endswith(".%s" % PACKAGE_EXTENSION):
497 # It is not allowed to upload packages on test builds.
4f90cf84 498 if self.test:
2a1e9ce2
MT
499 return
500
501 self._add_file_package(filename)
502
503 def _add_file_log(self, filename):
504 """
505 Attach a log file to this job.
506 """
507 target_dirname = os.path.join(self.build.path, "logs")
508
4f90cf84 509 if self.test:
2a1e9ce2
MT
510 i = 1
511 while True:
512 target_filename = os.path.join(target_dirname,
6990cac2 513 "test.%s.%s.%s.log" % (self.arch, i, self.uuid))
2a1e9ce2
MT
514
515 if os.path.exists(target_filename):
516 i += 1
517 else:
518 break
519 else:
520 target_filename = os.path.join(target_dirname,
6990cac2 521 "build.%s.%s.log" % (self.arch, self.uuid))
2a1e9ce2
MT
522
523 # Make sure the target directory exists.
524 if not os.path.exists(target_dirname):
525 os.makedirs(target_dirname)
526
527 # Calculate a SHA512 hash from that file.
528 f = open(filename, "rb")
529 h = hashlib.sha512()
530 while True:
531 buf = f.read(BUFFER_SIZE)
532 if not buf:
533 break
534
535 h.update(buf)
536 f.close()
537
538 # Copy the file to the final location.
539 shutil.copy2(filename, target_filename)
540
541 # Create an entry in the database.
542 self.db.execute("INSERT INTO logfiles(job_id, path, filesize, hash_sha512) \
543 VALUES(%s, %s, %s, %s)", self.id, os.path.relpath(target_filename, PACKAGES_DIR),
544 os.path.getsize(target_filename), h.hexdigest())
545
546 def _add_file_package(self, filename):
547 # Open package (creates entry in the database)
548 pkg = self.backend.packages.create(filename)
549
550 # Move package to the build directory.
551 pkg.move(os.path.join(self.build.path, self.arch))
552
553 # Attach the package to this job.
554 self.db.execute("INSERT INTO jobs_packages(job_id, pkg_id) VALUES(%s, %s)",
555 self.id, pkg.id)
556
557 def get_aborted_state(self):
558 return self.data.aborted_state
559
560 def set_aborted_state(self, state):
561 self._set_attribute("aborted_state", state)
562
563 aborted_state = property(get_aborted_state, set_aborted_state)
564
565 @property
566 def message_recipients(self):
567 l = []
568
569 # Add all people watching the build.
570 l += self.build.message_recipients
571
572 # Add the package maintainer on release builds.
573 if self.build.type == "release":
574 maint = self.pkg.maintainer
575
576 if isinstance(maint, users.User):
577 l.append("%s <%s>" % (maint.realname, maint.email))
578 elif maint:
579 l.append(maint)
580
581 # XXX add committer and commit author.
582
583 # Add the owner of the scratch build on scratch builds.
584 elif self.build.type == "scratch" and self.build.user:
585 l.append("%s <%s>" % \
586 (self.build.user.realname, self.build.user.email))
587
588 return set(l)
589
590 def save_buildroot(self, pkgs):
6990cac2
MT
591 # Cleanup old stuff first (for rebuilding packages)
592 self.db.execute("DELETE FROM jobs_buildroots WHERE job_id = %s", self.id)
2a1e9ce2
MT
593
594 for pkg_name, pkg_uuid in pkgs:
6990cac2
MT
595 self.db.execute("INSERT INTO jobs_buildroots(job_id, pkg_uuid, pkg_name) \
596 VALUES(%s, %s, %s)", self.id, pkg_name, pkg_uuid)
2a1e9ce2 597
6990cac2
MT
598 @lazy_property
599 def buildroot(self):
2a1e9ce2 600 rows = self.db.query("SELECT * FROM jobs_buildroots \
6990cac2 601 WHERE jobs_buildroots.job_id = %s ORDER BY pkg_name", self.id)
2a1e9ce2
MT
602
603 pkgs = []
604 for row in rows:
605 # Search for this package in the packages table.
606 pkg = self.backend.packages.get_by_uuid(row.pkg_uuid)
607 pkgs.append((row.pkg_name, row.pkg_uuid, pkg))
608
609 return pkgs
610
611 def send_finished_message(self):
612 # Send no finished mails for test jobs.
4f90cf84 613 if self.test:
2a1e9ce2
MT
614 return
615
616 logging.debug("Sending finished message for job %s to %s" % \
617 (self.name, ", ".join(self.message_recipients)))
618
619 info = {
620 "build_name" : self.name,
621 "build_host" : self.builder.name,
622 "build_uuid" : self.uuid,
623 }
624
625 self.backend.messages.send_to_all(self.message_recipients,
626 MSG_BUILD_FINISHED_SUBJECT, MSG_BUILD_FINISHED, info)
627
628 def send_failed_message(self):
629 logging.debug("Sending failed message for job %s to %s" % \
630 (self.name, ", ".join(self.message_recipients)))
631
632 build_host = "--"
633 if self.builder:
634 build_host = self.builder.name
635
636 info = {
637 "build_name" : self.name,
638 "build_host" : build_host,
639 "build_uuid" : self.uuid,
640 }
641
642 self.backend.messages.send_to_all(self.message_recipients,
643 MSG_BUILD_FAILED_SUBJECT, MSG_BUILD_FAILED, info)
644
645 def set_start_time(self, start_not_before):
646 self._set_attribute("start_not_before", start_not_before)
647
648 def schedule(self, type, start_time=None, user=None):
649 assert type in ("rebuild", "test")
650
651 if type == "rebuild":
652 if self.state == "finished":
653 return
654
96cc81de
MT
655 job = self.restart()
656 job.set_start_time(start_time)
2a1e9ce2
MT
657
658 # Log the event.
659 self.log("schedule_rebuild", user=user)
660
661 elif type == "test":
662 if not self.state == "finished":
663 return
664
665 # Create a new job with same build and arch.
96cc81de 666 job = self.create(self.backend, self.build, self.arch, test=True)
2a1e9ce2
MT
667 job.set_start_time(start_time)
668
669 # Log the event.
670 self.log("schedule_test_job", test_job=job, user=user)
671
672 return job
673
674 def schedule_test(self, start_not_before=None, user=None):
675 # XXX to be removed
676 return self.schedule("test", start_time=start_not_before, user=user)
677
678 def schedule_rebuild(self, start_not_before=None, user=None):
679 # XXX to be removed
680 return self.schedule("rebuild", start_time=start_not_before, user=user)
681
682 def get_build_repos(self):
683 """
684 Returns a list of all repositories that should be used when
685 building this job.
686 """
687 repo_ids = self.db.query("SELECT repo_id FROM jobs_repos WHERE job_id = %s",
688 self.id)
689
690 if not repo_ids:
691 return self.distro.get_build_repos()
692
693 repos = []
694 for repo in self.distro.repositories:
695 if repo.id in [r.id for r in repo_ids]:
696 repos.append(repo)
697
698 return repos or self.distro.get_build_repos()
699
700 def get_repo_config(self):
701 """
702 Get repository configuration file that is sent to the builder.
703 """
704 confs = []
705
706 for repo in self.get_build_repos():
707 confs.append(repo.get_conf())
708
709 return "\n\n".join(confs)
710
711 def get_config(self):
712 """
713 Get configuration file that is sent to the builder.
714 """
715 confs = []
716
717 # Add the distribution configuration.
718 confs.append(self.distro.get_config())
719
720 # Then add all repositories for this build.
721 confs.append(self.get_repo_config())
722
723 return "\n\n".join(confs)
724
725 def resolvdep(self):
726 config = pakfire.config.Config(files=["general.conf"])
727 config.parse(self.get_config())
728
729 # The filename of the source file.
730 filename = os.path.join(PACKAGES_DIR, self.build.pkg.path)
731 assert os.path.exists(filename), filename
732
733 # Create a new pakfire instance with the configuration for
734 # this build.
735 p = pakfire.PakfireServer(config=config, arch=self.arch)
736
737 # Try to solve the build dependencies.
738 try:
739 solver = p.resolvdep(filename)
740
741 # Catch dependency errors and log the problem string.
742 except DependencyError, e:
743 self.state = "dependency_error"
744 self.update_message(e)
745
746 else:
747 # If the build dependencies can be resolved, we set the build in
748 # pending state.
749 if solver.status is True:
750 if self.state in ("failed",):
751 return
752
753 self.state = "pending"