]> git.ipfire.org Git - people/jschlag/pbs.git/blame - src/buildservice/jobs.py
Always use absolute path when deleting a file
[people/jschlag/pbs.git] / src / buildservice / jobs.py
CommitLineData
2a1e9ce2
MT
1#!/usr/bin/python
2
3import datetime
4import hashlib
5import logging
6import os
7import shutil
8import uuid
9
10import pakfire
11import pakfire.config
12
13log = logging.getLogger("builds")
14log.propagate = 1
15
16from . import arches
17from . import base
18from . import logs
19from . import users
20
21from .constants import *
22from .decorators import *
23
24class Jobs(base.Object):
25 def _get_job(self, query, *args):
26 res = self.db.get(query, *args)
27
28 if res:
29 return Job(self.backend, res.id, data=res)
30
31 def _get_jobs(self, query, *args):
32 res = self.db.query(query, *args)
33
34 for row in res:
35 yield Job(self.backend, row.id, data=row)
36
089dfc92 37 def create(self, build, arch, test=False, superseeds=None):
96cc81de
MT
38 job = self._get_job("INSERT INTO jobs(uuid, build_id, arch, test) \
39 VALUES(%s, %s, %s, %s) RETURNING *", "%s" % uuid.uuid4(), build.id, arch, test)
2a1e9ce2
MT
40 job.log("created")
41
42 # Set cache for Build object.
43 job.build = build
44
3f516e41
MT
45 # Mark if the new job superseeds some other job
46 if superseeds:
47 superseeds.superseeded_by = job
48
2a1e9ce2
MT
49 return job
50
c402c708
MT
51 def get_by_id(self, id):
52 return self._get_job("SELECT * FROM jobs WHERE id = %s", id)
2a1e9ce2
MT
53
54 def get_by_uuid(self, uuid):
45ffc310 55 return self._get_job("SELECT * FROM jobs WHERE uuid = %s", uuid)
2a1e9ce2 56
2a1e9ce2
MT
57 def get_active(self, host_id=None, builder=None, states=None):
58 if builder:
59 host_id = builder.id
60
61 if states is None:
62 states = ["dispatching", "running", "uploading"]
63
64 query = "SELECT * FROM jobs WHERE state IN (%s)" % ", ".join(["%s"] * len(states))
65 args = states
66
67 if host_id:
68 query += " AND builder_id = %s" % host_id
69
70 query += " ORDER BY \
71 CASE \
72 WHEN jobs.state = 'running' THEN 0 \
73 WHEN jobs.state = 'uploading' THEN 1 \
74 WHEN jobs.state = 'dispatching' THEN 2 \
75 WHEN jobs.state = 'pending' THEN 3 \
76 WHEN jobs.state = 'new' THEN 4 \
77 END, time_started ASC"
78
79 return [Job(self.backend, j.id, j) for j in self.db.query(query, *args)]
80
81 def get_latest(self, arch=None, builder=None, limit=None, age=None, date=None):
82 query = "SELECT * FROM jobs"
83 args = []
84
85 where = ["(state = 'finished' OR state = 'failed' OR state = 'aborted')"]
86
87 if arch:
88 where.append("arch = %s")
89 args.append(arch)
90
91 if builder:
92 where.append("builder_id = %s")
93 args.append(builder.id)
94
95 if date:
96 try:
97 year, month, day = date.split("-", 2)
98 date = datetime.date(int(year), int(month), int(day))
99 except ValueError:
100 pass
101 else:
102 where.append("(time_created::date = %s OR \
103 time_started::date = %s OR time_finished::date = %s)")
104 args += (date, date, date)
105
106 if age:
107 where.append("time_finished >= NOW() - '%s'::interval" % age)
108
109 if where:
110 query += " WHERE %s" % " AND ".join(where)
111
112 query += " ORDER BY time_finished DESC"
113
114 if limit:
115 query += " LIMIT %s"
116 args.append(limit)
117
118 return [Job(self.backend, j.id, j) for j in self.db.query(query, *args)]
119
120 def get_average_build_time(self):
121 """
122 Returns the average build time of all finished builds from the
123 last 3 months.
124 """
125 result = self.db.get("SELECT AVG(time_finished - time_started) as average \
126 FROM jobs WHERE type = 'build' AND state = 'finished' AND \
127 time_finished >= NOW() - '3 months'::interval")
128
129 if result:
130 return result.average
131
132 def count(self, *states):
133 query = "SELECT COUNT(*) AS count FROM jobs"
134 args = []
135
136 if states:
137 query += " WHERE state IN %s"
138 args.append(states)
139
140 jobs = self.db.get(query, *args)
141 if jobs:
142 return jobs.count
143
6990cac2 144 def restart_failed(self):
2a1e9ce2
MT
145 jobs = self._get_jobs("SELECT jobs.* FROM jobs \
146 JOIN builds ON builds.id = jobs.build_id \
147 WHERE \
148 jobs.type = 'build' AND \
149 jobs.state = 'failed' AND \
2a1e9ce2
MT
150 NOT builds.state = 'broken' AND \
151 jobs.time_finished < NOW() - '72 hours'::interval \
152 ORDER BY \
153 CASE \
154 WHEN jobs.type = 'build' THEN 0 \
155 WHEN jobs.type = 'test' THEN 1 \
156 END, \
6990cac2 157 builds.priority DESC, jobs.time_created ASC")
2a1e9ce2
MT
158
159 # Restart the job
160 for job in jobs:
96cc81de 161 job.restart()
2a1e9ce2
MT
162
163
164class Job(base.DataObject):
165 table = "jobs"
166
167 def __str__(self):
168 return "<%s id=%s %s>" % (self.__class__.__name__, self.id, self.name)
169
170 def __eq__(self, other):
171 if isinstance(other, self.__class__):
172 return self.id == other.id
173
174 def __lt__(self, other):
175 if isinstance(other, self.__class__):
4f90cf84 176 if not self.test and other.test:
2a1e9ce2
MT
177 return True
178
179 if self.build == other.build:
180 return arches.priority(self.arch) < arches.priority(other.arch)
181
182 return self.time_created < other.time_created
183
184 def __iter__(self):
185 packages = self.backend.packages._get_packages("SELECT packages.* FROM jobs_packages \
186 LEFT JOIN packages ON jobs_packages.pkg_id = packages.id \
187 WHERE jobs_packages.job_id = %s ORDER BY packages.name", self.id)
188
189 return iter(packages)
190
191 def __nonzero__(self):
192 return True
193
194 def __len__(self):
195 res = self.db.get("SELECT COUNT(*) AS len FROM jobs_packages \
196 WHERE job_id = %s", self.id)
197
198 return res.len
199
200 @property
201 def distro(self):
202 return self.build.distro
203
96cc81de
MT
204 def restart(self):
205 # Copy the job and let it build again
206 return self.backend.jobs.create(self.build, self.arch,
207 test=self.test, superseeds=self)
208
3f516e41
MT
209 def get_superseeded_by(self):
210 if self.data.superseeded_by:
211 return self.backend.jobs.get_by_id(self.data.superseeded_by)
212
213 def set_superseeded_by(self, superseeded_by):
214 assert isinstance(superseeded_by, self.__class__)
215
216 self._set_attribute("superseeded_by", superseeded_by.id)
217 self.superseeded_by = superseeded_by
218
219 superseeded_by = lazy_property(get_superseeded_by, set_superseeded_by)
220
2a1e9ce2 221 def delete(self):
2a1e9ce2 222 """
a08fbdef 223 Deletes a job from the database
2a1e9ce2 224 """
a08fbdef 225 # Remove the buildroot
2a1e9ce2
MT
226 self.db.execute("DELETE FROM jobs_buildroots WHERE job_id = %s", self.id)
227
a08fbdef 228 # Remove the history
2a1e9ce2
MT
229 self.db.execute("DELETE FROM jobs_history WHERE job_id = %s", self.id)
230
a08fbdef
MT
231 # Delete all packages
232 for pkg in self:
233 self.db.execute("DELETE FROM jobs_packages \
234 WHERE job_id = %s AND pkg_id = %s", self.id, pkg.id)
2a1e9ce2
MT
235 pkg.delete()
236
a08fbdef 237 # Remove all logfiles
2a1e9ce2 238 for logfile in self.logfiles:
b2737501 239 self.backend.delete_file(os.path.join(PACKAGES_DIR, logfile.path))
2a1e9ce2 240
a08fbdef
MT
241 self.db.execute("DELETE FROM logfiles WHERE job_id = %s", self.id)
242
243 # Delete the job itself.
244 self.db.execute("DELETE FROM jobs WHERE id = %s", self.id)
245
2a1e9ce2
MT
246 ## Logging stuff
247
248 def log(self, action, user=None, state=None, builder=None, test_job=None):
249 user_id = None
250 if user:
251 user_id = user.id
252
253 builder_id = None
254 if builder:
255 builder_id = builder.id
256
257 test_job_id = None
258 if test_job:
259 test_job_id = test_job.id
260
261 self.db.execute("INSERT INTO jobs_history(job_id, action, state, user_id, \
262 time, builder_id, test_job_id) VALUES(%s, %s, %s, %s, NOW(), %s, %s)",
263 self.id, action, state, user_id, builder_id, test_job_id)
264
265 def get_log(self, limit=None, offset=None, user=None):
266 query = "SELECT * FROM jobs_history"
267
268 conditions = ["job_id = %s",]
269 args = [self.id,]
270
271 if user:
272 conditions.append("user_id = %s")
273 args.append(user.id)
274
275 if conditions:
276 query += " WHERE %s" % " AND ".join(conditions)
277
278 query += " ORDER BY time DESC"
279
280 if limit:
281 if offset:
282 query += " LIMIT %s,%s"
283 args += [offset, limit,]
284 else:
285 query += " LIMIT %s"
286 args += [limit,]
287
288 entries = []
289 for entry in self.db.query(query, *args):
290 entry = logs.JobLogEntry(self.backend, entry)
291 entries.append(entry)
292
293 return entries
294
295 @property
296 def uuid(self):
297 return self.data.uuid
298
299 @property
4f90cf84
MT
300 def test(self):
301 return self.data.test
2a1e9ce2
MT
302
303 @property
304 def build_id(self):
305 return self.data.build_id
306
307 @lazy_property
308 def build(self):
309 return self.backend.builds.get_by_id(self.build_id)
310
311 @property
312 def related_jobs(self):
313 ret = []
314
315 for job in self.build.jobs:
316 if job == self:
317 continue
318
319 ret.append(job)
320
321 return ret
322
323 @property
324 def pkg(self):
325 return self.build.pkg
326
327 @property
328 def name(self):
329 return "%s-%s.%s" % (self.pkg.name, self.pkg.friendly_version, self.arch)
330
331 @property
332 def size(self):
333 return sum((p.size for p in self.packages))
334
335 @lazy_property
336 def rank(self):
337 """
338 Returns the rank in the build queue
339 """
340 if not self.state == "pending":
341 return
342
343 res = self.db.get("SELECT rank FROM jobs_queue WHERE job_id = %s", self.id)
344
345 if res:
346 return res.rank
347
348 def is_running(self):
349 """
350 Returns True if job is in a running state.
351 """
352 return self.state in ("pending", "dispatching", "running", "uploading")
353
354 def get_state(self):
355 return self.data.state
356
357 def set_state(self, state, user=None, log=True):
358 # Nothing to do if the state remains.
359 if not self.state == state:
c2fb4460 360 self._set_attribute("state", state)
2a1e9ce2
MT
361
362 # Log the event.
363 if log and not state == "new":
364 self.log("state_change", state=state, user=user)
365
2a1e9ce2
MT
366 # Always clear the message when the status is changed.
367 self.update_message(None)
368
369 # Update some more informations.
370 if state == "dispatching":
371 # Set start time.
c2fb4460 372 self._set_attribute("time_started", datetime.datetime.utcnow())
2a1e9ce2
MT
373
374 elif state in ("aborted", "dependency_error", "finished", "failed"):
c2fb4460 375 self._set_attribute("time_finished", datetime.datetime.utcnow())
2a1e9ce2
MT
376
377 # Send messages to the user.
378 if state == "finished":
379 self.send_finished_message()
380
381 elif state == "failed":
382 # Remove all package files if a job is set to failed state.
383 self.__delete_packages()
384
385 self.send_failed_message()
386
387 # Automatically update the state of the build (not on test builds).
4f90cf84 388 if not self.test:
2a1e9ce2
MT
389 self.build.auto_update_state()
390
391 state = property(get_state, set_state)
392
393 @property
394 def message(self):
395 return self.data.message
396
c2fb4460
MT
397 def update_message(self, message):
398 self._set_attribute("message", message)
2a1e9ce2
MT
399
400 def get_builder(self):
401 if self.data.builder_id:
402 return self.backend.builders.get_by_id(self.data.builder_id)
403
404 def set_builder(self, builder, user=None):
c2fb4460 405 self._set_attribute("builder_id", builder.id)
2a1e9ce2
MT
406
407 # Log the event.
408 if user:
409 self.log("builder_assigned", builder=builder, user=user)
410
411 builder = lazy_property(get_builder, set_builder)
412
413 @property
414 def arch(self):
415 return self.data.arch
416
417 @property
418 def duration(self):
419 if not self.time_started:
420 return 0
421
422 if self.time_finished:
423 delta = self.time_finished - self.time_started
424 else:
425 delta = datetime.datetime.utcnow() - self.time_started
426
427 return delta.total_seconds()
428
429 @property
430 def time_created(self):
431 return self.data.time_created
432
433 @property
434 def time_started(self):
435 return self.data.time_started
436
437 @property
438 def time_finished(self):
439 return self.data.time_finished
440
2a1e9ce2
MT
441 def get_pkg_by_uuid(self, uuid):
442 pkg = self.backend.packages._get_package("SELECT packages.id FROM packages \
443 JOIN jobs_packages ON jobs_packages.pkg_id = packages.id \
444 WHERE jobs_packages.job_id = %s AND packages.uuid = %s",
445 self.id, uuid)
446
447 if pkg:
448 pkg.job = self
449 return pkg
450
451 @lazy_property
452 def logfiles(self):
453 logfiles = []
454
455 for log in self.db.query("SELECT id FROM logfiles WHERE job_id = %s", self.id):
456 log = logs.LogFile(self.backend, log.id)
457 log._job = self
458
459 logfiles.append(log)
460
461 return logfiles
462
463 def add_file(self, filename):
464 """
465 Add the specified file to this job.
466
467 The file is copied to the right directory by this function.
468 """
469 assert os.path.exists(filename)
470
471 if filename.endswith(".log"):
472 self._add_file_log(filename)
473
474 elif filename.endswith(".%s" % PACKAGE_EXTENSION):
475 # It is not allowed to upload packages on test builds.
4f90cf84 476 if self.test:
2a1e9ce2
MT
477 return
478
479 self._add_file_package(filename)
480
481 def _add_file_log(self, filename):
482 """
483 Attach a log file to this job.
484 """
485 target_dirname = os.path.join(self.build.path, "logs")
486
4f90cf84 487 if self.test:
2a1e9ce2
MT
488 i = 1
489 while True:
490 target_filename = os.path.join(target_dirname,
6990cac2 491 "test.%s.%s.%s.log" % (self.arch, i, self.uuid))
2a1e9ce2
MT
492
493 if os.path.exists(target_filename):
494 i += 1
495 else:
496 break
497 else:
498 target_filename = os.path.join(target_dirname,
6990cac2 499 "build.%s.%s.log" % (self.arch, self.uuid))
2a1e9ce2
MT
500
501 # Make sure the target directory exists.
502 if not os.path.exists(target_dirname):
503 os.makedirs(target_dirname)
504
505 # Calculate a SHA512 hash from that file.
506 f = open(filename, "rb")
507 h = hashlib.sha512()
508 while True:
509 buf = f.read(BUFFER_SIZE)
510 if not buf:
511 break
512
513 h.update(buf)
514 f.close()
515
516 # Copy the file to the final location.
517 shutil.copy2(filename, target_filename)
518
519 # Create an entry in the database.
520 self.db.execute("INSERT INTO logfiles(job_id, path, filesize, hash_sha512) \
521 VALUES(%s, %s, %s, %s)", self.id, os.path.relpath(target_filename, PACKAGES_DIR),
522 os.path.getsize(target_filename), h.hexdigest())
523
524 def _add_file_package(self, filename):
525 # Open package (creates entry in the database)
526 pkg = self.backend.packages.create(filename)
527
528 # Move package to the build directory.
529 pkg.move(os.path.join(self.build.path, self.arch))
530
531 # Attach the package to this job.
532 self.db.execute("INSERT INTO jobs_packages(job_id, pkg_id) VALUES(%s, %s)",
533 self.id, pkg.id)
534
535 def get_aborted_state(self):
536 return self.data.aborted_state
537
538 def set_aborted_state(self, state):
539 self._set_attribute("aborted_state", state)
540
541 aborted_state = property(get_aborted_state, set_aborted_state)
542
543 @property
544 def message_recipients(self):
545 l = []
546
547 # Add all people watching the build.
548 l += self.build.message_recipients
549
550 # Add the package maintainer on release builds.
551 if self.build.type == "release":
552 maint = self.pkg.maintainer
553
554 if isinstance(maint, users.User):
555 l.append("%s <%s>" % (maint.realname, maint.email))
556 elif maint:
557 l.append(maint)
558
559 # XXX add committer and commit author.
560
561 # Add the owner of the scratch build on scratch builds.
562 elif self.build.type == "scratch" and self.build.user:
563 l.append("%s <%s>" % \
564 (self.build.user.realname, self.build.user.email))
565
566 return set(l)
567
568 def save_buildroot(self, pkgs):
6990cac2
MT
569 # Cleanup old stuff first (for rebuilding packages)
570 self.db.execute("DELETE FROM jobs_buildroots WHERE job_id = %s", self.id)
2a1e9ce2
MT
571
572 for pkg_name, pkg_uuid in pkgs:
6990cac2
MT
573 self.db.execute("INSERT INTO jobs_buildroots(job_id, pkg_uuid, pkg_name) \
574 VALUES(%s, %s, %s)", self.id, pkg_name, pkg_uuid)
2a1e9ce2 575
6990cac2
MT
576 @lazy_property
577 def buildroot(self):
2a1e9ce2 578 rows = self.db.query("SELECT * FROM jobs_buildroots \
6990cac2 579 WHERE jobs_buildroots.job_id = %s ORDER BY pkg_name", self.id)
2a1e9ce2
MT
580
581 pkgs = []
582 for row in rows:
583 # Search for this package in the packages table.
584 pkg = self.backend.packages.get_by_uuid(row.pkg_uuid)
585 pkgs.append((row.pkg_name, row.pkg_uuid, pkg))
586
587 return pkgs
588
589 def send_finished_message(self):
590 # Send no finished mails for test jobs.
4f90cf84 591 if self.test:
2a1e9ce2
MT
592 return
593
594 logging.debug("Sending finished message for job %s to %s" % \
595 (self.name, ", ".join(self.message_recipients)))
596
597 info = {
598 "build_name" : self.name,
599 "build_host" : self.builder.name,
600 "build_uuid" : self.uuid,
601 }
602
603 self.backend.messages.send_to_all(self.message_recipients,
604 MSG_BUILD_FINISHED_SUBJECT, MSG_BUILD_FINISHED, info)
605
606 def send_failed_message(self):
607 logging.debug("Sending failed message for job %s to %s" % \
608 (self.name, ", ".join(self.message_recipients)))
609
610 build_host = "--"
611 if self.builder:
612 build_host = self.builder.name
613
614 info = {
615 "build_name" : self.name,
616 "build_host" : build_host,
617 "build_uuid" : self.uuid,
618 }
619
620 self.backend.messages.send_to_all(self.message_recipients,
621 MSG_BUILD_FAILED_SUBJECT, MSG_BUILD_FAILED, info)
622
623 def set_start_time(self, start_not_before):
624 self._set_attribute("start_not_before", start_not_before)
625
626 def schedule(self, type, start_time=None, user=None):
627 assert type in ("rebuild", "test")
628
629 if type == "rebuild":
630 if self.state == "finished":
631 return
632
96cc81de
MT
633 job = self.restart()
634 job.set_start_time(start_time)
2a1e9ce2
MT
635
636 # Log the event.
637 self.log("schedule_rebuild", user=user)
638
639 elif type == "test":
640 if not self.state == "finished":
641 return
642
643 # Create a new job with same build and arch.
96cc81de 644 job = self.create(self.backend, self.build, self.arch, test=True)
2a1e9ce2
MT
645 job.set_start_time(start_time)
646
647 # Log the event.
648 self.log("schedule_test_job", test_job=job, user=user)
649
650 return job
651
652 def schedule_test(self, start_not_before=None, user=None):
653 # XXX to be removed
654 return self.schedule("test", start_time=start_not_before, user=user)
655
656 def schedule_rebuild(self, start_not_before=None, user=None):
657 # XXX to be removed
658 return self.schedule("rebuild", start_time=start_not_before, user=user)
659
660 def get_build_repos(self):
661 """
662 Returns a list of all repositories that should be used when
663 building this job.
664 """
665 repo_ids = self.db.query("SELECT repo_id FROM jobs_repos WHERE job_id = %s",
666 self.id)
667
668 if not repo_ids:
669 return self.distro.get_build_repos()
670
671 repos = []
672 for repo in self.distro.repositories:
673 if repo.id in [r.id for r in repo_ids]:
674 repos.append(repo)
675
676 return repos or self.distro.get_build_repos()
677
5286089b 678 def get_config(self, local=False):
2a1e9ce2
MT
679 """
680 Get configuration file that is sent to the builder.
681 """
682 confs = []
683
684 # Add the distribution configuration.
685 confs.append(self.distro.get_config())
686
687 # Then add all repositories for this build.
5286089b
MT
688 for repo in self.get_build_repos():
689 conf = repo.get_conf(local=local)
690 confs.append(conf)
2a1e9ce2
MT
691
692 return "\n\n".join(confs)
693
694 def resolvdep(self):
695 config = pakfire.config.Config(files=["general.conf"])
e68f48c3 696 config.parse(self.get_config(local=True))
2a1e9ce2
MT
697
698 # The filename of the source file.
699 filename = os.path.join(PACKAGES_DIR, self.build.pkg.path)
700 assert os.path.exists(filename), filename
701
702 # Create a new pakfire instance with the configuration for
703 # this build.
704 p = pakfire.PakfireServer(config=config, arch=self.arch)
705
706 # Try to solve the build dependencies.
707 try:
708 solver = p.resolvdep(filename)
709
710 # Catch dependency errors and log the problem string.
711 except DependencyError, e:
712 self.state = "dependency_error"
9b652889 713 self.update_message("%s" % e)
2a1e9ce2
MT
714
715 else:
716 # If the build dependencies can be resolved, we set the build in
717 # pending state.
718 if solver.status is True:
719 if self.state in ("failed",):
720 return
721
722 self.state = "pending"